ClickHouse transaction implementation of EMR OLAP open source real-time data warehouse solution

1、 Background

Flink and ClickHouse are the leaders in real-time streaming computing and OLAP, respectively. Many Internet, advertising, games and other customers have combined the two to build user portraits, real-time BI reports, application monitoring index queries, monitoring and other businesses, forming a real-time data warehouse solution (see Figure 1). These services have strict requirements for data accuracy, so the entire link of real-time data warehouse needs to ensure end-to-end Exactly Once.

Generally speaking, the upstream of Flink is pull-based persistent storage (such as Kafka) that can be repeatedly read or consumed. To achieve Exactly-Once on the source side, you only need to backtrack the read progress on the source side. Exactly-Once on the Sink side is more complex, because Sink is push-based and needs to rely on the transaction guarantee of the target output system, but the community ClickHouse does not support transactions.

So in response to this situation, Alibaba Cloud EMR ClickHouse and the Flink team have developed in depth and supported the Exactly Once write from Flink to ClickHouse to ensure the accuracy of the entire real-time data warehouse. This article will introduce the existing mechanisms and implementation schemes respectively.

II. Mechanism sorting

ClickHouse write mechanism

ClickHouse is a column OLAP system with MPP architecture (as shown in Figure 2). Each node is peer. Through Zookeeper collaboration, data can be imported in large quantities by writing local tables to each node simultaneously.

The data part of ClickHouse is the smallest unit of data storage. When the data block received by ClickHouse is written, it will be split according to the partition granularity to form one or more data parts. After the data part is written to the disk, it will merge small data parts into large data parts through the background merge thread to reduce the cost of storage and reading.

When writing data to the local table, ClickHouse will first write a temporary data part whose data is not visible to the client, and then directly rename it to make the temporary data part become an official data part, at which time the data is visible to the client. Almost all temporary data parts will be quickly and successfully renamed into formal data parts. Temporary data parts that are not successfully renamed will eventually be deleted from the disk by the ClickHouse cleanup policy.

From the above analysis, it can be seen that ClickHouse has a mechanism for data writing from a temporary data part to a formal data part, which can be modified to meet the two-phase commit protocol, which is an important protocol for achieving transaction commit consistency in distributed systems.

Note: Multiple Flink Tasks can be written to the same shard or replica

Flink write mechanism

As a distributed processing engine, Flink provides a transaction-based Sink mechanism, which can guarantee the Exactly-Once of writing. The corresponding data receiver needs to provide JDBC that complies with the XA specification. Because the complete XA specification is quite complex, we first sort out the processing mechanism of Flink and determine the scope of the interface to be implemented based on the actual situation of ClickHouse.

In order to achieve unified transaction submission during distributed writing, Flink uses the checkpoint mechanism. This mechanism can periodically generate snapshots of the states in each operator and persist them. In the checkpoint mechanism, there is a coordinator role to coordinate the behavior of all operators. From the operator's point of view, a checkpoint has three stages: initialization -->generate snapshot -->complete/discard checkpoint. From the coordinator's point of view, the checkpoint needs to be triggered regularly and the complete notification needs to be triggered after all operators complete the snapshot. (Refer to Appendix 1)

Next, we will introduce how the operator in Flink can guarantee Exactly Once with the help of transaction and checkpoint mechanism. The complete execution of the operator needs to go through the initial, writeData, snapshot, commit and close stages.

Initial stage:

• Extract the persisted xid record from the snapshot when the last task was executed. The snapshot mainly stores two kinds of xids, one is the xid that has not completed the snapshot phase, and the other is the xid that has completed the snapshot.

• Next, perform a rollback operation on the xid that did not complete the snapshot last time; Perform the commit retry operation on the xid that completed the snapshot last time but failed to commit.

• If the above operation fails, the task initialization fails, the task is aborted, and the close phase is entered; If the above operation is successful, continue.

• Create a new unique xid as the transaction ID and record it in the snapshot.

• Use the newly generated xid to call the start() interface provided by JDBC.

WriteData stage:

• After the transaction is started, it enters the phase of writing data. The operator will be in this phase most of the time. In the interaction with ClickHouse, this phase calls the addBatch() and executeBatch() interfaces of the preparedStatement provided by JDBC. Each time the data is written, the current xid will be carried in the message.

• In the data writing stage, first write the data to the Operator's memory and submit the batch data in memory to ClickHouse in three ways: the number of data in memory reaches the threshold of batchsize; The background timer thread triggers automatic flush at intervals; Flush is called to clear the cache before calling the end() and prepare() interfaces in the snapshot phase.

