How Flink CDC simplifies data entry into lakes and warehouses

1. Introduction to Flink CDC

In a broad sense, technologies that can capture data changes can be called CDC technologies. Usually what we call CDC technology is a technology used to capture data changes in the database. The application scenarios of CDC technology are also very extensive, including:

*Data distribution, which distributes one data source to multiple downstreams, is often used for business decoupling and microservices.
*Data integration, integrating scattered and heterogeneous data sources into the data warehouse, eliminating data islands and facilitating subsequent analysis.
*Data migration, often used in database backup, disaster recovery, etc.

Flink CDC's Change Data Caputre technology based on database logs realizes full and incremental integrated reading capabilities, and with the help of Flink's excellent pipeline capabilities and rich upstream and downstream ecology, it supports capturing changes in various databases and converting these changes Synchronize to downstream storage in real time.

At present, the upstream of Flink CDC already supports a wealth of data sources such as MySQL, MariaDB, PG, and MongoDB, and support for databases such as Oceanbase, TiDB, and SQLServer is also in the planning of the community.

The downstream of Flink CDC is more abundant. It supports writing to Kafka and Pulsar message queues, and also supports writing to data lakes such as Hudi and Iceberg. It also supports writing to various data warehouses.

At the same time, through the Changelog mechanism natively supported by Flink SQL, the processing of CDC data can be made very simple. Users can realize operations such as cleaning, widening, and aggregation of full and incremental data in the database through SQL, which greatly reduces the threshold for users. In addition, the Flink DataStream API supports users to write codes to implement custom logic, providing users with the freedom to deeply customize services.

The core of Flink CDC technology is to support real-time and consistent synchronization and processing of full data and incremental data in the table, so that users can easily obtain real-time consistent snapshots of each table. For example, there is a full amount of historical business data in a table, and incremental business data is continuously written and updated. Flink CDC will capture incremental update records in real time and provide snapshots consistent with the database in real time. If it is an update record, it will update the existing data. If a record is inserted, it will be appended to the existing data. During the whole process, Flink CDC provides a consistency guarantee, that is, no duplication or loss.

So what kind of changes can Flink CDC technology bring to the existing data storage and lake architecture? Let's take a look at the architecture of traditional data warehousing.

In the early data warehousing architecture, the full amount of data is usually SELECTed into the data warehouse every day before offline analysis. This architecture has several obvious disadvantages:

Querying a full amount of business tables every day will affect the stability of the business itself.
In the offline day-level scheduling method, the timeliness of day-level output is poor.
Based on the query method, as the amount of data continues to grow, the pressure on the database will also continue to increase, and the performance bottleneck of the architecture is obvious.

In the 2.0 era of data warehouses, data warehousing has evolved to the Lambda architecture, adding a link for real-time synchronous import of increments. Overall, the Lambda architecture has better scalability and no longer affects business stability, but there are still some problems:

Relying on offline timing merge, only hourly output can be achieved, and the delay is still relatively large;
Full and incremental are two separate links;
The entire architecture has long links and many components that need to be maintained. The full link of this architecture needs to maintain DataX or Sqoop components, and the incremental link needs to maintain Canal and Kafka components. At the same time, it also needs to maintain full and incremental timed merge links .

For the problems existing in the traditional data warehousing architecture, the emergence of Flink CDC provides some new ideas for the data warehousing architecture. With the help of the full incremental integrated real-time synchronization capability of Flink CDC technology, combined with the update capability provided by the data lake, the entire architecture becomes very simple. We can directly use Flink CDC to read the full and incremental data of MySQL, and write and update it directly to Hudi.

This simple architecture has clear advantages. First of all, it will not affect business stability. Second, it provides minute-level output to meet the needs of near real-time business. At the same time, the full and incremental links have been unified to achieve integrated synchronization. Finally, the architecture has shorter links and fewer components to maintain.

2. The core features of Flink CDC

The core features of Flink CDC can be divided into four parts:

One is through the incremental snapshot reading algorithm, which realizes functions such as lock-free reading, concurrent reading, and resumable uploading from breakpoints.
The second is that the design is friendly to the lake, which improves the stability of CDC data entering the lake.
The third is to support the integration of heterogeneous data sources, which can facilitate the processing of Streaming ETL.
The fourth is to support the merging of sub-databases and sub-tables into the lake. Next, we will introduce these features separately.

In Flink CDC 1.x version, MySQL CDC had three major pain points, which affected production availability.

One is that MySQL CDC needs to use global locks to ensure the consistency of full and incremental data, and MySQL's global locks will affect online business.
Second, it only supports single concurrent reading, and reading large tables is very time-consuming.
The third is that in the full synchronization stage, after the job fails, it can only be re-synchronized, and the stability is poor. In response to these problems, the Flink CDC community proposed the "Incremental Snapshot Reading Algorithm", and at the same time realized the capabilities of lock-free reading, parallel reading, and resumable uploading from breakpoints, and solved the above pain points together.

