Practice and optimization of OLAP analysis and real-time data warehouse by BIGO using Flink

1. Business Background

BIGO is an overseas-oriented company mainly engaged in short video live broadcasting business. At present, the company's main business includes BigoLive (global live broadcast service), Likee (short video creation and sharing platform), and IMO (free communication tool). There are 400 million users in China. With the development of the business, the requirements for the processing capability of the data platform are getting higher and higher, and the problems faced by the platform are also becoming more and more prominent. Next, we will introduce the BIGO big data platform and the problems it faces. The data flow diagram of the BIGO big data platform is as follows:

The user's behavior log data on the APP and Web pages, as well as the Binlog data of the relational database will be synchronized to the message queue of the BIGO big data platform and the offline storage system, and then calculated through real-time, offline data analysis means to apply It is used in scenarios such as real-time recommendation, monitoring, and ad hoc query. However, there are the following problems:

*OLAP analysis platform entrances are not unified: Presto/Spark analysis task entrances coexist, users do not know which engine is suitable for their SQL queries to execute, blindly choose, and the experience is not good; in addition, users will submit the same query at the two entrances at the same time to improve Quickly obtain query results, resulting in waste of resources;
*The calculation delay of offline tasks is high, and the result output is too slow: a typical business such as ABTest often calculates the results until the afternoon;
*Each business party independently develops applications based on their own business scenarios, real-time task chimney-style development, lack of data layering, data blood relationship.

Faced with the above problems, the BIGO big data platform has built a OneSQL OLAP analysis platform and a real-time data warehouse.

*Through the OneSQL OLAP analysis platform, unify the OLAP query entry, reduce users' blind selection, and improve the resource utilization rate of the platform;
*Construct real-time data warehouse tasks through Flink, and perform data layering through Kafka/Pulsar;
* Migrate some tasks with slow offline calculations to Flink streaming computing tasks to speed up the output of calculation results;
* In addition, build a real-time computing platform Bigoflow to manage these real-time computing tasks and build blood relationships for real-time tasks.

2. Landing practice & characteristic improvement

2.1 OneSQL OLAP analysis platform practice and optimization
OneSQL OLAP analysis platform is an OLAP query analysis engine integrating Flink, Spark, and Presto. The OLAP query requests submitted by users are forwarded to clients of different execution engines through the OneSQL backend, and then the corresponding query requests are submitted to different clusters for execution. Its overall structure diagram is as follows:

The overall structure of the analysis platform is divided into entry layer, forwarding layer, execution layer, and resource management layer from top to bottom. In order to optimize user experience, reduce the probability of execution failure, and improve the resource utilization of each cluster, the OneSQL OLAP analysis platform implements the following functions:

*Unified query entry: entry layer, users submit queries through the unified Hue query page entry with Hive SQL syntax as the standard;
*Unified query syntax: integrates multiple query engines such as Flink, Spark, and Presto, and different query engines execute users' SQL query tasks by adapting Hive SQL syntax;
*Intelligent routing: In the process of selecting the execution engine, it will be based on the execution of historical SQL queries (whether the execution is successful on each engine, and the execution time), the busyness of each cluster, and whether each engine understands the SQL syntax. Compatible, to select the appropriate engine to submit the query;
*Retry on failure: OneSQL background will monitor the execution of the SQL task, if the SQL task fails during execution, it will select another engine to execute the retry submission task;

In this way, through the OneSQL OLAP analysis platform, the BIGO big data platform realizes the unification of OLAP analysis portals, reduces users' blind choices, and at the same time makes full use of the resources of each cluster to reduce resource idleness.

2.1.1 Flink OLAP analysis system construction

On the OneSQL analysis platform, Flink also serves as a part of the OLAP analysis engine. The Flink OLAP system is divided into two components: Flink SQL Gateway and Flink Session cluster; SQL Gateway serves as the entry point for SQL submission, query SQL is submitted to the Flink Session cluster through Gateway for execution, and at the same time obtain the progress of SQL execution query and return the query result to the client. The process of executing SQL query is as follows:

