Alibaba Cloud real-time computing Flink x Hologres

With the rapid development of big data, enterprises pay more and more attention to the value of data, which means that data needs to reach the enterprise analysis and decision makers as soon as possible to maximize the value of data. The most common practice of enterprises is to meet the rapid exploration of data by building real-time data warehouses. In the process of business construction, the real-time data warehouse needs to support a series of requirements, such as real-time data writing and updating, quick business response, data self-service analysis, convenient operation and maintenance, and cloud native elastic expansion, which rely on a powerful real-time data warehouse solution. Alibaba Cloud Real Time Computing Flink Edition (hereinafter referred to as "Alibaba Cloud Flink") provides full incremental integrated data synchronization technology, powerful streaming ETL and other capabilities, and supports the real-time warehousing of massive data into the lake. Alibaba Cloud Hologres, as a new generation real-time database engine, can simultaneously solve multiple business query scenarios such as OLAP multidimensional analysis, online services, and offline data acceleration. Through the strong combination of Alibaba Cloud Flink and Hologres, Alibaba Cloud Hologres can realize real-time data exploration and data analysis sensitivity across the link, quickly help businesses build an enterprise level one-stop real-time database, and achieve more efficient and intelligent business decisions.

In this article, we will introduce the core capabilities of Alibaba Cloud Flink and Alibaba Cloud Hologres in building a real-time digital warehouse, as well as the best solution combining the two. Users can significantly reduce the threshold of digital warehouse construction through Alibaba Cloud Flink+Hologres real-time digital warehouse solution, let data play a greater role, and help all walks of life achieve digital upgrading.

1、 Flink CDC Core Competencies

Apache Flink is an open source big data streaming computing engine, which supports processing databases, Binlogs, online logs and other real-time data, provides end-to-end sub second real-time data analysis capabilities, and reduces the threshold for real-time business development through standard SQL. With the development and deepening of the real-time wave, Flink has gradually evolved into the leading role and factual standard of stream processing, and has been the most active project in the Apache community.

Flink CDC is a data integration framework opened by Alibaba Cloud Computing Platform Business Unit in July 2020. It is deeply integrated with the Flink ecosystem, and has technical advantages such as full incremental integration, lockless reading, concurrent reading, and distributed architecture. It can not only replace traditional DataX and Canal tools for data synchronization, but also support real-time warehousing of database data. At the same time, it also has strong data processing capabilities.

In the process of building a real-time data warehouse, data collection is a necessary component. In the traditional ETL architecture, foreign users of the acquisition layer usually choose Debezium, while domestic users are accustomed to using DataX and Canal. The acquisition tool is responsible for collecting the full and incremental data of the database. The collected data will be output to the message middleware such as Kafka, and then the Flink computing engine will consume the message middleware data in real time for data cleaning and data processing on the computing layer. After processing, it will be written to the destination (loading layer), usually various databases, data lakes and data warehouses. In traditional ETL links, data collection tools and message queues are heavy components that may be maintained in different teams. When the upstream data source has business changes or these components need to be upgraded and maintained, the maintenance cost of the entire link will be very high.

By using Flink CDC to replace the data collection components and message queues in the above figure, the collection layer and the transformation layer are combined, simplifying the entire ETL analysis link. Users can use fewer components to complete the construction of the data link. The overall architecture brings lower operation and maintenance costs, less hardware costs, better data link stability, and reduced end-to-end data latency. In addition to the improvement of stability, another advantage of Flink CDC is that users only need to write SQL scripts to complete the cleaning, processing and synchronization of CDC data, greatly reducing the threshold for users to use.

In addition to the full incremental integration synchronization capability, Alibaba Cloud Flink CDC also provides many enterprise level features such as automatic synchronization of table structure changes, whole database synchronization, and consolidated synchronization of sub databases and sub tables to facilitate users to quickly get through data islands and realize business value.

1.1 Full Incremental Integration Synchronization

Flink CDC takes the lead in supporting four important features in the field of open source data integration: non lock read, parallel read, breakpoint resume, and no loss or duplication through the incremental snapshot read algorithm. Among them, the non lock read completely solves the deadlock risk of data synchronization on the upstream business database, and the parallel read well meets the requirements of massive data synchronization. The characteristics of breakpoint continuous transmission and no loss and no duplication improve the stability and reliability of the synchronization link.

