loader

Introduction

Pravega is a storage system based on the stream abstraction, providing the ability to process tail data (low-latency streaming) and historical data (catchup and batch reads). Relatedly, Apache Flink is a widely-used real-time computing engine that provides unified batch and stream processing. Flink provides high-throughput, low-latency streaming data processing, as well as support for complex event processing and state management. Both Pravega and Flink share the design principle of treating the data stream as a first-class primitive, which makes them well suited to jointly construct data pipelines encompassing compute and storage while enabling both batch and streaming use cases.

Pravega provides the features that enable applications to ingest, store and serve streaming data in a durable, consistent, and scalable manner. Flink provides the compute capabilities to derive value from the data in Pravega streams. With Flink, users have access to flexible APIs for windowing, complex event processing (CEP) or table abstractions to process streaming data.

To enable Pravega and Flink to connect, we developed a connector early on in the Pravega project. The connector implementation is open-sourced in a repository in the Pravega Github org as part of the overall Pravega project hosted by the Cloud Native Computing Foundation (CNCF). The connector implements a number of relevant features, including:

  • Exactly-once processing guarantee for both Reader and Writer, supporting end-to-end exactly-once processing pipelines.
  • Seamless integration with Flink checkpoints and savepoints.
  • Parallel Readers and Writers supporting high throughput and low latency processing.
  • Batch, Streaming, and Table API support to access Pravega Streams.

These features enable the implementation of effective streaming data pipelines that are fault-tolerant and performant.

In this blog, we discuss how to use the connector to read and write Pravega streams with Flink’s basic DataStream API in a Flink application. We will introduce how to use the Table API further in another blog, stay tuned if you are interested.

The Basics

To use the Pravega connector with a Flink application, we need to select an appropriate version and add it to your project.

<dependency>
  <groupId>io.pravega</groupId>
  <artifactId>pravega-connectors-flink-1.11_2.12</artifactId>
  <version>0.10.1</version>
</dependency>

As in the above example:

1.11 is the Flink major version which is in the middle of the artifact name. Pravega Flink connector maintains compatibility for the three latest major versions of Flink.

2.12 is the version of Scala the connector was compiled against.

0.10.1 is the version that aligns with the Pravega version.

You can find the latest release with support matrix on the GitHub Releases page.

We recommend packaging the application code and all its required dependencies including the connector into one jar-with-dependencies which we refer to as the application jar. Here is a documentation that can help you build such a project:

https://github.com/pravega/flink-connectors/blob/master/documentation/src/docs/dev-guide.md#bootstrapping-the-project

Once packaging the jar, the below command can run the application on the cluster.

 flink run -c <classname> ${your-app}.jar --controller <pravega-controller-uri> --scope <pravega-scope>

API introduction

We would like to introduce the Flink connector DataStream API through a turbine sensor data monitoring example for better illustration. Assume the Pravega standalone is running with the source data written into the stream “iot/sensor” in CSV format with the following schema.

public class SensorEvent {
    private long timestamp;
    private int sensorId;
    private String location;
    private float temp;
}

We would plan to compute a daily summary of the temperature range and write the results back to a Pravega stream. This blog will guide you through the basic reads and writes with such vivid use case.

Configurations

The connector has provided a common top-level object PravegaConfig for Pravega connection configurations. The config object automatically configures itself from environment variablessystem properties and program arguments.

The basic controller URI and the default scope can be set as below.

SettingEnvironment Variable / System Property / Program ArgumentDefault Value
Controller URIPRAVEGA_CONTROLLER_URI / pravega.controller.uri / --controllertcp://localhost:9090
Default ScopePRAVEGA_SCOPE / pravega.scope / --scope

This is the recommended ways to create an instance of PravegaConfig:

// From default environment
PravegaConfig config = PravegaConfig.fromDefaults();
​
// From program arguments
ParameterTool params = ParameterTool.fromArgs(args);
PravegaConfig config = PravegaConfig.fromParams(params);
​
// From user specification
PravegaConfig config = PravegaConfig.fromDefaults()
    .withControllerURI("tcp://localhost:9090")
    .withDefaultScope(“iot”)
    .withCredentials(credentials)
    .withHostnameValidation(false);

