All Products
Search
Document Center

Tablestore:Tutorial for the Wide Column model

更新時間:Apr 11, 2024

Tables in Tablestore can be used as source tables and result tables of Realtime Compute for Apache Flink. You can use Realtime Compute for Apache Flink to process data in a Tablestore table and save the result to another Tablestore table.

Background information

In Realtime Compute for Apache Flink, the tunnels of Tunnel Service can serve as sources of streaming data. Each data record is in a format similar to JSON. Example:

{
  "OtsRecordType": "PUT", 
  "OtsRecordTimestamp": 1506416585740836, 
  "PrimaryKey": [
    {
      "ColumnName": "pk_1", 
      "Value": 1506416585881590900
    },
    {
      "ColumnName": "pk_2", 
      "Value": "string_pk_value"
    }
  ],
  "Columns": [
    {
      "OtsColumnType": "Put", 
      "ColumnName": "attr_0",
      "Value": "hello_table_store",
    },
    {
      "OtsColumnType": "DELETE_ONE_VERSION", 
      "ColumnName": "attr_1"
    }
  ]
}

Field

Description

OtsRecordType

The operation type. Valid values:

  • PUT: adds data.

  • UPDATE: updates data.

  • DELETE: removes data.

OtsRecordTimestamp

The data operation time. Unit: microseconds. If you want Realtime Compute for Apache Flink to read full data, set this field to 0.

PrimaryKey

The settings of the primary key. The value of this field is a JSON array. You can specify one to four primary key columns based on the primary key column settings of your table. You must specify the following fields for each primary key column:

  • ColumnName: the name of the primary key column.

  • Value: the value of the primary key column.

Columns

The settings of the attribute columns. The value of this field is a JSON array. You can specify the following fields for each attribute column:

  • OtsColumnType: the type of the column operation. Valid values: PUT, DELETE_ONE_VERSION, and DELETE_ALL_VERSION.

  • ColumnName: the name of the attribute column.

  • Value: the value of the attribute column.

    If you set the OtsColumnType field to DELETE_ONE_VERSION or DELETE_ALL_VERSION, you do not need to specify this field.

Tablestore source tables

In Realtime Compute for Apache Flink, you can use the DDL statements of a source table to read the primary key and attribute column values of the source table in Tablestore based on the data type mapping of fields between Tablestore and Realtime Compute for Apache Flink. For more information, see Tablestore connector.

DDL syntax

The following sample code provides an example of the DDL syntax of a source table:

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' // Specify whether to ignore delete operations.
);

Tunnel Service returns user data that has not been consumed and the OtsRecordType and OtsRecordTimestamp fields, and allows Realtime Compute for Apache Flink to read the data and the fields as attribute columns. The following table describes the fields.

Field

Mapped field in Realtime Compute for Apache Flink

Description

OtsRecordType

type

The data operation type.

OtsRecordTimestamp

timestamp

The data operation time. Unit: microseconds. If you want Realtime Compute for Apache Flink to read full data, set this field to 0.

If you want Realtime Compute for Apache Flink to read the OtsRecordType and OtsRecordTimestamp fields, you can use the METADATA keyword that is provided by Realtime Compute for Apache Flink to obtain the attribute fields from the Tablestore source table. The following example shows the DDL statement:

CREATE TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    record_type STRING METADATA FROM 'type',
    record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
    ...
);

Parameters in the WITH clause

Parameter

Required

Description

connector

Yes

The type of the source table. The value is ots and cannot be changed.

endPoint

Yes

The endpoint of the Tablestore instance. For more information, see Endpoints.

instanceName

Yes

The name of the Tablestore instance.

tableName

Yes

The name of the Tablestore source table.

tunnelName

Yes

The tunnel name of the Tablestore source table. For information about how to create a tunnel, see Create a tunnel.

accessId

Yes

The AccessKey ID and AccessKey secret of the Alibaba Cloud account or the RAM user. For information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.

accessKey

Yes

ignoreDelete

No

Specifies whether to ignore delete operations. Default value: false, which specifies that delete operations are not ignored.

skipInvalidData

No

Specifies whether to ignore dirty data. Default value: false, which specifies that dirty data is not ignored.

If dirty data is not ignored, an error is reported when the system processes dirty data. To ignore dirty data, set this parameter to true.

Data type mapping of fields between Tablestore and Realtime Compute for Apache Flink

Field data type in Tablestore

