This topic describes how to compute Tablestore data by using Realtime Compute for Apache Flink. A data table or time series table in Tablestore can be used as the source table or result table of data processing by using Realtime Compute for Apache Flink.
Prerequisites
Tablestore is activated and an instance is created. For more information, see Activate Tablestore and create an instance.
A source table and result table are created. A tunnel is created for the source table. For more information, see Operations on a data table, Operations on time series tables, and Create a tunnel.
A Realtime Compute for Apache Flink workspace is created. For more information, see Activate Realtime Compute for Apache Flink.
ImportantThe Realtime Compute for Apache Flink workspace and the Tablestore instance must reside in the same region. For information about the regions that are supported by Realtime Compute for Apache Flink, see Regions.
An AccessKey pair is obtained.
ImportantFor security purposes, we recommend that you use Tablestore features as a Resource Access Management (RAM) user. For more information, see Use the AccessKey pair of a RAM user to access Tablestore.
Develop a real-time computing job
Step 1: Create an SQL draft
Go to the page for creating a draft.
Log on to the Realtime Compute for Apache Flink console.
Find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, choose
.
Click New. In the New Draft dialog box, select Blank Stream Draft and click Next.
NoteRealtime Compute for Apache Flink provides various code templates and supports data synchronization. Each code template is suitable for specific scenarios and provides code samples and instructions for you. You can click a template to learn about the features and the related syntax of Realtime Compute for Apache Flink and implement your business logic. For more information, see Code templates and Data synchronization templates.
Configure the parameters for the draft. The following table describes the parameters.
Parameter
Description
Example
Name
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
flink-test
Location
The folder in which the code file of the draft is saved.
You can also click the
icon to the right of an existing folder to create a subfolder.
Draft
Engine Version
The version of the Flink engine that you want the current draft to use. For more information about engine versions, see Release notes and Engine version.
vvr-8.0.10-flink-1.17
Click Create.
Step 2: Write code for the draft
In the example in this step, code is written to synchronize data from a data table to another data table. For information about more sample SQL statements, see Sample SQL statements.
Create a temporary table for the source table and the result table.
For more information, see Appendix 1: Tablestore connector.
-- Create a temporary table named tablestore_stream for the source table. CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector' = 'ots', -- Specify the connector type of the source table. The value is ots and cannot be changed. 'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Specify the virtual private cloud (VPC) endpoint of the Tablestore instance. 'instanceName' = 'xxx', -- Specify the name of the Tablestore instance. 'tableName' = 'flink_source_table', -- Specify the name of the source table. 'tunnelName' = 'flink_source_tunnel', -- Specify the name of the tunnel that is created for the source table. 'accessId' = 'xxxxxxxxxxx', -- Specify the AccessKey ID of the Alibaba Cloud account or RAM user. 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Specify the AccessKey secret of the Alibaba Cloud account or RAM user. 'ignoreDelete' = 'false' -- Specify whether to ignore the real-time data that is generated by delete operations. In this example, this parameter is set to false. ); -- Create a temporary table named tablestore_sink for the result table. CREATE TEMPORARY TABLE tablestore_sink( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED -- Specify the primary key. ) WITH ( 'connector' = 'ots', -- Specify the connector type of the result table. The value is ots and cannot be changed. 'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Specify the VPC endpoint of the Tablestore instance. 'instanceName' = 'xxx', -- Specify the name of the Tablestore instance. 'tableName' = 'flink_sink_table', -- Specify the name of the result table. 'accessId' = 'xxxxxxxxxxx', -- Specify the AccessKey ID of the Alibaba Cloud account or RAM user. 'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Specify the AccessKey secret of the Alibaba Cloud account or RAM user. 'valueColumns'='customerid,customername' --Specify the names of the columns that you want to insert to the result table. );
Write the draft logic.
The following sample SQL statement provides an example on how to insert data from the source table into the result table:
-- Insert data from the source table into the result table. INSERT INTO tablestore_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
Step 3: (Optional) View configuration information
On the right-side tab of the SQL editor, you can view the configurations or configure the parameters. The following table describes the parameters.
Tab name | Description |
Configurations |
|
Structure |
|
Versions | You can view the engine version of the draft. For more information about the operations that you can perform in the Actions column in the Draft Versions panel, see Manage deployment versions. |
Step 4: (Optional) Perform a syntax check
Check the SQL semantics of the draft, network connectivity, and the metadata information of the tables that are used by the draft. You can also click SQL Advice in the calculated results to view information about SQL risks and the related optimization suggestions.
In the upper-right corner of the SQL editor, click Validate.
In the Validate dialog box, click Confirm.
Step 5: (Optional) Debug the draft
You can use the debugging feature to simulate deployment running, check outputs, and verify the business logic of SELECT and INSERT statements. This feature improves the development efficiency and reduces the risks of poor data quality.
In the upper-right corner of the SQL editor, click Debug.
In the Debug dialog box, select the cluster that you want to debug and click Next.
If no cluster is available, create a session cluster. Make sure that the session cluster uses the same engine version as that of the SQL draft and that the session cluster is running. For more information, see Create a session cluster.
Configure debugging data.
If you use online data, skip this operation.
If you use debugging data, click Download mock data template, enter the debugging data in the template, and then click Upload mock data to upload the debugging data. For more information, see Debug a draft.
Click Confirm.
Step 6: Deploy the draft
In the upper-right corner of the SQL editor, click Deploy. In the Deploy draft dialog box, configure the related parameters and click Confirm.
Session clusters are suitable for non-production environments, such as development and test environments. You can deploy or debug drafts in a session cluster to improve the resource utilization of a JobManager and accelerate the deployment startup. However, we recommend that you do not deploy drafts for the production environment in session clusters. Otherwise, stability issues may occur.
Step 7: Start the deployment for the draft and view the computing result
In the left-side navigation pane, choose
.Find the job that you want to start and click Start in the Actions column.
In the Start Job panel, select Initial Mode and click Start. If the deployment status changes to RUNNING, the deployment runs as expected. For more information about the parameters that you must configure when you start a deployment, see Start a deployment.
NoteWe recommend that you configure two CPU cores and 4 GB memory for each TaskManager in Realtime Compute for Apache Flink to maximize the computing capabilities of each TaskManager. A TaskManager can write 10,000 rows per second.
If the number of partitions in the source table is large, we recommend that you set the concurrency to less than 16 in Realtime Compute for Apache Flink. The write rate linearly increases with the concurrency.
On the Deployments page, view the computing result.
In the left-side navigation pane, choose
. On the Deployments page, click the name of the deployment that you want to manage.On the Logs tab, click the Running Task Managers tab and click the value in the Path, ID column.
Click Logs. On the Logs tab, view the related log information.
(Optional) Cancel a deployment.
If you modify the SQL code for a deployment, add or delete parameters to or from the WITH clause, or change the version of a deployment, you must deploy the draft of the deployment, cancel the deployment, and then start the deployment for the changes to take effect. If a deployment fails and cannot reuse the state data to recover or if you want to update the parameter settings that do not dynamically take effect, you must cancel and then restart the deployment. For more information about how to cancel a deployment, see Cancel a deployment.