Serialization/Deserialization

Pravega has defined an interface io.pravega.client.stream.Serializer for serialization/deserialization, while Flink has also defined standard interfaces for the same purpose:

For interoperability with other Pravega client applications, we have built-in adapters PravegaSerializationSchema and PravegaDeserializationSchema to support processing Pravega stream data produced by a non-Flink application.

For example, an adapter for Pravega’s JavaSerializer:

import io.pravega.client.stream.impl.JavaSerializer;
...
DeserializationSchema<MyEvent> adapter = new PravegaDeserializationSchema<>(
   MyEvent.class, new JavaSerializer<MyEvent>());

FlinkPravegaReader

FlinkPravegaReader is a Flink SourceFunction implementation which supports parallel reads from one or more Pravega streams. Internally, it initiates a Pravega reader group and create Pravega EventStreamReader instances to read the data from the stream(s). It provides a builder-style API as constructor, and it can allow streamcuts to mark the start and end of the read. Here is a snippet for how to use it:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable Flink checkpoint to make state fault tolerant
env.enableCheckpointing(60000);

// Define the Pravega configuration
ParameterTool params = ParameterTool.fromArgs(args);
PravegaConfig config = PravegaConfig.fromParams(params);

// Define the event deserializer
DeserializationSchema<String> deserializer = new SimpleStringSchema();

// Define the source
FlinkPravegaReader<String> pravegaSource = FlinkPravegaReader.<String>builder()
    .forStream(Stream.of(“iot”, “sensor”))
    .withPravegaConfig(config)
    .withDeserializationSchema(deserializer)
    .build();

// Define the data stream
DataStream<SensorEvent> stream = env.addSource(pravegaSource)
    .setParallelism(4)
    .map(new SensorMapper())
    .uid("pravega-source");

This part of the code enables reading from a Pravega stream continuously, and constructing a Flink DataStream for further analysis such as window aggregation.

FlinkPravegaWriter

FlinkPravegaWriter is a Flink SinkFunction implementation which supports parallel writes to a Pravega stream.

It supports three writer modes that relate to guarantees about the persistence of events emitted by the sink to a Pravega stream.

  1. Best-effort – Any write failures will be ignored hence there could be data loss.
  2. At-least-once (default) – All events are persisted in Pravega. Duplicate events are possible, due to retries or in case of failure and subsequent recovery.
  3. Exactly-once – All events are persisted in Pravega using a transactional approach integrated with the Flink checkpointing feature.

Internally, and depending on the writer mode, it initiates several Pravega EventStreamWriter or TransactionalEventStreamWriter instances to write data to the stream. It provides a builder-style API as constructor as well. Here is a snippet showing how to use it:

// Summarize the temperature data for each sensor
DataStream<SensorAggregate> summaries = timestamped
    .keyBy(SensorEvent::getSensorId)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(8)))
    .aggregate(new SensorAggregator()).name("summaries");

// Define the event serializer
SerializationSchema<SensorAggregate> serializer = new PravegaDeserializationSchema<>(SensorAggregate.class, new JavaSerializer<>())

// Define the sink function
FlinkPravegaWriter<SensorAggregate> pravegaSink = FlinkPravegaWriter.< SensorAggregate>builder()
    .forStream(Stream.of(“iot”, “sensor-agg”))
    .withPravegaConfig(config)
    .withSerializationSchema(serializer)
    .withEventRouter(SensorAggregate::getId())
    .withWriterMode(PravegaWriterMode.EXACTLY_ONCE)
    .build();

summaries.addSink(pravegaSink)
    .setParallelism(4)
    .uid("pravega-sink");

For the complete code, please check the turbine heat processor project in the pravega-samples. There are some more fancy usages in the repository https://github.com/pravega/pravega-samples, which can help to learn more about the connectors’ usage.

Internals of reader and writer

