Real-time CDC replications between MySQL and PostgreSQL using Debezium connectors

Timothy Zhang
22 min readJun 18, 2023

This article introduces how to set up Debezium connectors so that real-time CDC incremental load can be realized between MySQL and PostgreSQL. As an example of my own practice, it demonstrates two-way replications: not only the replication from MySQL to PostgreSQL is set up, but also the replication from PostgreSQL to MySQL is implemented. So, I used both Debezium’s MySQL and PostgreSQL source connectors. Since version 2.2 of Debezium, Debezium’s own JDBC sink connector is added to replace Kafka’s JDBC sink connector, and it can also support multiple types of databases.

The figure above shows all components in the two-way CDC replications:

  • First of all, five boxes with red letters and orange backgrounds represent running Containers. MySQL and Postgres are two databases, each serving as sources and destinations respectively; Kafka relies on Zookeeper to provide a channel as a whole; Connect is the carrier of two kinds of connectors, depending on Kafka.
  • The four large boxes in Connect represent the four groups of processes, namely the connection stack. They are respectively used to establish the connection between Kafka and the source or destination of two-way replications. Each connection stack includes a connector and a converter, and multiple transform units can be added between the connector and the converter as needed.
  • The dark green arrow lines show replication steps from MySQL as the source to PostgreSQL as the destination; the dark purple arrow lines show replication steps from PostgreSQL as the source to MySQL as the destination.

1. System and Environment

I built up this system using Docker on my Mac laptop.

1.1 System Preparation and Startup

Here is the file to create the system: docker-compose-debezium-mp.yaml:

The five images used here are officially provided by Debezium, but the environment variable DEBEZIUM_VERSION needs to be set before starting. As can be seen from the diagram of the system configuration, we need to use two kinds of source connectors and JDBC sink connectors, both of which are developed by Debezium and have been included in the Debezium Connect image. Perform the following steps to quickly start the entire system:

export DEBEZIUM_VERSION=2.3
export DC_FILE=docker-compose-debezium-mp.yaml
docker compose -f ${DC_FILE} up -d

We see that all containers started successfully:

➜ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e7055ad9515c quay.io/debezium/connect:2.3 "/docker-entrypoint.…" About an hour ago Up About an hour 0.0.0.0:8083->8083/tcp, 9092/tcp connect
4fc527dfde4e quay.io/debezium/kafka:2.3 "/docker-entrypoint.…" About an hour ago Up About an hour 0.0.0.0:9092->9092/tcp kafka
0f29d8a1ab8a quay.io/debezium/zookeeper:2.3 "/docker-entrypoint.…" About an hour ago Up About an hour 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp zookeeper
ba7a7310f28e quay.io/debezium/example-postgres:2.3 "docker-entrypoint.s…" About an hour ago Up About an hour 0.0.0.0:5432->5432/tcp postgres
ed091ecbf5db quay.io/debezium/example-mysql:2.3 "docker-entrypoint.s…" About an hour ago Up About an hour 0.0.0.0:3306->3306/tcp, 33060/tcp mysql

1.2 Explore Sample Databases

Both MySQL and PostgreSQL databases here use the Debezium example images. They already contain instance data. First, explore Debezium’s MySQL example container and the table inventory.customers we work on.

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'show tables in inventory;'
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'select * from inventory.customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+

Take a look at Debezium’s PostgreSQL example container, and the table inventory.customers it contains.

➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c '\dt inventory.*;'
List of relations
Schema | Name | Type | Owner
-----------+------------------+-------+----------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows)

➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from inventory.customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(4 rows)

Although the SQL statement “select * from inventory.customers;” is used to list all records of the table —

the inventory in MySQL is a Database, while the inventory in PostgreSQL is a Schema.

This also decides the way we create the receiving tables in the following examples.

1.3 Check out Kafka and Connect

In addition, we can also check Kafka and Connect:

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka:9092 \
--list
__consumer_offsets
my_connect_configs
my_connect_offsets
my_connect_statuses

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--list

