用Debezium连接器在MySQL和PostgreSQL之间实现CDC实时增量互传
本文介绍如何配置Debezium的连接器,从而能够在MySQL和PostgreSQL之间实现CDC的实时增量互传。作为自己实践的例子,演示了双向互传:既实现了以MySQL为源,PostgresSQL为目标的传输;也实现了以PostgreSQL为源,MySQL为目标的传输。从而尝试了Debezium的MySQL和PostgreSQL两种源连接器。而在Debezium的2.2版本增加了Debezium自己的JDBC宿连接器,替代Kafka的JDBC宿连接器,也能同时支持多种目标数据库。
上图显示了我们所实践的系统构成:
- 首先,五个红字橙底的方框代表运行的Containers。MySQL和Postgres是两个数据库,互为源库和目标库;Kafka依赖Zookeeper,作为一个整体提供传输总线;Kafka Connect则作为多个连接系统的载体,需要依赖Kafka。
- Kafka Connect中四个大框代表运行在其中的四组进程,即连接系统。它们分别用来建立Kafka与双向互传的源端或目标端的连接。每组连接系统都包含了一个连接器(connector)和一个转化器(converter),另外连接器和转化器之间可以按需添加多个转换单元(transform)。
- 深绿色的箭头连线显示了以MySQL为源,PostgresSQL为目标的连接和传输;深紫色的箭头连线则显示了以PostgreSQL为源,MySQL为目标的连接和传输。
1. 系统和环境的准备
我在自己的Mac笔记本上用Docker搭建了这套系统。
1.1 系统的准备和启动
下面是创建这个系统的docker-compose-debezium-mp.yaml文件:
这里使用的五个images都是Debezium官方提供的,只是需要在启动前设置环境变量DEBEZIUM_VERSION。从系统构成的图中可以看到,我们需要使用两种源连接器和JDBC宿连接器都是Debezium开发的,已经包含在Debezium的connect image中。执行下面的步骤,快速启动整个系统:
export DEBEZIUM_VERSION=2.2
export DC_FILE=docker-compose-debezium-mp.yaml
docker compose -f ${DC_FILE} up -d
我们看到所有containers都正常启动:
➜ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e7055ad9515c quay.io/debezium/connect:2.2 "/docker-entrypoint.…" About an hour ago Up About an hour 0.0.0.0:8083->8083/tcp, 9092/tcp connect
4fc527dfde4e quay.io/debezium/kafka:2.2 "/docker-entrypoint.…" About an hour ago Up About an hour 0.0.0.0:9092->9092/tcp kafka
0f29d8a1ab8a quay.io/debezium/zookeeper:2.2 "/docker-entrypoint.…" About an hour ago Up About an hour 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp zookeeper
ba7a7310f28e quay.io/debezium/example-postgres:2.2 "docker-entrypoint.s…" About an hour ago Up About an hour 0.0.0.0:5432->5432/tcp postgres
ed091ecbf5db quay.io/debezium/example-mysql:2.2 "docker-entrypoint.s…" About an hour ago Up About an hour 0.0.0.0:3306->3306/tcp, 33060/tcp mysql
1.2 探查示例数据库
这里的MySQL和PostgreSQL库都使用了Debezium的例子images。它们已经包含了实例数据。首先探查Debezium的MySQL例子container,及其包含的数据库表。
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'show tables in inventory;'
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'select * from inventory.customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
再看看Debezium的PostgreSQL例子container,及其包含的数据库Schema和表。
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c '\dt inventory.*;'
List of relations
Schema | Name | Type | Owner
-----------+------------------+-------+----------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows)
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from inventory.customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(4 rows)
虽然最终都是用SQL语句“select * from inventory.customers;”列出了表的记录,但MySQL中的inventory是数据库,而PostgreSQL中的inventory却是Schema。这也影响到我们后面示例建接收表的方式。
1.3 检验Kafka和Kafka Connect
另外,我们也可以检查Kafka和Kafka Connect的情况:
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka:9092 \
--list
__consumer_offsets
my_connect_configs
my_connect_offsets
my_connect_statuses
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--list
目前Kafka只有4个工作的topics,而还没有consumer groups。
➜ curl -s -XGET http://localhost:8083/connector-plugins | jq '.[].class'
"io.debezium.connector.jdbc.JdbcSinkConnector"
"io.debezium.connector.db2.Db2Connector"
"io.debezium.connector.mongodb.MongoDbConnector"
"io.debezium.connector.mysql.MySqlConnector"
"io.debezium.connector.oracle.OracleConnector"
"io.debezium.connector.postgresql.PostgresConnector"
"io.debezium.connector.spanner.SpannerConnector"
"io.debezium.connector.sqlserver.SqlServerConnector"
"io.debezium.connector.vitess.VitessConnector"
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector"
"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"
"org.apache.kafka.connect.mirror.MirrorSourceConnector"
➜ curl -s -XGET http://localhost:8083/connectors | jq '.[]'
这里列出了当前Kafka Connect支持的所有connector plugins,有我们需要的:
- io.debezium.connector.jdbc.JdbcSinkConnector
- io.debezium.connector.mysql.MySqlConnector
- io.debezium.connector.postgresql.PostgresConnector
然而还没有运行的connector。
2. 从MySQL到PostgreSQL
2.1 创建目标Schema
如前所述,在MySQL和PostgreSQL的例子container中,都已经有了类似的库/Schema和表。为了区别已有的表,我们需要在PostgreSQL中新建一个Schema。
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'create schema target;'
CREATE SCHEMA
2.2 配置源和宿connectors
有了目标Schema,我们就可以分别创建源和宿的connectors:
- Debezium的MySQL source connector
- Debezium的JDBC sink connector,因为在connection url中表明了协议是postgresql,所以此JDBC sink connector能针对PostgreSQL数据库生成自适应的DDL和DML操作。
➜ curl -i -X POST http://localhost:8083/connectors/ \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
-d \
'{
"name": "inventory-connector-from-mysql",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "from_mysql",
"table.include.list": "inventory.customers",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "from_mysql_$3"
}
}'
HTTP/1.1 201 Created
Date: Mon, 10 Apr 2023 05:41:18 GMT
Location: http://localhost:8083/connectors/inventory-connector-from-mysql
Content-Type: application/json
Content-Length: 728
Server: Jetty(9.4.48.v20220622)
{"name":"inventory-connector-from-mysql","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"mysql","database.port":"3306","database.user":"debezium","database.password":"dbz","database.server.id":"184054","topic.prefix":"from_mysql","table.include.list":"inventory.customers","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","schema.history.internal.kafka.topic":"schema-changes.inventory","transforms":"route","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)","transforms.route.replacement":"from_mysql_$3","name":"inventory-connector-from-mysql"},"tasks":[],"type":"source"}%
➜ curl -i -X POST http://localhost:8083/connectors/ \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
-d \
'{
"name": "jdbc-sink-to-postgres",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"topics": "from_mysql_customers",
"connection.url": "jdbc:postgresql://postgres:5432/postgres?currentSchema=target",
"connection.username": "postgres",
"connection.password": "postgres",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.fields": "id",
"primary.key.mode": "record_key",
"schema.evolution": "basic"
}
}'
HTTP/1.1 201 Created
Date: Mon, 10 Apr 2023 05:42:02 GMT
Location: http://localhost:8083/connectors/jdbc-sink-to-postgres
Content-Type: application/json
Content-Length: 496
Server: Jetty(9.4.48.v20220622)
{"name":"jdbc-sink-to-postgres","config":{"connector.class":"io.debezium.connector.jdbc.JdbcSinkConnector","topics":"from_mysql_customers","connection.url":"jdbc:postgresql://postgres:5432/postgres?currentSchema=target","connection.username":"postgres","connection.password":"postgres","auto.create":"true","insert.mode":"upsert","delete.enabled":"true","primary.key.fields":"id","primary.key.mode":"record_key","schema.evolution":"basic","name":"jdbc-sink-to-postgres"},"tasks":[],"type":"sink"}%
从源连接器的配置中可以看到,我们这里只选择了表customers作为例子,其在MySQL中的库为inventory。另外,我们还在配置中增加了route转换单元,将原来数据库中表名做分解和重组,生成新的topic的名字from_mysql_customers。
在宿连接器的配置中,“connection.url”是目标PostgreSQL的JDBC连接信息,特别增加了currentSchema=target,作为我们要写入的默认schema名字。我们没有设置“table.name.format”属性,默认使用topic的名字作为表名,因而目标表选用同样的表名。后续在select查询中的表全名为target.from_mysql_customers。
我们检查这个两个已经建好的connectors:
➜ curl -s -XGET http://localhost:8083/connectors | jq '.[]'
"inventory-connector-from-mysql"
"jdbc-sink-to-postgres"
2.3 检查初始化后的Kafka管道实体
再来看看Kafka的topics和consumer groups:
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka:9092 \
--list
__consumer_offsets
from_mysql
from_mysql_customers
my_connect_configs
my_connect_offsets
my_connect_statuses
schema-changes.inventory
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--list
connect-jdbc-sink-to-postgres
与前面检查的对比,多了3个topics:from_mysql和schema-changes.inventory是辅助的topics,而我们关注作为数据管道的topic from_mysql_customers。这里看到了新建的customer group connect-jdbc-sink-to-postgres,用于消费topic中的数据到PostgreSQL数据库中。因此,将表customers从MySQL迁移到PostgreSQL实时通道成功建立。
2.4 检查初始化后的目标表数据
接下来我们验证数据传输的情况。
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from target.from_mysql_customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(4 rows)
显然,整个表customers已经成功复制到了PostgreSQL数据库中。
2.5 检查初始化后的Kafka管道状态和数据
进一步检验它们是怎样从Kafka的topic和consumer group传过来的。
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe --group connect-jdbc-sink-to-postgres
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-jdbc-sink-to-postgres from_mysql_customers 0 4 4 0 connector-consumer-jdbc-sink-to-postgres-0-bd341f24-6763-4db2-b30a-15c329f139b1 /192.168.32.6 connector-consumer-jdbc-sink-to-postgres-0
这个命令显示了consumer group connect-jdbc-sink-to-postgres从topic from_mysql_customers消费了4个消息,也就是表customers中的4个记录。
我们另外开一个终端窗口,用来实时监控topic from_mysql_customers中消息的情况。
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic from_mysql_customers
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"first","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129088,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"true","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"true","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"last","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}
这就是目前在Kafka的topic中保存的4条消息,可以看作管道初始化时全量复制表记录。这里每条消息的操作类型”op”都是”r”。
保持这个新建的终端窗口,我们回到之前操作的窗口。
2.6 源表增改删及目标表变化
进一步验证对MySQL中源表做增、改、删操作后,PostgreSQL中目标表的变化情况。
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e \
"insert into inventory.customers (id,first_name,last_name,email) values (1005, 'Tim','Zhang','zhangyi2.zb@ccbft.com');"
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'select * from inventory.customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
| 1005 | Tim | Zhang | zhangyi2.zb@ccbft.com |
+------+------------+-----------+-----------------------+
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from target.from_mysql_customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
1005 | Tim | Zhang | zhangyi2.zb@ccbft.com
(5 rows)
新增一条表的记录id=1005后,验证看到MySQL和PostgreSQL表中的数据是一致的。
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e \
"update inventory.customers set email='zhangyi3.zb@ccbft.com' where id=1005;"
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'select * from inventory.customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
| 1005 | Tim | Zhang | zhangyi3.zb@ccbft.com |
+------+------------+-----------+-----------------------+
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from target.from_mysql_customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
1005 | Tim | Zhang | zhangyi3.zb@ccbft.com
(5 rows)
更改记录id=1005的email字段值后,PostgreSQL表中的数据随着MySQL表中数据更新了。
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e \
"delete from inventory.customers where id=1005;"
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'select * from inventory.customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from target.from_mysql_customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(4 rows)
这里将MySQL源表中的记录id=1005删除了,PostgreSQL的表数据也相应去除了这条记录。
由此可见,我们建立的这条管道真正做到了源和目标表数据的实时同步。
2.7 检查增改删后的Kafka管道状态和数据
再看看Kafka的topic和consumer group的情况:
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe --group connect-jdbc-sink-to-postgres
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-jdbc-sink-to-postgres from_mysql_customers 0 8 8 0 connector-consumer-jdbc-sink-to-postgres-0-bd341f24-6763-4db2-b30a-15c329f139b1 /192.168.32.6 connector-consumer-jdbc-sink-to-postgres-0
这里看到增、改、删的操作增加了4条消费的消息。回到之前实时监控的终端窗口,进一步看看这4个消息的内容。
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic from_mysql_customers
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"first","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129088,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"true","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"true","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"last","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi2.zb@ccbft.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680319215000,"snapshot":"false","db":"inventory","sequence":null,"table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":383,"row":0,"thread":12,"query":null},"op":"c","ts_ms":1680319215658,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi2.zb@ccbft.com"},"after":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi3.zb@ccbft.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680319557000,"snapshot":"false","db":"inventory","sequence":null,"table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":724,"row":0,"thread":14,"query":null},"op":"u","ts_ms":1680319557887,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi3.zb@ccbft.com"},"after":null,"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680319739000,"snapshot":"false","db":"inventory","sequence":null,"table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1097,"row":0,"thread":16,"query":null},"op":"d","ts_ms":1680319739856,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1005}} null
除了之前操作类型“op”为“r”的4条初始化消息,新增的3条消息的操作类型分别是“c”,“u”,“d”,分别对应了我们的增、改、删操作。最后一条消息只有key的内容,而value为null,这称作墓碑消息(tombstones),与最后的“d”操作对应。
3. 从PostgreSQL到MySQL
3.1 创建目标数据库
与PostgreSQL不同,如果用MySQL作为目标表的数据库,为和已有的表区分,需要新建一个名为target的库。
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e 'create database target;'
请注意这里需要写MySQL,把账号换成了root,其密码在${DC_FILE}中设置。很容易检查MySQL各个账号的权限:
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e 'select * from mysql.user;'
+-----------+------------------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------------------+--------------------------+----------------------------+---------------+-------------+-----------------+----------------------+-----------------------+------------------------------------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| Host | User | Select_priv | Insert_priv | Update_priv | Delete_priv | Create_priv | Drop_priv | Reload_priv | Shutdown_priv | Process_priv | File_priv | Grant_priv | References_priv | Index_priv | Alter_priv | Show_db_priv | Super_priv | Create_tmp_table_priv | Lock_tables_priv | Execute_priv | Repl_slave_priv | Repl_client_priv | Create_view_priv | Show_view_priv | Create_routine_priv | Alter_routine_priv | Create_user_priv | Event_priv | Trigger_priv | Create_tablespace_priv | ssl_type | ssl_cipher | x509_issuer | x509_subject | max_questions | max_updates | max_connections | max_user_connections | plugin | authentication_string | password_expired | password_last_changed | password_lifetime | account_locked | Create_role_priv | Drop_role_priv | Password_reuse_history | Password_reuse_time | Password_require_current | User_attributes |
+-----------+------------------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------------------+--------------------------+----------------------------+---------------+-------------+-----------------+----------------------+-----------------------+------------------------------------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
| % | debezium | Y | N | N | N | N | N | Y | N | N | N | N | N | N | N | Y | N | N | N | N | Y | Y | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | mysql_native_password | *A06791A44B817A1A84D38FABE76938F0183EE3F0 | N | 2023-04-10 04:22:25 | NULL | N | N | N | NULL | NULL | NULL | NULL |
| % | mysqluser | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | mysql_native_password | *FBC02A898D66B9181D6F8826C045C11FD2B364A4 | N | 2023-04-10 04:22:25 | NULL | N | N | N | NULL | NULL | NULL | NULL |
| % | replicator | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | Y | Y | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | mysql_native_password | *D98280F03D0F78162EBDBB9C883FC01395DEA2BF | N | 2023-04-10 04:22:25 | NULL | N | N | N | NULL | NULL | NULL | NULL |
| % | root | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | mysql_native_password | *9A6A327AAD02A999D3D87D94ECA40EA6F02D9E92 | N | 2023-04-10 04:22:25 | NULL | N | Y | Y | NULL | NULL | NULL | NULL |
| localhost | mysql.infoschema | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | caching_sha2_password | $A$005$THISISACOMBINATIONOFINVALIDSALTANDPASSWORDTHATMUSTNEVERBRBEUSED | N | 2023-04-10 04:22:22 | NULL | Y | N | N | NULL | NULL | NULL | NULL |
| localhost | mysql.session | N | N | N | N | N | N | N | Y | N | N | N | N | N | N | N | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | caching_sha2_password | $A$005$THISISACOMBINATIONOFINVALIDSALTANDPASSWORDTHATMUSTNEVERBRBEUSED | N | 2023-04-10 04:22:22 | NULL | Y | N | N | NULL | NULL | NULL | NULL |
| localhost | mysql.sys | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | caching_sha2_password | $A$005$THISISACOMBINATIONOFINVALIDSALTANDPASSWORDTHATMUSTNEVERBRBEUSED | N | 2023-04-10 04:22:22 | NULL | Y | N | N | NULL | NULL | NULL | NULL |
| localhost | root | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | mysql_native_password | *9A6A327AAD02A999D3D87D94ECA40EA6F02D9E92 | N | 2023-04-10 04:22:25 | NULL | N | Y | Y | NULL | NULL | NULL | NULL |
+-----------+------------------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------------------+--------------------------+----------------------------+---------------+-------------+-----------------+----------------------+-----------------------+------------------------------------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+
第2部分的从MySQL到PostgreSQL,我们设置MySQL的源connector使用了账号debezium。其只有Select权限。
3.2 配置源和宿connectors
有了目标数据库,我们就可以分别创建源和宿的connectors:
- Debezium的PostgreSQL source connector
- Debezium的JDBC sink connector,因为在connection url中表明了协议是mysql,所以此JDBC sink connector能针对MySQL数据库生成自适应的DDL和DML操作。
➜ curl -i -X POST http://localhost:8083/connectors/ \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
-d \
'{
"name": "inventory-connector-from-postgres",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "from_postgres",
"table.include.list": "inventory.customers",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "from_postgres_$3"
}
}'
HTTP/1.1 201 Created
Date: Mon, 10 Apr 2023 01:26:41 GMT
Location: http://localhost:8083/connectors/inventory-connector-from-postgres
Content-Type: application/json
Content-Length: 627
Server: Jetty(9.4.48.v20220622)
{"name":"inventory-connector-from-postgres","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","tasks.max":"1","database.hostname":"postgres","database.port":"5432","database.user":"postgres","database.password":"postgres","database.dbname":"postgres","topic.prefix":"from_postgres","table.include.list":"inventory.customers","transforms":"route","transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter","transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)","transforms.route.replacement":"from_postgres_$3","name":"inventory-connector-from-postgres"},"tasks":[],"type":"source"}%
➜ curl -i -X POST http://localhost:8083/connectors/ \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
-d \
'{
"name": "jdbc-sink-to-mysql",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"topics": "from_postgres_customers",
"connection.url": "jdbc:mysql://mysql:3306/target",
"connection.username": "root",
"connection.password": "debezium",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.fields": "id",
"primary.key.mode": "record_key",
"schema.evolution": "basic"
}
}'
HTTP/1.1 201 Created
Date: Mon, 10 Apr 2023 04:26:10 GMT
Location: http://localhost:8083/connectors/jdbc-sink-to-mysql
Content-Type: application/json
Content-Length: 458
Server: Jetty(9.4.48.v20220622)
{"name":"jdbc-sink-to-mysql","config":{"connector.class":"io.debezium.connector.jdbc.JdbcSinkConnector","topics":"from_postgres_customers","connection.url":"jdbc:mysql://mysql:3306/target","connection.username":"root","connection.password":"debezium","auto.create":"true","insert.mode":"upsert","delete.enabled":"true","primary.key.fields":"id","primary.key.mode":"record_key","schema.evolution":"basic","name":"jdbc-sink-to-mysql"},"tasks":[],"type":"sink"}%
同样我们只选择了表customers作为例子,在PostgreSQL中其Schema为inventory。同样,我们还在配置中增加了route转换单元,将原来数据库中表名做分解和重组,生成新的topic的名字from_postgres_customers。
目标表选用同样的表名,为了与MySQL已有的表区分,我们将把这个表放在新增的数据库target,前面已经创建了数据库 target。
在宿连接器的配置中,“connection.url”是目标MySQL的JDBC连接信息,数据库名字target包含在url的最后,作为默认的写入库。我们没有设置“table.name.format”属性,默认使用topic的名字作为表名,因而目标表选用同样的表名。后续在select查询中的表全名为target.from_postgres_customers。
我们检查这个两个已经建好的connectors:
➜ curl -s -XGET http://localhost:8083/connectors | jq '.[]'
"inventory-connector-from-postgres"
"jdbc-sink-to-mysql"
3.3 检查初始化后的Kafka管道实体
同样,我们先看看Kafka的topics和consumer groups:
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka:9092 \
--list
__consumer_offsets
from_postgres_customers
my_connect_configs
my_connect_offsets
my_connect_statuses
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--list
connect-jdbc-sink-to-mysql
可以看到,Debezium的PostgreSQL源连接器并没有产生辅助的topics,这里只是多了一个数据管道from_postgres_customers。这里看到了新建的customer group connect-jdbc-sink-to-mysql,用于消费topic中的数据到MySQL数据库中。因此,将表customers从PostgreSQL迁移到MySQL实时通道成功建立。
3.4 检查初始化后的目标表数据
接下来我们验证数据传输的情况。
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e 'select * from target.from_postgres_customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
显然,整个表customers已经成功复制到了MySQL数据库中。
3.5 检查初始化后的Kafka管道状态和数据
进一步检验它们是怎样从Kafka的topic和consumer group传过来的。
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe --group connect-jdbc-sink-to-mysql
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-jdbc-sink-to-mysql from_postgres_customers 0 4 4 0 connector-consumer-jdbc-sink-to-mysql-0-4afa84dc-1199-4e7f-a18d-75be0e367c81 /172.29.0.6 connector-consumer-jdbc-sink-to-mysql-0
这个命令显示了consumer group connect-jdbc-sink-to-mysql从topic from_postgres_customers消费了4个消息,也就是表customers中的4个记录。
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic from_postgres_customers
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"first","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570466,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"true","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570467,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"true","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570468,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"last","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570468,"transaction":null}}
这就是目前在Kafka的topic中保存的4条消息,可以看作管道初始化时全量复制表记录。这里每条消息的操作类型”op”都是”r”。
保持这个新建的终端窗口,我们回到之前操作的窗口。
3.6 源表增改删及目标表变化
进一步验证对MySQL中源表做增、改、删操作后,PostgreSQL中目标表的变化情况。
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c \
"insert into inventory.customers (id,first_name,last_name,email) values (1005, 'Tim','Zhang','zhangyi2.zb@ccbft.com');"
INSERT 0 1
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from inventory.customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
1005 | Tim | Zhang | zhangyi2.zb@ccbft.com
(5 rows)
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e 'select * from target.from_postgres_customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
| 1005 | Tim | Zhang | zhangyi2.zb@ccbft.com |
+------+------------+-----------+-----------------------+
新增一条表的记录id=1005后,验证看到PostgreSQL和MySQL表中的数据是一致的。
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c \
"update inventory.customers set email='zhangyi3.zb@ccbft.com' where id=1005;"
UPDATE 1
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from inventory.customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
1005 | Tim | Zhang | zhangyi3.zb@ccbft.com
(5 rows)
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e 'select * from target.from_postgres_customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
| 1005 | Tim | Zhang | zhangyi3.zb@ccbft.com |
+------+------------+-----------+-----------------------+
更改记录id=1005的email字段值后,MySQL表中的数据随着PostgreSQL表中数据更新了。
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c \
"delete from inventory.customers where id=1005;"
DELETE 1
➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from inventory.customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(4 rows)
➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e 'select * from target.from_postgres_customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
这里将PostgreSQL源表中的记录id=1005删除了,MySQL的表数据也相应去除了这条记录。
由此可见,我们建立的这条管道真正做到了源和目标表数据的实时同步。
3.7 检查增改删后的Kafka管道状态和数据
再看看Kafka的topic和consumer group的情况:
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe --group connect-jdbc-sink-to-mysql
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-jdbc-sink-to-mysql from_postgres_customers 0 8 8 0 connector-consumer-jdbc-sink-to-mysql-0-4afa84dc-1199-4e7f-a18d-75be0e367c81 /172.29.0.6 connector-consumer-jdbc-sink-to-mysql-0
这里看到增、改、删的操作增加了4条消费的消息。回到之前实时监控的终端窗口,进一步看看这4个消息的内容。
➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic from_postgres_customers
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"first","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570466,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"true","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570467,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"true","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570468,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"last","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570468,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi2.zb@ccbft.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681117004992,"snapshot":"false","db":"postgres","sequence":"[\"34442224\",\"34466344\"]","schema":"inventory","table":"customers","txId":767,"lsn":34466344,"xmin":null},"op":"c","ts_ms":1681117005506,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi2.zb@ccbft.com"},"after":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi3.zb@ccbft.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681117218489,"snapshot":"false","db":"postgres","sequence":"[\"34467376\",\"34467664\"]","schema":"inventory","table":"customers","txId":768,"lsn":34467664,"xmin":null},"op":"u","ts_ms":1681117218797,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi3.zb@ccbft.com"},"after":null,"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681117304928,"snapshot":"false","db":"postgres","sequence":"[\"34468808\",\"34468864\"]","schema":"inventory","table":"customers","txId":769,"lsn":34468864,"xmin":null},"op":"d","ts_ms":1681117305316,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1005}} null
除了之前操作类型“op”为“r”的4条初始化消息,新增的3条消息的操作类型分别是“c”,“u”,“d”,分别对应了我们的增、改、删操作。最后一条消息只有key的内容,而value为null,这称作墓碑消息(tombstones),与最后的“d”操作对应。
4. 小结
第2和第3小节完整地描述了从MySQL到PostgreSQL、从PostgreSQL到MySQL的双向异构数据库之间通过Debezium的连接器构建的CDC实时增量同步通道。目前Debezium官方提供了8种源数据库的连接器:
"io.debezium.connector.db2.Db2Connector"
"io.debezium.connector.mongodb.MongoDbConnector"
"io.debezium.connector.mysql.MySqlConnector"
"io.debezium.connector.oracle.OracleConnector"
"io.debezium.connector.postgresql.PostgresConnector"
"io.debezium.connector.spanner.SpannerConnector"
"io.debezium.connector.sqlserver.SqlServerConnector"
"io.debezium.connector.vitess.VitessConnector"
版本2.2的Debezium更是增加了JDBC的宿连接器,从而全面覆盖了源和宿的连接器。
"io.debezium.connector.jdbc.JdbcSinkConnector"
虽然只是一个JDBC的连接器,但根据JDBC的url协议,可以自适应地支持如下的目标数据库:
- Db2
- MySQL
- Oracle
- PostgreSQL
- SQL Server