Building ETL data integration based on Flink SQL for streaming and batch integration

Data warehouse and data integration

The data warehouse is an integrated (Integrated), subject-oriented (Subject-Oriented), time-varying (Time-Variant), non-modifiable (Nonvolatile) data collection, used to support management decisions. This is the data warehouse concept proposed by the father of data warehouse Bill Inmon in 1990. The most important point in this concept is "integrated", and the rest of the features are methodological. Because the first problem to be solved by the data warehouse is data integration, which is to integrate multiple scattered and heterogeneous data sources together to eliminate data islands and facilitate subsequent analysis. This applies not only to traditional offline data warehouses, but also to real-time data warehouses, or the current hot data lakes. The first thing to solve is the problem of data integration. If the business data is all in one database, and this database can also provide very efficient query and analysis capabilities, then there is no need for data warehouses and data lakes.

Data integration is a process we often call ETL, which are data access, data cleaning and conversion, and data storage and storage. They correspond to the initials of three English words, so they are called ETL. The ETL process is also the most work-intensive link in the data warehouse construction. So how does Flink improve this ETL process? Let's first take a look at the architecture of a traditional data warehouse.

In traditional data warehouses, the real-time and offline data warehouses are two separate sets of links. For example, the real-time link synchronizes log and database data to Kafka in real time through Flume and Canal, and then performs data cleaning and widening in Kafka. The offline link periodically synchronizes log and database data to HDFS and Hive through Flume and Sqoop. Then do data cleaning and widening in Hive.

Here we mainly focus on the construction of the first half of the data warehouse, that is, to the ODS and DWD layers. We regard this area as the scope of ETL data integration in a broad sense. So in this area, the main problem with the traditional architecture is that this kind of split data warehouse construction will cause a lot of repetitive work, repeated resource consumption, and the real-time and offline underlying data models are inconsistent, which will lead to difficulty in guaranteeing data consistency and quality. . At the same time, the data of the two links is isolated, and the data has not been connected and shared.

So what changes can Flink bring to this architecture?

Based on Flink SQL, we can now easily build stream-batch integrated ETL data integration. The core differences from the traditional data warehouse architecture are mainly in the following points:

Flink SQL natively supports CDC, so now you can easily synchronize database data, whether it is directly connected to the database or connected to common CDC tools.

In recent versions, Flink SQL continues to enhance the ability to join dimension tables. Not only can it associate dimension table data in the database in real time, it can now also associate dimension table data in Hive and Kafka, which can flexibly meet the needs of different workloads and timeliness. need.

Based on Flink's powerful streaming ETL capabilities, we can uniformly perform data access and data conversion at the real-time layer, and then return the data at the detailed layer to the offline data warehouse.

Now that Flink is streaming into Hive, it already supports the function of automatically merging small files, which solves the pain of small files.

Therefore, based on the stream-batch integrated architecture, the benefits we can obtain are:

*Unified basic public data
*Guaranteed the consistency of stream batch results
*Improved the timeliness of offline data warehouse
* Reduced maintenance costs for components and links

Next, we will introduce each part of this architecture in combination with scenarios and cases, including data access, data storage and lake storage, and data widening.

data access

Now the typical data sources of data warehouses mainly come from logs and databases. Log access is very mature at this stage, and there are also very rich open source products to choose from, including Flume, Filebeat, Logstash, etc., which can easily collect logs to Kafka . We will not expand too much here.

Database access will be much more complicated. Several common CDC synchronization tools include Canal, Debezium, and Maxwell. Flink integrates well with these synchronization tools through the CDC format, and can directly consume the data generated by these synchronization tools. At the same time, Flink also launched a native CDC connector, which directly connects to the database, lowers the access threshold, and simplifies the data synchronization process.

Let's first look at an example using the CDC format. A common solution now is to use Debezium or Canal to collect the binlog of the MySQL database in real time, and synchronize row-level change events to Kafka for Flink analysis and processing. Before Flink launched the CDC format, it would be very troublesome for users to consume this kind of data. Users need to understand the data format of CDC tools, declare fields such as before and after, and then use ROW_NUMBER to deduplicate to ensure real-time retention of the last The semantics of a line. But this is expensive to use and doesn't support DELETE events.