Currently, Kafka only has 4 available topics, but no consumer groups yet.

➜ curl -s -XGET http://localhost:8083/connector-plugins | jq '.[].class'
"io.debezium.connector.jdbc.JdbcSinkConnector"
"io.debezium.connector.db2.Db2Connector"
"io.debezium.connector.mongodb.MongoDbConnector"
"io.debezium.connector.mysql.MySqlConnector"
"io.debezium.connector.oracle.OracleConnector"
"io.debezium.connector.postgresql.PostgresConnector"
"io.debezium.connector.spanner.SpannerConnector"
"io.debezium.connector.sqlserver.SqlServerConnector"
"io.debezium.connector.vitess.VitessConnector"
"org.apache.kafka.connect.mirror.MirrorCheckpointConnector"
"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"
"org.apache.kafka.connect.mirror.MirrorSourceConnector"

➜ curl -s -XGET http://localhost:8083/connectors | jq '.[]'

Here is a list of all the connector plugins currently supported by Connect, including the ones we need:

  • io.debezium.connector.jdbc.JdbcSinkConnector
  • io.debezium.connector.mysql.MySqlConnector
  • io.debezium.connector.postgresql.PostgresConnector

However, there is no running connector yet.

2. From MySQL To PostgreSQL

2.1 Create schema “target” in PostgreSQL

As mentioned earlier, in MySQL and PostgreSQL example containers, there are already similar databases/schemas and tables. In order to distinguish existing tables, we need to create a new Schema target in PostgreSQL.

➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'create schema target;'
CREATE SCHEMA

2.2 Configure source and sink connectors

With the Schematarget, we can create the source and sink connectors separately:

  • Debezium’s MySQL source connector
  • Debezium’s JDBC sink connector — Because connection.url has the protocol postgresql, this JDBC sink connector can generate adaptive DDL and DML operations for the PostgreSQL database.
➜ curl -i -X POST http://localhost:8083/connectors/ \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
-d \
'{
"name": "inventory-connector-from-mysql",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "from_mysql",
"table.include.list": "inventory.customers",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "from_mysql_$3"
}
}'



➜ curl -i -X POST http://localhost:8083/connectors/ \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
-d \
'{
"name": "jdbc-sink-to-postgres",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"topics": "from_mysql_customers",
"connection.url": "jdbc:postgresql://postgres:5432/postgres?currentSchema=target",
"connection.username": "postgres",
"connection.password": "postgres",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.fields": "id",
"primary.key.mode": "record_key",
"schema.evolution": "basic"
}
}'

As can be seen from the configuration of the source connector, we only choose the table customers as an example here, which is in the MySQL database inventory. In addition, we also added a route transform unit in the configuration to decompose and reorganize the table names in the original database to generate a new topic named from_mysql_customers.

In the configuration of the sink connector, the connection.url property is the JDBC connection information of the destination PostgreSQL and currentSchema=target is specially added as the default schema we want to write. We did not set the table.name.format property, and the topic name is used as the table name by default, so the destination table uses the same table name. The full name of the subsequent table in the select query is target.from_mysql_customers.

Let’s check the two connectors created:

➜ curl -s -XGET http://localhost:8083/connectors | jq '.[]'
"inventory-connector-from-mysql"
"jdbc-sink-to-postgres"

2.3 Check the Kafka pipeline after initialization

Let’s take a look at Kafka’s topics and consumer groups

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka:9092 \
--list
__consumer_offsets
from_mysql
from_mysql_customers
my_connect_configs
my_connect_offsets
my_connect_statuses
schema-changes.inventory

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--list
connect-jdbc-sink-to-postgres

Compared with the previous check, there are 3 more topics: from_mysql and schema-changes.inventory are auxiliary topics, and we focus on the topic from_mysql_customers as a data pipeline. Here you can see the new customer group connect-jdbc-sink-to-postgres, which is used to consume the data in the topic to the PostgreSQL database. Therefore, the live channel replicating the table customers from MySQL to PostgreSQL is successfully established.

