All Products
Search
Document Center

Tablestore:Use Realtime Compute for Apache Flink to process Tablestore data

Last Updated:Feb 11, 2025

This topic describes how to compute Tablestore data by using Realtime Compute for Apache Flink. A data table or time series table in Tablestore can be used as the source table or result table of data processing by using Realtime Compute for Apache Flink.

Prerequisites

Develop a real-time computing job

Step 1: Create an SQL draft

  1. Go to the page for creating a draft.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. Find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, choose Development > ETL.

  2. Click New. In the New Draft dialog box, select Blank Stream Draft and click Next.

    Note

    Realtime Compute for Apache Flink provides various code templates and supports data synchronization. Each code template is suitable for specific scenarios and provides code samples and instructions for you. You can click a template to learn about the features and the related syntax of Realtime Compute for Apache Flink and implement your business logic. For more information, see Code templates and Data synchronization templates.

  3. Configure the parameters for the draft. The following table describes the parameters.

    Parameter

    Description

    Example

    Name

    The name of the draft that you want to create.

    Note

    The draft name must be unique in the current project.

    flink-test

    Location

    The folder in which the code file of the draft is saved.

    You can also click the 新建文件夹 icon to the right of an existing folder to create a subfolder.

    Draft

    Engine Version

    The version of the Flink engine that you want the current draft to use. For more information about engine versions, see Release notes and Engine version.

    vvr-8.0.10-flink-1.17

  4. Click Create.

Step 2: Write code for the draft

Note