Now Flink supports the CDC format. For example, here we can directly specify format = 'debezium-json' in the with parameter, and then the schema part only needs to fill in the schema of the table in the database. Flink can automatically recognize Debezium's INSERT/UPDATE/DELETE events and convert them into Flink's internal INSERT/UPDATE/DELETE messages. Afterwards, the user can directly perform operations such as aggregation and join on the table, just like operating a MySQL real-time materialized view, which is very convenient.

In version 1.12 of Flink, Flink already natively supports most of the common CDC formats, such as Canal json, Debezium json, Debezium avro, Maxwell, etc. At the same time, Flink also opens up the CDC format interface, and users can implement their own CDC format plug-ins to connect with their own company's synchronization tools.

In addition, Flink internally supports the semantics of CDC, so it is natural to directly read MySQL binlog data and convert it into Flink internal change messages. So we launched the MySQL CDC connector. You only need to specify connector=mysql-cdc in the with parameter, and then select this table to read the full amount + CDC incremental data in MySQL in real time without deploying other components and services. You can understand the table defined in Flink as a real-time materialized view of MySQL, so the results of aggregation and join on this table are consistent with the results of real-time running in MySQL. Compared with the Debezium and Canal architectures just introduced, the CDC connector is easier to use. There is no need to learn and maintain additional components. The data does not need to go through Kafka, which reduces the end-to-end delay. Moreover, it supports reading the full amount of data first, and seamlessly switching to CDC incremental reading, that is to say, we are talking about the stream-batch integration and stream-batch fusion architecture.

We found that the MySQL CDC connector is very popular with users, especially when combined with the OLAP engine, it can quickly build a real-time OLAP architecture. One of the characteristics of the real-time OLAP architecture is to synchronize database data to OLAP for ad hoc query, so that there is no need for offline data warehouses.

How did you do it before?

In the past, users generally used datax to do a full amount of synchronization, and then used canal to synchronize real-time increments to Kafka, and then synchronized from Kafka to OLAP. This architecture is more complicated and the link is also very long. Now many companies are using Flink+ClickHouse to quickly build real-time OLAP architecture. We only need to define a mysql-cdc source and a ClickHouse sink in Flink, and then submit an insert into query to complete the real-time synchronization from MySQL to ClickHouse, which is very convenient. Moreover, one of the pain points of ClickHouse is that the join is relatively slow, so generally we will format the MySQL data into a large detailed wide table data, and then write it into ClickHouse. This is done in one join operation in Flink. Before Flink provided the MySQL CDC connector, it was very troublesome to do join during the full + incremental real-time synchronization process.

Of course, here we can also replace ClickHouse with other common OLAP engines, such as Alibaba Cloud's Hologres. We found that many users on Alibaba Cloud have adopted this link and architecture, because it can save the cost of data synchronization services and message middleware. For many small and medium-sized companies, in today's epidemic era, cost control is very important.

Of course, other OLAP engines, such as TiDB, can also be used here. TiDB official also recently published an article introducing this real-time OLAP architecture of Flink+TiDB.

data storage lake

Just now we introduced that data access can be done very conveniently based on Flink SQL, which is the Extract part of ETL. Next, let's introduce Flink SQL's ability to store data into the lake, which is the Load part.

Let's review the architecture diagram of stream-batch integration just now, the core part of which is the streaming of Kafka data into the warehouse. It is this process that opens up real-time and offline data warehouses, unifies the basic public data of data warehouses, and improves The timeliness of offline data warehouses, so we will talk about this one.

It is very convenient to use Flink SQL for streaming data storage, and version 1.12 already supports the automatic merging of small files, which solves the pain points of small files. You can look at the code on the right. First, use the Hive dialect in Flink SQL to create a Hive result table, and then use a simple query such as select from kafka table insert into Hive table to submit a task to stream Kafka data in real time. Enter Hive.