2.4 Check the initialized destination table data

Next, we verify the data replication.

➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from target.from_mysql_customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(4 rows)

Apparently, the entire table customers has been successfully replicated to the PostgreSQL database.

2.5 Check the Kafka pipeline after initialization

Further, examine how they are passed from Kafka’s topic and consumer group.

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe --group connect-jdbc-sink-to-postgres
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-jdbc-sink-to-postgres from_mysql_customers 0 4 4 0 connector-consumer-jdbc-sink-to-postgres-0-bd341f24-6763-4db2-b30a-15c329f139b1 /192.168.32.6 connector-consumer-jdbc-sink-to-postgres-0

This command shows that the consumer group connect-jdbc-sink-to-postgres consumes 4 messages from the topic from_mysql_customers , that is 4 records in the table customers.

We also open another terminal window to monitor messages on the topic from_mysql_customers in real-time.

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic from_mysql_customers
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"first","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129088,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"true","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"true","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"last","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}

These are the 4 messages currently stored in Kafka’s topic, which can be regarded as full replication of table records when the pipeline is initialized. Here the operation type op for each message is “r”.

Keep this newly created terminal window, and we return to the window we were working on before.

2.6 Addition, modification, and deletion of source table then changes of sink table

Further verify the changes in the sink table in PostgreSQL after adding, modifying, and deleting the source table in MySQL.

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e \
"insert into inventory.customers (id,first_name,last_name,email) values (1005, 'Tim','Zhang','zhangyi2.zb@ccbft.com');"


➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'select * from inventory.customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
| 1005 | Tim | Zhang | zhangyi2.zb@ccbft.com |
+------+------------+-----------+-----------------------+

➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from target.from_mysql_customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
1005 | Tim | Zhang | zhangyi2.zb@ccbft.com
(5 rows)

After adding a record id=1005 onto the table, verify that the data in the MySQL and PostgreSQL tables are consistent.

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e \
"update inventory.customers set email='zhangyi3.zb@ccbft.com' where id=1005;"


➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'select * from inventory.customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
| 1005 | Tim | Zhang | zhangyi3.zb@ccbft.com |
+------+------------+-----------+-----------------------+

➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from target.from_mysql_customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
1005 | Tim | Zhang | zhangyi3.zb@ccbft.com
(5 rows)

After changing the email field value of record id=1005, the data in the PostgreSQL table is updated along with the data in the MySQL table.

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e \
"delete from inventory.customers where id=1005;"


➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'select * from inventory.customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+

➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from target.from_mysql_customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(4 rows)

Here, the record id=1005 in the MySQL source table is deleted, and the PostgreSQL table also deletes this record accordingly

It can be seen that the pipeline we have established truly achieves real-time replication between source and destination table data.

2.7 Check the Kafka pipeline after adding, modifying, and deleting

Look at the status of Kafka’s topic and consumer group:

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe --group connect-jdbc-sink-to-postgres
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-jdbc-sink-to-postgres from_mysql_customers 0 8 8 0 connector-consumer-jdbc-sink-to-postgres-0-bd341f24-6763-4db2-b30a-15c329f139b1 /192.168.32.6 connector-consumer-jdbc-sink-to-postgres-0

