本文为您介绍如何使用表格存储Tablestore连接器。
背景信息
表格存储Tablestore(又名OTS)面向海量结构化数据提供Serverless表存储服务,同时针对物联网场景深度优化提供一站式的IoTstore解决方案。适用于海量账单、IM消息、物联网、车联网、风控和推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。详情请参见表格存储Tablestore。
Tablestore连接器支持的信息如下。
类别 | 详情 |
运行模式 | 流模式 |
API种类 | SQL |
支持类型 | 源表(公测中)、维表和结果表 |
数据格式 | 暂不支持 |
特有监控指标 |
说明 指标的含义及如何查看监控指标,请参见查看监。 |
是否支持更新或删除结果表数据 | 是 |
前提条件
已购买Tablestore集群并创建表,详情请参见使用流程。
使用限制
仅实时计算引擎VVR 3.0.0及以上版本支持表格存储Tablestore连接器。
语法结构
结果表
CREATE TABLE ots_sink ( name VARCHAR, age BIGINT, birthday BIGINT, primary key(name,age) not enforced ) WITH ( 'connector'='ots', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='<yourAccessId>', 'accessKey'='<yourAccessSecret>', 'endPoint'='<yourEndpoint>', 'valueColumns'='birthday' );
说明Tablestore结果表必须定义有Primary Key,输出数据以Update方式追加Tablestore表。
维表
CREATE TABLE ots_dim ( id int, len int, content STRING ) WITH ( 'connector'='ots', 'endPoint'='<yourEndpoint>', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='<yourAccessId>', 'accessKey'='<yourAccessKey>' );
源表
警告Tablestore源表功能公测中,请谨慎使用。
CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='<yourEndpoint>', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='<yourAccessId>', 'accessKey' ='<yourAccessSecret>', 'ignoreDelete' = 'false' );
属性列支持读取待消费字段和Tunnel Service,以及返回数据中的
OtsRecordType
和OtsRecordTimestamp
两个字段。字段说明请参见下表。字段名
Flink映射名
描述
OtsRecordType
type
数据操作类型。
OtsRecordTimestamp
timestamp
数据操作时间,单位为微秒。
说明全量读取数据时,OtsRecordTimestamp取值为0。
当需要读取
OtsRecordType
和OtsRecordTimestamp
字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下。CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, record_type STRING METADATA FROM 'type', record_timestamp BIGINT METADATA FROM 'timestamp' ) WITH ( ... );
WITH参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为
ots
。instanceName
实例名。
String
是
无
无。
endPoint
实例访问地址。
String
是
无
请参见服务地址。
tableName
表名。
String
是
无
无。
accessId
阿里云账号或者RAM用户的AccessKey ID。
String
是
无
获取AccessKey ID的具体操作,请参见获取AccessKey。
accessKey
阿里云账号或者RAM用户的AccessKey Secret。
String
是
无
获取AccessKey Secret的具体操作,请参见获取AccessId。
retryIntervalMs
重试间隔时间。
Integer
否
1000
单位为毫秒。
maxRetryTimes
最大重试次数。
Integer
否
100
无。
connectTimeout
连接器连接Tablestore的超时时间。
Integer
否
30000
单位为毫秒。
socketTimeout
连接器连接Tablestore的Socket超时时间。
Integer
否
30000
单位为毫秒。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
tunnelName
表格存储数据表的数据通道名称。
String
是
无
您需要提前在表格存储产品侧创建好通道名称和对应的通道类型(增量、全量和全量加增量)。关于创建通道的具体操作,请参见创建数据通道。
ignoreDelete
是否忽略DELETE操作类型的实时数据。
Boolean
否
false
参数取值如下:
true:忽略。
false(默认值):不忽略。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
valueColumns
插入字段的列名。
String
是
无
多个字段以英文逗号(,)分割,例如ID或NAME。
bufferSize
流入多少条数据后开始输出。
Integer
否
5000
无。
batchWriteTimeoutMs
写入超时的时间。
Integer
否
5000
单位为毫秒。表示如果缓存中的数据在等待batchWriteTimeoutMs秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
batchSize
一次批量写入的条数。
Integer
否
100
无。
ignoreDelete
是否忽略DELETE操作。
Boolean
否
False
无。
维表独有
参数
说明
数据类型
是否必填
默认值
备注
cache
缓存策略。
String
否
ALL
目前Tablestore维表支持以下三种缓存策略:
None:无缓存。
LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。
需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。
ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。
适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置相关参数:缓存更新时间间隔cacheTTLMs,更新时间黑名单cacheReloadTimeBlackList。
说明因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
cacheSize
缓存大小。
Integer
否
无
当缓存策略选择LRU时,可以设置缓存大小。
cacheTTLMs
缓存失效时间。
Integer
否
无
单位为毫秒。cacheTTLMs配置和cache有关:
如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。
cacheEmpty
是否缓存空结果。
Boolean
否
无
true:缓存
false:不缓存
类型映射
Tablestore字段类型 | Flink字段类型 |
INTEGER | BIGINT |
STRING | STRING |
BOOLEAN | BOOLEAN |
DOUBLE | DOUBLE |
使用示例
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH (
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='<yourAccessId>',
'accessKey' ='<yourAccessSecret>',
'ignoreDelete' = 'false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='<yourAccessId>',
'accessKey'='<yourAccessSecret>',
'valueColumns'='customerid,customername'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;