Explore the advanced function of Pravega Flink Connector Table API

At Flink Forward Asia 2020, our team shared the topic "Past, Present and Future of Pravega Flink Connector", introducing the process of Pravega Flink Connector FLIP-95 Table API from zero to one, enabling Pravega stream to be compatible with Flink table API After linking and mutual conversion, users can use simple SQL statements to query and write data in Pravega stream. More than a year has passed, and we have also made some advanced functions on the basis of promoting the integration of flow meters, further enriching the usage scenarios and simplifying the difficulty of use. Next, this article takes Catalog API integration and Debezium support for these two new functions as examples, and takes you to deeply explore the technical details of these advanced functions.

1. Introduction to Pravega Schema Registry project

Pravega Schema Registry, which is also a new feature launched by the Pravega ecological community in 2020, similar to Confluent Schema Registry and AWS Glue, is a general solution for storing and managing data schema structures using Pravega. Relying on the Pravega Schema Registry function, the Catalog API can be implemented in the connector to realize the function of the Pravega catalog. Users can use Flink to directly access data with SQL without rewriting the table creation DDL of CREATE TABLE to establish a connection with Pravega stream.

1. Project motivation

The serialized raw binary data is stored in the Pravega stream, which requires both the reader and the writer to have a consensus on the schema of the written data. This is easy to solve when the data and development scale is small, but when the scale of the load continues to expand with business growth, facing the situation of reading and writing of thousands of streams and multiple development departments collaborating with each other, It is more difficult. This requires the components of Pravega Schema Registry to complete the guarantee of schema consistency.

As shown in the figure above, when the writer writes an event, the reader must use the same schema to deserialize such an event in order to obtain accurate data from the binary. Some sensitive information may even be further encrypted and encoded on the basis of metadata.

We need to have a centralized schema metadata for Pravega's stream, including the storage of encoding format, which can be written and defined by the writer, and then all readers can read it to obtain the structure and reflection of the data. Sequence method to reach a schema consensus.

We also hope that the storage of this kind of metadata does not depend on an additional component. It is best to use Pravega itself, which not only saves the cost of operation and maintenance, but also takes advantage of the efficient and persistent storage features of Pravega itself.

Furthermore, a very common business scenario is schema changes. In the same stream, as the business expands, the written semi-structured data may introduce new or change some fields to support more business processes and jobs. This also has corresponding compatibility configurations in many format standards, such as Avro. Specifically, when another writer writes the data of the new schema, we not only need to ensure that the original online reader can continue to work, but also support the reading of data for the newly added reader that needs to use the new field. Therefore, the existence of the Pravega Schema Registry is best to ensure that there is no need to wait until the reader actually tries to deserialize to know the change, but to configure format compatibility at the write end, and to do some intervention when registering a new schema, just It is possible to perform some management of reading and writing client applications more quickly.

2. Project introduction
Based on some of these motivations, we developed a project like Pravega Schema Registry.

It is a schema for storing and managing semi-structured data in Pravega, and then provides a RESTful interface to manage the stored schema, data encoding format, and compatibility policy functions. The interface abstraction we provide is very open. Not only built-in common serialization formats such as Avro, protobuf, and Json, but also common compression algorithms such as lz4 and snappy, it also supports some custom serialization methods. Such an abstraction is a more general solution than some other similar projects in the industry. In all serialization management, we can customize the corresponding encoding format and compatibility strategy, so that users can freely use any serialization method to process data. The entire project is stored with the function of Pravega Key Value Table, which is also the underlying implementation of Schema Registry. Pravega Key Value Table was only one of the components of Pravega's internal metadata storage before. Slowly we developed a public API, and it became publicly available in version 0.8, and entered a beta stable version in version 0.10.

The Pravega Schema Registry project is actually relatively independent. Except that the underlying implementation uses the Pravega key value table, all abstractions on the upper layer are independent and not limited to Pravega. The entire project is not open source within Pravega. But as a separate project in the ecology. In this way, more general storage systems, including common file and object storage, can be used as schema management solutions.

3. System architecture

The system architecture of the whole project is shown in the figure.

Schema Registry can interact with the client through RESTful API and GRPC protocol. Group corresponds to a schema-managed unit, and Pravega corresponds to a Stream, which stores the default serialization format, compatibility configuration, and multi-version serialization and encoding information. As mentioned before, this information is stored on the Pravega segment in the form of key-value pairs.

