Best practices of EMR-StarRocks and Flink in real-time writing scenarios in sink volume

Introduction to EMR-StarRocks

Alibaba Cloud EMR launched the StarRocks service at the beginning of the year. StarRocks is a new generation of extremely fast full-scene MPP (Massively Parallel Processing) data warehouse, dedicated to building a fast and unified analysis experience. EMR StarRocks has the following characteristics:

• Compatible with MySQL protocol, and can use MySQL client and common BI tools to dock StarRocks to analyze data

• Distributed architecture:

• Divide data tables horizontally and store them in multiple copies

• The cluster size can be flexibly scaled to support 10 PB of data analysis

• Support MPP framework and accelerate parallel computing

• Supports multiple replicas with elastic fault tolerance

• Support vectorization engine and CBO

• Support elastic expansion and contraction

• Support detail model, aggregation model, primary key model and update model

For more details, please refer to https://help.aliyun.com/document_detail/405463.html

Introduction to Flink-CDC concept

The full name of CDC is Change Data Capture, and the scenarios it faces include data synchronization, data distribution, and data collection. Flink CDC mainly faces database changes, and can synchronize changes in upstream data and schema to downstream data lakes and data warehouses. In July 2020, the Flink CDC project submitted its first Commit. Last August, the Flink community released CDC2.0. After two years of polishing, it has become very mature in commercial use. This article mainly takes MySQL CDC as an example to introduce the pain points encountered by users in the real-time warehousing of StarRocks+Flink CDC, as well as the corresponding optimization and solutions at the level of Flink and StarRocks.

Use CDC to import data from a MySQL table into the StarRocks table. First, you need to create a target table on StarRocks to accept MySQL data, and then create a mapping of MySQL table and StarRocks table on Flink to Sink and Source table, and then execute an insert into sink_ table from source_ Table statement. After performing Insert into, a CDC task will be generated. The CDC task first synchronizes the full data of the source table to the target table, and then continues to synchronize the incremental data based on Binlog. It is very friendly for users to complete the full+incremental synchronization of data through one task. However, some pain points were still found in the process of use.

User pain points of real-time writing scenarios

Large workload of SQL development

For some new businesses that have not completed the data warehouse construction, or users who have just started to rely on StarRocks to build the OLAP platform, creating tables in StarRocks to carry the data synchronized by MySQL is the first step. In some complex businesses, there are often dozens or hundreds of tables in MySQL, and each table has tens of fields. It is a huge workload to write all the table creation statements of their corresponding StarRocks table. The first pain point is that StarRocks needs a lot of work to build tables.

The data type mapping relationship of Flink field is complex and error-prone

Creating a table in StarRocks is the first step. After the table is created, in order to start the CDC task, the source table corresponding to MySQL and the Sink table corresponding to StarRocks need to be created in Flink. When Flink creates a table, the mapping relationship between the field type of each field and MySQL and StarRocks needs to be strictly observed. For dozens or hundreds of tables that need fields, each field needs to find the type mapping relationship corresponding to Flink, Especially painful for developers. Therefore, the second pain point is that the data type mapping relationship between upstream and downstream tables and Flink fields is complex and prone to errors.

Schema change operation is cumbersome

The third pain point comes from the change of business data schema. According to Fivetran, about 60% of company data schemas change every month, and 30% of company data schemas change every week. For the addition, deletion and modification of fields in the MySQL table, users want to synchronize the schema changes to the downstream StarRocks without affecting the CDC task. The current common solution is to change the schema of StarRocks and MySQL, change the Sink and Source table structures on the Flink side, and restart the task by specifying savepoints after manually stopping the task. The operation of schema change is cumbersome, and the third pain point is that it cannot be automated.

The data synchronization task takes up too many resources

The fourth pain point is that in the scenario of large number of tables and large amount of real-time incremental data, CDC tasks occupy a high amount of memory and cpu resources. In order to save costs, users want to optimize resource utilization as much as possible.

Next, let's look at the optimization of EMR-StarRocks in terms of deep integration with Flink and the solutions provided for these pain points.

CTAS&CDAS

The CTAS&CDAS function launched by EMR-StarRocks and Flink team is mainly a solution developed for the first three pain points. Through CTAS&CDAS, you can use a SQL statement to complete the StarRocks table creation, Flink-CDC task creation, real-time synchronization of Schema changes and other tasks that originally require multiple complex operations, which greatly reduces the workload of development and operation and maintenance.

Introduction to CTAS

The full name of CTAS is create table as, and the syntax structure is as follows.

From the syntax structure of CTAS, we can see that in addition to cluster information and DataBase information, there is a special configuration "starrocks. create. table. properties". This is because MySQL and StarRocks have some different table structures, such as key type, partition, bucket number and other special configurations, so it is used to undertake the content behind the field definition in the StarRocks table creation statement.

In order to facilitate users to create tables faster, a Simple Mode is also set. The configuration method is as follows.

After Simple Mode is enabled, the primary key model will be used by default, the primary key in MySQL will be used as the primary key by default, and the hash (primary key) will be used for bucket splitting by default. In this way, when users start Simple Mode to use CTAS statements on tables, they do not need to care about which fields, field names, and primary keys are in the original table in MySQL. They only need to know the table name to complete SQL writing efficiently.