If you want to enable the compaction of small files, you only need to add auto-compaction = true to the Hive table parameters, then the compaction of small files will be automatically performed when streaming into this Hive table. The principle of merging small files is that Flink's streaming sink will create a small topology, in which the temp writer node is responsible for continuously writing the received data into temporary files. When a checkpoint is received, the compact coordinator is notified to start merging small files. The coordinator will distribute the compaction tasks to multiple compact operators to merge small files concurrently. When the compaction is completed, notify the partition committer to commit the entire partition file to be visible. The whole process uses Flink's own checkpoint mechanism to complete the automation of compaction, without requiring additional compaction services. This is also a core advantage of Flink's streaming warehousing compared to other warehousing tools.

In addition to streaming into the warehouse, Flink now also supports streaming into the lake. Taking Iceberg as an example, based on Iceberg 0.10, you can now directly create an Iceberg catalog in Flink SQL, and create an Iceberg table directly under the Iceberg catalog by creating table. Then submit the insert into query to import the streaming data into Iceberg. Then the Iceberg table can be read in batch mode in Flink for offline analysis. However, Iceberg's small file automatic merging function has not yet been released and is still being supported.

What I just introduced is the ability to stream pure append data into warehouses and lakes. Next, I will introduce the ability of CDC to stream data into warehouses and lakes. Let's first introduce CDC data into Kafka's real-time data warehouse. In fact, this requirement is very common in the construction of real-time data warehouses, such as synchronizing database binlog data to Kafka, or joining, the result of aggregation is an update stream, and the user wants to write this update stream to Kafka as intermediate data for downstream consumption .

This would have been very troublesome in the past. In Flink 1.12, Flink introduced a new connector called upsert-kafka, which natively supports Kafka as an efficient CDC streaming storage.

Why is it efficient? Because the storage form is highly integrated with the Kafka log compaction mechanism, Kafka will automatically clean up the compacted topic data, and Flink can still ensure semantic consistency when reading the cleaned data. And like Canal, Debezium will store a lot of useless metadata information such as before, op_type, etc., upsert-kafka will only store the content of the data itself, saving a lot of storage costs. In terms of use, you only need to declare connector = upsert-kafka in the DDL and define the PK.

For example, here we define the MySQL CDC live room table and an upsert-kafka result table to synchronize the live room database to Kafka. Then INSERT and UPDATE written to Kafka are common data with a key, and DELETE is NULL data with a key. When Flink reads the data in this upsert-kafka, it can automatically recognize INSERT/UPDATE/DELETE messages. Consuming this upsert-kafka table has the same semantics as consuming the MySQL CDC table. And when Kafka performs compaction cleaning on topic data, Flink can still ensure semantic consistency by reading the cleaned data.

Entering CDC data into the Hive data warehouse will be troublesome, because Hive itself does not support the semantics of CDC. A common way now is to stream write CDC data to HDFS in changelog-json format. Then start a batch task to periodically divide the CDC data on HDFS into three tables of INSERT, UPDATE, and DELETE according to the op type, and then do a batch merge.

data widening

The Extract and Load of the Flink SQL-based ETL process were introduced earlier, and then the most common data widening operations in Transformation are introduced.

Data widening is the most common business processing scenario in data integration. The main means of data widening is Join. Flink SQL provides a wealth of Join support, including Regular Join, Interval Join, and Temporal Join.

Regular Join is the well-known dual-stream Join, and its syntax is the common JOIN syntax. The example in the figure is to widen the advertising data by associating the advertising exposure stream with the advertising click stream. After widening, the advertising cost can be further calculated. It can be seen from the figure that both the exposure stream and the click stream will be stored in the state of the join node, and the join operator realizes data widening by associating the state of the exposure stream and the click stream. The characteristic of Regular Join is that any side stream will trigger the update of the result, such as the exposure stream and click stream in the case. At the same time, the syntax of Regular Join is consistent with traditional batch SQL, and the learning threshold for users is low. But it should be noted that regular join uses state to store the data that has already arrived in the dual stream, and the state is permanently reserved by default, so one problem with regular join is that the state will continue to grow by default, and we generally use it in combination with state TTL.