The core idea of the incremental snapshot reading algorithm is to divide the table into chunks for concurrent reading in the full volume reading phase. After entering the incremental phase, only one task is required to read Binlog logs in a single concurrent way. When the full volume and incremental are automatically switched, the consistency is guaranteed through a non lock algorithm. This design not only improves the reading efficiency, but also further saves resources and realizes the data synchronization of full incremental integration. With the automatic resource tuning feature provided by Alibaba Cloud real-time computing products, Flink CDC resources can be automatically expanded and shrunk without manual intervention.

1.2 Automatic Synchronization of Table Structure Changes

With the iteration and development of the business, the table structure of the data source changes frequently. Users need to modify data synchronization jobs in a timely manner to adapt to the latest table structure, which brings large operation and maintenance costs, and also affects the stability of the synchronization pipeline and the timeliness of data. Alibaba Cloud Flink supports the automatic discovery and management of metadata through the Catalog. With the CTAS (Create Table AS) syntax, users can synchronize data and automatically synchronize Table structure changes through a row of SQL.

1.3 Whole database synchronization

In the construction of real-time data warehouse, users often need to synchronize the entire database to the database for further analysis. The method of synchronizing one table to one job not only wastes resources, but also brings greater pressure to the upstream database. Alibaba Cloud Flink CDC provides the whole database synchronization feature for such users. The whole database synchronization function is implemented through CDAS (Create Database AS) syntax in conjunction with Catalog.

For example, MySQL Catalog and Hologres Catalog can complete full incremental data synchronization from MySQL to Hologres with CDAS syntax. The CDAS statement will be parsed into a Flink job for execution. This Flink job automatically parses the table structure and corresponding parameters of the source table, and synchronizes one or more specified databases to the downstream Hologres database. The user does not need to write DDL statements or create tables in advance in Hologres during the whole process, so that the whole database of data can be synchronized quickly.

By default, CDAS jobs provide the ability to synchronize table structure changes. All table structure changes will be synchronized to the downstream Hologres real-time database in the order of occurrence. CDAS syntax also supports filtering tables that do not need synchronization.

1.4 Consolidation and Synchronization of Sub databases and Sub tables

Database and table splitting is a classic database design adopted by highly concurrent business systems. Generally, we need to aggregate the business data of database and table splitting into a large table in a data warehouse to facilitate subsequent data analysis, that is, the scenario of merging and synchronizing database and table splitting. For this scenario, Alibaba Cloud Flink CDC provides the feature of merging and synchronizing databases and tables. By supporting regular expressions of source databases and tables in the CTAS syntax, the tables of the source database can be merged and synchronized to the downstream Hologres database efficiently.

The source database name order in the above CTAS statement_ db. It is a regular expression that can match the order of the current MySQL instance_ db01,order_ Db02 and order_ Db03 three libraries, source table name order_ It is also a regular expression, which can match all orders in the three libraries_ A leading watch.

For the synchronization scenario of sub databases and sub tables, users only need to provide regular expressions of sub databases and sub tables to merge and synchronize these multiple sub databases and sub tables into the order table of the downstream Hologres database. Like other CDAS statements, the database/table synchronization scenario provides automatic synchronization of table structure changes by default. The schema of the downstream Hologres table is the widest schema after all the tables are merged. During the synchronization of databases and tables, the database name and table name of each row of records will be automatically written to the user table as two additional fields. The database name (db column in the figure above), table name (tbl column in the figure above), and the original primary key (id column in the figure above) will together serve as the joint primary key of the downstream Hologres user table to ensure the uniqueness of the primary key on the Hologres user table.

2、 Hologres Core Competencies

Alibaba Cloud Hologres is a self developed one-stop real-time data warehouse engine, which supports real-time writing, real-time updating and real-time analysis of massive data, supports standard SQL (compatible with PostgreSQL protocol), provides petabyte level data multidimensional analysis (OLAP) and ad hoc analysis, as well as high concurrency and low latency online data services (Serving), and is deeply integrated with Alibaba Cloud Flink, MaxCompute, DataWorks, etc, It provides enterprises with an off line integrated whole stack digital warehouse solution.

2.1 High performance real-time write and update

The timeliness of data writing is one of the important capabilities of real-time data warehouse. For business queries that are not sensitive to latency, such as BI, it may be acceptable if the write latency is several seconds or even minutes. For many production systems, such as real-time risk control, real-time large screen and other scenarios, data must be visible when written. If the writing is delayed, the latest data will not be queried, seriously affecting online business decisions. In the whole data processing link of the real-time data warehouse, Hologres, as a one-stop real-time data warehouse engine, provides high-performance real-time writing of massive data, which can be queried without delay.

