全部產品
Search
文件中心

Realtime Compute for Apache Flink:Table StoreTablestore(OTS)

更新時間:Jun 26, 2025

本文為您介紹如何使用Table StoreTablestore(OTS)連接器。

背景資訊

Table StoreTablestore(又名OTS)面向海量結構化資料提供Serverless表格儲存體服務,同時針對物聯網情境深度最佳化提供一站式的IoTstore解決方案。適用于海量賬單、IM訊息、物聯網、車連網、風控和推薦等情境中的結構化資料存放區,提供海量資料低成本儲存、毫秒級的線上資料查詢和檢索以及靈活的資料分析能力。詳情請參見Table StoreTablestore

Tablestore連接器支援的資訊如下。

類別

詳情

運行模式

流模式

API種類

SQL

支援類型

源表、維表和結果表

資料格式

暫不支援

特有監控指標

  • 源表:無

  • 維表:無

  • 結果表:

    • numBytesOut

    • numBytesOutPerSecond

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

說明

指標含義詳情,請參見監控指標說明

是否支援更新或刪除結果表資料

前提條件

已購買Tablestore執行個體並建立表,詳情請參見使用流程

文法結構

結果表

CREATE TABLE ots_sink (
  name VARCHAR,
  age BIGINT,
  birthday BIGINT,
  primary key(name,age) not enforced
) WITH (
  'connector'='ots',
  'instanceName'='<yourInstanceName>',
  'tableName'='<yourTableName>',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'endPoint'='<yourEndpoint>',
  'valueColumns'='birthday'
);
說明

Tablestore結果表必須定義有Primary Key,輸出資料以Update方式追加Tablestore表。

維表

CREATE TABLE ots_dim (
  id int,
  len int,
  content STRING
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='<yourInstanceName>',
  'tableName'='<yourTableName>',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}'
);

源表

