loader

If you missed my previous post, Pravega Flink connector 101, we strongly recommend you take the time to read that one first. It introduced how Flink DataStream API works with reading from and writing to Pravega streams, which lays the necessary foundation for the topics we’ll cover in this post.

To briefly recap the last post, Pravega integrates with Flink DataStream source and sink APIs, which enables users to read, process and write data in the Pravega stream with fault tolerance and end-to-end exactly-once semantics. Flink DataStream API is really expressive with windowing, timers and state processing, as well as user-defined functions, which makes it easy to build up a streaming pipeline. However, to use this in production in the enterprise, it is still a bit demanding and complex for data analysts because of these limitations:

  • Most data analysts prefer to use Python or declarative languages to develop applications,
  • Common disadvantages of Java/Scala based applications, like complexity of dependency management, packaging and uploading artifacts, and
  • Data analysts needs to learn the Flink APIs from scratch and understand Flink well to write a runnable application, which takes time and effort.

There is a way out to solve all these problems with a single extension, Flink SQL / Table API. The Table API in Flink is becoming more and more essential to improve integrated processing of streaming and batch data as it’s easier to understand and write. In this blog, we will show how the Pravega connector is integrated and enables users to use Flink SQL and the Table API to process structured and semi-structured data in Pravega streams.

Duality of Streams and Tables

First, let’s start with some basic concepts.

When implementing stream processing use cases in practice, you typically need both streams and also databases. An example use case that is very common in practice is an e-commerce application that enriches an incoming stream of transactions with the product information from a database table. Both table and stream abstractions are equally important. If we put some more thoughts on these two, one can even change into the other, here is an interesting observation:

  • Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a row change of the table. Replaying the changelog from beginning to end reconstructs the table. Similarly, aggregating data records in a stream will return a table as well. For example, we could compute the total number of pageviews by user from an input stream of pageview events, and the result would be a table, with the table key being the user and the value being the corresponding pageview count.
  • Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value in a stream. A table is thus a stream in disguise, and it can be easily turned into a “real” stream by iterating over each row-level INSERT, UPDATE and DELETE operations in the table.

Flink provides first-class support for streams and tables, which we’ll discuss short later. A stream can be viewed as a table, and a table can be viewed as a stream. This is the so-called stream-table duality.

FLIP-95 new table API support

Motivation

To remove the complexity of the legacy Flink Table sources and sinks, the Flink community proposed this FLIP-95 to offer a new generalized Table API to unify many different use cases such as the upsert + insert-only and batch + streaming.

Pravega Flink connector had implemented the legacy table API to support SQL queries on Pravega streams, but the configurations are complicated and hard to use, and the implementations were deeply entangled with the DataStream API, which made it hard to maintain. These things create the motivation for us to grab this opportunity to rework our Table API, so that we can simplify usage for the end-user.

Pravega implementations

This new Flink API introduces the DynamicTableSource and DynamicTableSink interfaces that allow the downstream connectors to integrate with the table abstraction more easily, with the separation of table formation and I/O path, which is just based on the DataStream source and sink implementations that we covered in our previous blog.

Meanwhile, thanks to the associated FLIP-122, all the configurations are uniformly managed under Flink standard ConfigOption class which makes the properties more concise and readable.

The Pravega Flink connector team has integrated the new Table API in the 0.9.0 release. Since then, semi-structured data such as in CSV, JSON or Avro formats stored in Pravega streams, can be simply mapped as a dynamic table in Flink with such CREATE TABLE SQL DDL.

Moreover, many more optimizations are done on top of the table abstractions, such as:

  • Simplify the usage for batch and streaming reads,
  • Enable authentication configurations, and
  • Support reading from multiple streams and specifying starting and ending streamcut in the table source.

Because the Table API lays on top of the streaming API, it automatically benefits from the features of low latency, ordering-guaranteed on routing key, large scale parallel processing and fault tolerance with exactly-once guarantee. In the middle of sources and sinks, Flink also implements a great variety of relational operators such as selection, filter, and join that enable users to perform continuous queries in milliseconds and complex processing pipelines like time-windowed join with the data. The implementation unity of batch and streaming even allows users to perform union processing on disparate data sources. Flink also integrates with Apache Calcite that supports and extends the ANSI SQL standard to further lower the learning bar for data analysts, enabling them to use SQL to get insights of the data and to conceal the complexity of the big data handling behind the scenes.

Here is some samples showing the Flink SQL usages with Pravega on NY Taxi Data: https://github.com/pravega/pravega-samples/tree/master/scenarios/pravega-flink-connector-sql-samples

Changelog Table API support

First, let’s take a look at the basic CDC concept, which stands for Change Data Capture. It is a set of software design patterns used to determine and track the data that has changed so that action can be taken using the changed data. Here we are focusing on its narrower definition, which is targeting only the database environments that capturing the change log of a database system and then emitting these messages as a specific format. This new technology can eliminate the need for bulk load updating and inconvenient batch windows by enabling incremental loading or real-time streaming analytics of data changes into your target repository.

Debezium + Pravega