First, the SQL submitted by the user is judged in SQL Gateway: whether the result needs to be persisted and written to the Hive table. If so, a Hive table will be created through the HiveCatalog interface to persist the calculation results of the query task; Afterwards, the task executes SQL parsing on SQL Gateway, sets the parallelism of job operation, generates a Pipeline and submits it to the Session cluster for execution.

In order to ensure the stability of the entire Flink OLAP system and execute SQL queries efficiently, the following functional enhancements have been made in this system:

stability:

Based on zookeeper HA to ensure the reliability of the Flink Session cluster, SQL Gateway monitors Zookeeper nodes and perceives the Session cluster;
Control the amount of data that the query scans the Hive table, the number of partitions, and the amount of data returned to prevent the JobManager and TaskManager of the Session cluster from OOM;
performance:

The Flink Session cluster pre-allocates resources to reduce the time required to apply for resources after job submission;
Flink JobManager parses the split asynchronously, and executes the split while parsing the task, reducing the execution time of tasks blocked by parsing the split;
Control the scan partition and the maximum number of Splits during the job submission process to reduce the time required to set up parallel tasks;
Hive SQL is compatible with:
Improve Flink's compatibility with Hive SQL syntax. Currently, the compatibility with Hive SQL is roughly 80%;

Monitoring alarm:
Monitor the JobManager, TaskManager, and SQL Gateway memory, CPU usage, and task submission status of the Flink Session cluster. Once a problem occurs, it will be alerted and processed in time;

2.1.2 Achievements of the OneSQL OLAP analysis platform
The OneSQL OLAP analysis platform based on the above implementation has achieved the following benefits:

Unified query entry, reducing blind selection of users, user execution error rate decreased by 85.7%, and SQL execution success rate increased by 3%;
The SQL execution time is shortened by 10%, making full use of the resources of each cluster and reducing the time for queuing tasks;
As part of the OLAP analysis engine, Flink has increased the resource utilization of real-time computing clusters by 15%;
2.2 Real-time data warehouse construction and optimization
In order to improve the output efficiency of some business indicators on the BIGO big data platform and better manage Flink real-time tasks, the BIGO big data platform has built a real-time computing platform Bigoflow, and migrated some slow computing tasks to the real-time computing platform. Executed by means of Flink streaming computing, data layering is carried out through message queue Kafka/Pulsar, and a real-time data warehouse is built; on Bigoflow, the tasks of the real-time data warehouse are platform-based management, and a unified real-time task access portal is established. And based on the platform, manage the metadata of real-time tasks, and build the blood relationship of real-time tasks.

2.2.1 Construction plan
The BIGO big data platform is mainly based on Flink + ClickHouse to build a real-time data warehouse. The general plan is as follows:

According to the data layering method of the traditional data warehouse, the data is divided into four layers of data: ODS, DWD, DWS, and ADS:

ODS layer: based on user behavior logs, business logs, etc. as raw data, stored in message queues such as Kafka/Pulsar;
DWD layer: This part of data is aggregated according to the user's UserId through Flink tasks to form detailed behavior data of different users, which is saved in Kafka/Pulsar;
DWS layer: The Kafka flow table with user behavior details and the user Hive/MySQL dimension table perform flow dimension table JOIN, and then output the multi-dimensional detailed data generated after the JOIN to the ClickHouse table;
ADS layer: Summarize the multi-dimensional detailed data in ClickHouse according to different dimensions, and then apply it to different businesses.
In the process of building a real-time data warehouse according to the above scheme, some problems were encountered:

After converting offline tasks to real-time computing tasks, the calculation logic is more complicated (multi-stream JOIN, deduplication), resulting in too large job status, OOM (out of memory) exceptions in jobs, or too much back pressure on job operators;
During the join process of the dimension table, the detail flow table joins the large dimension table, the dimension table data is too much, OOM is loaded into the memory, and the job fails and cannot run;
Flink writes the multi-dimensional detailed data generated by the join of the flow dimension table to ClickHouse, which cannot guarantee Exactly-once. Once the job fails, it will cause repeated data writing.