Snapshot phase:

• The current transaction will call the end () and prepare () interfaces, wait for commit, and update the status in the snapshot.

• Next, a new transaction will be opened as the next xid of this task, the new transaction will be recorded in the snapshot, and the start() interface provided by JDBC will be called to start the new transaction.

• Persist snapshots to storage.

Complete stage:

After the snapshot phase of all operators is completed normally, the coordinator will notify all operators to complete the successful checkpoint. In the interaction with ClickHouse, this phase allows operators to call the commit() interface provided by JDBC to commit the transaction.

Close phase:

• If the current transaction has not reached the snapshot stage, rollback the current transaction.

• Close all resources.

It can be concluded from the above process that Flink divides the upstream data into batches according to the checkpoint cycle through the checkpoint and transaction mechanism to ensure that each batch of data is written completely, and then the coordinator notifies all operators to complete the commit operation. When an operator fails to write, it will return to the status of the last successful checkpoint, and roll back all the xids of this batch of checkpoints according to the xids recorded in the snapshot. When a commit operation fails, the commit operation will be retried, and the failure will be handled by human intervention.

3、 Technical proposal

Overall scheme

According to the write mechanism of Flink and ClickHouse, we can draw a sequence diagram of transaction writing from Flink to ClickHouse (as shown in Figure 3). Because the local table of ClickHouse is written, and the unified submission of transactions is guaranteed by the coordinator, ClickHouse does not need to implement the standard distributed transactions in the XA specification, but only needs to implement a few key interfaces in the two-phase commit protocol, and other interfaces can be defaulted on the JDBC side.

Figure 3 Sequence diagram of Flink to ClickHouse transaction writing

ClickHouse-Server

State machine

In order to implement the transaction of ClickHouse, we first define several operations allowed by the transaction to be implemented:

• Begin: Start a transaction.

• Write Data: Write data in a transaction.

• Commit: Commit a transaction.

• Rollback: roll back an uncommitted transaction.

Transaction status:

• Unknown: The transaction is not opened. It is illegal to perform any operation at this time.

• Initialized: The transaction has been opened, and all operations are allowed at this time.

• Committing: The transaction is being committed. Begin/Write Data operations are no longer allowed.

• Committed: The transaction has been committed and no operation is allowed.

• Aborting: The transaction is being rolled back and no operation is allowed.

• Aborted: The transaction has been rolled back and no operation is allowed.

All operations in the figure are idempotent. Among them, Committing to Committed and Aborting to Aborted do not need to perform any operation. When you start to execute Commit or Rollback, the transaction status will change to Committing or Aborting; After performing Commit or Rollback, the transaction status will be set to Committed or Aborted.

Transaction

The client accesses ClickHouse Server through HTTP Restful API. The interaction process of a complete transaction between the client and ClickHouse Server is shown in Figure 5:

Normal process:

• The client sends a Begin Transaction request to any ClickHouse server in the ClickHouse cluster and carries the globally unique Transaction ID generated by the client. When the ClickHouse Server receives the Begin Transaction request, it will register the Transaction ID with Zookeeper (including the creation of the Transaction ID and the child Znode node), and initialize the status of the Transaction as Initialized.

• When the client receives a successful response from Begin Transaction, it can start writing data. When the ClickHouse Server receives the data sent from the client, it will generate a temporary data part, but will not convert it to a formal data part. The ClickHouse Server will record the written temporary data part information in the form of JSON in the information of the transaction on the Zookeeper.

• After the client completes writing data, it will send a Commit Transaction request to ClickHouse Server. After the ClickHouse Server receives the Commit Transaction request, it converts the local temporary data part data of the ClickHouse Server to the official data part data according to the data part information of the corresponding Transaction on the ZooKeeper, and updates the Transaction status to Committed. The process of Rollback is similar to Commit.

Exception handling:

• If the same Transaction ID is found in Zookeeper during the creation of Transaction ID, process according to the Transaction status recorded in Zookeeper: if the status is Unknown, continue processing; If the status is Initialized, it will be returned directly; Otherwise, an exception will be thrown.

• Currently, the transactions implemented do not support distributed transactions, but only stand-alone transactions, so the client can only write data to the ClickHouse Server node that records the transaction ID. If the ClickHouse Server receives data from transactions other than this node, the ClickHouse Server will directly return the error message.