Checkpoints on the read path, watermarking, and transactions on the write path are some unique features in the Pravega connector. We will go through the details here.

Checkpoint integration

Flink implements asynchronous periodic checkpoints[1], following the Chandy-Lamport algorithm[2], to make both Flink state and stream positions recoverable, thereby giving the application the ability to resume from a recent state despite faults. Such a mechanism is important to avoid replaying too much data or even losing state in the case of truncation due to retention policies in place.

Pravega also has its own Checkpoint concept which creates a consistent reference for a position in the stream that an application can roll back to.  A special Event (a Checkpoint Event) signals to each Reader that it needs to persist its state. Once a Checkpoint has been completed, the application can use the Checkpoint to reset all the Readers in the Reader Group to the known consistent state represented by the Checkpoint.

Our end-to-end recovery story is not like the other messaging systems, such as Kafka, which persists its offset in the Flink task state, letting Flink do the coordination. Flink delegates the Pravega source recovery to Pravega and only requires a lightweight hook to connect. We worked with the Flink community to add a new interface ExternallyInducedSource (relevant Jira here) to allow such external calls for checkpointing, and, finally, the connector integrated this interface to guarantee the exactly once semantics during a failure recovery.

The checkpoint mechanism works as a two-step process:

  • The master hook handler from the job manager initiates the triggerCheckpoint request to the ReaderCheckpointHook that was registered with the Job Manager during FlinkPravegaReader source initialization. The ReaderCheckpointHook handler notifies Pravega to checkpoint the current reader state. This is a non-blocking call which returns a Future once Pravega readers are done with the checkpointing. Once the Future completes, the Pravega checkpoint will be persisted in a “master state” of a Flink checkpoint.
FlinkPravegaReader.java:
    @Override
    public MasterTriggerRestoreHook<Checkpoint> createMasterTriggerRestoreHook() {
        return new ReaderCheckpointHook(this.hookUid,
            this.readerGroupName,
            this.readerGroupScope,
            this.checkpointInitiateTimeout,
            this.clientConfig,
            this.readerGroupConfig);
    }
  • The ReaderCheckpointHook handler notifies Pravega to checkpoint the current reader state. This call is non-blocking and returns a Future once Pravega readers complete the checkpoint. Once the Future completes, the Pravega checkpoint will be persisted in a “master state” of a Flink checkpoint.
ReaderCheckpointHook.java:
    @Override
    public CompletableFuture<Checkpoint> triggerCheckpoint(
            long checkpointId, long checkpointTimestamp, Executor executor) throws Exception {

        ensureScheduledExecutorExists();

        final String checkpointName = createCheckpointName(checkpointId);

        final CompletableFuture<Checkpoint> checkpointResult =
                this.readerGroup.initiateCheckpoint(checkpointName, scheduledExecutorService);

        // Add a timeout to the future, to prevent long blocking calls
        scheduledExecutorService.schedule(() -> checkpointResult.cancel(false), triggerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);

        return checkpointResult;
    }
  • A Checkpoint event will be sent by Pravega as part of the data stream flow and on receiving the event, the FlinkPravegaReader will initiate triggerCheckpoint request to effectively let Flink continue and complete the checkpoint process.
FlinkPravegaReader.java:
    …
        if (eventRead.getEvent() == null) {
            // if the read marks a checkpoint, trigger the checkpoint
            if (eventRead.isCheckpoint()) {
                triggerCheckpoint(eventRead.getCheckpointName());
            }
            continue;
        }
    …

Correctness of stream processing

Some applications can tolerate inconsistencies such as missing events or duplicates, but others are sensitive to such deviations and present incorrect results with a highly negative impact. Correctness of results is very important for some use cases, including some in finance. This constraint is mainly coming from the following two problems:

  1. The unordered data source in event time
  2. End-to-end exactly once semantics guarantee

Pravega Watermark integration