2.2.2 Problem Solving & Optimization

Optimize job execution logic and reduce status

The logic of offline computing tasks is more complex, involving Join and deduplication operations between multiple Hive tables. The general logic is as follows:

After the offline job is converted into a Flink streaming task, the original scenario of offline joining multiple Hive tables is changed to the scenario of joining multiple Kafka topics. Since the traffic of the Kafka topic of Join is large, and the window time of Join is long (the longest window is 1 day), when the job runs for a period of time, a large amount of status is accumulated on the Join operator (the status disappears after one hour Close to 1T), in the face of such a large state, Flink jobs use Rocksdb State Backend to store state data, but still can not avoid the problem of being killed by YARN due to the excessive memory usage of Rocksdb, or too many states stored in Rocksdb State, and the throughput drops Cause severe back pressure on the job.

To solve this problem, we unoin all these multiple topics according to the same Schema to get a large data stream, and then judge according to the event_id of different event streams in this large data stream, we can know this The topic of which event stream the piece of data comes from, and then perform aggregation calculations to obtain the calculation indicators on the corresponding event stream.

In this way, by replacing JOIN with UNION ALL, the impact of large State caused by JOIN calculation is avoided.

In addition, there are more count distinct calculations in the calculation task, similar to the following:

These count distinct calculations are in the same group by, and deduplication calculations are performed based on the same postid, so that these distinct states can share a set of keys for deduplication calculations, then these counts can be stored through a MapState The status of distinct is as follows:

These count distinct functions deduplicate the same key, so they can share the key value in MapState to optimize storage space; while the Value of Mapstate is a Byte array, each Byte has 8 bits, each bit is 0 or 1, and the nth The bit corresponds to the value of n count distinct functions on the key: 1 means that the count disitnct function needs to count on the corresponding key, and 0 means that no counting is required; when calculating the aggregation result, all keys will be counted The addition of digits is the value of the nth count distinct, which further saves the storage space of the state.

Through the above optimization, the offline tasks of ABTest were successfully migrated to the Flink streaming computing tasks, and the state of the job was controlled within 100GB, allowing the job to run normally.

Flow dimension table JOIN optimization

In the process of generating a multi-dimensional detailed wide table, JOIN of the flow dimension table is required, and the function of Flink Join Hive dimension table is used: the data of the Hive dimension table will be loaded into the memory data structure of the task’s HashMap, and the data in the flow table will be Join according to Join Key and data in HashMap. However, in the face of Hive large-dimensional tables with hundreds of millions or billions of rows, the amount of data loaded into memory is too large, which can easily lead to OOM (out of memory). In response to the above problems, we Hive large-dimensional tables according to the Join Key for Hash fragmentation, as shown below:

In this way, the data of the Hive large-dimensional table is calculated by the Hash function and distributed to the HashMap of different parallel subtasks of the Flink job. Each HashMap only stores a part of the data of the large-dimensional table. As long as the parallelism of the job is large enough, it can Divide the data of the large dimension table into enough parts and save it in fragments; for some dimension tables that are too large, Rocksdb Map State can also be used to save the fragmented data.

When the data in the Kafka flow table is to be sent to different subtasks for Join, it is also calculated according to the same Hash function through the same Join Key, so that the data is allocated to the corresponding subtask for Join, and the result after Join is output .

Through the above optimizations, some Hive large dimension table tasks have been successfully joined to perform flow dimension table join calculations. The largest dimension table exceeds 1 billion rows.

Exactly-Once Semantics Support for ClickHouse Sink