At the same time, in the digital warehouse scenario, the data source is complex, which involves a lot of data update and correction scenarios. Hologres can provide high-performance Upsert capabilities through the primary key (PK). The entire write and update process ensures that the data is exactly Once, meeting the requirements for data consolidation and update.

The following figure shows the test results of 10 concurrent, real-time, 20 column storage tables under the Hologres 128C instance. The vertical axis represents the number of records written per second, and the horizontal axis represents four writing scenarios:

Append Only: The write table has no primary key, and the write capacity is 2.3 million+RPS.

INSERT: The write table has a primary key. If the primary key conflicts, new rows will be discarded. The write capacity is 2 million RPS.

UPDATE-1: There is a primary key for writing to the table. The original data in the table is 200 million. According to the primary key Upsert, the writing capacity is 800000 RPS.

UPDATE-2: There is a primary key for writing to the table. The data volume in the table is 2 billion. Upsert is based on the primary key, and the writing capacity is 700000 RPS.

2.2 Real time OLAP analysis

Hologres adopts scalable MPP full parallel computing, supports multiple storage modes such as row storage, column storage, row column coexistence, and supports multiple index types. Through distributed SQL processing and vectorized operators, CPU resources can be maximized to support subsecond analysis of massive data. Multiple real-time OLAP analysis scenarios, such as real-time multi-dimensional analysis and ad hoc analysis, can be supported without pre calculation, and then the upper layer applications/services can be directly and seamlessly connected to meet the WYSIWYG analysis experience.

The following figure shows the test results under the TPCH 100G standard data set under the Hologres 128C instance. The horizontal axis represents query and the vertical axis represents response time:

2.3 High performance online service

With the wide application of real-time digital warehouse, more and more enterprises use real-time digital warehouse as an online service system to provide online query. As the best implementation practice of HSAP (Hybrid Serving and Analytics Processing), Hologres not only has the ability to process analytical queries, but also has a very strong online service serving ability (high QPS spot check), such as KV spot check and vector search. In the KV spot check scenario, Holgres can support millions of QPS throughput and extremely low latency through the SQL interface. Hologres enables one system and one data to support both OLAP analysis and online services, simplifying the data architecture.

The following figure shows the spot check test performance of 25% CPU consumption in the Hologres 128C instance:

2.4 Read Write Separation High Availability

The real-time data warehouse Hologres provides high QPS and low latency writing capabilities, supports online service query scenarios, and supports complex multidimensional analysis OLAP queries. When different types and complex tasks request Hologres instances, Hologres needs to ensure not only the normal operation of tasks, but also the stability of the system. At present, Hologres supports a highly available architecture of one master and multiple slave sub instances through shared storage, realizing a complete read/write separation function, and ensuring SLA in different business scenarios.

Read/write separation: It realizes a complete read/write separation function to ensure SLA in different business scenarios. In high throughput data writing and complex ETL jobs, OLAP queries, AdHoc queries, online services and other scenarios, the system load is physically completely isolated, and the query task will not jitter due to the write task.

Multi type load resource isolation: A primary instance can be configured with four read-only instances. Instances can be configured with different specifications according to business conditions. The system loads are physically completely isolated to avoid mutual influence and jitter.

The millisecond asynchronous synchronization delay of data between instances: within P99 5ms.

2.5 Binlog subscription

Similar to the concept of Binlog in MySQL, Binlog is used to record changes to table data in the database, such as Insert/Delete/Update operations. In Hologres, the Binlog of a table is data in a strong schema format. The sequence number (BigInt) of the Binlog record increases monotonically in a single shard, similar to the Offset concept in Kafka. Using Alibaba Cloud Flink to consume Hologres Binlog, you can achieve full link real-time development between data warehouse tiers. On the premise of hierarchical governance, you can reduce end-to-end delay of data processing and improve the development efficiency of real-time data warehouse tiers.

3、 Alibaba Cloud Flink x Hologres one-stop enterprise real-time database solution

3.1 Real time digital warehouse ETL

ETL (Extract Transform Load) is a more traditional data warehouse construction method. After the Binlog data of the business library is processed by Alibaba Cloud Flink's ETL, the data is written to the real-time data warehouse Hologres, and then various data queries and analyses are performed. The core of ETL's method is to have a perfect data warehouse model hierarchy in the data warehouse, usually according to ODS (Operational Data Source)>DWD (Data Warehouse Detail)>DWS (Data Warehouse Summary)>ADS (Application Data Service). The whole data warehouse link is relatively perfect.