The use of time in stream processing has manifested in different forms, see for example a discussion in the DataFlow model paper[3]. Flink relies on watermarks to track how time elapses for streaming data, and the configuration on how the watermark advances establishes a tradeoff between correctness and latency. The use of watermarks enables an acceptable compromise to handle the imperfections of the data source with respect to time order, addressing point 1.

Pravega also has the watermark support, introduced in this blog, and we introduced the implementation in the connector that uses the time window information from the reader periodically. Users can further have their own implementation of the io.pravega.connectors.flink.watermark.AssignerWithTimeWindows interface.

This interface requires two parts

  • Event time assigner for each event – To compare the event-time order of each event with the watermark, users need to tell Flink what’s the event time for each event by offering the implementation of the timestamp assigner.
  • Calculate the watermark given the TimeWindow from Pravega – For each reader, there is a TimeWindow with upper and lower bound timestamps returned which provides the range over which the readers may be spread. Normally, simply using the lower bound fits the most cases.

In order to register the assigner with the source, users can put it in the reader builder with withTimestampAssigner(new MyAssignerWithTimeWindows()) API.

FlinkPravegaReader<SensorData> source = FlinkPravegaReader.<SensorData>builder()
        .withPravegaConfig(pravegaConfig)
        .forStream(inputStream)
        .withDeserializationSchema(new PravegaDeserializationSchema<>(SensorData.class, new JavaSerializer<>()))
        // provide an implementation of AssignerWithTimeWindows<T>
        .withTimestampAssigner(new LowerBoundAssigner<SensorData>() {
            @Override
            public long extractTimestamp(SensorData sensorData, long previousTimestamp) {
                return sensorData.getTimestamp();
            }
        })
        .build();

On the sink side, the application can propagate the Flink watermark into Pravega with enableWatermark(true) in the Flink writer builder. Internally, it will call the notetime() API in the Pravega event stream writer when Flink watermark updates.

FlinkPravegaWriter<SensorData> writer = FlinkPravegaWriter.<SensorData>builder()
        .withPravegaConfig(pravegaConfig)
        .forStream(outputStream)
        // enable watermark propagation
        .enableWatermark(true)
        .withEventRouter(event -> String.valueOf(event.getSensorId()))
        .withSerializationSchema(new PravegaSerializationSchema<>(new JavaSerializer<>()))
        .build();

There is a sample here showing how this works.

End-to-end exactly-once semantics

We use exactly-once semantics to denote that each incoming event are reflected in the final results exactly once. Even in case of faults, there’s no duplicate data and no data that goes unprocessed (assuming the job is eventually able to make progress). This is quite hard because of the demanding message acknowledgement and recovery during fast processing,  which is why some early distributed streaming engines like Storm (without Trident) choose to support “at-least once”. Flink is one of the first streaming systems to provided exactly-once semantics in an application due to its checkpoint mechanism[4].

To enable exactly-once semantics end-to-end, however, it is necessary to involve the output. The way that Flink implements exactly-once semantics requires retries, which implies that the output might be partially written. Consequently, the sink of an external system that a Flink job outputs to must support commits and rollbacks.

Pravega supports transactional writes[5], which matches the requirement of committing and rolling back (aborting a transaction). The idea with a Pravega transaction is that it allows an application to prepare a set of events that can be written atomically to a Stream. This allows an application to “commit” a number of events atomically. Pravega transactions enable a Flink job to align the checkpointing with committing the output, and to implement end-to-end exactly-once pipelines.

To build such an end-to-end solution is still not easy, the main difficulty is to have the coordination between Flink and the Pravega sink. One common approach for coordinating commits and rollbacks in a distributed system is the two-phase commit protocol[6]. We followed this path, worked together with the Flink community and implemented the sink function in a two-phase commit coordinated with Flink checkpoints.

