Apache Flink Table Store's full incremental integration into the lake in real time

Related Tags:1.Realtime Compute for Apache Flink
2. Flink Python

1. Development stage and pain points of data entering the lake (warehouse)



The process of database based data integration has briefly gone through the following stages.

1.1 Data warehousing in full volume+regular increment



Fig. 1 Data warehousing: one-time full amount+cycle increment

The full data is imported once through bulk load, and the incremental synchronization task is scheduled to synchronize the incremental data from the database to the temporary table, and then merge with the full data. Although this method can meet certain business needs, there are also the following problems.

Full and incremental data needs to be merged regularly to obtain the latest data snapshot. Since record level updates are not supported, users need additional SQL tasks to calculate duplication; Data freshness depends on scheduling. If the data arrives late, it is also necessary to deal with data drift. A common processing method is to schedule consolidation tasks that generate T (event time) at T+N processing time. At the same time, take T~T+N partitions (processing time), and filter out the data whose business time is less than or equal to T (event time) for consolidation, which will further reduce the data freshness.

Although the downstream will use various interactive analysis engines to speed up the query, based on cost considerations, the detailed data in the bottom table generally does not have this treatment, which leads to the need to directly query the details during data correctness troubleshooting, especially the need to query the full and incremental changes before and after consolidation to locate the problem. If business changes cause a batch of order data to be revised and require the generation of revised indicators, the original table and its downstream dependent tables need to be manually cascading revised.

1.2 Full volume+real-time incremental data entering the lake



Compared with the traditional data warehouse, the appearance of data lake has greatly improved the freshness of data while it is stored at a low cost. Take Apache Hudi as an example. It supports a full bootstrap to build a basic table, and then a real-time build of Hudi based lake warehouse integration technology in Shope based on newly accessed CDC data [1], as shown in Figure 2.

Fig. 2 Data entering the lake: one-time total+real-time increment

Because it supports record level update and deletion, the primary key can be de duplicated on the storage side, and no additional consolidation tasks are required. In terms of data freshness, since streaming jobs regularly trigger checkpoints to generate snapshots after full and incremental consolidation, data freshness has been greatly improved compared with the first method (generating consolidated snapshots by day or hour scheduling).

On the other hand, we also found the following problems with this method.

🤔 Bootstrap Index timeout and state expansion

After the Flink incremental synchronization job is started in streaming mode, the system will first import the full scale into the Flink state to build an index from the Hoodie key (that is, the primary key+the partition path) to the file group where the file is written. This process will block the completion of the checkpoint. Only after the checkpoint succeeds, the written data can become readable. Therefore, when the full amount of data is large, the checkpoint may always timeout, resulting in no data being read by the downstream. In addition, because the index is always stored in the state, the index will be updated when encountering insert type records in the incremental synchronization phase. You need to reasonably evaluate the state TTL. If the configuration is too small, data may be lost, and if the configuration is too large, the state may expand.

🤔 The links are still complex, it is difficult to align incremental points, and the cost of automatic operation and maintenance is high

The full volume+real-time incremental method does not simplify the complexity of the link, because it additionally introduces the operation and maintenance of Kafka. You need to manually align the points of incremental consumption to prevent data loss Change Data Capture with Debezium and Apache Hudi [2]. After the incremental CDC job is started, the user needs to wait and observe the running status of the job. After the first checkpoint succeeds, stop with savepoint modifies the configuration to disable Bootstrap Index, and then restart the job from the savepoint (restore from savepoint). The operation of the whole process is complex, and the cost of automatic operation and maintenance is relatively high.

In addition, we reviewed some industry practices of using Hudi, and found that users need to pay special attention to various configurations to achieve different needs, which is harmful to usability. For example, the practice of Hudi based integrated lake warehouse technology in Shopee [1] mentioned the need to monitor users' table building statements at the platform level to prevent the configuration of large-scale write scenarios to the COW (Copy on Write) mode; During full incremental switching, users must pay special attention to Kafka consumption points to ensure data accuracy. Parameter configuration greatly affects the data accuracy and performance of the job.

2. Full incremental integration into the lake based on Apache Flink Table Store



As log based CDC gradually replaces query based CDC, especially after Flink SQL CDC has supported full incremental all-in-one synchronization on the source side, full incremental all-in-one synchronization (using a stream job to complete full synchronization, and continuously listening to incremental changelogs) has become a new direction of exploration. This method reduces the link complexity. At the same time, Flink CDC and checkpoint mechanisms are entrusted with the tedious task of manually aligning offsets during full incremental switching, so that the framework level can ensure the final consistency of data. However, after investigation, we found that we encountered the following challenges when using Hudi to make such an attempt.

🤔 The data is out of order seriously in the full synchronization phase, and the write performance and stability are difficult to guarantee

One problem faced in the full synchronization phase is that multiple concurrent simultaneous reads of chunks will encounter serious data disorder, and multiple partitions will be written at the same time. A large number of random writes will lead to performance rollback and throughput glitches. Writers corresponding to each partition need to maintain their own caches. It is very easy for OOM to cause job instability. Although Hudi supports the use of Rate Limit Apache Hudi DeltaStreamer # Rate Limit [3] to limit data writes per minute to achieve a certain smoothing effect, the process of balancing job stability and performance throughput is also a high threshold for ordinary users.