As the state-of-the-art and most popular CDC solution, Debezium is an open source distributed platform that records all row-level changes within each database table in a change event stream. Applications can simply read these streams to see the change events in the same order in which they occurred. It is widely used in cases like replicating data into data lakes, feeding search indexes, updating caches, synchronizing data between microservices, auditing logs and so on.

Thanks to the contribution from Derek Moore, since Debezium 1.6.0 release, Debezium server has the Pravega support, which means debezium-format data can be written and stored into Pravega by the Debezium services. The Debezium Pravega sink adapter that we contribute offers two modes: non-transactional and transactional. The non-transactional mode individually writes each event in a Debezium batch to Pravega, while the transactional mode writes the Debezium batch to a Pravega transaction that commits when the batch is completed to achieve the exactly-once semantics.

Connector integration

Flink table abstraction is deliberately designed to serve the changelog use case which adds an additional operation flag on a table row definition, and, with the FLINK-17149 introduced in Flink 1.11, Flink has the ability to read the debezium changelogs from Kafka directly and materialize into a table in Flink.

After addressing the multi-record deserialization issue in the integration with the aforementioned Table API, the Pravega Flink connector can support reading the debezium-json format data in Pravega streams as well. With #534, it is aligned with complete support from the community that enables reading metadata like operation timestamp from debezium format.

Thinking of a MySQL table replication to Hive scenario, with the help of Debezium, Pravega and Flink SQL, it can be simply implemented in a single line as INSERT INTO hive_table SELECT * FROM pravega_table without any Java coding, and these systems can guarantee your job running real-time, in parallel without any data duplicate, loss or unordered.

Catalog support

In previous sections, we talked about the relationship between tables and streams, mainly its duality on the I/O path. On a higher level, we can also manage the Pravega streams as tables with CRUD operations if a stream has its own schema. In SQL language, they are also called DDL (Data Definition Language), compared to DQL (Data Query Language) focused on data itself. For example, a DROP TABLE my_table SQL DDL can help to delete a stream my_table in Pravega. It is feasible to manage the metadata of databases and tables with Flink’s Catalog abstraction, but we still need to introduce the Schema Registry service as the final piece of the puzzle.

Schema registry introduction

Pravega Schema Registry is a service offering from the Pravega family which published its first release along with the Pravega 0.8.0 release in 2020. The registry service is designed to store and manage schemas for the unstructured data stored in Pravega Streams. The service is designed to not only serve for the data stored in Pravega, but also serve as a general purpose management solution for storing and evolving schemas in wide variety of streaming and non-streaming use cases.

It provides RESTful interface to store and manage schemas under schema groups. Users can safely evolve their schemas within the context of the schema group based on a desired schema compatibility policy configured at a group level. The service has built-in support for popular serialization formats Avro, Profobuf and JSON schemas, however, users can also store and manage schemas from any serialization system. The service allows users to specify desired compatibility policies for evolution of their schemas, but these are employed only for the natively supported serialization systems.

Along with providing a storage layer for schema, the service also stores and manages additional encoding information in form of codec information. Codecs could correspond to different compression or encryption used while encoding the serialized data at rest. The service generates unique identifiers for schemas and codec information pairs that users may use to tag their data with.

With the schema registry involved, the schema is managed under the group with the same name for each stream, which makes Flink able to correctly deserialize the events in the stream and turn them into table rows.

Connector integration

The below table shows the mapping among these systems.

Flink CatalogPravega Schema RegistryPravega
DatabaseNamespaceScope
TableGroupStream (with schema registered)

The connector implements the Flink Catalog interface with the connection of Pravega and Schema Registry, and users can use the below DDL to establish the connection to Pravega and schema registry service, and manipulate Pravega as a database.

CREATE CATALOG pravega_catalog WITH(
  'type' = 'pravega',
  'default-database' = 'scope1',
  'controller-uri' = 'tcp://localhost:9090',
  'schema-registry-uri' = 'http://localhost:9092'
);

CREATE DATABASE pravega_catalog.pravega_scope;

Later, SQL DDLs can be performed to create, modify or delete the database structure and schema. All the operations will be explained and then finally take effect on the Pravega and schema registry. With such catalog and the help of Flink SQL, SQL queries and SQL-based ETL jobs can also be performed on top of such catalog. Flink does the work on the query plan explanation and the data source will finally fall on the Pravega table source with the schema registry provided serializers.

Once users set up Pravega with the schema registry service, they can follow the document here to get more details of the catalog usage and configurations.

Conclusion

This blog post provides an introduction to the Table API in the Pravega Flink connector. The Pravega community will continue to work on the Flink integration to simplify the user experience.

About the Authors

Yumin (Brian) Zhou is a Software Engineer at Dell and part of the DellEMC streaming data platform development team. He holds a M.Sc. in Computer Science and technology from Fudan University and is interested in big data analytics, Docker/Kubernetes and real-time processing area. He is one of the main developers of the analytics component for DellEMC streaming data platform. He is also an Apache Flink contributor and the maintainer for Pravega Flink connector project.

Leave a Reply

Your email address will not be published. Required fields are marked *