All Products
Search
Document Center

Tablestore:Tutorial (Wide Column model)

Last Updated:Jul 07, 2023

This topic describes how to use Realtime Compute for Apache Flink to access source tables and result tables in the Wide Column model in Tablestore.

Background information

In Realtime Compute for Apache Flink, the tunnels of Tunnel Service can serve as sources of streaming data. Each data record uses a JSON-like format. Examples:

{
  "OtsRecordType": "PUT", 
  "OtsRecordTimestamp": 1506416585740836, 
  "PrimaryKey": [
    {
      "ColumnName": "pk_1", 
      "Value": 1506416585881590900
    },
    {
      "ColumnName": "pk_2", 
      "Value": "string_pk_value"
    }
  ],
  "Columns": [
    {
      "OtsColumnType": "Put", 
      "ColumnName": "attr_0",
      "Value": "hello_table_store",
    },
    {
      "OtsColumnType": "DELETE_ONE_VERSION", 
      "ColumnName": "attr_1"
    }
  ]
}

Field

Description

OtsRecordType

The data operation type. Valid values:

  • PUT: adds data.

  • UPDATE: updates data.

  • DELETE: deletes data.

OtsRecordTimestamp

The data operation time. Unit: microseconds. If you want Realtime Compute for Apache Flink to read full data, set this field to 0.

PrimaryKey

The settings of the primary key. The value of this field is a JSON array. You can specify one to four primary key columns based on the primary key column configurations of your table. You must configure the following fields for each primary key column:

  • ColumnName: the name of the primary key column.

  • Value: the value of the primary key column.

Columns

The settings of the attribute column. The value of this field is a JSON array. You must configure the following fields for each primary key column:

  • OtsColumnType: the type of the column operation. Valid values: PUT, DELETE_ONE_VERSION, and DELETE_ALL_VERSION.

  • ColumnName: the name of the attribute column.

  • Value: the value of the attribute key column.

    If you set the OtsColumnType field to DELETE_ONE_VERSION or DELETE_ALL_VERSION, you do not need to configure this field.

Tablestore source tables

In Realtime Compute for Apache Flink, you can use DDL statements of the source table to read the primary key and attribute column values of the source table in Tablestore based on the data type mapping of fields in Tablestore and Realtime Compute for Apache Flink. For more information, see Tablestore connector.

DDL syntax

The following sample code provides an example of the DDL syntax of a source table:

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' // Specify whether to ignore delete operations.
);

Tunnel Service returns user data that was not consumed and the OtsRecordType and OtsRecordTimestamp fields, and allows Realtime Compute for Apache Flink to read the data and the fields as attribute columns. The following table describes the fields.

Field

Mapped field in Realtime Compute for Apache Flink

Description

OtsRecordType

type

The data operation type.

OtsRecordTimestamp

timestamp

The data operation time. Unit: microseconds. If you want Realtime Compute for Apache Flink to read full data, set this field to 0.

If you want Realtime Compute for Apache Flink to read the OtsRecordType and OtsRecordTimestamp fields, you can use the METADATA keyword that is provided by Realtime Compute for Apache Flink to obtain the attribute fields from the Tablestore source table. The following sample code provides an example of the DDL statement:

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    record_type STRING METADATA FROM 'type',
    record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
    ...
);

Parameters in the WITH clause

Parameter

Required

Description

connector

Yes

The type of the source table. The value of this parameter is fixed to ots.

endPoint

Yes

The endpoint of the Tablestore instance. For more information, see Endpoints.

instanceName

Yes

The name of the Tablestore instance.

tableName

Yes

The name of the Tablestore data table.

tunnelName

Yes

The tunnel name of the Tablestore data table. For more information about how to create a tunnel, see Quick start.

accessId

Yes

The AccessKey ID and AccessKey secret of an Alibaba Cloud account or a RAM user. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.

accessKey

Yes

ignoreDelete