In the example in this step, code is written to synchronize data from a data table to another data table. For information about more sample SQL statements, see Sample SQL statements.

  1. Create a temporary table for the source table and the result table.

    For more information, see Appendix 1: Tablestore connector.

    -- Create a temporary table named tablestore_stream for the source table.
    CREATE TEMPORARY TABLE tablestore_stream(
        `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR
    ) WITH (
        'connector' = 'ots', -- Specify the connector type of the source table. The value is ots and cannot be changed. 
        'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Specify the virtual private cloud (VPC) endpoint of the Tablestore instance. 
        'instanceName' = 'xxx', -- Specify the name of the Tablestore instance. 
        'tableName' = 'flink_source_table', -- Specify the name of the source table. 
        'tunnelName' = 'flink_source_tunnel', -- Specify the name of the tunnel that is created for the source table. 
        'accessId' = 'xxxxxxxxxxx', -- Specify the AccessKey ID of the Alibaba Cloud account or RAM user. 
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Specify the AccessKey secret of the Alibaba Cloud account or RAM user. 
        'ignoreDelete' = 'false' -- Specify whether to ignore the real-time data that is generated by delete operations. In this example, this parameter is set to false. 
    );
    
    -- Create a temporary table named tablestore_sink for the result table.
    CREATE TEMPORARY TABLE tablestore_sink(
       `order` VARCHAR,
        orderid VARCHAR,
        customerid VARCHAR,
        customername VARCHAR,
        PRIMARY KEY (`order`,orderid) NOT ENFORCED -- Specify the primary key. 
    ) WITH (
        'connector' = 'ots', -- Specify the connector type of the result table. The value is ots and cannot be changed. 
        'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com', -- Specify the VPC endpoint of the Tablestore instance. 
        'instanceName' = 'xxx', -- Specify the name of the Tablestore instance. 
        'tableName' = 'flink_sink_table', -- Specify the name of the result table. 
        'accessId' = 'xxxxxxxxxxx',  -- Specify the AccessKey ID of the Alibaba Cloud account or RAM user. 
        'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx', -- Specify the AccessKey secret of the Alibaba Cloud account or RAM user. 
        'valueColumns'='customerid,customername' --Specify the names of the columns that you want to insert to the result table. 
    );
  2. Write the draft logic.

    The following sample SQL statement provides an example on how to insert data from the source table into the result table:

    -- Insert data from the source table into the result table.
    INSERT INTO tablestore_sink
    SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

Step 3: (Optional) View configuration information

On the right-side tab of the SQL editor, you can view the configurations or configure the parameters. The following table describes the parameters.

Tab name

Description

Configurations

  • Engine Version: The version of the Flink engine that is used by the current job.

  • Additional Dependencies: the additional dependencies that are used in the draft, such as temporary functions.

    You can download Ververica Runtime (VVR) dependencies, upload the VVR dependencies on the Upload Artifact page, and then select the uploaded VVR dependencies for Additional Dependencies. For more information, see Appendix 2: Configure VVR dependencies.

Structure

  • Flow Diagram: the flow diagram that allows you to view the directions in which data flows.

  • Tree Diagram: the tree diagram that allows you to view the source from which data is processed.

Versions

You can view the engine version of the draft. For more information about the operations that you can perform in the Actions column in the Draft Versions panel, see Manage deployment versions.

Step 4: (Optional) Perform a syntax check

Check the SQL semantics of the draft, network connectivity, and the metadata information of the tables that are used by the draft. You can also click SQL Advice in the calculated results to view information about SQL risks and the related optimization suggestions.

  1. In the upper-right corner of the SQL editor, click Validate.

  2. In the Validate dialog box, click Confirm.

Step 5: (Optional) Debug the draft

You can use the debugging feature to simulate deployment running, check outputs, and verify the business logic of SELECT and INSERT statements. This feature improves the development efficiency and reduces the risks of poor data quality.

  1. In the upper-right corner of the SQL editor, click Debug.

  2. In the Debug dialog box, select the cluster that you want to debug and click Next.

    If no cluster is available, create a session cluster. Make sure that the session cluster uses the same engine version as that of the SQL draft and that the session cluster is running. For more information, see Create a session cluster.

  3. Configure debugging data.

    • If you use online data, skip this operation.

    • If you use debugging data, click Download mock data template, enter the debugging data in the template, and then click Upload mock data to upload the debugging data. For more information, see Debug a draft.

  4. Click Confirm.

Step 6: Deploy the draft

In the upper-right corner of the SQL editor, click Deploy. In the Deploy draft dialog box, configure the related parameters and click Confirm.

Note

Session clusters are suitable for non-production environments, such as development and test environments. You can deploy or debug drafts in a session cluster to improve the resource utilization of a JobManager and accelerate the deployment startup. However, we recommend that you do not deploy drafts for the production environment in session clusters. Otherwise, stability issues may occur.

Step 7: Start the deployment for the draft and view the computing result

  1. In the left-side navigation pane, choose O&M > Deployments.

  2. Find the job that you want to start and click Start in the Actions column.

    In the Start Job panel, select Initial Mode and click Start. If the deployment status changes to RUNNING, the deployment runs as expected. For more information about the parameters that you must configure when you start a deployment, see Start a deployment.

    Note
    • We recommend that you configure two CPU cores and 4 GB memory for each TaskManager in Realtime Compute for Apache Flink to maximize the computing capabilities of each TaskManager. A TaskManager can write 10,000 rows per second.

    • If the number of partitions in the source table is large, we recommend that you set the concurrency to less than 16 in Realtime Compute for Apache Flink. The write rate linearly increases with the concurrency.

  3. On the Deployments page, view the computing result.

    1. In the left-side navigation pane, choose O&M > Deployments. On the Deployments page, click the name of the deployment that you want to manage.

    2. On the Logs tab, click the Running Task Managers tab and click the value in the Path, ID column.

    3. Click Logs. On the Logs tab, view the related log information.

  4. (Optional) Cancel a deployment.

    If you modify the SQL code for a deployment, add or delete parameters to or from the WITH clause, or change the version of a deployment, you must deploy the draft of the deployment, cancel the deployment, and then start the deployment for the changes to take effect. If a deployment fails and cannot reuse the state data to recover or if you want to update the parameter settings that do not dynamically take effect, you must cancel and then restart the deployment. For more information about how to cancel a deployment, see Cancel a deployment.

Appendices

Appendix 1: Tablestore connector

Realtime Compute for Apache Flink provides a built-in Tablestore connector to read, write, and synchronize Tablestore data.

Source table

DDL syntax
Data table

The following sample code provides an example of the DDL statement for creating a temporary table for the source table:

-- Create a temporary table named tablestore_stream for the source table.
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false'
);
Time series table

The following sample code provides an example of the DDL statement for creating a temporary table for the source table:

-- Create a temporary table named tablestore_stream for the source table.
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

The fields whose data needs to be consumed and the OtsRecordType and OtsRecordTimestamp fields in the returned data of Tunnel Service can be read and written as attribute columns. The following table describes the fields.

Field

Mapped field in Realtime Compute for Apache Flink

Description

OtsRecordType

type

The operation type.

OtsRecordTimestamp

timestamp

The data operation time. Unit: microseconds.

Note

If you want Realtime Compute for Apache Flink to read full data, set this field to 0.

Parameter in the WITH clause

Parameter

Applicable table

Required

Description

connector

General

Yes

The connector type of the source table. The value is ots and cannot be changed.

endPoint

General

Yes

The endpoint of the Tablestore instance. You must use a VPC endpoint. For more information, see Endpoints.

instanceName

General

Yes

The name of the Tablestore instance.

tableName

General

Yes

The name of the source table in Tablestore.

tunnelName

General

Yes

The name of the tunnel for the source table in Tablestore. For information about how to create a tunnel, see Create a tunnel.

accessId

General

Yes

The AccessKey pair (AccessKey ID and AccessKey secret) of the Alibaba Cloud account or the RAM user.

Important

To protect your AccessKey pair, we recommend that you use variables to specify the AccessKey pair. For more information, see Manage variables.

accessKey

General

Yes

connectTimeout

General

No

The timeout period for the Tablestore connector to connect to Tablestore. Unit: milliseconds. Default value: 30000.

socketTimeout

General

No

The socket timeout period for the Tablestore connector to connect to Tablestore. Unit: milliseconds. Default value: 30000.

ioThreadCount

General

No

The number of I/O threads. Default value: 4.

callbackThreadPoolSize

General

No

The size of the callback thread pool. Default value: 4.

ignoreDelete

Data table

No

Specifies whether to ignore the real-time data that is generated by delete operations. Default value: false, which specifies that the real-time data that is generated by delete operations is not ignored.

skipInvalidData

General

No

Specifies whether to ignore dirty data. Default value: false, which specifies that dirty data is not ignored. If dirty data is not ignored, an error is reported when the system processes dirty data.

Important

Only Realtime Compute for Apache Flink that uses VVR 8.0.4 or later supports this parameter.

retryStrategy

General

No

The retry policy. Valid values:

  • TIME: The system continuously retries until the timeout period specified by the retryTimeoutMs parameter ends. This is the default value.

  • COUNT: The system continuously retries until the maximum number of retries specified by the retryCount parameter is reached.

retryCount

General

No

The maximum number of retries. If you set the retryStrategy parameter to COUNT, you can configure this parameter. Default value: 3.

retryTimeoutMs

General

No

The timeout period for retries. Unit: milliseconds. If you set the retryStrategy parameter to TIME, you can configure this parameter. Default value: 180000.

streamOriginColumnMapping

General

No

The mapping between the column names in the source table and the column names in the temporary table.

Note

Separate a column name in the source table and a column name in the temporary table with a colon (:). Separate multiple mappings with commas (,). Example: origin_col1:col1,origin_col2:col2.

outputSpecificRowType

General

No

Specifies whether to pass through a specific row type. Valid values:

  • false: does not pass through a specific row type. The row type of all data is INSERT. This is the default value.

  • true: passes through a specific row type. The row type of data can be INSERT, DELETE, or UPDATE_AFTER.

Data type mappings

Field data type in Tablestore

Field data type in Realtime Compute for Apache Flink

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

Result table

DDL syntax
Data table

The following sample code provides an example of the DDL statement for creating a temporary table for the result table:

-- Create a temporary table named tablestore_sink for the result table.
CREATE TEMPORARY TABLE tablestore_sink(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR,
    PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint'='https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='customerid,customername'
);
Note

You must specify the schema of the primary key and at least one attribute column for the Tablestore result table. The output data is appended to the Tablestore result table to update the table data.

Time series table

When you use a time series table as the result table, you must specify the following primary key columns for the result table: _m_name, _data_source, _tags, and _time. Other configurations are the same as those when you use a data table as the result table. You can specify the primary key columns of a time series table by using the parameters in the WITH clause, the primary key of the SINK table, and the primary key in the Map format. If you use the preceding three methods at the same time to specify the primary key columns of a time series table, the primary key columns that are specified by using the parameters in the WITH clause have the highest priority.

Use the parameters in the WITH clause

The following sample code provides an example on how to use the parameters in the WITH clause to define the DDL syntax:

-- Create a temporary table named tablestore_sink for the result table.
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES',
    'timeseriesSchema' = '{"measurement":"_m_name", "datasource":"_data_source", "tag_a":"_tags", "tag_b":"_tags", "tag_c":"_tags", "tag_d":"_tags", "tag_e":"_tags", "tag_f":"_tags", "time":"_time"}'
);

-- Insert data from the source table into the result table.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
Use the primary key in the Map format

The following sample code provides an example on how to use the primary key in the Map format to define the DDL syntax:

Note

Tablestore provides the Map data type of Flink to facilitate the generation of the _tags column of the time series table in the TimeSeries model. The Map data type supports mapping operations, such as column renaming and simple functions. When you use Map, make sure that the _tags primary key column is located in the third position.

-- Create a temporary table named tablestore_sink for the result table.
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- Insert data from the source table into the result table.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    MAP[`tag_a`, `tag_b`, `tag_c`, `tag_d`, `tag_e`, `tag_f`] AS tags,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
Use the primary key of the SINK table

The following sample code provides an example on how to use the primary key of the SINK table to define the DDL syntax. The first primary key column is the _m_name column, which specifies the measurement name. The second primary key column is the _data_source column, which specifies the data source. The last primary key column is the _time column, which specifies the timestamp. The primary key column in the middle is the _tags column, which specifies the tags of the time series.

-- Create a temporary table named tablestore_sink for the result table.
CREATE TEMPORARY TABLE tablestore_sink(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
    PRIMARY KEY(measurement, datasource, tag_a, tag_b, tag_c, tag_d, tag_e, tag_f, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' ='xxxxxxxxxxx',
    'accessKey' ='xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'storageType' = 'TIMESERIES'
);

-- Insert data from the source table into the result table.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from timeseries_source;
Parameter in the WITH clause

Parameter

Applicable table

Required

Description

connector

General

Yes

The connector type of the result table. The value is ots and cannot be changed.

endPoint

General

Yes

The endpoint of the Tablestore instance. You must use a VPC endpoint. For more information, see Endpoints.

instanceName

General

Yes

The name of the Tablestore instance.

tableName

General

Yes

The name of the time series table in Tablestore.

accessId

General

Yes

The AccessKey pair (AccessKey ID and AccessKey secret) of the Alibaba Cloud account or the RAM user.

Important

To protect your AccessKey pair, we recommend that you use variables to specify the AccessKey pair. For more information, see Manage variables.

accessKey

General

Yes

valueColumns

Data table

Yes

The names of the columns that you want to insert. Separate multiple fields with commas (,). Example: ID,NAME.

storageType

General

No

Important

If you use a time series table as a result table, set this parameter to TIMESERIES.

The type of the table. Valid values:

  • WIDE_COLUMN: data table. This is the default value.

  • TIMESERIES: time series table.

timeseriesSchema

Time series table

No

Important

When you use a time series table as the result table, if you use the parameters in the WITH clause to specify the primary key of the time series table, you must configure this parameter.

The columns that you want to specify as the primary key columns of the time series table.

  • Specify the value of this parameter by using key-value pairs in the JSON format. Example: {"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}.

  • The types of the primary key columns that you specify must be the same as the types of the primary key columns in the time series table. The tags primary key column can consist of multiple columns.

connectTimeout

General

No

The timeout period for the Tablestore connector to connect to Tablestore. Unit: milliseconds. Default value: 30000.

socketTimeout

General

No

The socket timeout period for the Tablestore connector to connect to Tablestore. Unit: milliseconds. Default value: 30000.

ioThreadCount

General

No

The number of I/O threads. Default value: 4.

callbackThreadPoolSize

General

No

The size of the callback thread pool. Default value: 4.

retryIntervalMs

General

No

The interval between retries. Unit: milliseconds. Default value: 1000.

maxRetryTimes

General

No

The maximum number of retries. Default value: 10.

bufferSize

General

No

The maximum number of data records that can be stored in the buffer before data is written to the result table. Default value: 5000, which specifies that data is written to the result table when the number of data records in the buffer reaches 5,000.

batchWriteTimeoutMs

General

No

The write timeout period. Unit: milliseconds. Default value: 5000, which specifies that all data in the buffer is written to the result table when the number of data records in the buffer does not reach the value that is specified by the bufferSize parameter within 5,000 milliseconds.

batchSize

General

No

The number of data records that can be written to the result table at the same time. Default value: 100. Maximum value: 200.

ignoreDelete

General

No

Specifies whether to ignore the real-time data that is generated by delete operations. Default value: false, which specifies that the real-time data that is generated by delete operations is not ignored.

Important

If you use a data table as a source table, you can configure this parameter based on your business requirements.

autoIncrementKey

Data table

No

The name of the auto-increment primary key column of the result table when the result table contains an auto-increment primary key column. If the result table does not have an auto-increment primary key column, you do not need to configure this parameter.

Important

Only Realtime Compute for Apache Flink that uses VVR 8.0.4 or later supports this parameter.

overwriteMode

General

No

The data overwrite mode. Valid values:

  • PUT: Data is written to the Tablestore table in PUT mode. This is the default value.

  • UPDATE: Data is written to the Tablestore table in UPDATE mode.

Note

Only the UPDATE mode is supported in dynamic column mode.

defaultTimestampInMillisecond

General

No

The default timestamp that is used to write data to the Tablestore table. If you leave this parameter empty, the timestamp of the current system time is used.

dynamicColumnSink

General

No

Specifies whether to enable the dynamic column mode. Default value: false, which specifies that the dynamic column mode is disabled.

Note
  • The dynamic column mode is suitable for scenarios in which no columns are specified for a table and data columns are inserted into the table based on the deployment status. You must specify the first several columns as the primary key columns in the table creation statement. The value of the second to the last column is used as a column name variable, the value of the last column is used as the value of the column name variable, and the data type of the second to the last column must be String.

  • If you enable the dynamic column mode, the auto-increment primary key column feature is not supported and you must set the overwriteMode parameter to UPDATE.

checkSinkTableMeta

General

No

Specifies whether to check the metadata of the result table. Default value: true, which specifies that the system checks whether the primary key columns of the Tablestore table are the same as the primary key columns that are specified in the table creation statement.

enableRequestCompression

General

No

Specifies whether to enable data compression during data writing. Default value: false, which specifies that data compression is disabled during data writing.

Data type mappings

Field data type in Realtime Compute for Apache Flink

Field data type in Tablestore

BINARY

BINARY

VARBINARY

BINARY

CHAR

STRING

VARCHAR

STRING

TINYINT

INTEGER

SMALLINT

INTEGER

INTEGER

INTEGER

BIGINT

INTEGER

FLOAT

DOUBLE

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

Sample SQL statements

Synchronize data from the source table to the result table
Synchronize data from a data table to a time series table

Read data from a data table named flink_source_table and write the data to a time series table named flink_sink_table.

Sample SQL statement:

-- Create a temporary table named tablestore_stream for the source table.
CREATE TEMPORARY TABLE tablestore_stream(
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- Create a temporary table named tablestore_sink for the result table by using the parameters in the WITH clause.
CREATE TEMPORARY TABLE tablestore_sink(
     measurement STRING,
     datasource STRING,
     tag_a STRING,
     `time` BIGINT,
     binary_value BINARY,
     bool_value BOOLEAN,
     double_value DOUBLE,
     long_value BIGINT,
     string_value STRING,
     tag_b STRING,
     tag_c STRING,
     tag_d STRING,
     tag_e STRING,
     tag_f STRING,
     PRIMARY KEY(measurement, datasource, tag_a, `time`) NOT ENFORCED
 ) WITH (
     'connector' = 'ots',
     'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
     'instanceName' = 'xxx',
     'tableName' = 'flink_sink_table',
     'accessId' = 'xxxxxxxxxxx',
     'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
     'storageType' = 'TIMESERIES',
     'timeseriesSchema' = '{"measurement":"_m_name","datasource":"_data_source","tag_a":"_tags","tag_b":"_tags","tag_c":"_tags","tag_d":"_tags","tag_e":"_tags","tag_f":"_tags","time":"_time"}'
 );
 
-- Insert data from the source table into the result table.
INSERT INTO tablestore_sink
    select 
    measurement,
    datasource,
    tag_a,
    `time`,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value,
    tag_b,
    tag_c,
    tag_d,
    tag_e,
    tag_f
    from tablestore_stream;
Synchronize data from a time series table to a data table

Read data from a time series table named flink_source_table and write the data to a data table named flink_sink_table.

Sample SQL statement:

-- Create a temporary table named tablestore_stream for the source table.
CREATE TEMPORARY TABLE tablestore_stream(
    _m_name STRING,
    _data_source STRING,
    _tags STRING,
    _time BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
);

-- Create a temporary table named print_table for the result table. 
CREATE TEMPORARY TABLE tablestore_target(
    measurement STRING,
    datasource STRING,
    tags STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    PRIMARY KEY (measurement,datasource, tags, `time`) NOT ENFORCED
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_sink_table',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'valueColumns'='binary_value,bool_value,double_value,long_value,string_value'
);

-- Insert data from the source table into the result table.
INSERT INTO tablestore_target
SELECT
    _m_name,
    _data_source,
    _tags,
    _time,
    binary_value,
    bool_value,
    double_value,
    long_value,
    string_value
    from tablestore_stream;
Read data from the source table and print the data in the Tablestore console

Read data from the source table named flink_source_table in batches. You can use the deployment debugging feature to simulate the running of a deployment. The debugging result is displayed in the lower part of the SQL editor.

Sample SQL statement:

-- Create a temporary table named tablestore_stream for the source data table.
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- Read data from the source table.
SELECT * FROM tablestore_stream LIMIT 100;
Read data from the source table and print the data to the TaskManager log

Read data from the source table named flink_source_table and print the results to the TaskManager log by using the Print connector.

Sample SQL statement:

-- Create a temporary table named tablestore_stream for the source data table.
CREATE TEMPORARY TABLE tablestore_stream(
    `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
    'connector' = 'ots',
    'endPoint' = 'https://xxx.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'xxx',
    'tableName' = 'flink_source_table',
    'tunnelName' = 'flink_source_tunnel',
    'accessId' = 'xxxxxxxxxxx',
    'accessKey' = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx',
    'ignoreDelete' = 'false' 
);

-- Create a temporary table named print_table for the result table. 
CREATE TEMPORARY TABLE print_table(
   `order` VARCHAR,
    orderid VARCHAR,
    customerid VARCHAR,
    customername VARCHAR
) WITH (
  'connector' = 'print',   -- Use the Print connector.
  'logger' = 'true'        -- Display the computing result in the development console of Realtime Compute for Apache Flink.
);

-- Print the fields of the source table.
INSERT INTO print_table
SELECT `order`,orderid,customerid,customername from tablestore_stream;

Appendix 2: Configure VVR dependencies

  1. Download VVR dependencies.

  2. Upload the VVR dependencies.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. Find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Artifacts.

    4. On the Artifacts page, click Upload Artifact and select the JAR package in which the VVR dependencies are stored.

  3. On the right side of the SQL editor of the job that you want to manage, click Configurations. In the Additional Dependencies field of the Configurations panel, select the JAR package in which the VVR dependencies are stored.