Interval Join is a join that requires a time interval on a stream. For example, in the advertising billing case just now, it has a very typical business feature in it, that is, clicks generally occur within 10 minutes after exposure. Therefore, compared with Regular Join, we actually only need to correlate the exposure data within 10 minutes, so the state does not need to store the full amount of exposure data. It is an optimization based on Regular Join. To convert to an Interval Join, time attribute fields (click_time and show_time in the figure) need to be defined on both streams. And define the time interval of the left and right streams in the join condition. For example, here we add a condition: the click time must be greater than or equal to the exposure time, and at the same time be less than or equal to 10 minutes after exposure. Similar to Regular Join, any stream of Interval Join will trigger the result update, but compared with Regular Join, the biggest advantage of Interval Join is that the state can be automatically cleaned up, data is retained according to the time interval, and the state occupation is greatly reduced. Interval Join is applicable to businesses with a clear time interval, for example, exposure flow is associated with click flow, click flow is associated with order flow, and order flow is associated with communication.

Temporal join (temporal table association) is the most commonly used data widening method, and it is often used to do the well-known dimension table join. Syntactically, it requires an explicit FOR SYSTEM_TIME AS OF statement. The biggest difference between it and Regular Join and Interval Join is that the change of dimensional data will not trigger the result update, so the dimensional data on the mainstream association will not change. Flink supports a very rich Temporal join function, including associated lookup DB, associated changelog, and associated Hive tables. In the past, the well-known dimension table join was generally associated with a database that can be queried, because the dimension data is in the database, but in fact the dimension data may have various physical forms, such as binlog form, or periodically synchronized to Hive to become In the form of Hive partition table. In Flink 1.12, it is now supported to associate these two new dimension table forms.

Temporal Join Lookup DB is the most common dimension table join method. For example, in the case of user click stream associated with user portrait, user click stream is in Kafka, user real-time portrait is stored in HBase database, and each click event is queried and associated with HBase The real-time portrait of the user in the data widening is completed. The feature of Temporal Join Lookup DB is that the update of the dimension table will not trigger the update of the result, and the dimension data is stored in the database, which is suitable for scenarios with high real-time requirements. When using it, we generally enable Async IO and memory cache to improve query efficiency .

Before introducing the Temporal Join Changelog, let's look at another example of Lookup DB, which is a case where live interactive data is associated with the live room dimension. In this case, the live interactive data (such as likes and comments) is stored in Kafka, and the real-time dimensional data of the live broadcast room (such as the anchor, the title of the live broadcast room) is stored in MySQL. The amount of live interactive data is very large. In order to speed up the access , the common solution is to add a cache, such as synchronizing the dimension data of the live room through CDC, then storing it in Redis, and then making dimension table association. The problem with this solution is that the business data of the live broadcast is special. The creation of the live broadcast room and the interactive data of the live broadcast are basically generated at the same time, so the interactive data may arrive at Kafka early and be consumed by Flink, but the creation message of the live broadcast room passes through Canal , Kafka, Redis, this link is relatively long, and the data delay is relatively large, which may cause the live room data to not be synchronized when the interactive data query Redis, resulting in the inability to associate the live room data, resulting in deviations in downstream statistical analysis.

For such scenarios, Flink 1.12 supports Temporal Join Changelog, which implements dimension table association by materializing dimension tables from the changelog in the Flink state. The scene just now has a more concise solution. We can synchronize the changelog of the live room database table to Kafka through the Flink CDC connector. Note that we look at the SQL on the right. We use the upsert-kafka connector to write the MySQL binlog Kafka is imported, that is, the upsert stream that stores the changed data in the live broadcast room in Kafka. Then we temporally join the interactive data to the upsert stream of the live room, thus realizing the function of widening the live data.

Note that here FOR SYSTEM_TIME AS OF is not related to a processing time, but the event time of the left stream. Its meaning is to associate the data of the live room at this event time. At the same time, we also define watermark on the upsert stream of the live room, so Temporal join changelog will perform watermark waiting and alignment during execution to ensure that the exact version of the result is associated, so as to solve the problem of not being associated in the previous solution.

