All Products
Search
Document Center

Realtime Compute for Apache Flink:Tablestore connector

Last Updated:Mar 13, 2024

This topic describes how to use the Tablestore connector.

Background information

Tablestore is a table-based, low-cost serverless storage service that is optimized for storing large amounts of structured data. Tablestore allows you to query and retrieve online data within milliseconds and analyze stored data in multiple dimensions. Tablestore is suitable for various scenarios such as a large number of bills, instant messaging (IM), Internet of Things (IoT), Internet of Vehicles (IoV), risk management, and intelligent recommendation. Tablestore also provides a deeply optimized end-to-end storage solution for IoT applications. For more information, see What is Tablestore?

The following table describes the capabilities supported by the Tablestore connector.

Item

Description

Running mode

Streaming mode

API type

SQL API

Table type

Source table, dimension table, and result table

Data format

N/A

Metric

  • Metrics for source tables: none

  • Metrics for dimension tables: none

  • Metrics for result tables:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

Note

For more information about the metrics and how to view the metrics, see Report metrics of fully managed Flink to other platforms.

Data update or deletion in a result table

Supported

Prerequisites

A Tablestore instance is purchased and a Tablestore table is created. For more information, see Use Tablestore.

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 3.0.0 or later supports the Tablestore connector.

Syntax

  • Statement for creating a result table

    CREATE TABLE ots_sink (
      name VARCHAR,
      age BIGINT,
      birthday BIGINT,
      primary key(name,age) not enforced
    ) WITH (
      'connector'='ots',
      'instanceName'='<yourInstanceName>',
      'tableName'='<yourTableName>',
      'accessId'='${ak_id}',
      'accessKey'='${ak_secret}',
      'endPoint'='<yourEndpoint>',
      'valueColumns'='birthday'
    );
    Note

    You must specify a primary key for a Tablestore result table. The latest output data is appended to the Tablestore result table to update the table data.

  • Statement for creating a dimension table

    CREATE TABLE ots_dim (
      id int,
      len int,
      content STRING
    ) WITH (
      'connector'='ots',
      'endPoint'='<yourEndpoint>',
      'instanceName'='<yourInstanceName>',
      'tableName'='<yourTableName>',
      'accessId'='${ak_id}',
      'accessKey'='${ak_secret}'
    );
  • Statement for creating a source table

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR
    ) WITH (
      'connector'='ots',
      'endPoint' ='<yourEndpoint>',
      'instanceName' = 'flink-source',
      'tableName' ='flink_source_table',
      'tunnelName' = 'flinksourcestream',
      'accessId' ='${ak_id}',
      'accessKey' ='${ak_secret}',
      'ignoreDelete' = 'false'
    );

    The fields whose data needs to be consumed and the OtsRecordType and OtsRecordTimestamp fields in the returned data of Tunnel Service can be read and written as attribute columns. The following table describes the fields.

    Field

    Mapping field in Flink

    Description

    OtsRecordType

    type

    The data operation type.

    OtsRecordTimestamp

    timestamp

    The data operation time. Unit: microseconds.

    Note

    If full data is read, the value of the OtsRecordTimestamp parameter is set to 0.

    If you want to read the OtsRecordType and OtsRecordTimestamp fields, you can use the METADATA keyword provided by Realtime Compute for Apache Flink to obtain the attribute fields from the Tablestore source table. The following example shows the DDL statement.

    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR,
      record_type STRING METADATA FROM 'type',
      record_timestamp BIGINT METADATA FROM 'timestamp'
    ) WITH (
      ...
    );

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    STRING

    Yes

    No default value

    Set the value to ots.

    instanceName

    The name of the Tablestore instance.

    STRING

    Yes

    No default value

    N/A.

    endPoint

    The endpoint of the Tablestore instance.

    STRING

    Yes

    No default value

    For more information, see Endpoints.

    tableName

    The name of the metatable.

    STRING

    Yes

    No default value

    N/A.

    accessId

    The AccessKey ID of your Alibaba Cloud account or a RAM user.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.

    accessKey

    The AccessKey secret of your Alibaba Cloud account or a RAM user.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

    retryIntervalMs

    The retry interval.

    INTEGER

    No

    1000

    Unit: milliseconds.

    maxRetryTimes

    The maximum number of retries for writing data to the table.

    INTEGER

    No

    100

    N/A.

    connectTimeout

    The timeout period for the Tablestore connector to connect to Tablestore.

    INTEGER

    No

    30000

    Unit: milliseconds.

    socketTimeout

    The socket timeout period for the Tablestore connector to connect to Tablestore.

    INTEGER

    No

    30000

    Unit: milliseconds.

  • Parameters only for source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    tunnelName

    The tunnel name of the Tablestore source table.

    STRING

    Yes

    No default value

    You must create a tunnel in the Tablestore console in advance. When you create a tunnel, specify the tunnel name and tunnel type. The tunnel type can be Incremental, Full, or Differential. For more information about how to create a tunnel, see Create a tunnel.

    ignoreDelete

    Specifies whether to ignore delete operations.

    BOOLEAN

    No

    false

    Valid values:

    • true: Delete operations are ignored.

    • false: Delete operations are not ignored. This is the default value.

    skipInvalidData

    Specifies whether to ignore dirty data. If dirty data is not ignored, an error is reported when the system processes the dirty data.

    BOOLEAN

    No

    false

    Valid values:

    • true: Dirty data is ignored.

    • false: Dirty data is not ignored. This is the default value.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.4 or later supports this parameter.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    valueColumns

    The names of the columns that you want to insert.

    STRING

    Yes

    No default value

    Separate multiple fields, such as the ID or NAME field, with commas (,).

    bufferSize

    The maximum number of data records that can be stored in the buffer before data deduplication is triggered.

    INTEGER

    No

    5000

    N/A.

    batchWriteTimeoutMs

    The write timeout period.

    INTEGER

    No

    5000

    Unit: milliseconds. If the number of cached data records does not reach the upper limit within the period of time specified by batchWriteTimeoutMs, all cached data is written to the result table.

    batchSize

    The number of data records that can be written at a time.

    INTEGER

    No

    100

    N/A.

    ignoreDelete

    Specifies whether to ignore delete operations.

    BOOLEAN

    No

    False

    N/A.

    autoIncrementKey

    The name of the auto-increment primary key column. If the result table contains an auto-increment primary key column, you can configure this parameter to specify the name of the auto-increment primary key column.

    STRING

    No

    No default value

    If the result table does not have an auto-increment primary key column, you do not need to configure this parameter.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.4 or later supports this parameter.

  • Parameters only for dimension tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    cache

    The cache policy.

    STRING

    No

    ALL

    Valid values:

    • None: No data is cached.

    • LRU: Only specific data in the dimension table is cached. Each time the system receives a data record, the system searches the cache. If the system does not find the record in the cache, the system searches for the data record in the physical dimension table.

      If this cache policy is used, you must configure the cacheSize and cacheTTLMs parameters.

    • ALL: All data in the dimension table is cached. This is the default value. Before a job runs, the system loads all data in the dimension table to the cache. This way, the cache is searched for all subsequent queries in the dimension table. If the system does not find the data record in the cache, the join key does not exist. The system reloads all data in the cache after cache entries expire.

      If the amount of data in a remote table is small and a large number of missing keys exist, we recommend that you set this parameter to ALL. The source table and dimension table cannot be associated based on the ON clause. If you use this cache policy, you must configure the cacheTTLMs and cacheReloadTimeBlackList parameters.

      Note

      If you set the cache parameter to ALL, you must increase the memory of the node for joining tables because the system asynchronously loads data from the dimension table. The increased memory size is twice that of the remote table.

    cacheSize

    The maximum number of rows of data records that can be cached.

    INTEGER

    No

    No default value

    If you set the cache parameter to LRU, you can configure this parameter.

    cacheTTLMs

    The cache timeout period.

    INTEGER

    No

    No default value

    Unit: milliseconds. The configuration of the cacheTTLMs parameter varies based on the value of the cache parameter.

    • If you set the cache parameter to None, the cacheTTLMs parameter can be left empty. This indicates that cache entries do not expire.

    • If you set the cache parameter to LRU, the cacheTTLMs parameter specifies the timeout period of the cache. By default, cache entries do not expire.

    • If you set the cache parameter to ALL, the cacheTTLMs parameter specifies the interval at which the system refreshes the cache. By default, the cache is not reloaded.

    cacheEmpty

    Specifies whether to cache empty results.

    BOOLEAN

    No

    No default value

    • true: Empty results are cached.

    • false: Empty results are not cached.

    cacheReloadTimeBlackList

    The time periods during which cache is not refreshed. This parameter takes effect when the cache parameter is set to ALL. The cache is not refreshed during the periods of time that you specify for this parameter. This parameter is suitable for large-scale online promotional events such as Double 11.

    STRING

    No

    No default value

    The following example shows the format of the values: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Use delimiters based on the following rules:

    • Separate multiple time periods with commas (,).

    • Separate the start time and end time of each time period with an arrow (->) that is a combination of a hyphen (-) and a closing angle bracket (>).

    async

    Specifies whether to enable data synchronization in asynchronous mode.

    BOOLEAN

    No

    false

    • true: Data synchronization in asynchronous mode is enabled. By default, the returned data is not sorted when data is synchronized in asynchronous mode. You can configure the asyncResultOrder parameter to specify whether the returned data is sorted when data is synchronized in asynchronous mode.

    • false: Data synchronization in asynchronous mode is disabled. This is the default value.

    asyncResultOrder

    Specifies whether the returned data is sorted when data is synchronized in asynchronous mode.

    STRING

    No

    unordered

    • unordered: The returned data is not sorted when data is synchronized in asynchronous mode. This is the default value.

    • ordered: The returned data is sorted when data is synchronized in asynchronous mode.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 8.0.0 or later supports this parameter.

Data type mappings

Data type of fields in Tablestore

Data type of fields in Flink

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

Sample code

CREATE TEMPORARY TABLE tablestore_stream(
 `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH 
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='${ak_id}',
  'accessKey' ='${ak_secret}',
  'ignoreDelete' = 'false',
  'skipInvalidData' ='false' 
);

CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-sink',
  'tableName'='flink_sink_table',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'valueColumns'='customerid,customername',
  'autoIncrementKey'='${auto_increment_primary_key_name}' 
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;