No

Specifies whether to ignore delete operations. This parameter is optional. Default value: false.

Data type mappings

Data type of fields in Tablestore

Data type of fields in Realtime Compute for Apache Flink

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

Tablestore result tables

Realtime Compute for Apache Flink allows you to store results in a Tablestore table. For more information, see Tablestore connector.

DDL syntax

The following sample code provides an example of the DDL syntax of a result table.

Note

A Tablestore result table must contain at least one attribute column and one or more primary key columns.

CREATE TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
     ...
);

Parameters in the WITH clause

Parameter

Required

Description

connector

Yes

The type of the result table. The value of this parameter is fixed to ots.

endPoint

Yes

The endpoint of the Tablestore instance. For more information, see Endpoints.

instanceName

Yes

The name of the Tablestore instance.

tableName

Yes

The name of the Tablestore data table.

tunnelName

Yes

The tunnel name of the Tablestore data table. For more information about how to create a tunnel, see Quick start.

accessId

Yes

The AccessKey ID and AccessKey secret of an Alibaba Cloud account or a RAM user. For more information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.

accessKey

Yes

valueColumns

Yes

The name of the column that you want to insert. If you want to insert multiple columns, separate the column names with commas (,). Example: ID,NAME.

bufferSize

No

The maximum number of data records that can be stored in the buffer before data is written to the result table. Default value: 5000. This value specifies that data is written to the result table if the number of input data records in the buffer reaches 5,000.

batchWriteTimeoutMs

No

The write timeout period. Unit: milliseconds. Default value: 5000. This value specifies that all data in the buffer is written to the result table if the number of input data records in the buffer does not reach the value that is specified by the bufferSize parameter within 5,000 milliseconds.

batchSize

No

The number of data records that can be written to the result table at a time. Default value: 100.

retryIntervalMs

No

The interval between retries. Unit: milliseconds. Default value: 1000.

maxRetryTimes

No

The maximum number of retries. Default value: 100.

ignoreDelete

No

Specifies whether to ignore delete operations. Default value: false. This value specifies that delete operations are not ignored.

Sample SQL statements

Read data from a source table

The following sample SQL statement provides an example on how to read data from a source table in batches:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' // Specify whether to ignore delete operations. 
);
SELECT * FROM tablestore_stream LIMIT 100;

Synchronize data to a result table

The following sample SQL statement provides an example on how to write result data to the result table by calling the updateRow operation:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' // Specify whether to ignore delete operations. 
);

CREATE TEMPORARY TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector'='ots',
    'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
    'instanceName'='flink-sink',
    'tableName'='flink_sink_table',
    'accessId'='xxxxxxxxxxx',
    'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

Develop a real-time computing draft

Prerequisites

Step 1: Create an SQL draft

  1. Log on to the Realtime Compute for Apache Flink console.

  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

  3. In the left-side navigation pane, click SQL Editor.

  4. In the upper-left corner of the SQL Editor page, click New.

  5. On the SQL Scripts tab of the New Draft dialog box, click Blank Stream Draft.

    Fully managed Flink provides various code templates and supports data synchronization. Each code template is applicable to specific scenarios and provides code samples and instructions for you. You can click a template to learn about the features and related syntax of Flink and implement your business logic. For more information, see Code templates and Data synchronization templates.

  6. Click Next.

  7. In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.

    Parameter

    Description

    Example

    Name

    The name of the draft that you want to create.

    Note

    The draft name must be unique in the current project.

    flink-test

    Location

    The folder in which the code file of the draft is saved.

    You can also click the 新建文件夹 icon to the right of an existing folder to create a subfolder.

    Development

    Engine Version

    The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.

    vvr-6.0.4-flink-1.15

  8. Click Create.

