Change Data Capture (CDC) is becoming a popular technique for interconnecting disparate systems, for replicating state across traditional boundaries, for decomposing existing monoliths into microservices, and for the recordation of audit trails. CDC is the idea of emitting a changelog of all
DELETE‘s, and schema changes performed on a database.
Debezium.io is an open source CDC technology developed by Red Hat. Debezium can connect several database systems to several messaging substrates. As of version 1.6, Debezium can emit database changelogs to Pravega using a new sink in Debezium Server.
At present, the Pravega sink is the only sink for Debezium Server that supports both transactional and non-transactional modes. See Debezium’s Pravega sink documentation for more details.
Debezium Server allows Debezium to be deployed standalone without Kafka Connect, which was previously a dependency of any Debezium deployment.
In this blog, we will look at how to setup a local development environment with PostgreSQL + Debezium + Pravega as a solution for recording and replaying database changelogs.
Pravega + Debezium + PostgreSQL Walkthrough
This tutorial assumes you are using a Windows computer. Users of a Unix-like OS are assumed to have enough experience to adapt these steps to their environment.
We will start with going end-to-end from PostgreSQL through Debezium to Pravega and then to console
In a future blog post, we will add Flink and perform some changelog analytics queries using Flink SQL.
Theory of Operation
When Debezium processes a change from the database, it produces a
ChangeEvent<K, V> object which is sent to the Pravega sink. The
ChangeEvent object has three properties:
destination. If the record being changed has a primary key, then
key is populated in the
ChangeEvent, and this
key is used as the routing key for writing the
value of the
ChangeEvent to a stream. If the record under change has no primary key, then
key is left
null, and the event is written to the stream without a routing key. The
destination property on the
ChangeEvent indicates the name of the stream that the
value will be written to. By default,
destination is the database server name as configured in Debezium concatenated with the schema name and the table name from which the record originated, all separated by dots. In our case, the destination stream name will be “
Download and unzip Pravega distribution: pravega-0.11.0.zip
Download and unzip Debezium Server distribution: debezium-server-dist-1.9.5.Final.zip
Download and install PostgreSQL: postgresql-14.4-1-windows-x64.exe
You will set the
postgres user’s password during install.
Install Java 11
Download and install Java 11: https://www.oracle.com/java/technologies/javase-jdk11-downloads.html
You may already have Java installed. Run
java -version on the command line to see if you have Java 11 or newer.
Configure the PostgreSQL Server
We will configure Debezium to use PostgreSQL’s built-in logical decoding output plug-in. This
pgoutput plug-in is available by default, so no additional libraries need to be installed. More details on this setup can be found in Debezium’s “PostgreSQL 10+ logical decoding support (
C:\Program Files\PostgreSQL\14\data\postgresql.conf; find, uncomment & set these configs:
wal_level = logical max_wal_senders = 1 max_replication_slots = 1
- Restart PostgreSQL (Windows+R, services.msc, restart “postgresql-x64-14”)
Create database table
- Open pgAdmin 4, set master password on first launch.
- Expand Servers in left “Browser” pane, give
postgresuser’s password set during PostgreSQL install.
- Expand Databases, expand postgres.
- Open Query Tool (Tools menu -> Query Tool or icon in Browser toolbar)
- Execute create table statement:
CREATE TABLE customers ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, location TEXT );
- Execute alter table statement:
ALTER TABLE customers REPLICA IDENTITY FULL;
Configure Debezium Server’s Pravega sink & PostgreSQL source
debezium.sink.type=pravega debezium.sink.pravega.scope=dbz debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.plugin.name=pgoutput debezium.source.database.hostname=localhost debezium.source.database.port=5432 debezium.source.database.user=postgres debezium.source.database.password=[password set during PostgreSQL install] debezium.source.database.dbname=postgres debezium.source.database.server.name=local debezium.source.schema.include.list=public
debezium.source.database.password property to the password that you provided for the
postgres user during PostgreSQL installation.
Start Pravega standalone server and create scope & stream
- Open a terminal, launch standalone server:
cd Downloads/pravega-0.11.0 ./bin/pravega-standalone
- Open another terminal, create scope & stream with
cd Downloads/pravega-0.11.0 ./bin/pravega-cli > scope create dbz > stream create dbz/local.public.customers
Start Debezium Server
- Open a 3rd terminal, launch Debezium:
cd Downloads/debezium-server java -cp 'debezium-server-dist-1.9.5.Final-runner.jar;conf;lib/*' io.debezium.server.Main
Perform activity in the
- In pgAdmin 4’s Query Tool, execute:
INSERT INTO customers (name, location) VALUES ('John Doe', 'NYC');
Read activity from Pravega using
- In your
> stream read dbz/local.public.customers
A single change event in
debezium-json format will appear in the console showing a
Perform more activity in the
- In pgAdmin 4’s Query Tool, execute:
INSERT INTO customers (name, location) VALUES ('Jane Doe', 'LA');
Immediately upon inserting this second customer, a change event will appear in your
pravega-cli terminal with a
In this blog post, we’ve installed the latest versions of Pravega, PostgreSQL and Debezium to create a pipeline for recording an audit trail of CDC events. We used tools that come with Pravega and PostgreSQL to change data and to view those changes real-time in the CDC stream.
Next, try executing some
UPDATE statements, and see how the
debezium-json format change event now shows the
"before" state of the record in the
"payload" field. Run, for example:
UPDATE customers SET location = 'Seattle' WHERE id = 1;
In a future blog post, we will connect these
debezium-json change event streams in Pravega to Flink in order to run Flink SQL analytics queries against the change event streams. In the meantime, to explore more about Flink SQL with Pravega and Debezium, check out our Change Data Capture Demo in Pravega Samples.