Here we can see that the operations of adding, modifying, and deleting have added 4 consumption messages. Go back to the terminal window that was monitored in real-time, and take a closer look at the contents of these 4 messages.

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic from_mysql_customers
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"first","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129088,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"true","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"true","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680317129000,"snapshot":"last","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1680317129089,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi2.zb@ccbft.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680319215000,"snapshot":"false","db":"inventory","sequence":null,"table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":383,"row":0,"thread":12,"query":null},"op":"c","ts_ms":1680319215658,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi2.zb@ccbft.com"},"after":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi3.zb@ccbft.com"},"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680319557000,"snapshot":"false","db":"inventory","sequence":null,"table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":724,"row":0,"thread":14,"query":null},"op":"u","ts_ms":1680319557887,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_mysql.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_mysql.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi3.zb@ccbft.com"},"after":null,"source":{"version":"2.1.2.Final","connector":"mysql","name":"from_mysql","ts_ms":1680319739000,"snapshot":"false","db":"inventory","sequence":null,"table":"customers","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1097,"row":0,"thread":16,"query":null},"op":"d","ts_ms":1680319739856,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"from_mysql.inventory.customers.Key"},"payload":{"id":1005}} null

In addition to the previous 4 initialization messages whose operation type op is “r”, the operation types of the newly added 3 messages are “c”, “u”, and “d”, which correspond to our addition, modification, and deletion respectively. The last message only has the content of the key, and the value is null. This is called a tombstone message, corresponding to the last “d” operation.

3. From PostgreSQL to MySQL

3.1 Create database “target” in MySQL

Unlike PostgreSQL, if MySQL is used as the database for the destination table, in order to distinguish it from the existing tables, a new database target needs to be created.

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e "create database target;grant all privileges on target.* to 'mysqluser'@'%';"

Please note that you need to write data into MySQL here, so you must change the account to root, and set the password in ${DC_FILE}. It is easy to check the permissions of each MySQL account:

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e 'select * from mysql.user;'

| Host | User | Select_priv | Insert_priv | Update_priv | Delete_priv | Create_priv | Drop_priv | Reload_priv | Shutdown_priv | Process_priv | File_priv | Grant_priv | References_priv | Index_priv | Alter_priv | Show_db_priv | Super_priv | Create_tmp_table_priv | Lock_tables_priv | Execute_priv | Repl_slave_priv | Repl_client_priv | Create_view_priv | Show_view_priv | Create_routine_priv | Alter_routine_priv | Create_user_priv | Event_priv | Trigger_priv | Create_tablespace_priv | ssl_type | ssl_cipher | x509_issuer | x509_subject | max_questions | max_updates | max_connections | max_user_connections | plugin | authentication_string | password_expired | password_last_changed | password_lifetime | account_locked | Create_role_priv | Drop_role_priv | Password_reuse_history | Password_reuse_time | Password_require_current | User_attributes |

| % | debezium | Y | N | N | N | N | N | Y | N | N | N | N | N | N | N | Y | N | N | N | N | Y | Y | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | mysql_native_password | *A06791A44B817A1A84D38FABE76938F0183EE3F0 | N | 2023-04-10 04:22:25 | NULL | N | N | N | NULL | NULL | NULL | NULL |
| % | mysqluser | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | mysql_native_password | *FBC02A898D66B9181D6F8826C045C11FD2B364A4 | N | 2023-04-10 04:22:25 | NULL | N | N | N | NULL | NULL | NULL | NULL |
| % | replicator | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | Y | Y | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | mysql_native_password | *D98280F03D0F78162EBDBB9C883FC01395DEA2BF | N | 2023-04-10 04:22:25 | NULL | N | N | N | NULL | NULL | NULL | NULL |
| % | root | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | mysql_native_password | *9A6A327AAD02A999D3D87D94ECA40EA6F02D9E92 | N | 2023-04-10 04:22:25 | NULL | N | Y | Y | NULL | NULL | NULL | NULL |
| localhost | mysql.infoschema | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | caching_sha2_password | $A$005$THISISACOMBINATIONOFINVALIDSALTANDPASSWORDTHATMUSTNEVERBRBEUSED | N | 2023-04-10 04:22:22 | NULL | Y | N | N | NULL | NULL | NULL | NULL |
| localhost | mysql.session | N | N | N | N | N | N | N | Y | N | N | N | N | N | N | N | Y | N | N | N | N | N | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | caching_sha2_password | $A$005$THISISACOMBINATIONOFINVALIDSALTANDPASSWORDTHATMUSTNEVERBRBEUSED | N | 2023-04-10 04:22:22 | NULL | Y | N | N | NULL | NULL | NULL | NULL |
| localhost | mysql.sys | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | N | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | caching_sha2_password | $A$005$THISISACOMBINATIONOFINVALIDSALTANDPASSWORDTHATMUSTNEVERBRBEUSED | N | 2023-04-10 04:22:22 | NULL | Y | N | N | NULL | NULL | NULL | NULL |
| localhost | root | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | Y | | 0x | 0x | 0x | 0 | 0 | 0 | 0 | mysql_native_password | *9A6A327AAD02A999D3D87D94ECA40EA6F02D9E92 | N | 2023-04-10 04:22:25 | NULL | N | Y | Y | NULL | NULL | NULL | NULL |
+-----------+------------------+-------------+-------------+-------------+-------------+-------------+-----------+-------------+---------------+--------------+-----------+------------+-----------------+------------+------------+--------------+------------+-----------------------+------------------+--------------+-----------------+------------------+------------------+----------------+---------------------+--------------------+------------------+------------+--------------+------------------------+----------+------------------------+--------------------------+----------------------------+---------------+-------------+-----------------+----------------------+-----------------------+------------------------------------------------------------------------+------------------+-----------------------+-------------------+----------------+------------------+----------------+------------------------+---------------------+--------------------------+-----------------+

In Section 2 from MySQL to PostgreSQL, we set the MySQL source connector to use the account “debezium”, since the Debezium MySQL connector needs an account with Reload privilege to capture changes.

On the one hand, we can get all grant privileges for the account mysqluser by:

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e 'show grants for mysqluser;'
+----------------------------------------------------------+
| Grants for mysqluser@% |
+----------------------------------------------------------+
| GRANT USAGE ON *.* TO `mysqluser`@`%` |
| GRANT ALL PRIVILEGES ON `inventory`.* TO `mysqluser`@`%` |
| GRANT ALL PRIVILEGES ON `target`.* TO `mysqluser`@`%` |
+----------------------------------------------------------+

On the other hand, here are grant privileges for the account debezium:

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e 'show grants for debezium;'
+------------------------------------------------------------------------------------------------------+
| Grants for debezium@% |
+------------------------------------------------------------------------------------------------------+
| GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `debezium`@`%` |
+------------------------------------------------------------------------------------------------------+

3.2 Configure source and sink connectors

With the database target in place, we can create the source and sink connectors separately:

  • Debezium’s PostgreSQL source connector
  • Debezium’s JDBC sink connector — Because connection.url has the protocol mysql, this JDBC sink connector can generate adaptive DDL and DML operations for the MySQL database.
➜ curl -i -X POST http://localhost:8083/connectors/ \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
-d \
'{
"name": "inventory-connector-from-postgres",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "from_postgres",
"table.include.list": "inventory.customers",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "from_postgres_$3"
}
}'