Field data type in Realtime Compute for Apache Flink

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

Tablestore result tables

Realtime Compute for Apache Flink allows you to store results in a Tablestore table. For more information, see Tablestore connector.

DDL syntax

The following sample code provides an example of the DDL syntax of a result table:

Note

You must specify the primary key and one or more attribute columns for a Tablestore result table.

CREATE TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
     ...
);

Parameters in the WITH clause

Parameter

Required

Description

connector

Yes

The type of the result table. The value is ots and cannot be changed.

endPoint

Yes

The endpoint of the Tablestore instance. For more information, see Endpoints.

instanceName

Yes

The name of the Tablestore instance.

tableName

Yes

The name of the Tablestore result table.

tunnelName

Yes

The tunnel name of the Tablestore result table. For information about how to create a tunnel, see Create a tunnel.

accessId

Yes

The AccessKey ID and AccessKey secret of the Alibaba Cloud account or the RAM user. For information about how to obtain an AccessKey pair, see Obtain an AccessKey pair.

accessKey

Yes

valueColumns

Yes

The name of the column that you want to insert. If you want to insert multiple columns, separate the column names with commas (,). Example: ID,NAME.

bufferSize

No

The maximum number of data records that can be stored in the buffer before data is written to the result table. Default value: 5000, which specifies that data is written to the result table if the number of data records in the buffer reaches 5,000.

batchWriteTimeoutMs

No

The write timeout period. Unit: milliseconds. Default value: 5000, which specifies that all data in the buffer is written to the result table if the number of data records in the buffer does not reach the value that is specified by the bufferSize parameter within 5,000 milliseconds.

batchSize

No

The number of data records that can be written to the result table at a time. Default value: 100.

retryIntervalMs

No

The interval between two consecutive retries. Unit: milliseconds. Default value: 1000.

maxRetryTimes

No

The maximum number of retries. Default value: 100.

ignoreDelete

No

Specifies whether to ignore delete operations. Default value: false, which specifies that delete operations are not ignored.

autoIncrementKey

No

The name of the auto-increment primary key column. If the result table contains an auto-increment primary key column, you can configure this parameter to specify the name of the auto-increment primary key column.

defaultTimestampInMillisecond

No

The version number of data that is written to the result table. Unit: milliseconds. If you do not configure this parameter, the system uses the data written time as the version number.

Data type mapping of fields between Tablestore and Realtime Compute for Apache Flink

Field data type in Realtime Compute for Apache Flink

Field data type in Tablestore

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

Sample SQL statements

Read data from a source table

The following sample SQL statement provides an example on how to read data from a source table in a batch:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' // Specify whether to ignore delete operations. 
);
SELECT * FROM tablestore_stream LIMIT 100;

Synchronize data to a result table

The following sample SQL statement provides an example on how to write result data to a result table by calling the updateRow operation:

CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector'='ots',
    'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
    'instanceName' = 'flink-source',
    'tableName' ='flink_source_table',
    'tunnelName' = 'flinksourcestream',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' // Specify whether to ignore delete operations. 
);