Step 2: Write code for the draft

  1. Create a temporary source table and a temporary result table.

    Note

    When you create an SQL draft, we recommend that you minimize the number of times that temporary tables are used. We also recommend that you use tables that are registered in catalogs.

    The following sample code provides an example on how to create a tablestore_stream temporary table and an ots_sink temporary table:

    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector'='ots',
        'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
        'instanceName' = 'flink-source',
        'tableName' ='flink_source_table',
        'tunnelName' = 'flinksourcestream',
        'accessId' ='xxxxxxxxxxx',
        'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'ignoreDelete' = 'false' // Specify whether to ignore delete operations. 
    );
    
    CREATE TEMPORARY TABLE ots_sink (
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED
    ) WITH (
        'connector'='ots',
        'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
        'instanceName'='flink-sink',
        'tableName'='flink_sink_table',
        'accessId'='xxxxxxxxxxx',
        'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'valueColumns'='customerid,customername'
    );
  2. Write the draft logic.

    The following sample code provides an example on how to select data from the source table and insert the data into the result table:

    INSERT INTO ots_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

Step 3: Configure parameters on the Configurations tab

On the right side of the SQL Editor page, click the Configurations tab and configure the following parameters:

  • Engine Version: the version of the Realtime Compute for Apache Flink engine that you select when you create the draft.

    Important

    In VVR 3.0.3 and later versions, Ververica Platform (VVP) allows you to run SQL jobs that use different engine versions at the same time. The version of the Flink engine that uses VVR 3.0.3 is Flink 1.12. If the engine version of your job is Flink 1.12 or earlier, you can perform the following operations to update the engine version based on the engine version that your job uses:

    • Flink 1.12: Stop and then restart your job. Then, the system automatically updates the engine version of your job to vvr-3.0.3-flink-1.12.

    • Flink 1.11 or Flink 1.10: Manually update the engine version of your job to vvr-3.0.3-flink-1.12 or vvr-4.0.8-flink-1.13, and then restart the job. Otherwise, a timeout error occurs when you start the job.

  • Additional Dependencies: the additional dependencies that are used in the draft, such as temporary functions.

Step 4: Perform a syntax check

In the upper-right corner of the SQL Editor page, click Deep Validate to perform a syntax check.

(Optional) Step 5: Debug the draft

In the upper-right corner of the SQL Editor page, click Debug.

You can enable the debugging feature to simulate deployment running, check outputs, and verify the business logic of SELECT and INSERT statements. This feature improves development efficiency and reduces the risks of poor data quality. For more information, see Debug a deployment.

Step 6: Deploy the draft

In the upper-right corner of the SQL Editor page, click Deploy. In the Deploy draft dialog box, configure the parameters and click Confirm.部署

Note

Session clusters are suitable for non-production environments, such as development and test environments. You can deploy or debug a draft in a session cluster to improve the resource utilization of a JobManager and accelerate the deployment startup. However, we recommend that you do not deploy a draft in session clusters. If you deploy drafts in session clusters, stability issues may occur. For more information, see Debug a deployment.

Step 7: Start the deployment of the draft and view the computing result

Note

If you modify the SQL code of a deployment, add parameters to or remove parameters from the WITH clause, or change the version of a deployment, you must re-publish and cancel the deployment, and then restart the deployment for the changes to take effect. If a deployment fails and cannot reuse the state data to recover, you must cancel and then restart the deployment. For more information, see Cancel a deployment.

  1. In the left-side navigation pane, click Deployments.

  2. Find the deployment that you want to start and click Start in the Actions column.

    For more information about how to configure deployment startup parameters, see Start a deployment. After you click Start, the deployment status changes to RUNNING. This indicates that the deployment is running as expected.

  3. On the Deployments page, view the computing result.

    1. In the left-side navigation pane, click Deployments. On the Deployments page, click the name of the deployment.

    2. Click the Exploration tab.

    3. On the Running Logs tab, click Running Task Managers and click the value in the Path, ID column.

    4. Click Logs. On the Logs tab, search for logs related to the sink.