From the data link point of view, the writer needs to use a special event serialization method of header bytes with protocol version and encoding id, so that Schema Registry can intervene to register or verify the schema of the data to check whether it conforms to Encoding and compatibility requirements are required before allowing schema legal data to enter the storage of Pravega stream. Likewise, the reader needs to read with such a special deserialization.

2. Catalog API integration

With Schema Registry, the mapping relationship between Catalog and Pravega is more obvious.

As shown in the figure, we can successfully deserialize and synthesize a table in the Flink Catalog according to the table structure by adding the flow data in the stream to the schema stored in the Schema Registry, and the conversion from the table to the stream can also be reversed. The link is complete. Therefore, traditional catalog operations such as creating tables, deleting tables, and deleting databases can be abstracted into metadata changes for Pravega and Schema Registry.

Based on this theory, we initially implemented the catalog interface, so that users can use the following DDL to create a catalog, and use SQL to manipulate Pravega metadata.

Implementation Difficulties
However, when we further polished the prototype of the first version, we encountered three difficulties in the implementation details.

The first is the handling of schema and serialization. Before Flink version 1.12, the internal abstract RowData conversion process between Json and Avro data and Flink table row records was an internal and private class of Flink under format. Therefore, if we want to reuse this part of the code to maintain consistency with Flink's schema and serialization conversion behavior, we have to copy the entire class library. So we put forward such an idea to the community, whether it can be abstracted and converted into a public class. After discussion and communication with the community, we also opened the corresponding JIRA FLINK-19098 and contributed code to fix this problem, and successfully opened the link for data serialization converted in the Catalog table.
The second point is to support both Avro and Json formats. In the existing catalog implementation, the serialization method is actually relatively fixed, which is actually somewhat inconsistent with the generality in the design concept of Pravega Schema Registry. Some users may be accustomed to using Avro, while others are accustomed to using Json. How can we take into account the needs of both, so that they can all enjoy the convenience brought by the catalog? Here we have introduced the serialization format option in the catalog, which allows not only to specify Avro and Json, but also to add all serialization additional configurations officially supported by Flink, such as the serialization format of timestamp, to further customize the method applied to the entire catalog sequence.
The third difficulty is that the event we process through the Schema Registry will have a five-byte header at the beginning, so there is no way to directly use Flink's ready-made Json and Avro formats for serialization. In order to use the serializer API provided by the Schema Registry, we need Develop your own set of format factory to call. In terms of implementation, we serialize the Catalog table, that is, the relevant parameters required by the Schema Registry API, including namespace, group and other information, to the format factory parameters, which is equivalent to the following table creation DDL.

Then get this information in the implementation to call the API of Schema Registry, and then stitch together the fixes for the problems mentioned above, and we have completed an interoperation between the complete binary data and Flink RowData. The whole link will work.

3. Debezium support

First, let's introduce a concept of the entire large CDC. The full name of CDC is Change Data Capture, which is a methodology for identifying and tracking data changes and then taking action. But in fact this is a broad concept. In more modern practical experience in the industry, CDC appears more in a relatively narrow technical term, that is, for database scenarios, for database log analysis, and then transforming it into a new technology of data streams in a specific format . For example, common implementations, such as Debezium and Canal, which are commonly used in China, etc.

As the most widely used CDC technology in the industry, Debezium is implemented based on Kafka Connect and is a distributed platform that converts database row-level changes into event streams. The CDC technology in the industry now has a wide range of application scenarios, including data synchronization for backup and disaster recovery, data distribution to multiple downstream systems, and ETL integration for connecting to data lakes.

1. Deployment method

Currently Debezium has three deployment methods:

The first, and the most commonly used in the industry, uses Kafka Connect for deployment. As mentioned before, this is also a usage method supported by Debezium since its inception. It analyzes the binlog of traditional databases such as MySQL or Postgres DB, and imports it to Apache Kafka through the Debezium source implementation of Kafka connect. Using Kafka's powerful ecology, it can be connected to a variety of downstream engines for further aggregation calculations and data warehouse data lake applications.
The second point, in fact, the Debezium community has also seen changes in the field of message queues and stream storage, slowly peeling off Kafka. It now supports starting a separate Debezium server using a source connector to connect to a downstream messaging system. There are integrations such as Amazon Kinesis and Google's PubSub. In the first half of 2021, the Debezium community released version 1.6. In this release, Debezium also officially accepted the contributions of our Pravega community. The sink on the Pravega side has also become one of the implementations of the source connector, realizing the hand in hand with the Debezium community.