To put it simply, the core idea of the incremental snapshot reading algorithm is to divide the table into chunks for concurrent reading in the full reading phase. After entering the incremental phase, only one task is needed to read the binlog log concurrently. When automatically switching between incremental and incremental, the consistency is guaranteed through a lock-free algorithm. This design further saves resources while improving reading efficiency. Realized full incremental integration of data synchronization. This is also a very important landing on the road of stream-batch integration.

Flink CDC is a streaming lake-friendly framework. In the early version of the Flink CDC design, the data lake scenario was not considered, and the checkpoint was not supported in the full phase, and the full data would be processed in one checkpoint, which was very unfriendly to data lakes that relied on checkpoint to submit data. Flink CDC 2.0 was designed with the data lake scenario in mind at the beginning, and it is a stream-to-lake friendly design. In design, the full amount of data is fragmented, and Flink CDC can optimize the checkpoint granularity from table granularity to chunk granularity, which greatly reduces the buffer usage when data lake is written, and is more friendly to data lake writing.

One of the core points that distinguishes Flink CDC from other data integration frameworks is the stream-batch integrated computing capability provided by Flink. This makes Flink CDC a complete ETL tool, not only has excellent E and L capabilities, but also has powerful Transformation capabilities. Therefore, we can easily realize the construction of data lakes based on heterogeneous data sources.

In the SQL on the left side of the figure above, we can associate the real-time product table and real-time order table in MySQL with the real-time logistics information table in PostgreSQL, that is, Streaming Join. The associated results are updated to Hudi in real time, which is very easy Complete the data lake construction of heterogeneous data sources efficiently.

In the OLTP system, in order to solve the problem of a large amount of data in a single table, the method of sub-database and table is usually used to split a single large table to improve the throughput of the system. However, in order to facilitate data analysis, it is usually necessary to merge the tables split from the sub-database and sub-tables into a large table when synchronizing to the data warehouse or data lake. Flink CDC can easily accomplish this task.

In the SQL on the left side of the figure above, we declare a user_source table to capture the data of all user sub-databases and sub-tables. We use regular expressions to match these tables through the table configuration items database-name and table-name. Moreover, the user_source table also defines two metadata columns to distinguish which library and table the data comes from. In the declaration of the Hudi table, we declare the library name, table name and the primary key of the original table as the joint primary key in Hudi. After declaring the two tables, a simple INSERT INTO statement can merge the data of all sub-databases and sub-tables into one table in Hudi, completing the construction of a data lake based on sub-databases and sub-tables, which facilitates subsequent unified analysis on the lake .

3. The open source ecology of Flink CDC

Flink CDC is an independent open source project, and the project code is hosted on GitHub. Adopting a release rhythm of small steps and fast running, the community has released 5 versions this year. The three versions of the 1.x series introduced some small functions; the 2.0 version of MySQL CDC supports advanced functions such as lock-free reading, concurrent reading, breakpoint resume, etc., with 91 commits and 15 contributors; Version 2.1 supports MongoDB databases, with 115 commits and 28 contributors. The community's commits and contributors have grown significantly.

Documentation and help manuals are also a very important part of the open source community. In order to better help users, the Flink CDC community has launched a versioned documentation website, such as version 2.1 documentation. The documentation also provides many quick-start tutorials, users can get started with Flink CDC as long as they have a Docker environment. In addition, a FAQ instruction manual) is provided to quickly solve common problems encountered by users.

In the past 2021, the Flink CDC community has achieved rapid development, GitHub's PR and issue are quite active, and GitHub Star has increased by 330% year-on-year.

4. Practice and improvement of Flink CDC in Alibaba

Flink CDC's entry into the lake and warehousing has also been practiced and implemented on a large scale in Alibaba, and some pain points and challenges have also been encountered in the process. We will introduce how we improved and solved it.

Let's first look at some of the pain points and challenges that CDC encountered when entering the lake. This is a user's original CDC data lake architecture, which is divided into two links:

There is a full synchronization job to do a one-time full data pull;
There is also an incremental job that synchronizes Binlog data to the Hudi table in near real-time through the Canal and processing engines.
Although this architecture takes advantage of Hudi's update capability, it does not need to periodically schedule full merge tasks, and can achieve minute-level delays. However, full and incremental are still two separate operations, and manual intervention is still required to switch between full and incremental, and an accurate incremental start point needs to be specified, otherwise there is a risk of data loss. It can be seen that this architecture is divided into streams and batches, not a unified whole. Just now Xuejin also introduced that one of the biggest advantages of Flink CDC is the automatic switching of full increments, so we replaced the user's original in-lake architecture with Flink CDC.

However, after users use Flink CDC, the first pain point they encounter is the need to manually map MySQL DDL to Flink DDL. Manually mapping the table structure is cumbersome, especially when the number of tables and fields is very large. Moreover, manual mapping is also prone to errors. For example, MySQL's BIGINT UNSINGED cannot be mapped to Flink's BIGINT, but must be mapped to DECIMAL(20). If the system can automatically help users automatically map the table structure, it will be much simpler and safer.

