The Tablestore connector lets you use Tablestore tables as source tables, dimension tables, and sink tables in Flink SQL jobs running in streaming mode.
Connector capabilities
| Item | Description |
|---|---|
| Running mode | Streaming mode |
| API type | SQL API |
| Table type | Source table, dimension table, and sink table |
| Data format | N/A |
| Sink table metrics | numBytesOut, numBytesOutPerSecond, numRecordsOut, numRecordsOutPerSecond, currentSendTime |
| Data update or deletion in a sink table | Supported |
For details on sink metrics, see Monitoring metrics.
Prerequisites
Before you begin, ensure that you have:
-
A Tablestore instance and a Tablestore table. See Use Tablestore.
Syntax
All three table types use 'connector'='ots' in the WITH clause, with type-specific options.
Sink table
CREATE TABLE ots_sink (
name VARCHAR,
age BIGINT,
birthday BIGINT,
PRIMARY KEY (name, age) NOT ENFORCED
) WITH (
'connector'='ots',
'instanceName'='<yourInstanceName>',
'tableName'='<yourTableName>',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'endPoint'='<yourEndpoint>',
'valueColumns'='birthday'
);
A Tablestore sink table requires a primary key. Each output record is appended to the table to update existing data.
Dimension table
CREATE TABLE ots_dim (
id INT,
len INT,
content STRING
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='<yourInstanceName>',
'tableName'='<yourTableName>',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}'
);
Source table
CREATE TABLE tablestore_stream (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-source',
'tableName'='flink_source_table',
'tunnelName'='flinksourcestream',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'ignoreDelete'='false'
);
Available metadata
The Tablestore source table exposes two metadata fields via the METADATA keyword. Use these fields to track the operation type and timing of each change event.
| Metadata key | Flink data type | Description |
|---|---|---|
type |
STRING | The data operation type (maps to OtsRecordType). |
timestamp |
BIGINT | The data operation time in microseconds (maps to OtsRecordTimestamp). Set to 0 for full data reads. |
To read metadata fields, declare them with the METADATA FROM syntax:
CREATE TABLE tablestore_stream (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);
Connector options
General options
All table types share the following options.
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | — | Set to ots. |
instanceName |
String | Yes | — | Name of the Tablestore instance. |
endPoint |
String | Yes | — | Endpoint of the Tablestore instance. See Endpoints. |
tableName |
String | Yes | — | Name of the table. |
accessId |
String | Yes | — | AccessKey ID of your Alibaba Cloud account or a Resource Access Management (RAM) user. See How do I view the AccessKey ID and AccessKey secret? |
accessKey |
String | Yes | — | AccessKey secret of your Alibaba Cloud account or a RAM user. |
connectTimeout |
Integer | No | 30000 | Connection timeout in milliseconds. |
socketTimeout |
Integer | No | 30000 | Socket timeout in milliseconds. |
ioThreadCount |
Integer | No | 4 | Number of I/O threads. |
callbackThreadPoolSize |
Integer | No | 4 | Size of the callback thread pool. |
Use variables to store your AccessKey pair instead of hardcoding it.
Source table options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
tunnelName |
String | Yes | — | Name of the Tablestore tunnel. Create the tunnel in the Tablestore console before using this option. Supported tunnel types: Incremental, Full, and Differential. See the "Create a tunnel" section in Quick start. |
ignoreDelete |
Boolean | No | false | Whether to skip delete operations. true: skip; false: process delete operations. |
skipInvalidData |
Boolean | No | false | Whether to skip dirty data. true: skip dirty data; false: report an error. Requires Ververica Runtime (VVR) 8.0.4 or later. |
retryStrategy |
Enum | No | TIME | Retry policy. TIME: retry until retryTimeoutMs elapses; COUNT: retry until retryCount is reached. |
retryCount |
Integer | No | 3 | Maximum number of retries. Applies when retryStrategy is COUNT. |
retryTimeoutMs |
Integer | No | 180000 | Retry timeout in milliseconds. Applies when retryStrategy is TIME. |
streamOriginColumnMapping |
String | No | — | Mapping from original column names to actual column names. Format: origin_col1:col1,origin_col2:col2. |
outputSpecificRowType |
Boolean | No | false | Whether to pass through the specific row type. false: all rows are treated as INSERT; true: rows can be INSERT, DELETE, or UPDATE_AFTER. |
dataFetchTimeoutMs |
Integer | No | 10000 | Maximum time in milliseconds to fetch data from a single partition. Reduce this value to lower overall synchronization latency when syncing many partitions. Requires VVR 8.0.10 or later. |
enableRequestCompression |
Boolean | No | false | Whether to enable request compression. Reduces bandwidth usage at the cost of higher CPU load. Requires VVR 8.0.10 or later. |
Sink table options
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
valueColumns |
String | Yes | — | Names of the columns to write. Separate multiple column names with commas (,). |
retryIntervalMs |
Integer | No | 1000 | Retry interval in milliseconds. |
maxRetryTimes |
Integer | No | 10 | Maximum number of retries. |
bufferSize |
Integer | No | 5000 | Maximum number of records buffered before a write is triggered. |
batchWriteTimeoutMs |
Integer | No | 5000 | Write timeout in milliseconds. If buffered records don't reach bufferSize within this period, all buffered records are written. |
batchSize |
Integer | No | 100 | Number of records written per batch. Maximum: 200. |
ignoreDelete |
Boolean | No | false | Whether to skip delete operations. |
autoIncrementKey |
String | No | — | Name of the auto-increment primary key column. Configure only if the sink table has an auto-increment primary key column. Requires VVR 8.0.4 or later. |
overwriteMode |
Enum | No | PUT | Write mode. PUT: overwrite in PUT mode; UPDATE: overwrite in UPDATE mode. Dynamic column mode requires UPDATE. |
defaultTimestampInMillisecond |
Long | No | -1 | Default timestamp for writes. If not set, the current system time is used. |
dynamicColumnSink |
Boolean | No | false | Whether to enable dynamic column mode. In this mode, no columns are pre-defined; columns are inserted based on runtime values. The first N columns define the primary key. The second-to-last column holds the column name and the last column holds its value — both must be STRING. If enabled, overwriteMode must be UPDATE and auto-increment primary keys are not supported. |
checkSinkTableMeta |
Boolean | No | true | Whether to verify that the Tablestore table's primary key matches the primary key declared in the CREATE TABLE statement. |
enableRequestCompression |
Boolean | No | false | Whether to enable request compression during writes. |
maxColumnsCount |
Integer | No | 128 | Maximum number of columns written to the sink table. If set above 128, the error The count of attribute columns exceeds the maximum occurs. Requires VVR 8.0.10 or later. |
storageType |
String | No | WIDE_COLUMN |
Sink table type. WIDE_COLUMN: wide-column table; TIMESERIES: time series table. |
Dimension table options
How the cache works
The dimension table cache reduces repeated lookups against Tablestore. Choose a cache policy based on your table size and query patterns:
-
None: No caching. Every lookup hits Tablestore directly. Use when data changes frequently and freshness is critical.
-
LRU: Caches a fixed number of recently accessed records. When a lookup misses the cache, the connector queries Tablestore and updates the cache with the result. Set
cacheSizeandcacheTTLMswhen using this policy. -
ALL (default): Loads the entire dimension table into the cache before the job starts. All subsequent lookups are served from cache. When the cache expires (
cacheTTLMs), the connector reloads all data. Use ALL when the table is small and you expect many missing-key lookups. When using ALL, increase the memory of the join node — the cache requires approximately twice the size of the remote table.
| Option | Type | Required | Default | Description |
|---|---|---|---|---|
retryIntervalMs |
Integer | No | 1000 | Retry interval in milliseconds. |
maxRetryTimes |
Integer | No | 10 | Maximum number of retries. |
cache |
String | No | ALL | Cache policy: None, LRU, or ALL. |
cacheSize |
Integer | No | — | Maximum number of cached records. Applies when cache is LRU. |
cacheTTLMs |
Integer | No | — | Cache TTL in milliseconds. For LRU: timeout per entry. For ALL: full-cache refresh interval. Leave unset to disable expiration. |
cacheEmpty |
Boolean | No | — | Whether to cache empty (no-match) results. true: cache; false: do not cache. |
cacheReloadTimeBlackList |
String | No | — | Time windows during which the ALL cache is not refreshed. Format: 2017-10-24 14:00 -> 2017-10-24 15:00, 2017-11-10 23:30 -> 2017-11-11 08:00. Separate multiple windows with commas; use -> between start and end times. |
async |
Boolean | No | false | Whether to enable asynchronous lookup. true: async lookups (results are not ordered); false: synchronous lookups. |
Data type mappings
Source table
| Tablestore type | Flink SQL type |
|---|---|
| INTEGER | BIGINT |
| STRING | STRING |
| BOOLEAN | BOOLEAN |
| DOUBLE | DOUBLE |
| BINARY | BINARY |
Sink table
| Flink SQL type | Tablestore type |
|---|---|
| BINARY | BINARY |
| VARBINARY | BINARY |
| CHAR | STRING |
| VARCHAR | STRING |
| TINYINT | INTEGER |
| SMALLINT | INTEGER |
| INTEGER | INTEGER |
| BIGINT | INTEGER |
| FLOAT | DOUBLE |
| DOUBLE | DOUBLE |
| BOOLEAN | BOOLEAN |
Examples
Read from Tablestore and write to Tablestore
This example reads order data from a Tablestore source table via Tunnel Service and writes it to a Tablestore sink table. The sink table uses an auto-increment primary key column.
CREATE TEMPORARY TABLE tablestore_stream (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-source',
'tableName'='flink_source_table',
'tunnelName'='flinksourcestream',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'ignoreDelete'='false',
'skipInvalidData'='false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`, orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'valueColumns'='customerid,customername',
'autoIncrementKey'='${auto_increment_primary_key_name}'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;
Synchronize a wide-column table to a time series table
This example reads from a wide-column source table and writes to a time series sink table. The sink table's tags column uses MAP<STRING, STRING> to hold tag key-value pairs, and storageType is set to TIMESERIES.
CREATE TEMPORARY TABLE timeseries_source (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
) WITH (
'connector'='ots',
'endPoint'='https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName'='iotstore-test',
'tableName'='test_ots_timeseries_2',
'tunnelName'='timeseries_source_tunnel_2',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'ignoreDelete'='true'
);
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tags MAP<STRING, STRING>,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY (measurement, datasource, tags, `time`) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName'='iotstore-test',
'tableName'='test_timeseries_sink_table_2',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'storageType'='TIMESERIES'
);
-- Build the tags map from individual tag columns and insert into the time series sink table
INSERT INTO timeseries_sink
SELECT
measurement,
datasource,
MAP['tag_a', tag_a, 'tag_b', tag_b, 'tag_c', tag_c, 'tag_d', tag_d, 'tag_e', tag_e, 'tag_f', tag_f] AS tags,
`time`,
binary_value,
bool_value,
double_value,
long_value,
string_value,
tag_b,
tag_c,
tag_d,
tag_e,
tag_f
FROM timeseries_source;