In the process of outputting the multi-dimensional detailed data generated by the Join of the flow dimension table to the ClickHouse table, since the ClickHouse in the community does not support transactions, there is no way to guarantee the Exactly-Once semantics in the process of data sinking to the ClickHouse. During this process, once a job Failover occurs, the data will be repeatedly written to ClickHouse.

In response to this problem, BIGO ClickHouse implements a two-phase commit transaction mechanism: when data needs to be written to ClickHouse, you can first set the write mode to temporary, indicating that the data being written now is temporary data; , return an Insert id, and then execute the Commit operation according to the Insert id, then the temporary data will be transformed into official data.

Based on the two-phase commit transaction mechanism of BIGO ClickHouse and combined with the checkpoint mechanism of Flink, a ClickHouse Connector is implemented to ensure the Exactly Once write semantics of ClickHouse Sink, as follows:

In the case of normal writing, the Connector randomly selects a shard of ClickHouse to write, writes a single copy or double copy to perform the insert operation according to the user configuration, and records the insert id after writing; There will be multiple insert operations of this kind, resulting in multiple insert ids. When the checkpoint is completed, these insert ids will be submitted in batches to convert the temporary data into formal data, that is, the writing of data between two checkpoints is completed;
Once a Failover occurs in a job, after the Flink job Failover is restarted, the state will be restored from the latest checkpoint. At this time, the Operator State in the ClickHouse Sink may contain the Insert id that has not been submitted in time for the last time. For these insert ids, the Retry the submission; after the data has been written into ClickHouse, but the insert id is not recorded in the data in the Operator State, because it is temporary data, it will not be queried in ClickHouse. After a period of time, it will be queried by ClickHouse The expired cleanup mechanism is cleared, thus ensuring that the state is rolled back to the last checkpoint, and the data will not be repeated.
Through the above mechanism, the end-to-end Exactly-Once semantics of the end-to-end Exactly-Once semantics in the entire link of ClickHouse after the data is written from Kafka to Flink after calculation is successfully guaranteed, and the data is not repeated or lost.

2.2.3 Platform construction
In order to better manage the real-time computing tasks of the BIGO big data platform, the company built the BIGO real-time computing platform Bigoflow to provide users with unified Flink real-time task access. The platform construction is as follows:

Support Flink JAR, SQL, Python and other types of jobs; support different Flink versions, covering most of the company's internal real-time computing-related businesses;
One-stop management: integrate job development, submission, operation, history display, monitoring, and alarm, so that you can check the running status of jobs and find problems at any time;
Blood relationship: It is convenient to query the data source, data purpose, and the ins and outs of data calculation of each job.

3. Application scenarios

3.1 Application scenarios of Onesql OLAP analysis platform
The application scenario of the Onesql OLAP analysis platform in the company is: applied to AdHoc queries, as follows:

The SQL submitted by the user through the Hue page is forwarded to Flink SQL Gateway through the OneSQL backend, and submitted to the Flink Session cluster to execute the query task. Flink SQL Gateway obtains the execution progress of the query task and returns it to the Hue page, and returns the query result.

3.2 Application scenarios of real-time data warehouse
The real-time data warehouse application scenario is currently mainly ABTest business, as follows:

The user's original behavior log data is aggregated by Flink tasks to generate user detailed data, and then flow dimension table JOIN is performed with the dimension table data, and output to ClickHouse to generate a multi-dimensional detailed wide table, which is summarized according to different dimensions and applied to different businesses. By transforming the ABTest business, the generation time of the business's result indicators was advanced by 8 hours, and the resource usage was reduced by more than one time.

4. Future planning

In order to better build the OneSQL OLAP analysis platform and BIGO real-time data warehouse, the planning of the real-time computing platform is as follows:

Improve the Flink OLAP analysis platform, improve the Hive SQL syntax support, and solve the problem of JOIN data skew in the calculation process;

Improve the construction of the real-time data warehouse, introduce data lake technology, and solve the problem that the task data in the real-time data warehouse can be rerun and traced back to a small range;

Build a stream-batch integrated data computing platform based on Flink.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us