Examples of CDC with Debezium Engine (1)

Timothy Zhang
7 min readJul 7, 2024

Debezium has three usage methods, as referenced on its architecture page:

  1. Deploying Debezium Connectors to the Apache Kafka Connect framework: All Debezium Connectors function as Source Connectors within the Kafka Connect framework. This way, Debezium Connectors send records to Kafka, and subsequently, Sink Connectors transmit these records from Kafka topics to other systems.
  2. Deploying Debezium Connectors to Debezium Server: The Debezium Server is a configurable, ready-to-use application that streams change events from source databases to various messaging infrastructures.
  3. Using Debezium Connectors as a jar: This method integrates Debezium with your program code as a single application, also known as Debezium Engine. Depending on the integrated program code, it can either consume database change events and transmit them to other systems like the first method, or stream change events to messaging infrastructures like the second method.

The third method allows for the seamless integration of Debezium with your application, which in some cases is sufficient to meet business needs. The modular and functional implementation of CDC (Change Data Capture) by Debezium greatly facilitates and accelerates application development, while also providing adequate performance and reliability assurances.

I found two representative examples online:

  1. Change Data Capture (CDC) with Embedded Debezium and SpringBoot, with code shared on GitHub.
  2. Change Data Capture Made Easy: Debezium Integration with Spring Boot, MongoDB, and Postgres, with code also shared on GitHub.

After studying these examples, I made some modifications to accommodate the latest versions of Debezium, Spring, and data storage solutions like MongoDB, PostgreSQL, and Elasticsearch. I will detail these modifications in two blog posts. In the third blog post, I will introduce how I captured changes in an Oracle database and transmitted them to Redis storage for use in my project.

Here, I will introduce the first example. The code for this example can be found in my GitHub repository: cdc-postgresql-elasticsearch.

1. Source and Target Data Storage Systems

First, in my project directory, I provide a docker-compose.yml file to set up the data source PostgreSQL and the target data storage Elasticsearch.

services:
# Install postgres and set up the student database.
postgres:
container_name: postgres
image: debezium/postgres:16-alpine
ports:
- "5432:5432"
environment:
- POSTGRES_DB=studentdb
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password

# Install Elasticsearch.
elasticsearch:
container_name: elasticsearch
image: docker.elastic.co/elasticsearch/elasticsearch:8.13.4
environment:
- xpack.security.enabled=false
- xpack.security.enrollment.enabled=false
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"

Because Debezium requires some special configurations for PostgreSQL, I chose the debezium/postgres image instead of the official postgres image from Docker Hub. For Elasticsearch, I used the official image from docker.elastic.co/elasticsearch/elasticsearch because it gets the latest updates the fastest, rather than using the elasticsearch image from Docker Hub. Both images were selected with the latest tags available at that time.

docker-compose up -d --wait
[+] Running 3/3
✔ Network cdc-postgresql-elasticsearch_default Created 0.0s
✔ Container elasticsearch Healthy 0.7s
✔ Container postgres Healthy

After successful execution, you can check the status of containers:

docker ps --format "table {{.Image}}\t{{.Ports}}\t{{.Names}}"
IMAGE                                                  PORTS                                            NAMES
debezium/postgres:16-alpine 0.0.0.0:5432->5432/tcp postgres
docker.elastic.co/elasticsearch/elasticsearch:8.13.4 0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp elasticsearch

2. Prepare the environment and run the application.

2.1 Terminal window layout

It is recommended to use a Π-shaped terminal window layout for our CDC: the upper half runs our application project, the lower left part operates on the source database, and the lower right part verifies the results in the target storage. In this example, the lower left part runs operations on the PostgreSQL database, where we can execute a container and enter the PostgreSQL CLI interface:

docker exec -it postgres psql -U user -d studentdb

Since Elasticsearch allows us to view the results in the target storage directly using curl commands through its API, we will keep the lower right window in the shell interface.

2.2 Prepare the source table and data

Create the Student table in the source PostgreSQL database:

CREATE TABLE public.student
(
id integer NOT NULL,
address character varying(255),
email character varying(255),
name character varying(255),
CONSTRAINT student_pkey2 PRIMARY KEY (id)
);
CREATE TABLE

Insert a row of data into the table:

INSERT INTO STUDENT(ID, NAME, ADDRESS, EMAIL)
VALUES('1','Jack','Dallas, TX','jack@gmail.com');
INSERT 0 1

The last line is the output after executing the Insert command: the first value is the oid, always 0; the second value is the count of inserted records. Check the current data in the table:

SELECT * from STUDENT;
id |  address   |     email      | name
----+------------+----------------+------
1 | Dallas, TX | jack@gmail.com | Jack
(1 row)

2.3 Run the Debezium Engine application

Next, we can run the cdc-postgresql-elasticsearch application. Since the Java version is set to 21 in the pom.xml, we need to switch the command window’s Java to this version:

jenv shell 21
mvn clean package
java -jar target/cdc-0.0.1-SNAPSHOT.jar

The above commands are all executed in the upper half of the window. As the application starts running, you can observe the Spring log outputs.

3. Verify CDC on operations in the source database

Now that the Debezium Engine application is up and running, we begin to verify our application’s implementation of CDC.

