本文為您介紹如何使用Table StoreTablestore(OTS)連接器。
背景資訊
Table StoreTablestore(又名OTS)面向海量結構化資料提供Serverless表格儲存體服務,同時針對物聯網情境深度最佳化提供一站式的IoTstore解決方案。適用于海量賬單、IM訊息、物聯網、車連網、風控和推薦等情境中的結構化資料存放區,提供海量資料低成本儲存、毫秒級的線上資料查詢和檢索以及靈活的資料分析能力。詳情請參見Table StoreTablestore。
Tablestore連接器支援的資訊如下。
類別 | 詳情 |
運行模式 | 流模式 |
API種類 | SQL |
支援類型 | 源表、維表和結果表 |
資料格式 | 暫不支援 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
是否支援更新或刪除結果表資料 | 是 |
前提條件
已購買Tablestore執行個體並建立表,詳情請參見使用流程。
文法結構
結果表
CREATE TABLE ots_sink (
name VARCHAR,
age BIGINT,
birthday BIGINT,
primary key(name,age) not enforced
) WITH (
'connector'='ots',
'instanceName'='<yourInstanceName>',
'tableName'='<yourTableName>',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'endPoint'='<yourEndpoint>',
'valueColumns'='birthday'
);Tablestore結果表必須定義有Primary Key,輸出資料以Update方式追加Tablestore表。
維表
CREATE TABLE ots_dim (
id int,
len int,
content STRING
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='<yourInstanceName>',
'tableName'='<yourTableName>',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}'
);源表
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='${ak_id}',
'accessKey' ='${ak_secret}',
'ignoreDelete' = 'false'
);屬性列支援讀取待消費欄位和Tunnel Service,以及返回資料中的OtsRecordType和OtsRecordTimestamp兩個欄位。欄位說明請參見下表。
欄位名 | Flink映射名 | 描述 |
OtsRecordType | type | 資料操作類型。 |
OtsRecordTimestamp | timestamp | 資料操作時間,單位為微秒。 說明 全量讀取資料時,OtsRecordTimestamp取值為0。 |
當需要讀取OtsRecordType和OtsRecordTimestamp欄位時,Flink提供了METADATA關鍵字用於擷取源表中的屬性欄位,具體DDL樣本如下。
CREATE TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
record_type STRING METADATA FROM 'type',
record_timestamp BIGINT METADATA FROM 'timestamp'
) WITH (
...
);WITH參數
通用
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
connector | 表類型。 | String | 是 | 無 | 固定值為 |
instanceName | 執行個體名。 | String | 是 | 無 | 無。 |
endPoint | 執行個體訪問地址。 | String | 是 | 無 | 請參見服務地址。 |
tableName | 表名。 | String | 是 | 無 | 無。 |
accessId | 阿里雲帳號或者RAM使用者的AccessKey ID。 | String | 是 | 無 | 詳情請參見如何查看AccessKey ID和AccessKey Secret資訊?。 重要 為了避免您的AK資訊泄露,建議您使用變數的方式填寫AccessKey取值,詳情請參見專案變數。 |
accessKey | 阿里雲帳號或者RAM使用者的AccessKey Secret。 | String | 是 | 無 | |
connectTimeout | 連接器串連Tablestore的逾時時間。 | Integer | 否 | 30000 | 單位為毫秒。 |
socketTimeout | 連接器串連Tablestore的Socket逾時時間。 | Integer | 否 | 30000 | 單位為毫秒。 |
ioThreadCount | IO線程數量。 | Integer | 否 | 4 | 無。 |
callbackThreadPoolSize | 回調線程池大小。 | Integer | 否 | 4 | 無。 |
源表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
tunnelName | Table Store資料表的資料通道名稱。 | String | 是 | 無 | 您需要提前在Table Store產品側建立好通道名稱和對應的通道類型(增量、全量和全量加增量)。關於建立通道的具體操作,請參見操作步驟。 |
ignoreDelete | 是否忽略DELETE操作類型的即時資料。 | Boolean | 否 | false | 參數取值如下:
|
skipInvalidData | 是否忽略髒資料。如果不忽略髒資料,則處理髒資料時會進行報錯。 | Boolean | 否 | false | 參數取值如下:
說明 僅Realtime Compute引擎VVR 8.0.4及以上版本支援該參數。 |
retryStrategy | 重試策略。 | Enum | 否 | TIME | 參數取值如下:
|
retryCount | 重試次數。 | Integer | 否 | 3 | 當retryStrategy設定為COUNT時,可以設定重試次數。 |
retryTimeoutMs | 重試的逾時時間。 | Integer | 否 | 180000 | 當retryStrategy設定為TIME時,可以設定重試的逾時時間,單位為毫秒。 |
streamOriginColumnMapping | 原始列名到真實列名的映射。 | String | 否 | 無 | 原始列名與真實列名之間,請使用半形冒號(:)隔開;多組映射之間,請使用半形逗號(,)隔開。例如 |
outputSpecificRowType | 是否透傳具體的RowType。 | Boolean | 否 | false | 參數取值如下:
|
dataFetchTimeoutMs | 讀取表的分區的最大時間開銷。 | Integer | 否 | 10000 | 單位為毫秒。當表的分區數量較多且同步任務對延遲較為敏感時,可以適當調小此參數,以降低同步延遲。 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援。 |
enableRequestCompression | 是否開啟資料壓縮。 | Boolean | 否 | false | 在讀取資料時是否啟用資料壓縮功能。啟用後將節省頻寬,但可能會增加CPU負載。 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援。 |
結果表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
retryIntervalMs | 稍候再試時間。 | Integer | 否 | 1000 | 單位為毫秒。 |
maxRetryTimes | 最大重試次數。 | Integer | 否 | 10 | 無。 |
valueColumns | 插入欄位的列名。 | String | 是 | 無 | 多個欄位以半形逗號(,)分割,例如ID或NAME。 |
bufferSize | 流入多少條資料後開始輸出。 | Integer | 否 | 5000 | 無。 |
batchWriteTimeoutMs | 寫入逾時的時間。 | Integer | 否 | 5000 | 單位為毫秒。表示如果緩衝中的資料在等待batchWriteTimeoutMs秒後,依然沒有達到輸出條件,系統會自動輸出緩衝中的所有資料。 |
batchSize | 一次批量寫入的條數。 | Integer | 否 | 100 | 最大值為200。 |
ignoreDelete | 是否忽略DELETE操作。 | Boolean | 否 | False | 無。 |
autoIncrementKey | 當結果表中包含主鍵自增列時,通過該參數指定主鍵自增列的列名稱。 | String | 否 | 無 | 當結果表沒有主鍵自增列時,請不要設定此參數。 說明 僅Realtime Compute引擎VVR 8.0.4及以上版本支援該參數。 |
overwriteMode | 資料覆蓋模式。 | Enum | 否 | PUT | 參數取值如下:
說明 動態列模式下只支援UPDATE模式。 |
defaultTimestampInMillisecond | 設定寫入Tablestrore資料的預設時間戳記。 | Long | 否 | -1 | 如果不指定,則會使用系統當前的毫秒時間戳記。 |
dynamicColumnSink | 是否開啟動態列模式。 | Boolean | 否 | false | 動態列模式適用於在表定義中無需指定列名,根據作業運行情況動態插入資料列的情境。建表語句中主鍵需要定義為前若干列,最後兩列中前一列的值作為列名變數,且類型必須為String,後一列的值作為該列對應的值。 說明 開啟動態列模式時,不支援主鍵自增列,且參數overwriteMode必須設定為UPDATE。 |
checkSinkTableMeta | 是否檢查結果表中繼資料。 | Boolean | 否 | true | 若設定為true,會檢查Tablestore表的主鍵列和此處的建表語句中指定的主鍵是否一致。 |
enableRequestCompression | 資料寫入過程中是否開啟資料壓縮。 | Boolean | 否 | false | 無。 |
maxColumnsCount | 寫入下遊表的最大列數。 | Integer | 否 | 128 | 如果寫入的列數超過128,則可能會出現錯誤: 說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援。 |
storageType | 寫入結果表的類型。 | String | 否 | WIDE_COLUMN | 參數取值如下:
說明 僅Realtime Compute引擎VVR 8.0.10及以上版本支援。 |
維表專屬
參數 | 說明 | 資料類型 | 是否必填 | 預設值 | 備忘 |
retryIntervalMs | 稍候再試時間。 | Integer | 否 | 1000 | 單位為毫秒。 |
maxRetryTimes | 最大重試次數。 | Integer | 否 | 10 | 無。 |
cache | 緩衝策略。 | String | 否 | ALL | 目前Tablestore維表支援以下三種緩衝策略:
|
cacheSize | 緩衝大小。 | Integer | 否 | 無 | 當緩衝策略選擇LRU時,可以設定緩衝大小。 說明 單位為資料條數。 |
cacheTTLMs | 緩衝失效時間。 | Integer | 否 | 無 | 單位為毫秒。cacheTTLMs配置和cache有關:
|
cacheEmpty | 是否緩衝空結果。 | Boolean | 否 | 無 |
|
cacheReloadTimeBlackList | 更新時間黑名單。在緩衝策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內做Cache更新(例如雙11情境)。 | String | 否 | 無 | 格式為2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔字元的使用方式如下所示:
|
async | 是否非同步返回資料。 | Boolean | 否 | false |
|
類型映射
源表
Tablestore欄位類型 | Flink欄位類型 |
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
BINARY | BINARY |
結果表
Flink欄位類型 | Tablestore欄位類型 |
BINARY | BINARY |
VARBINARY | |
CHAR | STRING |
VARCHAR | |
TINYINT | INTEGER |
SMALLINT | |
INTEGER | |
BIGINT | |
FLOAT | DOUBLE |
DOUBLE | |
BOOLEAN | BOOLEAN |
使用樣本
樣本1
從OTS源表向OTS結果表進行資料寫入。
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='${ak_id}',
'accessKey' ='${ak_secret}',
'ignoreDelete' = 'false',
'skipInvalidData' ='false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'valueColumns'='customerid,customername',
'autoIncrementKey'='${auto_increment_primary_key_name}'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;樣本2
寬表模型同步到時序模型。
CREATE TEMPORARY TABLE timeseries_source (
measurement STRING,
datasource STRING,
tag_a STRING,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'iotstore-test',
'tableName' = 'test_ots_timeseries_2',
'tunnelName' = 'timeseries_source_tunnel_2',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'ignoreDelete' = 'true', //是否忽略delete操作的資料
);
CREATE TEMPORARY TABLE timeseries_sink (
measurement STRING,
datasource STRING,
tags Map<String, String>,
`time` BIGINT,
binary_value BINARY,
bool_value BOOLEAN,
double_value DOUBLE,
long_value BIGINT,
string_value STRING,
tag_b STRING,
tag_c STRING,
tag_d STRING,
tag_e STRING,
tag_f STRING,
PRIMARY KEY(measurement, datasource, tags, `time`) NOT ENFORCED
)
WITH (
'connector' = 'ots',
'endPoint' = 'https://iotstore-test.cn-hangzhou.vpc.tablestore.aliyuncs.com',
'instanceName' = 'iotstore-test',
'tableName' = 'test_timeseries_sink_table_2',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'storageType' = 'TIMESERIES',
);
--將源表資料插入到結果表。
INSERT INTO timeseries_sink
select
m_name,
data_source,
MAP["tag_a":tag_a,"tag_b":tag_b,"tag_c":tag_c,"tag_d":tag_d,"tag_e":tag_e,"tag_f":tag_f] AS tags,
`time`,
cpu_sys,
cpu_user,
disk_0,
disk_1,
disk_2,
memory_used,
net_in,
net_out
from
timeseries_source;