Another pain point encountered by users is that the change of the table structure makes it difficult to maintain the inbound link. For example, the user has a table that originally had two columns of id and name, but suddenly added a column of Address. The newly added column of data may not be synchronized to the data lake, and may even cause the link to the lake to hang up, affecting stability. In addition to changes in adding columns, there may also be column deletions, type changes, and so on. Fivetran abroad has done a research report and found that 60% of the companies, the schema changes every month, and 30% changes every week. This shows that basically every company will face data integration challenges brought about by schema changes.

The last one is the challenge of putting the whole warehouse into the lake. Because users mainly use SQL, it is necessary to define an INSERT INTO statement for each table's data synchronization link. Some users even have thousands of business tables in their MySQL instances, and users have to write thousands of INSERT INTO statements. What is even more daunting is that each INSERT INTO task will create at least one database connection and read Binlog data once. If thousands of tables enter the lake, thousands of connections and thousands of repeated binlog reads are required. This puts a lot of pressure on MySQL and the network.

Just now we introduced many pain points and challenges of CDC data into the lake. We can think from the perspective of users, what exactly do users want in the scenario of database into the lake? We can first regard the intermediate data integration system as a black box. What kind of capabilities do users expect this black box to provide to simplify the work of entering the lake?

First of all, users definitely want to synchronize both full and incremental data in the database, which requires the system to have the ability to integrate full increments and automatically switch between full increments, rather than a split full link + incremental link .

Secondly, users definitely don’t want to manually map the schema for each table, which requires the system to have the ability to automatically discover meta information, save the user the process of creating DDL in Flink, and even help users automatically create target tables in Hudi.

In addition, the user also hopes that the change of the source table structure can also be automatically synchronized. Whether it is adding columns, subtracting columns and changing columns, or adding tables, subtracting tables and changing tables, they can be automatically synchronized to the target side in real time, so as not to lose Any new data that occurs at the source end automatically builds an ODS layer that maintains data consistency with the source end database.

Finally, it is also necessary to have the ability to synchronize the entire database available for production, so as not to put too much pressure on the source end and affect the online business.

These four core functions basically constitute the ideal data integration system that users expect, and it would be more perfect if all of these can be completed with only one line of SQL and one job. We call the system in the middle "full-automatic data integration" because it automatically completes the database entry into the lake and solves several core pain points currently encountered. And it seems that Flink is a very suitable engine to achieve this goal.

So we spent a lot of energy building this "fully automated data integration" based on Flink. It mainly revolves around the four points just mentioned.

First of all, Flink CDC already has the capability of full incremental automatic switching, which is also one of the highlights of Flink CDC.

In terms of automatic discovery of meta information, it can be seamlessly connected through Flink’s Catalog interface. We developed MySQL Catalog to automatically discover tables and schemas in MySQL, and developed Hudi Catalog to automatically create meta information of target tables in Hudi .

In terms of automatic synchronization of table structure changes, we have introduced a Schema Evolution kernel, which enables Flink Jobs to synchronize schema changes in real time without relying on external services.
In terms of whole database synchronization, we have introduced the CDAS syntax, which can complete the definition of the whole database synchronization job with one line of SQL statement, and introduced the optimization of source merging to reduce the pressure on the source database.

In order to support the synchronization of the entire database, we also introduced the data synchronization syntax of CDAS and CTAS. Its syntax is very simple. The CDAS syntax is create database as database, which is mainly used for the synchronization of the entire database. The statement shown here completes the synchronization of the entire database from MySQL's tpc_ds library to Hudi's ods library. Similarly, we also have a CTAS syntax, which can be conveniently used to support table-level synchronization, and can also specify the library name and table name through regular expressions to complete the merge synchronization of sub-databases and sub-tables. As here, the MySQL user sub-database and sub-table are merged into Hudi's users table. The syntax of CDAS and CTAS will automatically go to the target to create the target table, and then start a Flink job to automatically synchronize the full + incremental data, and also synchronize the table structure changes in real time.

As mentioned earlier, when thousands of tables are imported into the lake, too many database connections are established, and repeated reading of Binlog will cause huge pressure on the source database. In order to solve this problem, we have introduced the optimization of source merging. We will try to merge sources in the same job. If they all read the same data source, they will be merged into one source node. At this time, the database only needs to establish a connection. The binlog only needs to be read once, which realizes the reading of the entire database and reduces the pressure on the database.

In order to understand more intuitively how we simplify the work of data entry into the lake and warehouse, we also provide an additional demo video. Interested friends can watch "How Flink CDC Simplifies Real-time Data Entry into the Lake" at the Flink Forward Asia 2021 conference Warehouse" sharing.

5. Future planning of Flink CDC

Finally, there are three main plans for the future of Flink CDC.

First, we will continue to improve the syntax and interface of CDAS and CTAS, polish the core of Schema Evolution, and prepare for open source.

Second, we will expand more CDC data sources, including TiDB, OceanBase, SQLServer, which are already planned.

Third, we will abstract the current incremental snapshot reading algorithm into a general framework, so that more databases can be connected to this framework through a few simple interfaces, and have the ability of full incremental integration.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us