How does Flink analyze the CDC data of Iceberg data lake in real time

1. Common CDC analysis schemes

Let's first look at what needs to be designed for today's topic? The input is a CDC or upsert data, and the output is Database or storage for big data OLAP analysis.

Our common input mainly includes two types of data. The first type of data is the CDC data of the database, which continuously generates changelogs; the other scenario is the upsert data generated by stream computing. The latest Flink 1.12 version already supports upsert data.

1.1 Offline HBase cluster analysis CDC data
The first solution we usually think of is to write the CDC upsert data to HBase in real time after some processing through Flink. HBase is an online database that can provide online query capabilities. It has very high real-time performance and is very friendly to write operations. It can also support some small-scale queries, and the cluster is scalable.

This solution is actually the same as the ordinary real-time link query, so what's wrong with using HBase to do OLAP query analysis of big data?

First of all, HBase is a database designed for point checks, and it is an online service. Its row-stored indexes are not suitable for analysis tasks. A typical data warehouse design must be stored in columns, so that the compression efficiency and query efficiency will be high. Second, HBase cluster maintenance costs are relatively high. Finally, the data of HBase is HFile, which is inconvenient to combine with typical Parquet, Avro, Orc, etc. in big data warehouses.

1.2 Apache Kudu maintains the CDC dataset

In response to the relatively weak analysis capabilities of HBase, a new project appeared in the community a few years ago, which is the Apache Kudu project. While the Kudu project has HBase's enumeration capability, it also uses column storage, which makes column storage acceleration very suitable for OLAP analysis.


What's wrong with this scheme?

First of all, Kudu is a relatively small and independent cluster, and the maintenance cost is relatively high. It is relatively separated from HDFS, S3, and OSS. Secondly, because Kudu retains the ability to search in its design, its batch scanning performance is not as good as parquet. In addition, Kudu's support for delete is relatively weak, and finally it does not support incremental pull.

1.3 Directly import CDC to Hive analysis
The third solution, which is also commonly used in data warehouses, is to write MySQL data to Hive. The process is: maintain a full partition, then make an incremental partition every day, and finally write the incremental partition After that, perform a Merge and write to a new partition, which is feasible in the process. The full partition before Hive is not affected by the increment. Only after the incremental Merge is successful, the partition can be checked and it is a brand new data. This append data stored in pure columns is very friendly for analysis.

What's wrong with this scheme?

Merge of incremental data and full data has a delay, and data is not written in real time. Typically, Merge is performed once a day, which is T+1 data. Therefore, the timeliness is very poor and real-time upsert is not supported. Every Merge needs to reread and rewrite all the data, which is inefficient and wastes resources.

1.4 Spark + Delta to analyze CDC data

In response to this problem, Spark + Delta provides the syntax of MERGE INTO when analyzing CDC data. This is not just a simplification of the syntax of the Hive data warehouse. Spark + Delta is a new type of data lake architecture (such as Iceberg, Hudi). Its management of data is not partitions, but files. Therefore, Delta optimizes the MERGE INTO syntax and only scans And rewrite the changed files, so it is much more efficient.

Let's evaluate this solution. Its advantage is that it only relies on Spark + Delta, which has a simple architecture, no online services, and column storage, and the analysis speed is very fast. The optimized MERGE INTO syntax is also fast enough.

This solution is a Copy On Write solution in business. It only needs to copy a small number of files, which can make the delay relatively low. In theory, if the updated data does not overlap much with the existing stock, the day-level delay can be reduced to an hour-level delay, and the performance can also keep up.

This solution has taken a small step forward in the way of processing upsert data in the Hive warehouse. However, the hour-level delay is not as effective as real-time after all. Therefore, the biggest disadvantage of this solution is that the Merge of Copy On Write has a certain overhead, and the delay cannot be made too low.

In the first part, there are probably so many existing solutions. At the same time, it needs to be emphasized again that the reason why upsert is so important is that in the data lake solution, upsert is a key technical point for realizing quasi-real-time and real-time database access to the lake.

2. Why choose Flink + Iceberg

