Featured image of post Debezium Demo搭建

Debezium Demo搭建

使用Debezium单机搭建从MySQL到PG的CDC同步系统

概要

搭建从MySQL -> Debezium Source Connector -> Kafka -> Debezium JDBC Sink Connector -> PostgreSQL 的服务。架构如下

搭建过程

  1. 启动ZooKeeper, Kafka, 作为数据源的MySQL, Kafka Connector。这一步使用Docker Composer启动即可。具体参考debezium-examples/tutorial at main · debezium/debezium-examples · GitHub 启动成功后应当有以下容器: containers
  2. 启动PostgreSQL
    • 这里使用postgres:17 进行部署
    • $ docker run --name debezium_pg -p 5432:5432 -e POSTGRES_PASSWORD=debeziumpg_passwd -e POSTGRES_DB=inventory -d postgres:17
    • 这里-e POSTGRES_DB=inventory 继续使用初始教程中的示例数据库。
    • 下载pgJDBC Download | pgJDBC Debezium提供的archive 里面已经有了pgJDBC。 `
  3. 部署SinkConnector 这一步主要参考 Debezium connector for JDBC :: Debezium Documentation 进行。 首先下载Debezium Connector JDBC 到自己的环境中。并进行解压,如下图所示。

并重新启动一个connector。并将该工作目录挂载进去。这里我使用了apache/kafka的镜像。

docker run -it \
-p 8084:8083 \
-e CONFIG_STORAGE_TOPIC=pg_sink_configs \
-e OFFSET_STORAGE_TOPIC=pg_sink_offsets \
-e STATUS_STORAGE_TOPIC=pg_sink_statuses \
--link kafka \
--link debezium_pg \
-v $PWD:/kafka \
--name sink_connector \
apache/kafka:latest bash

参数解析:

  • -p 8084:8083 该connector使用宿主机的8084端口,避免和第一个mysql source connector冲突。
  • --link kafka --link debezium_pg 等会和kafka、pg连接使用。
  • -v $PWD:/kafka 把下载的debezium-connector挂载进去。 编辑该镜像内的配置文件/opt/kafka/config/connect-standalone.properties, 把刚才挂载的plugin目录/kafka 加入配置,这里需要是/kafka/debezium-connector-jdbc的父目录 。注意这里也要修改kafka的地址为kafka:9092 启动connector: bin/connect-standalone.sh config/connect-standalone.properties 启动时应当可以看到加载debezium plugin的信息,并且没有报错

注册connector,配置如下。

{ 
	"name": "pg-jdbc-connector-orders", 
	"config": { 
		"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", 
		"tasks.max": "1", 
		"connection.url": "jdbc:postgresql://debezium_pg/inventory", 
		"connection.username": "postgres",
		"connection.password": "debeziumpg_passwd",
		"insert.mode": "upsert",
		"delete.enabled": "true",
		"primary.key.mode": "record_key",
		"schema.evolution": "basic",
		"use.time.zone": "UTC",
		"topics": "dbserver1.inventory.orders" 
	}
}

这里"topics"中,dbserver1为官方教程配置中的前缀,inventory为示例数据库,orders是我们要跟踪的表。

使用curl工具进行注册。

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8084/connectors/ -d '
{ 
	"name": "pg-jdbc-connector-orders", 
	"config": { 
		"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", 
		"tasks.max": "1", 
		"connection.url": "jdbc:postgresql://debezium_pg/inventory", 
		"connection.username": "postgres",
		"connection.password": "debeziumpg_passwd",
		"insert.mode": "upsert",
		"delete.enabled": "true",
		"primary.key.mode": "record_key",
		"schema.evolution": "basic",
		"use.time.zone": "UTC",
		"topics": "dbserver1.inventory.orders" 
	}
}'

访问http://localhost:8084/connectors/pg-jdbc-connector-orders , 可以看到成功注册的sink_connector 信息

测试

分别启动MYSQL和PSQL的命令行。

mysql中使用use inventory, psql输出\c inventory 切换到inventory数据库。可以看到mysql中已经有了示例数据,pg中有表dbserver1_inventory_orders。该表是在pg-jdbc-connector-orders 启动时被创建的,并且已经同步了已有的数据

create

MYSQL中插入示例数据。 insert into orders values(10008, "2024-12-24", 1002, 3, 102); psql增加相应行。

update

我们修改所有order_date为2024-12-24的订单数量。update orders set quantity = quantity + 100 where order_date = '2024-12-24';

可以看到修改被同步

delete

DELETE FROM orders WHERE EXTRACT(YEAR FROM order_date) = 2016;

删除所有2016年的订单记录。

upsert

upsert用于当insert主键冲突时,执行update

DialectUpsert Syntax
Db2MERGE …​
MySQLINSERT …​ ON DUPLICATE KEY UPDATE …​
OracleMERGE …​
PostgreSQLINSERT …​ ON CONFLICT …​ DO UPDATE SET …​
SQL ServerMERGE …​

psql首先插入一条主键为114514的记录。

INSERT INTO dbserver1_inventory_orders(order_number, order_date, purchaser, quantity, product_id)
VALUES (114514, '2024-01-01', 1001, 3, 101);

MYSQL插入相同主键,其他列不同的记录

INSERT INTO orders(order_number, order_date, purchaser, quantity, product_id)
VALUES (114514, '2023-01-01', 1001, 3, 101);

可以看到pg中没有产生两条记录,而是同一条记录被修改。sink_connector在第一次同步数据时,也自动创建了orders表上的主键。

更多

同步整个inventory数据库

我们先使用REST API删除刚才创建的connector。

curl -X DELETE http://localhost:8084/connectors/pg-jdbc-connector-orders

使用topics-regex

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8084/connectors/ -d '
{ 
	"name": "pg-jdbc-connector", 
	"config": { 
		"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", 
		"tasks.max": "1", 
		"connection.url": "jdbc:postgresql://debezium_pg/inventory", 
		"connection.username": "postgres",
		"connection.password": "debeziumpg_passwd",
		"insert.mode": "upsert",
		"delete.enabled": "true",
		"primary.key.mode": "record_key",
		"schema.evolution": "basic",
		"use.time.zone": "UTC",
		"topics.regex": "dbserver1.inventory.(.*)",
	}
}'

connecotr log中会出现报错:psql中没有public.geomtry , 导致该表以及之后的表都无法进行同步。

该问题通过安装 PostGIS插件解决。

可以直接在postgres容器中使用命令apt install postgresql-17-postgis-3 ,再创建插件

完成安装后重新启动connector,geom表被成功同步