2.1 Why Flink Table Store



Apache Table Store https://github.com/apache/flink-table-store [4] As an open source Apache Flink subproject in early 2022, the goal is to create a databank that supports updates for real-time streaming changelog ingestion and high-performance queries.

🚀 High throughput update data ingestion, supporting full incremental integration into the lake, and one Flink job to handle all

Fig. 3 Flink Table Store: Full+Incremental Integration Synchronization

Reviewing the previous article, we know that the main challenge of full incremental integration synchronization is that a large number of random writes caused by data disorder occur in the full synchronization phase, leading to performance regression, throughput glitches and instability. The storage format of the Table Store is partition first and then bucket. Each bucket maintains an LSM (Log structured Merge Tree) (see Fig. 4 and Fig. 5). When each record falls into the bucket through the primary key hash, the write path (Directory) is determined and written to the MemTable in KV mode (similar to HashMap, where Key is the primary key and Value is the record). In the process of flushing to disk, sort and merge (de duplication) by primary key, and write to disk by appending. Sort Merge is performed in the buffer, avoiding the need to click the index to determine whether a record is inserted or updated to obtain the tagging of the file group that writes the file Apache Hudi Technical Specification # Writer Expectations [5]. In addition, the triggering of MemTable flush occurs when the buffer is full. You do not need to use Auto File Sizing Apache Hudi Write Operations # Writing path [6] (Auto File Sizing will affect the write speed Apache Hudi File Sizing # Auto Size During establishment [7]) to avoid the generation of small files. The entire write process is partial and sequential On Disk IO, Part 3: LSM Trees [8], which avoids the generation of random IO.

When using the Table Store as the lake storage, only one INSERT INTO statement is required to complete the full incremental synchronization. Taking the following SQL as an example, it shows how to use a Flink stream job to stream the order table in the MySQL database into the Table Store table, and continuously consume incremental data.

Figure 4 shows the storage structure using dt as the partitioned orders table. After the user specifies the total number of buckets N, the corresponding bucket - ${n} directory will be generated under each partition, and the hash will be stored in the column format (orc or request) under each directory_ Func (pk)% N==${n}.

Metadata and data are stored in the same level directory of the table, including the manifest directory and snapshot directory.

The manifest directory records the data file changes submitted each time triggered by the checkpoint, including new and deleted data files

The snapshot file generated by each submission is recorded in the snapshot directory. The content includes the manifest generated for the last submission, plus the manifest generated for this submission as an increment

Figure 5 shows the LSM implementation process in each bucket in Figure 4. Take Flink flow job as an example. Each checkpoint, Flink Table Store will generate a commit, including the following information

Generate a snapshot of the current table. The system will track the earliest and current snapshot files through snapshot pointer files (similar to pointers)

The snapshot file contains which manifest files are added and deleted in this commit

Each manifest file records which sst files have been generated, which sst files have been deleted, the primary key range of records contained in each sst file, and the min/max/null count of each field

Each sst file contains records sorted by primary key and in column format. For Level 0 files, the Table Store will asynchronously trigger the compact merge thread to eliminate the read end merge cost caused by overlapping primary key ranges.

Fig. 5 LSM Implementation of Flink Table Store Table

When Flink Table Store 0.2.0 was released, we tested the real-time update scenario writing (including insertion and update) of 500 million pieces of data under 100 million primary keys and compared it with Apache Hudi MOR and COW. Apache Flink Table Store 0.2.0 was released [9]. The Table Store has good write performance in large-scale real-time update scenario.



🚀 Efficient Data Skipping supports filtering and provides high-performance point and range queries

Although there is no additional index, thanks to the meta management and column format, the manifest stores

The min/max of the file's primary key and the statistics of each field, which can perform some predicate filtering without accessing the file

In the orc/parquet format, sparse indexes are recorded at the end of the file. Statistics and offsets of each chunk can be filtered through the end of the file

When the data is read by a filter, the following filtering can be performed according to the above information



1. Read the manifest: according to the min/max and partition of the file, execute the prediction of partitions and fields, and eliminate redundant files

2. Read the file footer: filter the chunk that do not need to be read according to their min/max

3. Read the remaining files and chunks in them

Take the order table above as an example, when the user wants to query all orders under the partition dt=2022-01-01 When the ID is between 100 and 200

SELECT * FROM orders WHERE dt = '2022-01-01' AND order_ id >= 100 AND order_ id <= 200;

Flink Table Store will first read the latest submitted snapshot file (read committed) according to the LATEST-snapshot file, and then read the corresponding manifest meta file from the SNAPSHOT. According to the partition condition dt='2022-01-01 ', it will filter out the statistical information containing these partitions. Since the statistical information contains the range of each sst file key, continue to execute order If the ID is in the [100, 200] range, you can only read the corresponding sst file in the directory 2022-01-01.

We also tested the query performance of the Flink Table Store based on the above datasets. Apache Flink Table Store 0.2.0 was released [9]. In the scenario of spot check and range query, Flink Table Store performs well. From the implementation principle, it is inevitable that the query performance of MOR is lower than that of COW, and the write performance of COW is lower than that of MOR. On the practical level, the MOR table created in the large-scale write scenario is also difficult to be converted to COW for reading by one key. Therefore, the query performance of Flink Table Store is good under the premise of querying more tables (MOR tables) to be written.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us