Change Data Capture (CDC) is becoming a popular technique for interconnecting disparate systems, for replicating state across traditional boundaries, for decomposing existing monoliths into microservices, and for the recordation of audit trails. CDC is the idea of emitting a changelog of all INSERT
‘s, UPDATE
‘s, DELETE
‘s, and schema changes performed on a database.
Debezium.io is an open source CDC technology developed by Red Hat. Debezium can connect several database systems to several messaging substrates. As of version 1.6, Debezium can emit database changelogs to Pravega using a new sink in Debezium Server.
At present, the Pravega sink is the only sink for Debezium Server that supports both transactional and non-transactional modes. See Debezium’s Pravega sink documentation for more details.
Debezium Server allows Debezium to be deployed standalone without Kafka Connect, which was previously a dependency of any Debezium deployment.
In this blog, we will look at how to setup a local development environment with PostgreSQL + Debezium + Pravega as a solution for recording and replaying database changelogs.
Pravega + Debezium + PostgreSQL Walkthrough
This tutorial assumes you are using a Windows computer. Users of a Unix-like OS are assumed to have enough experience to adapt these steps to their environment.
We will start with going end-to-end from PostgreSQL through Debezium to Pravega and then to console stdout
via pravega-cli
.
In a future blog post, we will add Flink and perform some changelog analytics queries using Flink SQL.
Theory of Operation
When Debezium processes a change from the database, it produces a ChangeEvent<K, V>
object which is sent to the Pravega sink. The ChangeEvent
object has three properties: key
, value
, and destination
. If the record being changed has a primary key, then key
is populated in the ChangeEvent
, and this key
is used as the routing key for writing the value
of the ChangeEvent
to a stream. If the record under change has no primary key, then key
is left null
, and the event is written to the stream without a routing key. The destination
property on the ChangeEvent
indicates the name of the stream that the value
will be written to. By default, destination
is the database server name as configured in Debezium concatenated with the schema name and the table name from which the record originated, all separated by dots. In our case, the destination stream name will be “local.public.customers
“.
Install Prerequisites
Install Pravega
Download and unzip Pravega distribution: pravega-0.11.0.zip
Install Debezium
Download and unzip Debezium Server distribution: debezium-server-dist-1.9.5.Final.zip
Install PostgreSQL
Download and install PostgreSQL: postgresql-14.4-1-windows-x64.exe
You will set the postgres
user’s password during install.
Install Java 11
Download and install Java 11: https://www.oracle.com/java/technologies/javase-jdk11-downloads.html
You may already have Java installed. Run java -version
on the command line to see if you have Java 11 or newer.
Configure the PostgreSQL Server
We will configure Debezium to use PostgreSQL’s built-in logical decoding output plug-in. This pgoutput
plug-in is available by default, so no additional libraries need to be installed. More details on this setup can be found in Debezium’s “PostgreSQL 10+ logical decoding support (pgoutput
)” documentation.
- Edit
C:\Program Files\PostgreSQL\14\data\postgresql.conf
; find, uncomment & set these configs:
wal_level = logical
max_wal_senders = 1
max_replication_slots = 1
- Restart PostgreSQL (Windows+R, services.msc, restart “postgresql-x64-14”)
Create database table
- Open pgAdmin 4, set master password on first launch.
- Expand Servers in left “Browser” pane, give
postgres
user’s password set during PostgreSQL install.
- Expand Databases, expand postgres.
- Open Query Tool (Tools menu -> Query Tool or icon in Browser toolbar)
- Execute create table statement:
CREATE TABLE customers
(
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
location TEXT
);
- Execute alter table statement:
ALTER TABLE customers REPLICA IDENTITY FULL;
Configure Debezium Server’s Pravega sink & PostgreSQL source
- Create
Downloads/debezium-server/conf/application.properties
file containing:
debezium.sink.type=pravega
debezium.sink.pravega.scope=dbz
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.plugin.name=pgoutput
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=[password set during PostgreSQL install]
debezium.source.database.dbname=postgres
debezium.source.database.server.name=local
debezium.source.schema.include.list=public
Set the debezium.source.database.password
property to the password that you provided for the postgres
user during PostgreSQL installation.
Start Pravega standalone server and create scope & stream
- Open a terminal, launch standalone server:
cd Downloads/pravega-0.11.0
./bin/pravega-standalone
- Open another terminal, create scope & stream with
pravega-cli
:
cd Downloads/pravega-0.11.0
./bin/pravega-cli
> scope create dbz
> stream create dbz/local.public.customers
Start Debezium Server
- Open a 3rd terminal, launch Debezium:
cd Downloads/debezium-server
java -cp 'debezium-server-dist-1.9.5.Final-runner.jar;conf;lib/*' io.debezium.server.Main
Perform activity in the customers
table
- In pgAdmin 4’s Query Tool, execute:
INSERT INTO customers (name, location) VALUES ('John Doe', 'NYC');
Read activity from Pravega using pravega-cli
- In your
pravega-cli
terminal, run:
> stream read dbz/local.public.customers
A single change event in debezium-json
format will appear in the console showing a "payload"
of {"before":null,"after":{"id":1,"name":"John Doe","location":"NYC"}
.
Perform more activity in the customers
table
- In pgAdmin 4’s Query Tool, execute:
INSERT INTO customers (name, location) VALUES ('Jane Doe', 'LA');
Immediately upon inserting this second customer, a change event will appear in your pravega-cli
terminal with a "payload"
of {"before":null,"after":{"id":2,"name":"Jane Doe","location":"LA"}
.
Conclusion
In this blog post, we’ve installed the latest versions of Pravega, PostgreSQL and Debezium to create a pipeline for recording an audit trail of CDC events. We used tools that come with Pravega and PostgreSQL to change data and to view those changes real-time in the CDC stream.
Next, try executing some UPDATE
statements, and see how the debezium-json
format change event now shows the "before"
state of the record in the "payload"
field. Run, for example:
UPDATE customers SET location = 'Seattle' WHERE id = 1;
In a future blog post, we will connect these debezium-json
change event streams in Pravega to Flink in order to run Flink SQL analytics queries against the change event streams. In the meantime, to explore more about Flink SQL with Pravega and Debezium, check out our Change Data Capture Demo in Pravega Samples.