This topic describes how to create a Tablestore result table in Realtime Compute for Apache Flink. It also describes the mappings between the field data types of Tablestore and Realtime Compute for Apache Flink.

Notice This topic applies only to Blink 1.4.5 and later.

Introduction to Tablestore

Tablestore is a distributed NoSQL database service built on the Apsara distributed operating system of Alibaba Cloud. Tablestore adopts data sharding and load balancing technologies to scale out and handle concurrent transactions. You can use Tablestore to store and query a large amount of structured data in real time.

DDL syntax

In Realtime Compute for Apache Flink, you can use Tablestore to store output data. The following code shows an example:
CREATE TABLE stream_test_hotline_agent (
 name VARCHAR,
 age BIGINT,
 birthday BIGINT,
 PRIMARY KEY (name,age)
) WITH (
 type='ots',
 instanceName='<yourInstanceName>',
 tableName='<yourTableName>',
 accessId='<yourAccessId>',
 accessKey='<yourAccessSecret>',
 endPoint='<yourEndpoint>',
 valueColumns='birthday'
); 
Note
  • We recommend that you use the storage registration feature. For more information, see Register a Tablestore instance.
  • The value of the valueColumns parameter cannot be a declared primary key.
  • The declared Tablestore result table must contain at least one attribute column and the primary key column.

Parameters in the WITH clause

Parameter Description Remarks
type The type of the result table. Set the value to ots.
instanceName The name of a Tablestore instance. None.
tableName The name of the table in the database None.
endPoint The endpoint of the instance. For more information, see Endpoint.
accessId AccessKey ID None.
accessKey AccessKey Secret None.
valueColumns The name of a column to be inserted. Separate multiple column names with commas (,), for example, 'ID,NAME'.
bufferSize The maximum number of data records that can be stored in the buffer before deduplication is triggered. Optional. Default value: 5000. This value indicates that deduplication is triggered if the number of input data records in the buffer reaches 5,000.
Note Realtime Compute for Apache Flink removes data record duplicates based on the primary key of the Tablestore result table. You can set bufferSize to the number of data record duplicates to be removed. Then, Realtime Compute for Apache Flink writes the data records after duplicates are removed. You can set batchSize to the number of data records to be written at a time.
batchWriteTimeoutMs The write timeout period. Optional. Default value: 5000. Unit: milliseconds. This value indicates that if the number of input data records does not reach the value specified by the batchSize parameter within 5,000 milliseconds, all cached data is written into the result table.
batchSize The number of data records that can be written at a time. Optional. Default value: 100.
retryIntervalMs The retry interval. Optional. Default value: 1000. Unit: milliseconds.
maxRetryTimes The maximum number of retries for writing data to a table. Optional. Default value: 100.
ignoreDelete Specifies whether to ignore DELETE operations. Default value: false.

Field type mapping

Data type of Tablestore Data type of Realtime Compute for Apache Flink
INTEGER BIGINT
STRING VARCHAR
BOOLEAN BOOLEAN
DOUBLE DOUBLE
Note You must define a primary key in a Tablestore result table. Output data is appended to the Tablestore result table to update the result. For more information about update methods, see Update type.