Principle of CTAS

As shown in the figure, after executing the CTAS statement, first Flink will automatically create a target table in StarRocks that is the same as the schema of the MySQL source table, then establish the Sink and Source mapping of the MySQL and StarRocks tables in Flink, and then start a CDC task, which will synchronize the source table data to the target table, and monitor the schema changes of the data sent from the MySQL source table at runtime, Automatically synchronize schema changes to the StarRocks target table. The CTAS function actually uses one SQL to complete many operations that need to be manually written and executed.

Next, the implementation principle of CTAS is introduced. The implementation of CTAS mainly depends on Flink CDC, Flink Catalog and Schema Evolution. The CDC function of Flink has been described earlier. The Catalog function enables Flink to sense all DataBases and all table schemas in StarRocks and perform DDL operations on them. The Schema Evolution function is realized by detecting and recording the schema changes of data. For example, when MySQL adds a column, the CTAS task does not immediately add a column to the downstream StarRocks according to the DDL changes of MySQL. However, when the first data using the new schema is processed, the corresponding Alter Table Add Column statement will be generated by comparing the difference between the new and old data schemas, Add columns to StarRocks. The new data will not be pushed to the downstream until the schema change of StarRocks is completed.

CDAS Introduction

CDAS is a grammatical sugar of CTAS. Through the CDAS statement, you can synchronize the entire database in MySQL, that is, generate a Flink job. The source is the database in MySQL, and the target table is the corresponding multiple tables in StarRocks.

Because we expect to use a single SQL to generate the Schema and CDC tasks of multiple tables, we need to use the Simple mode uniformly. In the actual use process, some tables in a DataBase may not need to be synchronized, and some tables need to be customized. Therefore, we can use the Including Table syntax to select only some tables in a DataBase for CDAS operation. For tables that need to be customized attribute configuration, we can use the CTAS statement to operate.

Important characteristics

Several important features of CTAS&CDAS include:

• Support the execution of multiple CDC tasks using the same job, saving a lot of memory and CPU resources.

• Support source consolidation. When using CDAS for data synchronization, a job will be used to manage the synchronization tasks of all tables, and automatically merge the sources of all tables into one, reducing the pressure of concurrent reading on the MySQL side.

• Supported Schema Change types include adding columns, deleting columns, and modifying column names. It should be noted here that the currently supported delete column operation is realized by setting the value of the corresponding field to null. For example, if a field is deleted from the upstream MySQL table, the corresponding column in StarRocks will not be deleted after Flink detects a change in the data schema, but the value of the corresponding field will be filled to null when writing data to StarRocks. The operation of modifying the column name is also realized by adding a new column and leaving the value of the original column in the new data blank.

Connector-V2 Introduction

Connector-V2 is developed to solve the fourth pain point. It can help users reduce the memory consumption when importing StarRocks through Flink and improve the stability of tasks.

As shown in the figure, in the V1 version, in order to ensure Exactly Once, we need to hold all the data during a Checkpoint in the memory of Flink's Sink operator. Because the Checkpoint time cannot be set too short and the data flow per unit time cannot be predicted, it not only causes serious consumption of memory resources, but also often causes stability problems due to OOM.

The V2 version solves this problem through the two-phase submission feature. The two-phase submission means that the submission of data is divided into two stages. The first stage is to submit the data writing task. In the data writing stage, the data is invisible and can be written multiple times in batches. The second stage is the submission stage. The data written in multiple batches is made visible at the same time through the Commit request. The StarRocks side provides interfaces such as Begin, Prepare, Commit, and supports the submission of multiple data write requests as the same transaction, ensuring the consistency of data within the same transaction.

The way of calling the Transaction interface displayed can be improved from the original way of accumulating a large amount of data on the Flink side and sending data at one time to the way of continuously submitting data in small batches. While ensuring Exactly Once, it greatly reduces the internal memory consumption of the Flink side for storing the data buffer, and also improves the stability of the Flink task.

Practice of StarRocks+Flink in sink volume

The CDAS feature is used to complete the real-time change of data from MySQL to Flink in the advertising and analysis business of the volume.

Previously, this business mainly relied on a closed-source data warehouse for OLAP analysis. With the growth of data volume, there were large bottlenecks in the single-table query and multi-table join scenarios, and the query time reached an intolerable level of minutes. Therefore, StarRocks was selected for data analysis, which performed very well in the corresponding scenarios.

In the business scenario of aggregation, dozens of small tables involving operational metadata in StarRocks are synchronized in real time using CDAS, and several other details with large amount of data are updated daily in the form of offline import. CDAS is mainly used for small tables and dimension tables with frequent data updates and schema changes. When performing business queries, these real-time updated tables are joined with offline data tables. Through real-time updates of small tables, offline updates of large tables, and joint queries of large and small tables, the trade-off between real-time, cost, and import and query performance is achieved. Because the business requires high accuracy of data, it uses Exactly-once semantics to ensure that data is not lost or heavy through Flink's Checkpoint mechanism.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us