This topic describes how to use Realtime Compute for Apache Flink to access source tables and result tables in the Wide Column model in Tablestore.
Background information
In Realtime Compute for Apache Flink, the tunnels of Tunnel Service can serve as sources of streaming data. Each data record uses a JSON-like format. Examples:
{
"OtsRecordType": "PUT",
"OtsRecordTimestamp": 1506416585740836,
"PrimaryKey": [
{
"ColumnName": "pk_1",
"Value": 1506416585881590900
},
{
"ColumnName": "pk_2",
"Value": "string_pk_value"
}
],
"Columns": [
{
"OtsColumnType": "Put",
"ColumnName": "attr_0",
"Value": "hello_table_store",
},
{
"OtsColumnType": "DELETE_ONE_VERSION",
"ColumnName": "attr_1"
}
]
}
Field | Description |
OtsRecordType | The data operation type. Valid values:
|
OtsRecordTimestamp | The data operation time. Unit: microseconds. If you want Realtime Compute for Apache Flink to read full data, set this field to 0. |
PrimaryKey | The settings of the primary key. The value of this field is a JSON array. You can specify one to four primary key columns based on the primary key column configurations of your table. You must configure the following fields for each primary key column:
|
Columns | The settings of the attribute column. The value of this field is a JSON array. You must configure the following fields for each primary key column:
|
Tablestore source tables
In Realtime Compute for Apache Flink, you can use DDL statements of the source table to read the primary key and attribute column values of the source table in Tablestore based on the data type mapping of fields in Tablestore and Realtime Compute for Apache Flink. For more information, see Tablestore connector.
DDL syntax
The following sample code provides an example of the DDL syntax of a source table:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' // Specify whether to ignore delete operations.
);
Tunnel Service returns user data that was not consumed and the OtsRecordType and OtsRecordTimestamp fields, and allows Realtime Compute for Apache Flink to read the data and the fields as attribute columns. The following table describes the fields.
Field | Mapped field in Realtime Compute for Apache Flink | Description |
OtsRecordType | type | The data operation type. |
OtsRecordTimestamp | timestamp | The data operation time. Unit: microseconds. If you want Realtime Compute for Apache Flink to read full data, set this field to 0. |
If you want Realtime Compute for Apache Flink to read the OtsRecordType and OtsRecordTimestamp fields, you can use the METADATA keyword that is provided by Realtime Compute for Apache Flink to obtain the attribute fields from the Tablestore source table. The following sample code provides an example of the DDL statement:
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);
Parameters in the WITH clause
Parameter | Required | Description |
connector | Yes | The type of the source table. The value of this parameter is fixed to ots. |
endPoint | Yes | The endpoint of the Tablestore instance. For more information, see Endpoints. |
instanceName | Yes | The name of the Tablestore instance. |
tableName | Yes | The name of the Tablestore data table. |
tunnelName | Yes | The tunnel name of the Tablestore data table. For more information about how to create a tunnel, see Quick start. |
accessId | Yes | The AccessKey ID and AccessKey secret of an Alibaba Cloud account or a RAM user. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair. |
accessKey | Yes | |
ignoreDelete | No | Specifies whether to ignore delete operations. This parameter is optional. Default value: false. |
Data type mappings
Data type of fields in Tablestore | Data type of fields in Realtime Compute for Apache Flink |
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
Tablestore result tables
Realtime Compute for Apache Flink allows you to store results in a Tablestore table. For more information, see Tablestore connector.
DDL syntax
The following sample code provides an example of the DDL syntax of a result table.
A Tablestore result table must contain at least one attribute column and one or more primary key columns.
CREATE TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
...
);
Parameters in the WITH clause
Parameter | Required | Description |
connector | Yes | The type of the result table. The value of this parameter is fixed to ots. |
endPoint | Yes | The endpoint of the Tablestore instance. For more information, see Endpoints. |
instanceName | Yes | The name of the Tablestore instance. |
tableName | Yes | The name of the Tablestore data table. |
tunnelName | Yes | The tunnel name of the Tablestore data table. For more information about how to create a tunnel, see Quick start. |
accessId | Yes | The AccessKey ID and AccessKey secret of an Alibaba Cloud account or a RAM user. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair. |
accessKey | Yes | |
valueColumns | Yes | The name of the column that you want to insert. If you want to insert multiple columns, separate the column names with commas (,). Example: |
bufferSize | No | The maximum number of data records that can be stored in the buffer before data is written to the result table. Default value: 5000. This value specifies that data is written to the result table if the number of input data records in the buffer reaches 5,000. |
batchWriteTimeoutMs | No | The write timeout period. Unit: milliseconds. Default value: 5000. This value specifies that all data in the buffer is written to the result table if the number of input data records in the buffer does not reach the value that is specified by the bufferSize parameter within 5,000 milliseconds. |
batchSize | No | The number of data records that can be written to the result table at a time. Default value: 100. |
retryIntervalMs | No | The interval between retries. Unit: milliseconds. Default value: 1000. |
maxRetryTimes | No | The maximum number of retries. Default value: 100. |
ignoreDelete | No | Specifies whether to ignore delete operations. Default value: false. This value specifies that delete operations are not ignored. |
Sample SQL statements
Read data from a source table
The following sample SQL statement provides an example on how to read data from a source table in batches:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' // Specify whether to ignore delete operations.
);
SELECT * FROM tablestore_stream LIMIT 100;
Synchronize data to a result table
The following sample SQL statement provides an example on how to write result data to the result table by calling the updateRow operation:
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='xxxxxxxxxxx',
'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'ignoreDelete' = 'false' // Specify whether to ignore delete operations.
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='xxxxxxxxxxx',
'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
'valueColumns'='customerid,customername'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
Develop a real-time computing draft
Prerequisites
An AccessKey pair is created. For more information, see Create an AccessKey pair.
A role is assigned to the Alibaba Cloud account or the required permissions are granted to the RAM user by using an Alibaba Cloud account. For more information, see Assign a role to an Alibaba Cloud account or Grant permissions to a RAM user.
A tunnel that uses the Tablestore data table as the source table is created. For more information, see Quick start.
Step 1: Create an SQL draft
Log on to the Realtime Compute for Apache Flink console.
On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click SQL Editor.
In the upper-left corner of the SQL Editor page, click New.
On the SQL Scripts tab of the New Draft dialog box, click Blank Stream Draft.
Fully managed Flink provides various code templates and supports data synchronization. Each code template is applicable to specific scenarios and provides code samples and instructions for you. You can click a template to learn about the features and related syntax of Flink and implement your business logic. For more information, see Code templates and Data synchronization templates.
Click Next.
In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.
Parameter
Description
Example
Name
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
flink-test
Location
The folder in which the code file of the draft is saved.
You can also click the
icon to the right of an existing folder to create a subfolder.
Development
Engine Version
The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.
vvr-6.0.4-flink-1.15
Click Create.
Step 2: Write code for the draft
Create a temporary source table and a temporary result table.
NoteWhen you create an SQL draft, we recommend that you minimize the number of times that temporary tables are used. We also recommend that you use tables that are registered in catalogs.
The following sample code provides an example on how to create a tablestore_stream temporary table and an ots_sink temporary table:
CREATE TEMPORARY TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='xxxxxxxxxxx', 'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'ignoreDelete' = 'false' // Specify whether to ignore delete operations. ); CREATE TEMPORARY TABLE ots_sink ( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, PRIMARY KEY (`order`,orderid) NOT ENFORCED ) WITH ( 'connector'='ots', 'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com', 'instanceName'='flink-sink', 'tableName'='flink_sink_table', 'accessId'='xxxxxxxxxxx', 'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx', 'valueColumns'='customerid,customername' );
Write the draft logic.
The following sample code provides an example on how to select data from the source table and insert the data into the result table:
INSERT INTO ots_sink SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
Step 3: Configure parameters on the Configurations tab
On the right side of the SQL Editor page, click the Configurations tab and configure the following parameters:
Engine Version: the version of the Realtime Compute for Apache Flink engine that you select when you create the draft.
ImportantIn VVR 3.0.3 and later versions, Ververica Platform (VVP) allows you to run SQL jobs that use different engine versions at the same time. The version of the Flink engine that uses VVR 3.0.3 is Flink 1.12. If the engine version of your job is Flink 1.12 or earlier, you can perform the following operations to update the engine version based on the engine version that your job uses:
Flink 1.12: Stop and then restart your job. Then, the system automatically updates the engine version of your job to vvr-3.0.3-flink-1.12.
Flink 1.11 or Flink 1.10: Manually update the engine version of your job to vvr-3.0.3-flink-1.12 or vvr-4.0.8-flink-1.13, and then restart the job. Otherwise, a timeout error occurs when you start the job.
Additional Dependencies: the additional dependencies that are used in the draft, such as temporary functions.
Step 4: Perform a syntax check
In the upper-right corner of the SQL Editor page, click Deep Validate to perform a syntax check.
(Optional) Step 5: Debug the draft
In the upper-right corner of the SQL Editor page, click Debug.
You can enable the debugging feature to simulate deployment running, check outputs, and verify the business logic of SELECT and INSERT statements. This feature improves development efficiency and reduces the risks of poor data quality. For more information, see Debug a deployment.
Step 6: Deploy the draft
In the upper-right corner of the SQL Editor page, click Deploy. In the Deploy draft dialog box, configure the parameters and click Confirm.
Session clusters are suitable for non-production environments, such as development and test environments. You can deploy or debug a draft in a session cluster to improve the resource utilization of a JobManager and accelerate the deployment startup. However, we recommend that you do not deploy a draft in session clusters. If you deploy drafts in session clusters, stability issues may occur. For more information, see Debug a deployment.
Step 7: Start the deployment of the draft and view the computing result
If you modify the SQL code of a deployment, add parameters to or remove parameters from the WITH clause, or change the version of a deployment, you must re-publish and cancel the deployment, and then restart the deployment for the changes to take effect. If a deployment fails and cannot reuse the state data to recover, you must cancel and then restart the deployment. For more information, see Cancel a deployment.
In the left-side navigation pane, click Deployments.
Find the deployment that you want to start and click Start in the Actions column.
For more information about how to configure deployment startup parameters, see Start a deployment. After you click Start, the deployment status changes to RUNNING. This indicates that the deployment is running as expected.
On the Deployments page, view the computing result.
In the left-side navigation pane, click Deployments. On the Deployments page, click the name of the deployment.
Click the Exploration tab.
On the Running Logs tab, click Running Task Managers and click the value in the Path, ID column.
Click Logs. On the Logs tab, search for logs related to the sink.