Tablestore+Delta Lake

Background introduction

In recent years, HTAP (Hybrid transaction/analytical processing) has become more and more popular. By combining storage and computing, it can not only support traditional massive structured data analysis, but also support fast transaction update writing. It is the design data A proven architecture for dense systems.

Tablestore is a NoSQL multi-model database self-developed by Alibaba Cloud, which provides massive structured data storage and fast query and analysis services (PB-level storage, tens of millions of TPS and millisecond-level delay), with the help of the underlying engine of Tablestore , which can well fulfill the requirements in the OLTP scenario. Delta Lake is similar to the Data Lake (data lake) that supports Delta. It uses column storage to store base data, and stores new delta data in a row format, thereby supporting ACID and CRUD for data operations, and is fully compatible with Spark's big data ecosystem. By combining Delta Lake and Spark ecology, the requirements in OLAP scenarios can be well fulfilled. The figure below shows a brief logical structure diagram of the HATP scenario combining Tablestore and Delta Lake. For more details and dry goods on the design of the structured big data analysis platform, please refer to the article Structured Big Data Analysis Platform Design.

Preparation

1: Log in to the Alibaba Cloud E-MapReduce console
2: Create a Hadoop cluster (if already created, please skip)
3: Make sure to deploy the Tablestore instance in the E-MapReduce cluster
4: In the same VPC environment

Step 1 Create Tablestore source table

For detailed activation steps, please refer to the official documentation. The table created in this demo is named Source. The schema of the table is shown in the figure below. The table has two primary keys, PKString and PkInt, and the types are String and Integer respectively.

Create an incremental channel for the table Source, as shown in the figure below, the name, ID and type of the channel will be displayed in the channel list.

Technical Notes:

Tunnel Service (Tunnel Service) is a fully incremental integrated service based on the Tablestore data interface, including three types of tunnels:

Full amount: consumption and processing of historical stock data in the data table

Incremental: Consumption processing of newly added data in the data table
Full amount plus increment: first consume the total historical stock data of the data table, and then consume the newly added data
For a detailed introduction to the channel service, please refer to the documentation on the official website of Tablestore.

Step 2 Obtain the relevant jar package and upload it to the hadoop cluster

Obtain the JAR packages that the environment depends on.

On the cluster management page, click the cluster ID of the created Hadoop cluster to enter the cluster and service management page.

Select the host list in the navigation tree on the left, and view the IP information of the emr-header-1 host in the Hadoop cluster on the right.

Create a command window in the SSH client, and log in to the emr-header-1 host of the Hadoop cluster.

Upload all JAR packages to a certain directory of the emr-header-1 node.

Step 3 Run the Spark Streaming job

Take a code modified based on the emr demo as an example, compile and generate a JAR package, and the JAR package needs to be uploaded to the emr-header-1 host of the Hadoop cluster (see step 2). The complete code is not included in this article due to the large changes. Once explained, it will be merged into the emr demo official project in the future.

This example uses the Tablestore table as the data source, and demonstrates a complete link from Tablestore to Delta Lake by combining Tablestore CDC technology, Tablestore Streaming Source and Delta Sink.

Press the following command to start the spark streaming job and start a listener program that synchronizes the data in the Tablestore Source table to the Delta Lake Table in real time.

Each parameter is described as follows:

Step 4 Data CRUD example

First, insert two rows into the TableStore. In this example, we create an 8-column synchronization column, including two primary keys (PkString, PkInt) and six attribute columns (col1, col2, col3, timestamp, col5, and col6). Since Table Store is a Free-Schema structure, we can insert attribute columns arbitrarily, and the Spark Source of TableStore will automatically filter the attribute columns. As shown in the following two figures, after inserting two rows of data, the synchronization in the Delta Table can also read two rows immediately, and the data is consistent.

Next, perform some operations of updating and inserting rows in Tablestore, as shown in the following two figures, after waiting for a short period of micro-batch data synchronization, the data synchronization changes in Tablestore can be updated to Delta Table in real time .

Clear all the data in the Tablestore, as shown in the following two figures, the Delta Table also becomes empty synchronously.

On the cluster, Delta Table is stored in HDFS by default. As shown in the figure below, the json file stored in the _delta_log directory is the Transaction log, and the file in parquet format is the underlying data file.

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00

phone Contact Us