All Products
Search
Document Center

Realtime Compute for Apache Flink:Tablestore

Last Updated:Mar 26, 2026

The Tablestore connector lets you use Tablestore tables as source tables, dimension tables, and sink tables in Flink SQL jobs running in streaming mode.

Connector capabilities

Item Description
Running mode Streaming mode
API type SQL API
Table type Source table, dimension table, and sink table
Data format N/A
Sink table metrics numBytesOut, numBytesOutPerSecond, numRecordsOut, numRecordsOutPerSecond, currentSendTime
Data update or deletion in a sink table Supported
For details on sink metrics, see Monitoring metrics.

Prerequisites

Before you begin, ensure that you have:

Syntax

All three table types use 'connector'='ots' in the WITH clause, with type-specific options.

Sink table

CREATE TABLE ots_sink (
  name VARCHAR,
  age BIGINT,
  birthday BIGINT,
  PRIMARY KEY (name, age) NOT ENFORCED
) WITH (
  'connector'='ots',
  'instanceName'='<yourInstanceName>',
  'tableName'='<yourTableName>',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'endPoint'='<yourEndpoint>',
  'valueColumns'='birthday'
);
A Tablestore sink table requires a primary key. Each output record is appended to the table to update existing data.

Dimension table

CREATE TABLE ots_dim (
  id INT,
  len INT,
  content STRING
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='<yourInstanceName>',
  'tableName'='<yourTableName>',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}'
);

Source table

CREATE TABLE tablestore_stream (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-source',
  'tableName'='flink_source_table',
  'tunnelName'='flinksourcestream',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'ignoreDelete'='false'
);

Available metadata

The Tablestore source table exposes two metadata fields via the METADATA keyword. Use these fields to track the operation type and timing of each change event.

Metadata key Flink data type Description
type STRING The data operation type (maps to OtsRecordType).
timestamp BIGINT The data operation time in microseconds (maps to OtsRecordTimestamp). Set to 0 for full data reads.

To read metadata fields, declare them with the METADATA FROM syntax:

CREATE TABLE tablestore_stream (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  record_type STRING METADATA FROM 'type',
  record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
  ...
);

Connector options

General options

All table types share the following options.

Option Type Required Default Description
connector String Yes Set to ots.
instanceName String Yes Name of the Tablestore instance.
endPoint String Yes Endpoint of the Tablestore instance. See Endpoints.
tableName String Yes Name of the table.
accessId String Yes AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user. See How do I view the AccessKey ID and AccessKey secret?
accessKey String Yes AccessKey secret of your Alibaba Cloud account or a RAM user.
connectTimeout Integer No 30000 Connection timeout in milliseconds.
socketTimeout Integer No 30000 Socket timeout in milliseconds.
ioThreadCount Integer No 4 Number of I/O threads.
callbackThreadPoolSize Integer No 4 Size of the callback thread pool.
Important

Use variables to store your AccessKey pair instead of hardcoding it.

Source table options

Option Type Required Default Description
tunnelName String Yes Name of the Tablestore tunnel. Create the tunnel in the Tablestore console before using this option. Supported tunnel types: Incremental, Full, and Differential. See the "Create a tunnel" section in Quick start.
ignoreDelete Boolean No false Whether to skip delete operations. true: skip; false: process delete operations.
skipInvalidData Boolean No false Whether to skip dirty data. true: skip dirty data; false: report an error. Requires Ververica Runtime (VVR) 8.0.4 or later.
retryStrategy Enum No TIME Retry policy. TIME: retry until retryTimeoutMs elapses; COUNT: retry until retryCount is reached.
retryCount Integer No 3 Maximum number of retries. Applies when retryStrategy is COUNT.
retryTimeoutMs Integer No 180000 Retry timeout in milliseconds. Applies when retryStrategy is TIME.
streamOriginColumnMapping String No Mapping from original column names to actual column names. Format: origin_col1:col1,origin_col2:col2.
outputSpecificRowType Boolean No false Whether to pass through the specific row type. false: all rows are treated as INSERT; true: rows can be INSERT, DELETE, or UPDATE_AFTER.
dataFetchTimeoutMs Integer No 10000 Maximum time in milliseconds to fetch data from a single partition. Reduce this value to lower overall synchronization latency when syncing many partitions. Requires VVR 8.0.10 or later.
enableRequestCompression Boolean No false Whether to enable request compression. Reduces bandwidth usage at the cost of higher CPU load. Requires VVR 8.0.10 or later.

Sink table options

Option Type Required Default Description
valueColumns String Yes Names of the columns to write. Separate multiple column names with commas (,).
retryIntervalMs Integer No 1000 Retry interval in milliseconds.
maxRetryTimes Integer No 10 Maximum number of retries.
bufferSize Integer No 5000 Maximum number of records buffered before a write is triggered.
batchWriteTimeoutMs Integer No 5000 Write timeout in milliseconds. If buffered records don't reach bufferSize within this period, all buffered records are written.
batchSize Integer No 100 Number of records written per batch. Maximum: 200.
ignoreDelete Boolean No false Whether to skip delete operations.
autoIncrementKey String No Name of the auto-increment primary key column. Configure only if the sink table has an auto-increment primary key column. Requires VVR 8.0.4 or later.
overwriteMode Enum No PUT Write mode. PUT: overwrite in PUT mode; UPDATE: overwrite in UPDATE mode. Dynamic column mode requires UPDATE.
defaultTimestampInMillisecond Long No -1 Default timestamp for writes. If not set, the current system time is used.
dynamicColumnSink Boolean No false Whether to enable dynamic column mode. In this mode, no columns are pre-defined; columns are inserted based on runtime values. The first N columns define the primary key. The second-to-last column holds the column name and the last column holds its value — both must be STRING. If enabled, overwriteMode must be UPDATE and auto-increment primary keys are not supported.
checkSinkTableMeta Boolean No true Whether to verify that the Tablestore table's primary key matches the primary key declared in the CREATE TABLE statement.
enableRequestCompression Boolean No false Whether to enable request compression during writes.
maxColumnsCount Integer No 128 Maximum number of columns written to the sink table. If set above 128, the error The count of attribute columns exceeds the maximum occurs. Requires VVR 8.0.10 or later.
storageType String No WIDE_COLUMN Sink table type. WIDE_COLUMN: wide-column table; TIMESERIES: time series table.