➜ curl -i -X POST http://localhost:8083/connectors/ \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
-d \
'{
"name": "jdbc-sink-to-mysql",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"topics": "from_postgres_customers",
"connection.url": "jdbc:mysql://mysql:3306/target",
"connection.username": "mysqluser",
"connection.password": "mysqlpw",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.fields": "id",
"primary.key.mode": "record_key",
"schema.evolution": "basic"
}
}'

Similarly, we only selected the table customers as an example, and its schema is inventory in PostgreSQL. We also added a route transform unit in the configuration to decompose and reorganize the table name in the original database to generate a new topic name from_postgres_customers.

The destination table uses the same table name. In order to distinguish it from the existing tables in MySQL, we will put this table into the database target newly added, which has been created before.

In the configuration of the sink connector, the property connection.url is the JDBC connection information of the destination MySQL, and the database name target is included at the end of the url as the default database. We did not set the table.name.format property, and the topic name is used as the table name by default, so the destination table uses the same table name. The full name of the subsequent table in the select query is target.from_postgres_customers.

We check the two connectors:

➜ curl -s -XGET http://localhost:8083/connectors | jq '.[]'
"inventory-connector-from-postgres"
"jdbc-sink-to-mysql"

3.3 Check the Kafka pipeline after initialization

Let’s take a look at Kafka’s topics and consumer groups:

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-topics.sh \
--bootstrap-server kafka:9092 \
--list
__consumer_offsets
from_postgres_customers
my_connect_configs
my_connect_offsets
my_connect_statuses


➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--list
connect-jdbc-sink-to-mysql

It can be seen that Debezium’s PostgreSQL source connector does not generate auxiliary topics, but there is only one more data pipeline from_postgres_customers. Here you can see the customer group connect-jdbc-sink-to-mysqlwhich is used to consume the data in the topic to the MySQL database. Therefore, the live channel to replicate the table customers from PostgreSQL to MySQL was established successfully.

3.4 Check the initialized destination table data

Next, we verify the data replication.

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'select * from target.from_postgres_customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+

Apparently, the entire table customers has been successfully replicated to the MySQL database.

3.5 Check the Kafka pipeline after initialization

Further, examine how they are passed from Kafka’s topic and consumer group.

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe --group connect-jdbc-sink-to-mysql
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-jdbc-sink-to-mysql from_postgres_customers 0 4 4 0 connector-consumer-jdbc-sink-to-mysql-0-4afa84dc-1199-4e7f-a18d-75be0e367c81 /172.29.0.6 connector-consumer-jdbc-sink-to-mysql-0

This command shows that the consumer group connect-jdbc-sink-to-mysql consumes 4 messages from the topic from_postgres_customers, that is, 4 records in the table customers.

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic from_postgres_customers
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"first","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570466,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"true","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570467,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"true","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570468,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"last","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570468,"transaction":null}}

These are the 4 messages currently stored in Kafka’s topic, which can be regarded as full replication of table records when the pipeline is initialized. Here the operation type op for each message is “r”.

Keep this newly created terminal window, and we return to the window we were working on before.

3.6 Addition, modification, and deletion of source table then changes of sink table

Further verify the changes in the sink table in MySQL after adding, modifying, and deleting the source table in PostgreSQL.

➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c \
"insert into inventory.customers (id,first_name,last_name,email) values (1005, 'Tim','Zhang','zhangyi2.zb@ccbft.com');"
INSERT 0 1


➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from inventory.customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
1005 | Tim | Zhang | zhangyi2.zb@ccbft.com
(5 rows)

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=mysqlpw mysql \
mysql -umysqluser -e 'select * from target.from_postgres_customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
| 1005 | Tim | Zhang | zhangyi2.zb@ccbft.com |
+------+------------+-----------+-----------------------+

After adding a record id=1005 into the table, verify that the data in the PostgreSQL and MySQL tables are consistent.

➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c \
"update inventory.customers set email='zhangyi3.zb@ccbft.com' where id=1005;"
UPDATE 1


➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from inventory.customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
1005 | Tim | Zhang | zhangyi3.zb@ccbft.com
(5 rows)

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e 'select * from target.from_postgres_customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
| 1005 | Tim | Zhang | zhangyi3.zb@ccbft.com |
+------+------------+-----------+-----------------------+

After changing the email field value of record id=1005, the data in the MySQL table is updated along with the data in the PostgreSQL table.

➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c \
"delete from inventory.customers where id=1005;"
DELETE 1


➜ docker compose -f ${DC_FILE} exec -u postgres postgres \
psql -c 'select * from inventory.customers;'
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(4 rows)

➜ docker compose -f ${DC_FILE} exec -e MYSQL_PWD=debezium mysql \
mysql -uroot -e 'select * from target.from_postgres_customers;'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+