Let's explain the process of temporal join changelog in detail. The left stream is the interactive stream data, and the right stream is the changelog of the live room. The changelog in the live broadcast room will be materialized in the dimension table state of the right stream. The state is equivalent to a multi-version database mirroring. The mainstream interactive data will be temporarily cached in the state of the left stream. After the watermark is aligned, check the state in the dimension table. data. For example, now that the watermarks of both the interactive stream and the live stream have reached 10:01, the 10:01 comment data of the interactive stream will be queried in the dimension table state and associated with room 103. When the comment data arrives at 10:05, it will not be output immediately, otherwise it will be associated with the room information above. It will wait until the watermarks of the left and right streams reach 10:05, and then associate the data in the dimension table state and output it. At this time, it can associate accurate 104 room information.

To sum up, Temporal Join Changelog is characterized by high real-time performance. Because it is associated with versions based on event time, it can be associated with accurate version information, and the dimension table will wait for watermark alignment, so that users can control late dimensions through watermark. table number. The dimension table data in the Temporal Join Changelog is stored in the state of the temporal join node, and the reading is very efficient, just like a local Redis, and users no longer need to maintain additional Redis components.

In data warehouse scenarios, Hive is widely used, and the integration of Flink and Hive is very friendly. Now it supports Temporal Join Hive partitioned tables and non-partitioned tables. Let's take a typical case of associating Hive partition tables: order flow is associated with store data. Store data generally changes slowly, so the business side generally synchronizes the full store table to the Hive partition on a daily basis, and a new partition is generated every day, and each partition contains the full store data of the day.

In order to associate this kind of Hive data, we only need to specify the parameters in the two red circles on the right when creating the Hive partition table, and then the function of automatically associating the latest partition of Hive can be realized. partition.include = latestb means that only the latest partition of Hive is read , partition-name indicates sorting alphabetically by partition name when selecting the newest partition. By October 3rd, a new partition on October 2nd has been generated in Hive. After Flink monitors the new partition, it will reload the data on October 2nd into the cache and replace the data on October 1st. as the latest dimension table. Subsequent order flow data will be associated with data from the October 2 partition of the cache. The feature of Temporal join Hive is that it can automatically associate the latest Hive partitions, which is suitable for business scenarios with slow update of dimension tables and high throughput.

Summarize the several joins we just introduced that are used in data widening:

The effectiveness of Regular Join is very high, and the throughput is average, because the state will retain all the arriving data, which is suitable for dual-stream association scenarios;

The timeliness of Interval Jon is very good, and the throughput is better, because the state only retains the data within the time interval, which is suitable for dual-stream association scenarios with business time intervals;

Temporal Join Lookup DB has better timeliness and poor throughput, because each piece of data needs to be queried to an external system, and there will be IO overhead, which is suitable for scenarios where dimension tables are in the database;

Temporal Join Changelog has good timeliness and good throughput, because it has no IO overhead, and is suitable for scenarios that require dimension tables to wait, or associate accurate versions;

The timeliness of Temporal Join Hive is average, but the throughput is very good, because the data of the dimension table is stored in the cache, which is suitable for the scene of slow updating of the dimension table and the scene of high throughput.


Finally, let's summarize Flink's capabilities in ETL data integration. This is the current capability matrix of Flink data integration. We divide the existing external storage systems into five types: relational database, KV database, message queue, data lake, and data warehouse. It can be seen from the figure that Flink has a very rich ecology, and has very strong integration capabilities for each storage engine.

Horizontally, we have defined 6 capabilities, namely 3 data access capabilities:

full read
streaming read
CDC streaming read
A data widening capability:

dimension association;
And two storage/lake capabilities:

stream write

CDC write

It can be seen that Flink's data access capabilities, dimension widening capabilities, and storage/lake storage capabilities for each system are already very complete. For CDC stream reading, Flink already supports mainstream databases and Kafka message queues. In the direction of data lakes, Flink's streaming read and CDC write functions for Iceberg will also be released in the next Iceberg version. From this capability matrix, we can see that Flink's data integration capabilities are very comprehensive.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us