2.1 Flink's support for CDC data consumption
First, Flink natively supports CDC data consumption. In the previous Spark + Delta solution, the MARGE INTO syntax, the user needs to perceive the attribute concept of CDC, and then write it to the merge syntax. But Flink natively supports CDC data. Users only need to declare a Debezium or other CDC format, and the SQL on Flink does not need to perceive any CDC or upsert attributes. Flink has a built-in hidden column to identify its CDC type data, so it is relatively concise for users.

As shown in the figure below, in the process of CDC, Flink only needs to declare a DDL statement of MySQL Binlog, and the subsequent select does not need to perceive the CDC attribute.

2.2 Flink's support for Change Log Stream

The figure below shows that Flink natively supports Change Log Stream. After Flink is connected to a Change Log Stream, the topology does not need to care about the SQL of the Change Log flag. The topology is completely defined according to its own business logic, and it is written to Iceberg until the end, without the need to perceive the flag of the Change Log in the middle.

2.3 Flink + Iceberg CDC Import Scheme Evaluation

Finally, what are the advantages of the CDC import scheme of Flink + Iceberg?

Compared with the previous solutions, both Copy On Write and Merge On Read have applicable scenarios, with different emphases. Copy On Write is very efficient when only part of the files need to be rewritten in the scenario of updating some files, and the generated data is a full data set of pure append, which is also the fastest when used for data analysis. It is the advantage of Copy On Write.

The other is Merge On Read, which is to directly append the data together with the CDC flag to Iceberg. When merging, these incremental data are merged with the full amount of the previous data according to a certain organizational format and a certain efficient calculation method. The advantage of this is that it supports near-real-time import and real-time data reading; the Flink SQL of this computing solution natively supports CDC ingestion, and does not require additional business field design.

Iceberg is a unified data lake storage that supports diverse computing models and various engines (including Spark, Presto, and hive) for analysis; the generated files are all stored in pure columns, which is very fast for subsequent analysis ; As a data lake, Iceberg is designed based on snapshots and supports incremental reading; Iceberg's architecture is simple enough, there are no online service nodes, and it is pure table format, which gives the upstream platform enough ability to customize its own logic and service.

3. How to write and read in real time

3.1 Batch update scenario and CDC write scenario
First, let's take a look at the two scenarios of batch updates in the entire data lake.

In the scenario of the first batch update, we use a SQL to update tens of thousands of rows of data, such as the European GDPR policy. When a user logs out of his account, the background system must update this All user-related data will be physically deleted.
The second scenario is that we need to delete some data with common characteristics in the date lake. This scenario is also a batch update scenario. In this scenario, the deletion conditions may be arbitrary conditions, which have nothing to do with the primary key (Primary key) Any relationship, and the data set to be updated is very large. This kind of job is a long time-consuming and low-frequency job.
The other is the CDC writing scenario. For Flink, there are generally two commonly used scenarios. The first scenario is that the upstream Binlog can be quickly written to the data lake, and then used for analysis by different analysis engines; The second scenario is to use Flink to do some aggregation operations. The output stream is an upsert type of data stream, which also needs to be able to be written to the data lake or downstream system for analysis in real time. In the example shown in the figure below, CDC writes the SQL statement in the scene. We use a single SQL to update a row of data. This calculation mode is a streaming incremental import, and it is a high-frequency update.

3.2 Issues to be considered in Apache Iceberg's CDC writing scheme design

Next, let's take a look at what issues iceberg needs to consider in the design of the CDC writing scenario.

The first is correctness, that is, it is necessary to ensure the correctness of semantics and data, such as upstream data upsert to iceberg, when the upstream upsert stops, the data in iceberg needs to be consistent with the data in the upstream system.

The second is efficient writing. Since the writing frequency of upsert is very high, we need to maintain high throughput and high concurrent writing.

The third is fast reading. When the data is written, we need to analyze the data, which involves two issues. The first issue is the need to support fine-grained concurrency. When the job uses multiple tasks to read, it can Guarantee a balanced allocation for each task to speed up data calculation; the second problem is that we need to give full play to the advantages of columnar storage to speed up reading.

The fourth is to support incremental reading, such as ETL in some traditional data warehouses, which perform further data conversion through incremental reading.

