loader

Change Data Capture (CDC) is becoming a popular technique for interconnecting disparate systems, for replicating state across traditional boundaries, for decomposing existing monoliths into microservices, and for the recordation of audit trails. CDC is the idea of emitting a changelog of all INSERT‘s, UPDATE‘s, DELETE‘s, and schema changes performed on a database.

Debezium.io is an open source CDC technology developed by Red Hat. Debezium can connect several database systems to several messaging substrates. As of version 1.6, Debezium can emit database changelogs to Pravega using a new sink in Debezium Server.

At present, the Pravega sink is the only sink for Debezium Server that supports both transactional and non-transactional modes. See Debezium’s Pravega sink documentation for more details.

Debezium Server allows Debezium to be deployed standalone without Kafka Connect, which was previously a dependency of any Debezium deployment.

In this blog, we will look at how to setup a local development environment with PostgreSQL + Debezium + Pravega as a solution for recording and replaying database changelogs.

Pravega + Debezium + PostgreSQL Walkthrough

This tutorial assumes you are using a Windows computer. Users of a Unix-like OS are assumed to have enough experience to adapt these steps to their environment.

We will start with going end-to-end from PostgreSQL through Debezium to Pravega and then to console stdout via pravega-cli.

In a future blog post, we will add Flink and perform some changelog analytics queries using Flink SQL.

Theory of Operation

When Debezium processes a change from the database, it produces a ChangeEvent<K, V> object which is sent to the Pravega sink. The ChangeEvent object has three properties: key, value, and destination. If the record being changed has a primary key, then key is populated in the ChangeEvent, and this key is used as the routing key for writing the value of the ChangeEvent to a stream. If the record under change has no primary key, then key is left null, and the event is written to the stream without a routing key. The destination property on the ChangeEvent indicates the name of the stream that the value will be written to. By default, destination is the database server name as configured in Debezium concatenated with the schema name and the table name from which the record originated, all separated by dots. In our case, the destination stream name will be “local.public.customers“.

Install Prerequisites

Install Pravega

Download and unzip Pravega distribution: pravega-0.11.0.zip

Install Debezium

Download and unzip Debezium Server distribution: debezium-server-dist-1.9.5.Final.zip

Install PostgreSQL

Download and install PostgreSQL: postgresql-14.4-1-windows-x64.exe

You will set the postgres user’s password during install.

Install Java 11

Download and install Java 11: https://www.oracle.com/java/technologies/javase-jdk11-downloads.html

You may already have Java installed. Run java -version on the command line to see if you have Java 11 or newer.

Configure the PostgreSQL Server

We will configure Debezium to use PostgreSQL’s built-in logical decoding output plug-in. This pgoutput plug-in is available by default, so no additional libraries need to be installed. More details on this setup can be found in Debezium’s “PostgreSQL 10+ logical decoding support (pgoutput)” documentation.

  1. Edit C:\Program Files\PostgreSQL\14\data\postgresql.conf; find, uncomment & set these configs:
wal_level = logical
max_wal_senders = 1
max_replication_slots = 1
  1. Restart PostgreSQL (Windows+R, services.msc, restart “postgresql-x64-14”)

Create database table

  1. Open pgAdmin 4, set master password on first launch.
  2. Expand Servers in left “Browser” pane, give postgres user’s password set during PostgreSQL install.
  1. Expand Databases, expand postgres.
  2. Open Query Tool (Tools menu -> Query Tool or icon in Browser toolbar)
  3. Execute create table statement:
CREATE TABLE customers
(
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
location TEXT
);
  1. Execute alter table statement:
ALTER TABLE customers REPLICA IDENTITY FULL;

Configure Debezium Server’s Pravega sink & PostgreSQL source

  1. Create Downloads/debezium-server/conf/application.properties file containing:
debezium.sink.type=pravega
debezium.sink.pravega.scope=dbz
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.plugin.name=pgoutput
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=[password set during PostgreSQL install]
debezium.source.database.dbname=postgres
debezium.source.database.server.name=local
debezium.source.schema.include.list=public

Set the debezium.source.database.password property to the password that you provided for the postgres user during PostgreSQL installation.

Start Pravega standalone server and create scope & stream

  1. Open a terminal, launch standalone server:
cd Downloads/pravega-0.11.0
./bin/pravega-standalone
  1. Open another terminal, create scope & stream with pravega-cli:
cd Downloads/pravega-0.11.0
./bin/pravega-cli
> scope create dbz
> stream create dbz/local.public.customers

Start Debezium Server

  1. Open a 3rd terminal, launch Debezium:
cd Downloads/debezium-server
java -cp 'debezium-server-dist-1.9.5.Final-runner.jar;conf;lib/*' io.debezium.server.Main

Perform activity in the customers table

  1. In pgAdmin 4’s Query Tool, execute:
INSERT INTO customers (name, location) VALUES ('John Doe', 'NYC');

Read activity from Pravega using pravega-cli

  1. In your pravega-cli terminal, run:
> stream read dbz/local.public.customers

A single change event in debezium-json format will appear in the console showing a "payload" of {"before":null,"after":{"id":1,"name":"John Doe","location":"NYC"}.

Perform more activity in the customers table

  1. In pgAdmin 4’s Query Tool, execute:
INSERT INTO customers (name, location) VALUES ('Jane Doe', 'LA');

Immediately upon inserting this second customer, a change event will appear in your pravega-cli terminal with a "payload" of {"before":null,"after":{"id":2,"name":"Jane Doe","location":"LA"}.

Conclusion

In this blog post, we’ve installed the latest versions of Pravega, PostgreSQL and Debezium to create a pipeline for recording an audit trail of CDC events. We used tools that come with Pravega and PostgreSQL to change data and to view those changes real-time in the CDC stream.

Next, try executing some UPDATE statements, and see how the debezium-json format change event now shows the "before" state of the record in the "payload" field. Run, for example:

UPDATE customers SET location = 'Seattle' WHERE id = 1;

In a future blog post, we will connect these debezium-json change event streams in Pravega to Flink in order to run Flink SQL analytics queries against the change event streams. In the meantime, to explore more about Flink SQL with Pravega and Debezium, check out our Change Data Capture Demo in Pravega Samples.

Leave a Reply

Your email address will not be published. Required fields are marked *