Dimension table options

How the cache works

The dimension table cache reduces repeated lookups against Tablestore. Choose a cache policy based on your table size and query patterns:

  • None: No caching. Every lookup hits Tablestore directly. Use when data changes frequently and freshness is critical.

  • LRU: Caches a fixed number of recently accessed records. When a lookup misses the cache, the connector queries Tablestore and updates the cache with the result. Set cacheSize and cacheTTLMs when using this policy.

  • ALL (default): Loads the entire dimension table into the cache before the job starts. All subsequent lookups are served from cache. When the cache expires (cacheTTLMs), the connector reloads all data. Use ALL when the table is small and you expect many missing-key lookups. When using ALL, increase the memory of the join node — the cache requires approximately twice the size of the remote table.

Option Type Required Default Description
retryIntervalMs Integer No 1000 Retry interval in milliseconds.
maxRetryTimes Integer No 10 Maximum number of retries.
cache String No ALL Cache policy: None, LRU, or ALL.
cacheSize Integer No Maximum number of cached records. Applies when cache is LRU.
cacheTTLMs Integer No Cache TTL in milliseconds. For LRU: timeout per entry. For ALL: full-cache refresh interval. Leave unset to disable expiration.
cacheEmpty Boolean No Whether to cache empty (no-match) results. true: cache; false: do not cache.
cacheReloadTimeBlackList String No Time windows during which the ALL cache is not refreshed. Format: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Separate multiple windows with commas; use -> between start and end times.
async Boolean No false Whether to enable asynchronous lookup. true: async lookups (results are not ordered); false: synchronous lookups.

Data type mappings

Source table

Tablestore type Flink SQL type
INTEGER BIGINT
STRING STRING
BOOLEAN BOOLEAN
DOUBLE DOUBLE
BINARY BINARY

Sink table

Flink SQL type Tablestore type
BINARY BINARY
VARBINARY BINARY
CHAR STRING
VARCHAR STRING
TINYINT INTEGER
SMALLINT INTEGER
INTEGER INTEGER
BIGINT INTEGER
FLOAT DOUBLE
DOUBLE DOUBLE
BOOLEAN BOOLEAN

Examples

Read from Tablestore and write to Tablestore

This example reads order data from a Tablestore source table via Tunnel Service and writes it to a Tablestore sink table. The sink table uses an auto-increment primary key column.

CREATE TEMPORARY TABLE tablestore_stream (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-source',
  'tableName'='flink_source_table',
  'tunnelName'='flinksourcestream',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'ignoreDelete'='false',
  'skipInvalidData'='false'
);

CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`, orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-sink',
  'tableName'='flink_sink_table',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'valueColumns'='customerid,customername',
  'autoIncrementKey'='${auto_increment_primary_key_name}'
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

Synchronize a wide-column table to a time series table

This example reads from a wide-column source table and writes to a time series sink table. The sink table's tags column uses MAP<STRING, STRING> to hold tag key-value pairs, and storageType is set to TIMESERIES.

CREATE TEMPORARY TABLE timeseries_source (
  measurement STRING,
  datasource STRING,
  tag_a STRING,
  `time` BIGINT,
  binary_value BINARY,
  bool_value BOOLEAN,
  double_value DOUBLE,
  long_value BIGINT,
  string_value STRING,
  tag_b STRING,
  tag_c STRING,
  tag_d STRING,
  tag_e STRING,
  tag_f STRING
) WITH (
  'connector'='ots',
  'endPoint'='https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
  'instanceName'='iotstore-test',
  'tableName'='test_ots_timeseries_2',
  'tunnelName'='timeseries_source_tunnel_2',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'ignoreDelete'='true'
);

CREATE TEMPORARY TABLE timeseries_sink (
  measurement STRING,
  datasource STRING,
  tags MAP<STRING, STRING>,
  `time` BIGINT,
  binary_value BINARY,
  bool_value BOOLEAN,
  double_value DOUBLE,
  long_value BIGINT,
  string_value STRING,
  tag_b STRING,
  tag_c STRING,
  tag_d STRING,
  tag_e STRING,
  tag_f STRING,
  PRIMARY KEY (measurement, datasource, tags, `time`) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
  'instanceName'='iotstore-test',
  'tableName'='test_timeseries_sink_table_2',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'storageType'='TIMESERIES'
);

-- Build the tags map from individual tag columns and insert into the time series sink table
INSERT INTO timeseries_sink
SELECT
  measurement,
  datasource,
  MAP['tag_a', tag_a, 'tag_b', tag_b, 'tag_c', tag_c, 'tag_d', tag_d, 'tag_e', tag_e, 'tag_f', tag_f] AS tags,
  `time`,
  binary_value,
  bool_value,
  double_value,
  long_value,
  string_value,
  tag_b,
  tag_c,
  tag_d,
  tag_e,
  tag_f
FROM timeseries_source;

What's next