3.3 Apache Iceberg Basic

Before introducing the details of the specific solution, let's first understand the layout of Iceberg in the file system. Generally speaking, Iceberg is divided into two parts of data. The first part is the data file, such as the parquet file in the figure below. Each data file corresponds to a school. verification file (.crc file). The second part is the table metadata file (Metadata file), including Snapshot file (snap-.avro), Manifest file (.avro), TableMetadata file (*.json), etc.

The figure below shows the corresponding relationship of files in snapshot, manifest and partition in iceberg. The figure below contains three partitions. The first partition has two files f1 and f3, the second partition has two files f4 and f5, and the third partition has one file f2. For each write, a manifest file is generated, which records the correspondence between the file written this time and the partition. On the upper layer, there is the concept of snapshot. Snapshot can help to quickly access the full amount of data in the entire table. Snapshot records multiple manifests. For example, the second snapshot contains manifest2 and manifest3.


After understanding the basic concepts, the following introduces the design of insert, update, and delete operations in iceberg.

The table shown in the SQL example in the figure below contains two fields, id and data, both of which are of type int. In a transaction, we performed the data flow operation in the diagram, first inserting a record (1, 2), and then updating this record to (1, 3), the update operation in iceberg will be split into delete and insert two operations.

The reason for this is that considering that iceberg is a unified storage layer for streaming batches, splitting the update operation into delete and insert operations can ensure a unified reading path when updating in streaming batch scenarios. For example, in the scenario of batch deletion, Hive For example, Hive will write the file offset of the line to be deleted into the delta file, and then do a merge on read, because this will be faster. When merging, the original file and delta will be mapped by position, and the result will be obtained quickly. All but deleted records.

Next, record (3, 5) is inserted, record (1, 3) is deleted, record (2, 5) is inserted, and the final query is that we get record (3, 5) (2, 5).

The above operation looks very simple, but there are some semantic problems in the implementation. As shown in the figure below, in a transaction, the operation of inserting records (1, 2) is first performed, which will write INSERT (1, 2) in the data file1 file, and then the operation of deleting records (1, 2) is performed. Will write DELETE (1, 2) in equalify delete file1, and then execute the insert record (1, 2) operation, which will write INSERT (1, 2) in the data file1 file, and then execute the query operation.

Under normal circumstances, the query result should return the record INSERT (1, 2), but in the implementation, the DELETE (1, 2) operation cannot know which line in the data file1 file is deleted, so the two lines of INSERT (1, 2 ) records will be deleted.

So how to solve this problem, the current way of the community is to adopt Mixed position-delete and equality-delete. Equality-delete is to delete by specifying one or more columns, and position-delete is to delete according to the file path and line number. By combining these two methods, the correctness of the delete operation is ensured.

As shown in the figure below, we inserted three rows of records in the first transaction, namely INSERT (1, 2), INSERT (1, 3), INSERT (1, 4), and then performed the commit operation to submit. Next, we start a new transaction and insert a row of data (1, 5). Since it is a new transaction, we create a new data file2 and write the INSERT (1, 5) record, and then delete the record (1, 5) 5), when actually writing delete is:

Writing (file2, 0) in position delete file1 means deleting the record at row 0 in data file2, which is to solve the semantic problem of repeated insertion and deletion of the same row of data in the same transaction.
Write DELETE (1,5) in the equality delete file1 file. The reason for writing this delete is to ensure that (1,5) written before this txn can be deleted correctly.

Then execute the delete (1, 4) operation. Since (1, 4) has not been inserted in the current transaction, this operation will use the equality-delete operation, that is, write the (1, 4) record in the equality delete file1. From the above process, it can be seen that there are three types of files in the current solution: data file, position delete file, and equality delete file.

After understanding the writing process, how to read it. As shown in the figure below, for the records in the position delete file (file2, 0) only need to join with the data file of the current transaction, and for the records of the equality delete file (1, 4) to join with the data file in the previous transaction . Finally, the records INSERT (1, 3) and INSERT (1, 2) are recorded to ensure the correctness of the process.

3.5 Manifest file design