In this link, you need to synchronize the Binlog data of data sources such as MySQL to the message queue Kafka through Alibaba Cloud Flink CDC, and then filter, clean, and logically convert the ODS data through Alibaba Cloud Flink to form a DWD data hierarchy for different business theme models. At the same time, you need to send the data to the Kafka cluster, and then use Alibaba Cloud Flink to slightly summarize the DWD data, Form DWS mild summary layer data that is more convenient for business query, and then write the data to Kafka cluster. Finally, based on the requirements of the specific application layer of the business, the ADS data application layer is formed through Alibaba Cloud Flink real-time processing on the basis of the DWS layer. It is written into the real-time data warehouse Hologres for storage and analysis, and supports various types of reports, portraits and other business scenarios of the business.

The advantages of real-time data warehouse ETL processing are that the data warehouse has complete levels and clear responsibilities, but the disadvantages are that Flink combined with Kafka cluster maintenance is complex, the processing link is long, the historical data correction is complex, and the data real-time of the ADS application layer is weak. Secondly, the data in each Kafka is not easy to query, check the data quality, and realize the dynamic change of the schema.

3.2 Real time data warehouse ELT

With the increasing timeliness of data required by services, compared with the complex and complex processing links of ETL, services need to store data more quickly in real time, so ELT has become a more popular processing method. ELT is the abbreviation of Extract Load Transform. We can understand ELT as a process of data migration and integration. In this process, we can collect logs and other data from relational databases such as MySQL, PostgresSQL and business databases such as HBase, Cassandra, message queues such as Datahub and Kafka buried points of the data source such as Binlog, which are extracted in real time by Alibaba Cloud Flink, and then loaded into Hologres for related OLAP analysis and online services.

In this link, Alibaba Cloud Flink is responsible for the real-time warehousing of data and the cleaning association of data. The cleaned data is written to Hologres in real time, and Hologres directly stores detailed data. In Hologres, layers can be simplified, with details as the main layer, and other summary layers as required. Through Hologres' powerful data processing capability, it can directly interface with reports, applications and other upper layer query services. The analysis SQL of the upper layer cannot be solidified. Usually, the SQL logic is encapsulated in the ADS layer with a logical view. The upper layer uses the encapsulated view to directly query, so as to realize ad hoc queries.

The construction of the real-time data warehouse in the ELT mode will bring great benefits to the data and business. Details are as follows:

Flexibility: The original business data is directly entered into the warehouse to form the ODS layer data. The data can be flexibly transformed in the data warehouse through View, which can be adjusted at any time according to the business.

Low cost: the data warehouse has a clear architecture, short links, and low operation and maintenance costs.

Simple indicator correction: the upper layer is encapsulated by View logic, so you only need to update the data in the bottom table, without correcting the data layer by layer.

However, this scheme also has some disadvantages. When the logic of View is complex and the data volume is large, the query performance is low. Therefore, it is more suitable for scenarios where the data comes from databases and buried point systems, the requirements for QPS are not high, the requirements for flexibility are high, and the computing resources are sufficient.

3.3 Real time data warehouse layering (Streaming Warehouse scheme)

According to the traditional data warehouse development methodology, the ODS>DWD>DWS>ADS development method is adopted to support stateful full link event real-time driving between layers through the combination of Alibaba Cloud Flink and Hologres Binlog. In this scheme, the data is stored in Hologres in real time through Alibaba Cloud Flink CDC, and then subscribed to Hologres Binlog through Alibaba Cloud Flink to realize continuous processing of data between different levels, and finally written into Hologres docking application query.

With this solution, Hologres can achieve the same capabilities as message queues such as Kafka and Datahub, and increase the ability to reuse data. Data from one Table can not only be provided for downstream Alibaba Cloud Flink task consumption, but also can interface with upstream OLAP/online service queries. This not only saves costs, but also simplifies the digital warehouse architecture, and allows each level in the digital warehouse to be built and queried in real time, Improve data transfer efficiency.

3.4 Streaming batch integrated warehouse

In the real-time data warehouse, flow computing tasks and batch processing tasks are developed in two workflows, namely Kappa architecture mode. In this digital warehouse architecture, there are some problems such as high labor cost, redundant data links, inconsistent data caliber, and low development efficiency.