Here, the record id=1005 in the PostgreSQL source table is deleted, and the MySQL table also deletes this record accordingly.

It can be seen that the pipeline we have established truly achieves real-time replication of source and destination table data.

3.7 Check the Kafka pipeline after adding, modifying, and deleting

Look at the situation of Kafka’s topic and consumer group:

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe --group connect-jdbc-sink-to-mysql
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-jdbc-sink-to-mysql from_postgres_customers 0 8 8 0 connector-consumer-jdbc-sink-to-mysql-0-4afa84dc-1199-4e7f-a18d-75be0e367c81 /172.29.0.6 connector-consumer-jdbc-sink-to-mysql-0

Here we can see that the operations of adding, modifying, and deleting have added 4 consumption messages. Go back to the terminal window that was monitored in real-time, and take a closer look at the contents of these 4 messages.

➜ docker-compose -f ${DC_FILE} exec kafka \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic from_postgres_customers
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"first","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570466,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"true","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570467,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"true","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570468,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681115570426,"snapshot":"last","db":"postgres","sequence":"[null,\"34073024\"]","schema":"inventory","table":"customers","txId":763,"lsn":34073024,"xmin":null},"op":"r","ts_ms":1681115570468,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi2.zb@ccbft.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681117004992,"snapshot":"false","db":"postgres","sequence":"[\"34442224\",\"34466344\"]","schema":"inventory","table":"customers","txId":767,"lsn":34466344,"xmin":null},"op":"c","ts_ms":1681117005506,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi2.zb@ccbft.com"},"after":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi3.zb@ccbft.com"},"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681117218489,"snapshot":"false","db":"postgres","sequence":"[\"34467376\",\"34467664\"]","schema":"inventory","table":"customers","txId":768,"lsn":34467664,"xmin":null},"op":"u","ts_ms":1681117218797,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1005}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"from_postgres.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"from_postgres.inventory.customers.Envelope","version":1},"payload":{"before":{"id":1005,"first_name":"Tim","last_name":"Zhang","email":"zhangyi3.zb@ccbft.com"},"after":null,"source":{"version":"2.2.0.Beta1","connector":"postgresql","name":"from_postgres","ts_ms":1681117304928,"snapshot":"false","db":"postgres","sequence":"[\"34468808\",\"34468864\"]","schema":"inventory","table":"customers","txId":769,"lsn":34468864,"xmin":null},"op":"d","ts_ms":1681117305316,"transaction":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"from_postgres.inventory.customers.Key"},"payload":{"id":1005}} null

In addition to the previous 4 initialization messages whose operation type op is “r”, the operation types of the newly added 3 messages are “c”, “u”, and “d”, which correspond to our addition, modification, and deletion respectively. The last message only has the content of the key, and the value is null. This is called a tombstone message, corresponding to the last “d” operation.

4. Summary

Sections 2 and 3 fully describe the CDC real-time incremental replication channel built through the Debezium connectors from MySQL to PostgreSQL, and from PostgreSQL to MySQL two-way in these heterogeneous databases. Currently, Debezium officially provides connectors for 8 source databases:

"io.debezium.connector.db2.Db2Connector"
"io.debezium.connector.mongodb.MongoDbConnector"
"io.debezium.connector.mysql.MySqlConnector"
"io.debezium.connector.oracle.OracleConnector"
"io.debezium.connector.postgresql.PostgresConnector"
"io.debezium.connector.spanner.SpannerConnector"
"io.debezium.connector.sqlserver.SqlServerConnector"
"io.debezium.connector.vitess.VitessConnector"

Version 2.2 of Debezium adds its own JDBC sink connector, thus fully covering the source and sink connectors.

"io.debezium.connector.jdbc.JdbcSinkConnector"

Although it is only a JDBC connector, according to the JDBC URL protocol, it can adaptively support the following destination databases:

  • Db2
  • MySQL
  • Oracle
  • PostgreSQL
  • SQL Server

--

--

Responses (3)