3.1 READ

Since we have already created the Student table in the database and inserted a record, upon project startup, in the log’s final entries, you will see that the Listener has captured this record. Because it’s existing data, its operation is marked as READ.

com.tutorial.cdc.listener.CdcListener    : Data Changed: {address=Dallas, TX, name=Jack, id=1, email=jack@gmail.com} with Operation: READ

We can inspect the records in Elasticsearch:

curl -s http://localhost:9200/student/_search | jq '.hits.hits'
[
{
"_index": "student",
"_id": "1",
"_score": 1.0,
"_source": {
"_class": "com.tutorial.cdc.elasticsearch.entity.Student",
"id": 1,
"name": "Jack",
"address": "Dallas, TX",
"email": "jack@gmail.com"
}
}
]

You can see that the record from PostgreSQL has been transferred to Elasticsearch. The current Terminal window looks like this:

Next, let’s perform some additional data operations to validate our project.

3.2 INSERT/CREATE

We insert another row of data into the source PostgreSQL database Student table with the following command:

INSERT INTO STUDENT(ID, NAME, ADDRESS, EMAIL)
VALUES('3','Tom','Chicago, IL','tom@gmail.com');
INSERT 0 1

We check the data changes in the PostgreSQL database Student table:

id |   address   |     email      | name
----+-------------+----------------+------
1 | Dallas, TX | jack@gmail.com | Jack
3 | Chicago, IL | tom@gmail.com | Tom
(2 rows)

The second entry is the new record we inserted. Correspondingly, the logs of our application also show that the Listener captured this record. Since it’s a new insertion, the operation here is marked as CREATE:

com.tutorial.cdc.listener.CdcListener    : Data Changed: {address=Chicago, IL, name=Tom, id=3, email=tom@gmail.com} with Operation: CREATE

Check the indexing status in the target Elasticsearch database:

[
{
"_index": "student",
"_id": "1",
"_score": 1.0,
"_source": {
"_class": "com.tutorial.cdc.elasticsearch.entity.Student",
"id": 1,
"name": "Jack",
"address": "Dallas, TX",
"email": "jack@gmail.com"
}
},
{
"_index": "student",
"_id": "3",
"_score": 1.0,
"_source": {
"_class": "com.tutorial.cdc.elasticsearch.entity.Student",
"id": 3,
"name": "Tom",
"address": "Chicago, IL",
"email": "tom@gmail.com"
}
}
]

Now there are two indexed entries.

3.3 UPDATE

Then update the first record in the Student table:

UPDATE STUDENT 
SET EMAIL='jill@gmail.com', NAME='Jill'
WHERE ID = 1;
UPDATE 1

The last line similarly shows the output after executing the Update command: the value 1 indicates the number of records updated (count). Let’s now check the current data in the table:

id |   address   |     email      | name
----+-------------+----------------+------
3 | Chicago, IL | tom@gmail.com | Tom
1 | Dallas, TX | jill@gmail.com | Jill
(2 rows)

Clearly, the fields email and name of the record with id 1 have been updated. Our application has also added a log entry from the Listener, indicating an UPDATE operation:

com.tutorial.cdc.listener.CdcListener    : Data Changed: {address=Dallas, TX, name=Jill, id=1, email=jill@gmail.com} with Operation: UPDATE

Simultaneously check the indexing status in the target Elasticsearch database:

[
{
"_index": "student",
"_id": "3",
"_score": 1.0,
"_source": {
"_class": "com.tutorial.cdc.elasticsearch.entity.Student",
"id": 3,
"name": "Tom",
"address": "Chicago, IL",
"email": "tom@gmail.com"
}
},
{
"_index": "student",
"_id": "1",
"_score": 1.0,
"_source": {
"_class": "com.tutorial.cdc.elasticsearch.entity.Student",
"id": 1,
"name": "Jill",
"address": "Dallas, TX",
"email": "jill@gmail.com"
}
}
]

Correspondingly, the values of the name and email fields for the index with id 1 have also been updated to the values specified in the Update command.

3.4 DELETE

Finally, delete the record with id 1 from the Student table:

DELETE FROM STUDENT WHERE ID = 1;
DAELETE 1

The last line similarly shows the output after executing the Delete command: the value 1 indicates the number of records deleted (count). At this point, there is only one remaining record in the table:

id |   address   |     email     | name
----+-------------+---------------+------
3 | Chicago, IL | tom@gmail.com | Tom
(1 row)

At this point, the new Listener log records this operation:

com.tutorial.cdc.listener.CdcListener    : Data Changed: {id=1} with Operation: DELETE

Similarly, this index has been deleted from the Elasticsearch database as well.

[
{
"_index": "student",
"_id": "3",
"_score": 1.0,
"_source": {
"_class": "com.tutorial.cdc.elasticsearch.entity.Student",
"id": 3,
"name": "Tom",
"address": "Chicago, IL",
"email": "tom@gmail.com"
}
}
]

So far, we have attempted all CRUD operations: CREATE, READ, UPDATE, DELETE. We used the Debezium Engine application to correctly propagate these operations from the PostgreSQL database to the Elasticsearch index records.

--

--

No responses yet