The last way is to embed Debezium as a dependency library into the Java program for calling. The more famous ones are the Flink CDC connectors in charge of teachers Yunxie and Xuejin who are very popular in the community. In scenarios where long-term storage or reuse of CDC data is not required, such a lightweight implementation can remove the complexity of message queue deployment and maintenance while still ensuring computing reliability and fault tolerance.

2. Writing method

During the integration of Debezium and Pravega, we maintained the integrity of the implementation, and also provided two ways of writing, normal writing and transactional writing.

The Debezium server is actually a periodic and batch pull process in the source connector, that is, the interface will receive an upsert stream of a Debezium batch. As can be seen from the figure on the right, each bracket is a pulled batch, in which yellow represents update and white represents insert.

If it is normal writing, we will sequentially write all events, whether they are insert or update, as separate events. You don't have to worry about the order in a distributed environment. Because each Debezium event carries the key in the database table. Then when Pravega writes, it can also carry the corresponding routing key to make the events of the same key fall into the same segment, and Pravega can guarantee the order of the data on the same routing key.

Next, look at transactional writing. For each Debezium batch, Pravega encapsulates it in a transaction, and submits the Pravega transaction when the batch is complete. In this way, when the Debezium server fails over, due to the atomicity and idempotency guarantee of Pravega transaction, all events can be played back and output without repetition, so that the semantics of exactly one time can be guaranteed.

After version 1.6, users can also use the parameters shown in the table to configure Debezium server, only need to fill in the corresponding Pravega connection parameters, specify the scope name and transactional write switch, and then real-time synchronization with a database such as MySQL The changes of all tables in the database are written to the Pravega stream with the same name as the table name in Debezium message format.

3. Connector integration

In addition to Pravega's contribution in Debezium, in order to use Flink's stream processing capabilities to consume data, we also need to do corresponding integration on the computing side of the Pravega Flink connector.

We implemented Table Factory on the existing FLIP-95 Table API to support basic read and write functions. Some people who know the Flink Table API may ask, the community has provided the implementation of a format factory like debezium-json, which seems to be applied directly, and it can be easily used by specifying the corresponding format. What is the difficulty?

At first we also thought so, but things are far from as simple as imagined. There are two main difficulties we encountered.

One is the need to additionally support the function of deserializing into multiple events.

Debezium provides an upsert stream, which is a common one-to-one deserialization process for insert events when transforming the Rowdata abstraction of Flink table. But for update, we need to convert two events into pre-update and post-update states. This is completely contrary to our default one-to-one implementation of connector serialization.

Since Pravega's own serializer interface is also a one-to-one mapping, and in order to ensure the interoperability between Pravega serializer and Flink DeserializationSchema interface, we have also done a lot of code support on it, which is convenient for users to use. In order to support this new requirement, we have to reconstruct the previous deserialization link, and refer the original deserialization process from the Pravega client to the inside of the Pravega connector, so as to use the following deserialize with collector method.

default void deserialize(byte[] message, Collector out) throws IOException {
At the same time, the entire code modification link needs to be very careful, and the compatibility between the original Pravega serializer and the Flink API conversion API is still guaranteed, so as not to affect the upgradeability of online users.

The second is FLIP-107, about metadata support on the Table source side.

Pravega Table Source implements the SupportsReadingMetadata interface provided by Flink and provides such a support. Users can specify the metadata from the format factory itself with the prefix of from-format, such as Debezium's ingestion timestamp, table name and other meta information to complete and enrich the information of the table. At the same time, we also support the metadata from Pravega itself, called EventPointer to record the position information of the current event in the stream. Recording this information also helps users store this data, and then complete subsequent random reading and even indexing need. The DDL of the create table including metadata is as follows. These newly created columns will appear in the abstraction of Flink Table after the original data structure and arranged in sequence.

The above is our complete Debezium support process from Pravega to Connector.

4. Community joint white paper released

CDC real-time processing of databases is a very important application scenario in the big data industry, such as common applications such as data synchronization, distribution, and real-time data warehouses. With the integration of Debezium, using Apache Flink and Debezium, Pravega can be used as a unified message intermediate storage layer solution to meet the user's database synchronization needs. This solution uses Pravega as the message middleware, which can give full play to Pravega's real-time persistent storage characteristics, and can further provide reliable data storage for other applications on the basis of ensuring the millisecond-level real-time performance of the link and data consistency. Data reuse. At the same time, using the rich ecology downstream of Apache Flink, it can be easily connected to more systems to meet various business needs. The entire integration process is completed by the cooperation between Pravega and the Flink community.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us