Technical Analysis | Doris Connector and Flink CDC Achieve Exactly Only Access to MySQL Sub databases and Tables

This document will demonstrate how to use Apache Doris Flink Connector in combination with the two-phase submission of Flink CDC and Doris Stream Load to achieve real-time and efficient access to MySQL databases and tables, and achieve Exactly Once.

1、 Overview

In the actual business system, in order to solve various problems caused by the large amount of data in a single table, we usually split the database and table by database to improve the system throughput.

But this brings trouble to the later data analysis. At this time, we usually try to synchronize the database and table of the business database to the data warehouse, and combine the data of these databases and tables into one database and one table for our later data analysis.

In this document, we will demonstrate how to achieve real-time and efficient access of MySQL database sub databases and sub tables to the Doris data warehouse for analysis based on Flink CDC combined with Apache Doris Flink Connector and Doris Stream Load's two-phase submission.

1.1 What is CDC

CDC is the abbreviation of Change Data Capture.

The core idea is to monitor and capture database changes (including inserting INSERT, updating UPDATE, deleting DELETE, etc. of data or data tables), record these changes completely in the order of occurrence, and write them into the message middleware for subscription and consumption by other services.

CDC technology application scenarios are also very extensive, including:

Data distribution: distribute one data source to multiple downstream, often used for business decoupling and microservice.

Data integration: integrate distributed and heterogeneous data sources into the data warehouse to eliminate data islands and facilitate subsequent analysis.

Data migration: commonly used for database backup, disaster recovery, etc.

1.2 Why Flink CDC

Flink CDC, based on the Change Data Capture technology of database logs, has realized the integrated reading capability of full volume and increment. With Flink's excellent pipeline capability and rich upstream and downstream ecology, it supports the capture of changes in multiple databases, and synchronizes these changes to downstream storage in real time.

At present, the upstream of Flink CDC has supported MySQL, MariaDB, PG, Oracle, MongoDB, Oceanbase, TiDB, SQLServer and other databases.

The downstream of Flink CDC is more abundant. It supports writing to Kafka, Pulsar message queues, Hudi, Iceberg, Doris, etc., and various data warehouses and data lakes.

At the same time, the processing of CDC data can be very simple through the Changelog mechanism natively supported by Flink SQL. Users can clean, broaden, and aggregate the full and incremental database data through SQL, which greatly reduces the user threshold. In addition, the Flink DataStream API supports users to write code to implement custom logic, providing users with freedom to customize business in depth.

The core of Flink CDC technology is to support real-time consistency synchronization and processing of full data and incremental data in tables, so that users can easily obtain real-time consistency snapshots of each table. For example, a table contains historical full business data, and incremental business data is constantly being written and updated. Flink CDC will capture incremental update records in real time and provide snapshots consistent with the database in real time. If it is an update record, it will update the existing data; If the record is inserted, it will be added to the existing data. During the whole process, Flink CDC provides consistency assurance, that is, no duplication or loss.

Flink CDC has the following advantages:

Flink operators and SQL modules are more mature and easy to use;

Flink jobs can easily expand processing capacity by adjusting the parallelism of operators;

Flink supports advanced state backends, allowing access to massive state data;

Flink provides more ecological support such as Source and Sink;

Flink has a larger user base and an active support community, making it easier to solve problems.

In addition, Flink Table/SQL module regards database tables and change record streams (such as the data stream of CDC) as two sides of the same thing. Therefore, the internally provided Upsert message structure (+I for adding, - U for the value before the record is updated,+U for the value after the record is updated, and - D for deleting) can correspond one-to-one with the change records generated by Debezium, etc.

1.3 What is Apache Doris

Apache Doris is a modern MPP analytical database product. Only sub second response time is required to obtain query results, effectively supporting real-time data analysis. The distributed architecture of Apache Doris is very simple, easy to operate and maintain, and can support over 10PB of super large data sets.

Apache Doris can meet a variety of data analysis needs, such as fixed historical reports, real-time data analysis, interactive data analysis and exploratory data analysis. It can make data analysis more simple and efficient!

1.4 Two-phase commit

What is Two phase commit (2PC)

In a distributed system, in order to enable each node to perceive the transaction execution status of other nodes, it is necessary to introduce a central node to uniformly handle the execution logic of all nodes. This central node is called the coordinator, and other business nodes scheduled by the central node are called participants.

2PC divides distributed transactions into two phases, namely, submit request (voting) and submit (execution). The coordinator decides whether to actually execute the transaction according to the response of the participants. The specific process is as follows:

Submit Request (Vote) Phase

The coordinator sends the prepare request and transaction content to all participants, asks whether the transaction can be prepared for submission, and waits for the response of the participants.

The participant executes the operations contained in the transaction and records undo logs (for rollback) and redo logs (for replay), but does not actually commit.

The participant returns the execution result of the transaction operation to the coordinator. If the operation is successful, yes is returned. Otherwise, no is returned.

Submission (execution) stage

It can be divided into success and failure.

If all participants return yes, the transaction can be committed:

The coordinator sends a Commit request to all participants.

After receiving the Commit request, the participant will actually commit the transaction, release the occupied transaction resources, and return Ack to the coordinator.

The coordinator receives the Ack message from all participants, and the transaction completes successfully.