• Unlike writing data, if the Client sends a Commit Transaction request to the ClickHouse Server that does not record the Transaction ID in the Commit phase, the ClickHouse Server will not return error information, but will return the address of the ClickHouse Server that records the Transaction ID to the Client, so that the Client can redirect to the correct ClickHouse Server. The process of Rollback is similar to Commit.

ClickHouse-JDBC

According to the XA specification, a complete distributed transaction mechanism needs to implement a large number of standard interfaces (see Appendix 2). In this design, only a few key interfaces need to be implemented. Therefore, the combination-based adapter mode is adopted to provide Flink with XAResource implementation based on the standard XA interface. At the same time, the interface that does not need to be supported is shielded for ClickHouse Server.

For the implementation of XADataSource, the inheritance-based adapter mode is adopted, and some default configurations are modified according to the Exactly-Once feature, such as the number of retries that failed to send.

In addition, in the production environment, the load balancing of data writing is usually performed through SLB instead of distributed tables. In the Exactly Once scenario, the task on the Flink side needs to maintain the connection to a ClickHouse Server node, so SLB cannot be used for load balancing. To solve this problem, we use the idea of BalanceClickHouseDataSource for reference, configure multiple IPs in the URL, and write in the properties configuration_ The mode is set to Random, which enables XADataSource to have load balancing capability while ensuring Exactly Once.

Flink-Connector-ClickHouse

As a streaming data processing engine, Flink supports the ability to write to multiple data receivers, and each receiver needs to implement a specific connector. For Exactly Once, ClickHouse Connector adds the option configuration for XADataSource, and provides Exactly Once function according to the client configuration.

4、 Test results

ClickHouse transaction performance test

• The amount of data written to a single batch of ClickHouse is the same as the total batch, and the performance of concurrent write threads on the client side is different. As can be seen from Figure 6, whether ClickHouse starts a transaction or not, the throughput of ClickHouse is in direct proportion to the number of concurrent writing threads on the client side. When a transaction is started, the temporary data part in ClickHouse will not be immediately converted to the formal data part, so a large number of temporary data parts will not participate in the ClickHouse merge process before the transaction is completed, reducing the impact of disk IO on the write performance, so the write performance of the transaction is better than that of the transaction that is not started; However, the number of batches contained in the transaction has increased, and the increase of temporary data parts on the disk has led to the increase of CPU pressure during consolidation, which has affected the write performance, and the write performance of opening the transaction will also be reduced.

• The total batch of writing to ClickHouse is the same as the concurrent writing thread on the client side, and the performance of writing to ClickHouse in a single batch is different.

It can be seen from Figure 7 that the throughput of ClickHouse is in direct proportion to the amount of data in a single batch, regardless of whether the transaction is started or not. When a transaction is started, the smaller the data in each batch, the greater the impact on the throughput of ClickHouse will be affected by whether the transaction is started. This is because the time of writing in each batch is relatively small in the transaction processing, and the transaction will have a certain impact on this. Therefore, the more batches a transaction contains, the less the impact of the transaction on the writing performance; When the transaction contains more batches, the proportion of transaction processing time in writing decreases gradually, and the impact of ClickHouse merge is increasing, which affects the performance of writing. The performance of opening a transaction is better than not opening a transaction.

• In general, starting a transaction has little impact on write performance, which is in line with our expectations.

Performance comparison of Flink writing to ClickHouse

• For the same amount of data and different checkpoint cycles, the total time spent writing Flink to ClickHouse is shown in Figure 8. It can be seen that the checkpoint cycle has no effect on the time consumption of tasks that do not start Exactly Once. For tasks that start Exactly-Once, the time consumption shows a trend of decreasing first and then increasing in the range of 5s to 60s. The reason is that the checkpoint cycle is short, and the transaction interaction between the operator who opens Exactly Once and Clickhouse is too frequent; When the checkpoint cycle is long, the operator who opens Exactly Once needs to wait for the checkpoint cycle to end before committing the last transaction to make the data visible. In this test, the checkpoint cycle data is only used as a reference. In the production environment, it needs to be adjusted according to the machine specification and data writing speed.

• In general, when Flink writes to Clickhouse, it turns on the Exactly Once feature, which will slightly affect the performance. This conclusion is in line with our expectations.

5、 Future planning

The transactions implemented by this version of EMR ClickHouse are not very complete. It only supports stand-alone transactions and does not support distributed transactions. Distributed systems generally use Meta Server to do unified metadata management to support distributed transaction mechanism. At present, we are also planning to design ClickHouse MetaServer to support distributed transactions, and can remove ClickHouse's dependence on ZooKeeper.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us