Basically such sink requires four methods for Pravega implementation:

  • beginTransaction – Start a Pravega transaction, persist the transaction ID inside Flink state backend for tracking
        case EXACTLY_ONCE:
            Transaction<T> txn = transactionalWriter.beginTxn();
            return new PravegaTransactionState(txn);
  • preCommit – Flink tries to start a checkpoint, for all the transactions recorded in the state, flush the data in the Pravega transaction to ensure all writes have been persisted
        case EXACTLY_ONCE:
            transaction.getTransaction().flush();
            break;
  • commit – Flink checkpoint succeeds, and then notifies the transactions in the state to commit. The connector commits all the open transactions which appends the transaction into the stream, persist and finally visible to readers
        case EXACTLY_ONCE:
            …
            final Transaction.Status status = txn.checkStatus();
            if (status == Transaction.Status.OPEN) {
                if (enableWatermark && transaction.watermark != null) {
                    txn.commit(transaction.watermark);
                } else {
                    txn.commit();
                }
            } else {
                log.warn("{} - Transaction {} has unexpected transaction status {} while committing",
                         writerId(), txn.getTxnId(), status);
            }
  • abort – Flink checkpoint fails, the transaction should be aborted and data written to the transaction will be deleted to avoid duplicates.
        case EXACTLY_ONCE:
            …
            txn.abort();
            break;

The Flink community extracted the common logic of the two-phase commit protocol and provided a general interface TwoPhaseCommitSinkFunction (relevant Jira here) to make it possible to build end-to-end exactly-once applications using other message systems with transaction support, including Apache Kafka versions 0.11 and beyond. There is an official Flink blog as well to introduce this feature in detail.

Wrap Up and plan for the future

The Pravega Flink connector enables Pravega and Flink to build effective streaming data pipelines. Here are some key points that we covered:

  • Introduction to basic usage of the streaming source and sink in Pravega Flink connector
  • Deep dive into the internals of the checkpoint, watermark integration and end-to-end exactly-once feature

Here are some of our plans for the future connector work:

  • FlinkPravegaInputFormat and FlinkPravegaOutputFormat is now provided to support batch reads and writes in Flink, but these are under the legacy DataSet API. As Flink is now making much effort to unify batch and streaming, it has dedicated to improve at the API level, providing new interfaces for source and sink in recent Flink releases. We will continue collaborating with the Flink community and integrate these new APIs.
  • We are putting more focus on SQL / Table API support to provide a better user experience, as it’s simpler to understand and even more powerful to use in some cases. For example, we have offered a Pravega Flink Catalog with the integration of Pravega schema registry which users can treat Pravega as a database system and perform SQL queries directly on Pravega streams. Besides, the Debezium format is also supported now. These new features including some others were both covered in our latest connector 0.10.1 release. A follow-up blog will be coming soon with an introduction and deep dive into the Table API of the connector.

Acknowledgements

Special thanks to Flavio Junqueira, Yu Teng, Derek Moore, Ashish Batwara and Srikanth Satya for their feedback, corrections and valuable insights. I’d like to thank Pravega team as well for building such a great system that the connector depends on. This blog cannot be here without the help from you all.

About the Authors

Yumin (Brian) Zhou is a Software Engineer at Dell and part of the DellEMC streaming data platform development team. He holds a M.Sc. in Computer Science and technology from Fudan University and is interested in big data analytics, Docker/Kubernetes and real-time processing area. He is one of the main developers of the analytics component for DellEMC streaming data platform. He is also an Apache Flink contributor and the maintainer for Pravega Flink connector project.

References

  1. Carbone, Paris, et al. “Lightweight asynchronous snapshots for distributed dataflows.” arXiv preprint arXiv:1506.08603 (2015).
  2. Chandy, K. Mani, and Leslie Lamport. “Distributed snapshots: Determining global states of distributed systems.” ACM Transactions on Computer Systems (TOCS) 3.1 (1985): 63-75.
  3. Akidau, Tyler, et al. “The dataflow model: a practical approach to balancing correctness, latency, and cost in massive-scale, unbounded, out-of-order data processing.” (2015).
  4. https://data-artisans.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink
  5. https://cncf.pravega.io/docs/latest/transactions/
  6. https://en.wikipedia.org/wiki/Two-phase_commit_protocol

Leave a Reply

Your email address will not be published. Required fields are marked *