CREATE TABLE tablestore_stream(
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH (
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='${ak_id}',
  'accessKey' ='${ak_secret}',
  'ignoreDelete' = 'false'
);

屬性列支援讀取待消費欄位和Tunnel Service,以及返回資料中的OtsRecordTypeOtsRecordTimestamp兩個欄位。欄位說明請參見下表。

欄位名

Flink映射名

描述

OtsRecordType

type

資料操作類型。

OtsRecordTimestamp

timestamp

資料操作時間,單位為微秒。

說明

全量讀取資料時,OtsRecordTimestamp取值為0。

當需要讀取OtsRecordTypeOtsRecordTimestamp欄位時,Flink提供了METADATA關鍵字用於擷取源表中的屬性欄位,具體DDL樣本如下。

CREATE TABLE tablestore_stream(
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  record_type STRING METADATA FROM 'type',
  record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
  ...
);

WITH參數

通用

參數

說明

資料類型

是否必填

預設值

備忘

connector

表類型。

String

固定值為ots

instanceName

執行個體名。

String

無。

endPoint

執行個體訪問地址。

String

請參見服務地址

tableName

表名。

String

無。

accessId

阿里雲帳號或者RAM使用者的AccessKey ID。

String

詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?

重要

為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數

accessKey

阿里雲帳號或者RAM使用者的AccessKey Secret。

String

connectTimeout

連接器串連Tablestore的逾時時間。

Integer

30000

單位為毫秒。

socketTimeout

連接器串連Tablestore的Socket逾時時間。

Integer

30000

單位為毫秒。

ioThreadCount

IO線程數量。

Integer

4

無。

callbackThreadPoolSize

回調線程池大小。

Integer

4

無。

源表專屬

參數

說明

資料類型

是否必填

預設值

備忘

tunnelName

Table Store資料表的資料通道名稱。

String

您需要提前在Table Store產品側建立好通道名稱和對應的通道類型(增量、全量和全量加增量)。關於建立通道的具體操作,請參見操作步驟

ignoreDelete

是否忽略DELETE操作類型的即時資料。

Boolean

false

參數取值如下:

  • true:忽略。

  • false(預設值):不忽略。

skipInvalidData

是否忽略髒資料。如果不忽略髒資料,則處理髒資料時會進行報錯。

Boolean

false

參數取值如下:

  • true:忽略髒資料。

  • false(預設值):不忽略髒資料。

說明

僅Realtime Compute引擎VVR 8.0.4及以上版本支援該參數。

retryStrategy

重試策略。

Enum

TIME

參數取值如下:

  • TIME:在逾時時間retryTimeoutMs內持續進行重試。

  • COUNT:在最大重試次數retryCount內持續進行重試。

retryCount

重試次數。

Integer

3

當retryStrategy設定為COUNT時,可以設定重試次數。

retryTimeoutMs

重試的逾時時間。

Integer

180000

當retryStrategy設定為TIME時,可以設定重試的逾時時間,單位為毫秒。

streamOriginColumnMapping

原始列名到真實列名的映射。

String

原始列名與真實列名之間,請使用半形冒號(:)隔開;多組映射之間,請使用半形逗號(,)隔開。例如origin_col1:col1,origin_col2:col2

outputSpecificRowType

是否透傳具體的RowType。

Boolean

false

參數取值如下:

  • false:不透傳,所有資料RowType均為INSERT。

  • true:透傳,將根據透傳的類型相應設定為INSERT、DELETE或UPDATE_AFTER。

dataFetchTimeoutMs

讀取表的分區的最大時間開銷。

Integer

10000

單位為毫秒。當表的分區數量較多且同步任務對延遲較為敏感時,可以適當調小此參數,以降低同步延遲。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援。

enableRequestCompression

是否開啟資料壓縮。

Boolean

false

在讀取資料時是否啟用資料壓縮功能。啟用後將節省頻寬,但可能會增加CPU負載。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援。

結果表專屬

參數

說明

資料類型

是否必填

預設值

備忘

retryIntervalMs

稍候再試時間。

Integer

1000

單位為毫秒。

maxRetryTimes

最大重試次數。

Integer

10

無。

valueColumns

插入欄位的列名。

String

多個欄位以半形逗號(,)分割,例如ID或NAME。

bufferSize

流入多少條資料後開始輸出。

Integer

5000

無。

batchWriteTimeoutMs

寫入逾時的時間。

Integer

5000

單位為毫秒。表示如果緩衝中的資料在等待batchWriteTimeoutMs秒後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。

batchSize

一次批量寫入的條數。

Integer

100

最大值為200。

ignoreDelete

是否忽略DELETE操作。

Boolean

False

無。

autoIncrementKey

當結果表中包含主鍵自增列時,通過該參數指定主鍵自增列的列名稱。

String

當結果表沒有主鍵自增列時,請不要設定此參數。

說明

僅Realtime Compute引擎VVR 8.0.4及以上版本支援該參數。

overwriteMode

資料覆蓋模式。

Enum

PUT

參數取值如下:

  • PUT:以PUT方式將資料寫入到Tablestore表。

  • UPDATE:以UPDATE方式寫入到Tablestore表。

說明

動態列模式下只支援UPDATE模式。

defaultTimestampInMillisecond

設定寫入Tablestrore資料的預設時間戳記。

Long

-1

如果不指定,則會使用系統當前的毫秒時間戳記。

dynamicColumnSink

是否開啟動態列模式。

Boolean

false

動態列模式適用於在表定義中無需指定列名,根據作業運行情況動態插入資料列的情境。建表語句中主鍵需要定義為前若干列,最後兩列中前一列的值作為列名變數,且類型必須為String,後一列的值作為該列對應的值。

說明

開啟動態列模式時,不支援主鍵自增列,且參數overwriteMode必須設定為UPDATE。

checkSinkTableMeta

是否檢查結果表中繼資料。

Boolean

true

若設定為true,會檢查Tablestore表的主鍵列和此處的建表語句中指定的主鍵是否一致。

enableRequestCompression

資料寫入過程中是否開啟資料壓縮。

Boolean

false

無。

maxColumnsCount

寫入下遊表的最大列數。

Integer

128

如果寫入的列數超過128,則可能會出現錯誤:The count of attribute columns exceeds the maximum,此時需要調整該參數。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援。

storageType

寫入結果表的類型。

String

WIDE_COLUMN

參數取值如下:

  • WIDE_COLUMN:結果表是寬表。

  • TIMESERIES:結果表是時序表。

說明

僅Realtime Compute引擎VVR 8.0.10及以上版本支援。

維表專屬

參數

說明

資料類型

是否必填

預設值

備忘

retryIntervalMs

稍候再試時間。

Integer

1000

單位為毫秒。

maxRetryTimes

最大重試次數。

Integer

10

無。

cache

緩衝策略。

String

ALL

目前Tablestore維表支援以下三種緩衝策略:

  • None:無緩衝。

  • LRU:緩衝維表裡的部分資料。源表的每條資料都會觸發系統先在Cache中尋找資料,如果沒有找到,則去物理維表中尋找。

    需要配置相關參數:緩衝大小(cacheSize)和緩衝更新時間間隔(cacheTTLMs)。

  • ALL(預設值):緩衝維表裡的所有資料。在Job運行前,系統會將維表中所有資料載入到Cache中,之後所有的維表尋找資料都會通過Cache進行。如果在Cache中無法找到資料,則KEY不存在,並在Cache到期後重新載入一遍全量Cache。

    適用於遠端資料表資料量小且MISS KEY(源表資料和維表JOIN時,ON條件無法關聯)特別多的情境。需要配置相關參數:緩衝更新時間間隔cacheTTLMs,更新時間黑名單cacheReloadTimeBlackList

    說明

    因為系統會非同步載入維表資料,所以在使用CACHE ALL時,需要增加維表JOIN節點的記憶體,增加的記憶體大小為遠端資料表資料量的兩倍。

cacheSize

緩衝大小。

Integer

當緩衝策略選擇LRU時,可以設定緩衝大小。

說明

單位為資料條數。

cacheTTLMs

緩衝失效時間。

Integer

單位為毫秒。cacheTTLMs配置和cache有關:

  • 如果cache配置為None,則cacheTTLMs可以不配置,表示緩衝不逾時。

  • 如果cache配置為LRU,則cacheTTLMs為緩衝逾時時間。預設不到期。

  • 如果cache配置為ALL,則cacheTTLMs為緩衝載入時間。預設不重新載入。

cacheEmpty

是否緩衝空結果。

Boolean

  • true:緩衝

  • false:不緩衝

cacheReloadTimeBlackList

更新時間黑名單。在緩衝策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內做Cache更新(例如雙11情境)。

String

格式為2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔字元的使用方式如下所示:

  • 用半形逗號(,)來分隔多個黑名單。

  • 用箭頭(->)來分割黑名單的起始結束時間。

async

是否非同步返回資料。

Boolean

false

  • true:表示非同步返回資料。非同步返回資料預設是無序的。

  • false(預設值):表示不進行非同步返回資料。

類型映射

源表

Tablestore欄位類型

Flink欄位類型

INTEGER

BIGINT

STRING

STRING

BOOLEAN

BOOLEAN

DOUBLE

DOUBLE

BINARY

BINARY

結果表

Flink欄位類型

Tablestore欄位類型

BINARY

BINARY

VARBINARY

CHAR

STRING

VARCHAR

TINYINT

INTEGER

SMALLINT

INTEGER

BIGINT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

使用樣本

樣本1

從OTS源表向OTS結果表進行資料寫入。

CREATE TEMPORARY TABLE tablestore_stream(
 `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH 
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='${ak_id}',
  'accessKey' ='${ak_secret}',
  'ignoreDelete' = 'false',
  'skipInvalidData' ='false' 
);

CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-sink',
  'tableName'='flink_sink_table',
  'accessId'='${ak_id}',
  'accessKey'='${ak_secret}',
  'valueColumns'='customerid,customername',
  'autoIncrementKey'='${auto_increment_primary_key_name}' 
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;

樣本2

寬表模型同步到時序模型。

CREATE TEMPORARY TABLE timeseries_source (
    measurement STRING,
    datasource STRING,
    tag_a STRING,
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'iotstore-test',
    'tableName' = 'test_ots_timeseries_2',
    'tunnelName' = 'timeseries_source_tunnel_2',
    'accessId' = '${ak_id}',
    'accessKey' = '${ak_secret}',
    'ignoreDelete' = 'true', //是否忽略delete操作的資料
);
CREATE TEMPORARY TABLE timeseries_sink (
    measurement STRING,
    datasource STRING,
    tags Map<String, String>, 
    `time` BIGINT,
    binary_value BINARY,
    bool_value BOOLEAN,
    double_value DOUBLE,
    long_value BIGINT,
    string_value STRING,
    tag_b STRING,
    tag_c STRING,
    tag_d STRING,
    tag_e STRING,
    tag_f STRING,
    PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
) 
WITH (
    'connector' = 'ots',
    'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
    'instanceName' = 'iotstore-test',
    'tableName' = 'test_timeseries_sink_table_2',
    'accessId' = '${ak_id}',
    'accessKey' = '${ak_secret}',
    'storageType' = 'TIMESERIES',
);

--將源表資料插入到結果表。
INSERT INTO timeseries_sink
    select 
        m_name,
        data_source,
        MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
        `time`,
        cpu_sys,
        cpu_user,
        disk_0,
        disk_1,
        disk_2,
        memory_used,
        net_in,
        net_out 
    from
        timeseries_source;