Insert, update, and delete are introduced above, but when designing the execution plan of the task, we have made some designs on the manifest. The purpose is to quickly find the data file through the manifest, and divide it according to the data size to ensure that the data processed by each task distribute as evenly as possible.

The example shown in the figure below contains four transactions, the first two transactions are INSERT operations, corresponding to M1 and M2, the third transaction is a DELETE operation, corresponding to M3, and the fourth transaction is an UPDATE operation, including two manifest files, namely data manifest and delete manifest.

As for why the manifest file should be split into data manifest and delete manifest, it is essentially to quickly find the corresponding delete file list for each data file. You can see the example in the figure below. When we read in partition-2, we need to do a join operation between deletefile-4, datafile-2 and datafile-3, and also need to deletefile-5 with datafile-2 and datafile-3. Do a join operation.

Taking datafile-3 as an example, the deletefile list includes deletefile-4 and deletefile-5. How to quickly find the corresponding deletefIle list? We can query according to the upper-level manifest. When we split the manifest file into data manifest After deleting the manifest, M2 (data manifest) and M3, M4 (delete manifest) can be joined first, so that the delete file list corresponding to the data file can be quickly obtained.

3.6 File-level concurrency

Another problem is that we need to ensure high enough concurrent reads, which is done very well in iceberg. In iceberg, file-level concurrent reading can be achieved, and even finer-grained concurrent reading of segments in the file can be achieved. For example, if the file has 256MB, it can be divided into two 128MB for concurrent reading. Here is an example, assuming that the layout of the insert file and the delete file in the two Buckets is as shown in the figure below.

Through manifest comparison, we found that the delete file list of datafile-2 is only deletefile-4, so that these two files can be executed as a single task (Task-2 in the figure), and other files are similar, which can ensure The merge operation is performed on each task data in a relatively balanced manner.

We have made a brief summary of this scheme, as shown in the figure below. First of all, the advantages of this solution can satisfy correctness, and can achieve high-throughput writing and concurrent and efficient reading. In addition, it can achieve snapshot-level incremental pull.

The current scheme is still relatively rough, and there are some points that can be optimized below.

First, if there are duplicate delete files in the same task, it can be cached, which can improve the efficiency of join.

The second point is that when the delete file is relatively large and needs to be overflowed to disk, you can use kv lib to optimize it, but this does not depend on external services or other heavy indexes.

Third, you can design a Bloom filter (Bloom filter) to filter invalid IO, because the commonly used upsert operation in Flink will generate a delete operation and an insert operation, which will cause the size of the data file and delete file in iceberg The difference is not big, so the efficiency of join will not be very high. If Bloom Filter is used, when the upsert data arrives, it is split into insert and delete operations. If the delete operations that have not been inserted before are filtered out through the bloom filter (that is, if this data has not been inserted before, there is no need to delete record written to the delete file), which will greatly improve the efficiency of upsert.

The fourth point is that some background compaction strategies are needed to control the size of deleted files. When there are fewer deleted files, the analysis efficiency is higher. Of course, these strategies will not affect normal reading and writing.

3.7 Transaction submission of incremental file set

The writing of the file was introduced above. In the figure below, we introduce how to write according to the semantics of iceberg and make it readable by users. It is mainly divided into two parts: data and metastore. First, IcebergStreamWriter will write data, but the metadata information written to the data is not written into the metastore at this time, so it is not visible to the outside world. The second operator is IcebergFileCommitter, which collects data files and finally writes them through commit transaction.

In Iceberg, there is no dependency on any other third-party services, and Hudi has abstracted some services in some aspects, such as abstracting the metastore into an independent Timeline, which may rely on some independent indexes or even other external services To be done.

4. Future planning

The following are some of our future plans. The first is some optimization of the Iceberg core, including the full link stability test and performance optimization involved in the plan, and provide some related Table API interfaces for CDC incremental pull.

In the integration of Flink, the ability to automatically and manually merge data files of CDC data will be realized, and the ability of Flink to incrementally pull CDC data will be provided.

In terms of other ecological integrations, we will integrate engines such as Spark and Presto, and use Alluxio to accelerate data queries.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us