概要
搭建从MySQL -> Debezium Source Connector -> Kafka -> Debezium JDBC Sink Connector -> PostgreSQL 的服务。架构如下
搭建过程
- 启动ZooKeeper, Kafka, 作为数据源的MySQL, Kafka Connector。这一步使用Docker Composer启动即可。具体参考debezium-examples/tutorial at main · debezium/debezium-examples · GitHub 启动成功后应当有以下容器:
- 启动PostgreSQL:
- 这里使用
postgres:17
进行部署 $ docker run --name debezium_pg -p 5432:5432 -e POSTGRES_PASSWORD=debeziumpg_passwd -e POSTGRES_DB=inventory -d postgres:17
- 这里
-e POSTGRES_DB=inventory
继续使用初始教程中的示例数据库。 下载pgJDBC Download | pgJDBCDebezium提供的archive 里面已经有了pgJDBC。 `
- 这里使用
- 部署SinkConnector 这一步主要参考 Debezium connector for JDBC :: Debezium Documentation 进行。 首先下载Debezium Connector JDBC 到自己的环境中。并进行解压,如下图所示。
并重新启动一个connector。并将该工作目录挂载进去。这里我使用了apache/kafka的镜像。
docker run -it \
-p 8084:8083 \
-e CONFIG_STORAGE_TOPIC=pg_sink_configs \
-e OFFSET_STORAGE_TOPIC=pg_sink_offsets \
-e STATUS_STORAGE_TOPIC=pg_sink_statuses \
--link kafka \
--link debezium_pg \
-v $PWD:/kafka \
--name sink_connector \
apache/kafka:latest bash
参数解析:
-p 8084:8083
该connector使用宿主机的8084端口,避免和第一个mysql source connector冲突。--link kafka --link debezium_pg
等会和kafka、pg连接使用。-v $PWD:/kafka
把下载的debezium-connector挂载进去。 编辑该镜像内的配置文件/opt/kafka/config/connect-standalone.properties
, 把刚才挂载的plugin目录/kafka
加入配置,这里需要是/kafka/debezium-connector-jdbc
的父目录 。注意这里也要修改kafka的地址为kafka:9092
启动connector:bin/connect-standalone.sh config/connect-standalone.properties
启动时应当可以看到加载debezium plugin的信息,并且没有报错
注册connector,配置如下。
{
"name": "pg-jdbc-connector-orders",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://debezium_pg/inventory",
"connection.username": "postgres",
"connection.password": "debeziumpg_passwd",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.mode": "record_key",
"schema.evolution": "basic",
"use.time.zone": "UTC",
"topics": "dbserver1.inventory.orders"
}
}
这里"topics"中,dbserver1为官方教程配置中的前缀,inventory为示例数据库,orders是我们要跟踪的表。
使用curl工具进行注册。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8084/connectors/ -d '
{
"name": "pg-jdbc-connector-orders",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://debezium_pg/inventory",
"connection.username": "postgres",
"connection.password": "debeziumpg_passwd",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.mode": "record_key",
"schema.evolution": "basic",
"use.time.zone": "UTC",
"topics": "dbserver1.inventory.orders"
}
}'
访问http://localhost:8084/connectors/pg-jdbc-connector-orders
, 可以看到成功注册的sink_connector 信息
测试
分别启动MYSQL和PSQL的命令行。
mysql中使用use inventory
, psql输出\c inventory
切换到inventory数据库。可以看到mysql中已经有了示例数据,pg中有表dbserver1_inventory_orders
。该表是在pg-jdbc-connector-orders
启动时被创建的,并且已经同步了已有的数据
create
MYSQL中插入示例数据。
insert into orders values(10008, "2024-12-24", 1002, 3, 102);
psql增加相应行。
update
我们修改所有order_date为2024-12-24的订单数量。update orders set quantity = quantity + 100 where order_date = '2024-12-24';
可以看到修改被同步
delete
DELETE FROM orders WHERE EXTRACT(YEAR FROM order_date) = 2016;
删除所有2016年的订单记录。
upsert
upsert用于当insert主键冲突时,执行update
Dialect | Upsert Syntax |
---|---|
Db2 | MERGE … |
MySQL | INSERT … ON DUPLICATE KEY UPDATE … |
Oracle | MERGE … |
PostgreSQL | INSERT … ON CONFLICT … DO UPDATE SET … |
SQL Server | MERGE … |
psql首先插入一条主键为114514的记录。
INSERT INTO dbserver1_inventory_orders(order_number, order_date, purchaser, quantity, product_id)
VALUES (114514, '2024-01-01', 1001, 3, 101);
MYSQL插入相同主键,其他列不同的记录
INSERT INTO orders(order_number, order_date, purchaser, quantity, product_id)
VALUES (114514, '2023-01-01', 1001, 3, 101);
可以看到pg中没有产生两条记录,而是同一条记录被修改。sink_connector在第一次同步数据时,也自动创建了orders表上的主键。
更多
同步整个inventory数据库
我们先使用REST API删除刚才创建的connector。
curl -X DELETE http://localhost:8084/connectors/pg-jdbc-connector-orders
使用topics-regex
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8084/connectors/ -d '
{
"name": "pg-jdbc-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://debezium_pg/inventory",
"connection.username": "postgres",
"connection.password": "debeziumpg_passwd",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.mode": "record_key",
"schema.evolution": "basic",
"use.time.zone": "UTC",
"topics.regex": "dbserver1.inventory.(.*)",
}
}'
connecotr log中会出现报错:psql中没有public.geomtry , 导致该表以及之后的表都无法进行同步。
该问题通过安装 PostGIS插件解决。
可以直接在postgres容器中使用命令apt install postgresql-17-postgis-3
,再创建插件
完成安装后重新启动connector,geom表被成功同步