CREATE TEMPORARY TABLE ots_sink (
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector'='ots',
    'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
    'instanceName'='flink-sink',
    'tableName'='flink_sink_table',
    'accessId'='xxxxxxxxxxx',
    'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

Develop a real-time computing job

Prerequisites

  • An AccessKey pair is created. For more information, see Create an AccessKey pair.

  • A tunnel is created for a Tablestore source table. For information about how to create a tunnel, see Create a tunnel.

Step 1: Create an SQL draft

  1. Log on to the Realtime Compute for Apache Flink console.

  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

  3. In the left-side navigation pane, click SQL Editor.

  4. In the upper-left corner of the SQL Editor page, click New.

  5. In the New Draft dialog box, click Blank Stream Draft.

    Fully managed Flink provides various code templates and data synchronization templates. Each code template provides specific scenarios, code samples, and instructions. You can click a template to learn about the features and the related syntax of Realtime Compute for Apache Flink and implement your business logic. For more information, see Code templates and Data synchronization templates.

  6. Click Next.

  7. Configure parameters for the draft. The following table describes the parameters.

    Parameter

    Description

    Example

    Name

    The name of the draft that you want to create.

    Note

    The draft name must be unique in the current project.

    flink-test

    Location

    The folder in which the code file of the draft is stored.

    You can also click the 新建文件夹 icon to the right of an existing folder to create a subfolder.

    Development

    Engine Version

    The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.

    vvr-6.0.4-flink-1.15

  8. Click Create.

Step 2: Write code for the draft

  1. Create a temporary source table and a temporary result table.

    Note

    When you create a draft, we recommend that you minimize the number of times that you use temporary tables. We also recommend that you use tables that are registered in catalogs.

    The following sample code provides an example on how to create a temporary source table whose name is tablestore_stream and a temporary result table whose name is ots_sink:

    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector'='ots',
        'endPoint' ='https://flink-source.cn-hangzhou.ots.aliyuncs.com',
        'instanceName' = 'flink-source',
        'tableName' ='flink_source_table',
        'tunnelName' = 'flinksourcestream',
        'accessId' ='xxxxxxxxxxx',
        'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'ignoreDelete' = 'false' // Specify whether to ignore delete operations. 
    );
    
    CREATE TEMPORARY TABLE ots_sink (
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED
    ) WITH (
        'connector'='ots',
        'endPoint'='https://flink-sink.cn-hangzhou.ots.aliyuncs.com',
        'instanceName'='flink-sink',
        'tableName'='flink_sink_table',
        'accessId'='xxxxxxxxxxx',
        'accessKey'='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
        'valueColumns'='customerid,customername'
    );
  2. Write the draft logic.

    The following sample code provides an example on how to insert data from the source table into the result table:

    INSERT INTO ots_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

Step 3: Configure parameters on the Configurations tab

On the right side of the SQL Editor page, click the Configurations tab and configure the following parameters:

  • Engine Version: the version of the Flink engine. You can change the version that you select when you create the draft.

    Important

    In VVR 3.0.3 and later versions, Ververica Platform (VVP) allows you to run SQL jobs that use different engine versions at the same time. The version of the Flink engine that uses VVR 3.0.3 is Flink 1.12. If the engine version of your job is Flink 1.12 or earlier, you can perform the following operations to update the engine version based on the engine version that your job uses:

    • Flink 1.12: Stop and then restart your job. Then, the system automatically updates the engine version of your job to vvr-3.0.3-flink-1.12.

    • Flink 1.11 or Flink 1.10: Manually update the engine version of your job to vvr-3.0.3-flink-1.12 or vvr-4.0.8-flink-1.13, and then restart the job. Otherwise, a timeout error occurs when you start the job.

  • Additional Dependencies: the additional dependencies that are used in the draft, such as temporary functions.

Step 4: Perform a syntax check

In the upper-right corner of the SQL Editor page, click Validate to perform a syntax check.

(Optional) Step 5: Debug the draft

In the upper-right corner of the SQL Editor page, click Debug.

You can enable the debugging feature to simulate deployment running, check outputs, and verify the business logic of SELECT and INSERT statements. This feature improves the development efficiency and reduces the risks of poor data quality. For more information, see Debug a deployment.

Step 6: Deploy the draft

In the upper-right corner of the SQL Editor page, click Deploy. In the Deploy draft dialog box, configure the parameters and click Confirm.部署

Note

Session clusters are suitable for non-production environments, such as development and test environments. You can deploy or debug drafts in a session cluster to improve the resource utilization of a JobManager and accelerate the deployment startup. However, we recommend that you do not deploy a draft in session clusters. If you deploy drafts in session clusters, stability issues may occur. For more information, see the "Step 1: Create a session cluster" step of the Debug a deployment topic.

Step 7: Start the deployment for the draft and view the computing result

Note

If you modify the SQL code of a deployment, add parameters to or remove parameters from the WITH clause, or change the version of a deployment, you must re-publish and cancel the deployment, and then restart the deployment for the changes to take effect. If the deployment fails and cannot reuse the state data to recover, you must cancel and then restart the deployment. For more information, see Cancel a deployment.

  1. In the left-side navigation pane, click Deployments.

  2. Find the deployment that you want to start and click Start in the Actions column.

    For more information about how to configure deployment startup parameters, see Start a deployment. After you click Start, the deployment status changes to RUNNING. This indicates that the deployment is running as expected.

  3. On the Deployments page, view the computing result.

    1. In the left-side navigation pane, click Deployments. On the Deployments page, click the name of the deployment that you want to manage.

    2. Click the Diagnostics tab.

    3. On the Logs tab, click Running Task Managers, and then click the value in the Path, ID column.

    4. Click Logs. On the Logs tab, search for logs related to the sink.