To solve these problems, Alibaba Cloud Flink+Hologres provides streaming and batching capabilities. In this scenario, the input layer is unified into Hologres, and a set of business logic codes are used to achieve the ability of streaming and batch processing. The Stream task of Flink SQL consumes Hologres Binlog to provide streaming processing. The Batch task of Flink SQL reads the original data of Hologres tables to achieve the ability of Batch processing. After the unified calculation and processing of Flink, it is uniformly written and stored in Hologres.

Alibaba Cloud Flink combines Hologres' streaming and batching technology to unify the data input layer, real-time offline computing layer and data analysis storage layer, greatly improving the efficiency of data development and ensuring the quality of data.

4、 Typical application scenarios

Alibaba Cloud Flink and Hologres are deeply integrated to help enterprises quickly build one-stop real-time databases:

It can be written to Hologres in real time through Alibaba Cloud Flink. High performance writing and updating can make data writing visible without delay, which can meet the requirements of high performance and low delay writing of real-time data warehouse;

Hologres source table data can be read through Alibaba Cloud Flink's full volume reading, Binlog reading, CDC reading, full incremental integration and other methods. No additional components are required, unified computing and storage are required, and data flow efficiency is accelerated;

Hologres dimension tables can be read through Alibaba Cloud Flink to help with high-performance dimension table correlation, data widening and other application scenarios;

Alibaba Cloud Flink connects with Hologres metadata. Through Hologres Catalog, metadata can be automatically discovered, greatly improving the efficiency and correctness of job development.

Through Alibaba Cloud Flink and Hologres' standard real-time digital warehouse solutions, it can support a variety of real-time digital warehouse application scenarios, such as real-time recommendation, real-time risk control, etc., to meet the real-time analysis needs of enterprises. Next, we will introduce the typical application scenarios of Alibaba Cloud Flink+Hologres to help businesses build real-time data warehouses more efficiently.

4.1 Real time warehousing of massive data

The first step in building a real-time data warehouse is the real-time warehousing of massive data. Based on Alibaba Cloud Flink CDC, massive data can be synchronized to the real-time data warehouse simply and efficiently, and incremental data and table structure changes can be synchronized to the data warehouse in real time. In the whole process, you only need to define a CREATE DATABASE AS DATABASE SQL on Alibaba Cloud Flink (for detailed steps, refer to the Quick Start of Real time Warehousing). According to the test, the TPC-DS 1T dataset in MySQL can be synchronized to Hologres in 5 hours using Alibaba Cloud Flink 64 concurrency, with about 300000 TPSs per second. In the incremental Binlog synchronization phase, Alibaba Cloud Flink is used for single concurrency, and the synchronization performance reaches 100000 entries/second.

4.2 Double flow Join

After the data in the ODS layer is formed by the real-time warehousing of data, it is usually necessary to use Flink's multi stream join capability to flatten the fact data and dimension data into wide tables in real time. Combined with the excellent multidimensional analysis performance of Hologres wide tables, it helps speed up the upper layer business queries. Alibaba Cloud Flink supports reading Hologres tables in a full incremental integration mode, that is, first read the full amount of data and then smoothly switch to read CDC data. The whole process ensures that data is not duplicated or lost. Therefore, Alibaba Cloud Flink can easily process and broaden the ODS layer data of Hologres in real time, and complete the wide table model construction of the DWD layer.

4.3 Wide Table Merge

In the data warehouse, we usually need to focus on modeling. Data models are generally divided into four types: wide table model, star model, snowflake model, and constellation model (Hologres supports them). Here we focus on the construction of wide table model. A wide table model usually refers to a model table that associates indicators, dimension tables, and attributes related to business entities. It can also generally refer to a wide table formed by associating multiple fact tables with multiple dimension tables.

The common way to build wide tables is to use Alibaba Cloud Flink's dual stream join, including Regular Join, Interval Join, and Temporary Join. For the scenario of primary key association (that is, the Join condition is the primary key of two streams respectively), we can sink the work of Join to Hologres, and use the local update function of Hologres to achieve wide table Merge, thus saving the cost of Flink Join status maintenance. For example, in the advertising scenario, a Flink task processes the advertising exposure data stream, counts the exposure of each product, and updates it to the product indicator width table with the product ID as the primary key. At the same time, another Flink task processes the ad click data flow, counts the clicks of each product, and updates it to the product indicator wide table with the product ID as the primary key. The whole process does not require a dual stream join, and Hologres will eventually assemble the entire row of data by itself. Based on the wide table of product indicators obtained, users can easily analyze advertising marketing in Hologres, such as calculating the CTR=clicks/exposures of the product. The following figure and code example show how to change from double stream join to wide table merge.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us