If a participant returns no or does not return after timeout, the transaction is interrupted and needs to be rolled back:

The coordinator sends a Rollback request to all participants.

After receiving the Rollback request, the participant rolls back to the pre transaction status according to the undo log, releases the occupied transaction resources, and returns the ACK to the coordinator.

The coordinator receives the Ack message from all participants, and the transaction rollback is completed.

1.5 Flink 2PC

Flink, as a streaming processing engine, naturally provides the guarantee of exactly Once semantics. The end-to-end Exactly Once semantics are the result of the synergy of input, processing logic and output. Flink internally relies on the checkpoint mechanism and lightweight distributed snapshot algorithm ABS to ensure that it is Exactly Once. In order to implement the output logic exactly once, one of the following two restrictions should be imposed: idempotent write and transactional write.

Process in advance delivery stage

Whenever a checkpoint is required, the JobManager will insert a barrier in the data flow as the boundary of the checkpoint. The barrier is passed downstream along with the operator chain. Each operator that arrives will trigger the action of writing the state snapshot to the back end of the state. When the barrier reaches Kafka sink, it passes through KafkaProducer The flush () method brushes the message data, but it has not yet been submitted. Next, you still need to trigger the submission phase through checkpoints:

Submission stage process

The write will succeed only if all checkpoints are successfully completed. This is in line with the 2PC process described above, in which JobManager is the coordinator and each operator is the participant (but only Sink will perform the submission). Once a checkpoint fails, the notifyCheckpointComplete() method will not be executed. If the retry fails, the abort () method will eventually be called to rollback the transaction.

1.6 Doris Stream Load 2PC

1.6.1 Stream Load

Stream Load is a synchronous import method provided by Apache Doris. Users send requests via HTTP protocol to import local files or data streams into Doris. Stream Load executes the import synchronously and returns the import results. The user can directly judge whether the import is successful by the return body of the request.

Stream Load is mainly applicable to importing local files or data in data streams through programs.

usage method:

Users can operate through the Http Client or use the Curl command

curl --location-trusted -u user:passwd [-H ""...] -T data. file -H "label:label" -XPUT http://fe_host:http_port/api/ {db}/{table}/_ stream_ load

In order to prevent users from importing the same data repeatedly, the import task ID Label is used. It is strongly recommended that users use the same Label for the same batch of data. In this way, repeated requests for the same batch of data will only be accepted once, ensuring At Post Once.

1.6.2 Stream Load 2PC

The earliest Stream Load of Aapche Doris was not submitted in two phases. When importing data, the data was directly imported through the HTTP interface of the Stream Load. Only success and failure occurred.

This is not a problem under normal circumstances. In a distributed environment, the data on both sides may be inconsistent due to the failure of an import task. Especially in the Doris Flink Connector, the previous data import failure of the Doris Flink Connector requires the user's own control and exception handling. If the import fails, save the data to a designated place (such as Kafka), and then handle it manually.

If Flink Job suddenly fails due to other problems, some data will succeed and some data will fail. In addition, since there is no checkpoint for the failed data, restarting the job will not be able to consume the failed data again, resulting in inconsistent data on both ends.

In order to solve these problems and ensure data consistency at both ends, we have implemented Doris Stream Load 2PC. The principle is as follows:

The submission is divided into two stages

In the first stage, the data writing task is submitted. After the data is successfully written, the data status is invisible and the transaction status is PRECOMMITTED

After the data is successfully written, the user triggers the Commit operation to change the transaction status to VISIBLE. At this time, the data can be queried

If the user wants to use the transaction ID to trigger Abort on the transaction, this batch of data will be automatically deleted

1.6.3 Usage of Stream Load 2PC

In be Configure disable in conf_ stream_ load_ 2pc=false (restart takes effect)

And declare two in HEADER_ phase_ commit=true

1.7 Doris Flink Connector 2PC

We previously provided the Doris Flink Connector, which supports the reading of Doris table data, Upsert and Delete (Unique key model). However, there is a problem that the data on both ends may be inconsistent due to job failure or other exceptions.

In order to solve these problems, we upgraded the Doris Connector based on FLink 2PC and Doris Stream Load 2PC to ensure that both ends are exactly Once.

We will maintain read-write buffers in the memory. When starting, we will enable writing and submit asynchronously. During this period, we will continue to write data to BE through HTTP Chunked, and stop writing until the checkpoint. The advantage of this is to avoid the overhead caused by frequent HTTP submissions by users. After the checkpoint is completed, the next stage of writing will be started.

During this checkpoint, multiple task tasks may be writing data to a table at the same time. For these, we will correspond to a global label during this checkpoint. During the checkpoint, we will uniformly submit the transactions corresponding to this label that write data to make the data status visible.

If it fails, Flink will play back these data through Checkpoint when restarting.

This can ensure the consistency of data at both ends of Doris.

2、 System architecture

Let's take a complete example to see how to use the latest version of Doris Flink Connector (which supports two-phase submission) to integrate Flink CDC to achieve real-time collection and warehousing of MySQL databases and tables.

Here, Flink CDC is used to collect MySQL database and table data.

Then use Doris Flink Connector to complete data warehousing.

Finally, make use of Doris's highly concurrent and high-performance OLAP analysis and computing capabilities to provide external data services.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us