本文为您介绍如何使用表格存储OTS连接器。

背景信息

表格存储Tablestore(又名OTS)面向海量结构化数据提供Serverless表存储服务,同时针对物联网场景深度优化提供一站式的IoTstore解决方案。适用于海量账单、IM消息、物联网、车联网、风控和推荐等场景中的结构化数据存储,提供海量数据低成本存储、毫秒级的在线数据查询和检索以及灵活的数据分析能力。详情请参见表格存储Tablestore

OTS连接器支持的信息如下。
类别详情
运行模式流模式
API种类SQL
支持类型源表(实验性功能)、维表和结果表
数据格式暂不支持
特有监控指标
  • 源表:无
  • 维表:无
  • 结果表:
    • numBytesOut
    • numBytesOutPerSecond
    • numRecordsOut
    • numRecordsOutPerSecond
    • currentSendTime
说明 指标的含义及如何查看监控指标,请参见查看监控指标
是否支持更新或删除结果表数据

前提条件

已购买OTS集群并创建表,详情请参见使用流程

使用限制

仅实时计算引擎VVR 3.0.0及以上版本支持表格存储Tablestore连接器。

语法结构

  • 结果表
    CREATE TABLE ots_sink (
      name VARCHAR,
      age BIGINT,
      birthday BIGINT,
      primary key(name,age) not enforced
    ) WITH (
      'connector'='ots',
      'instanceName'='<yourInstanceName>',
      'tableName'='<yourTableName>',
      'accessId'='<yourAccessId>',
      'accessKey'='<yourAccessSecret>',
      'endPoint'='<yourEndpoint>',
      'valueColumns'='birthday'
    );
    说明 Tablestore结果表必须定义有Primary Key,输出数据以Update方式追加Tablestore表。
  • 维表
    CREATE TABLE ots_dim (
      id int,
      len int,
      content STRING
    ) WITH (
      'connector'='ots',
      'endPoint'='<yourEndpoint>',
      'instanceName'='<yourInstanceName>',
      'tableName'='<yourTableName>',
      'accessId'='<yourAccessId>',
      'accessKey'='<yourAccessKey>'
    );
  • 源表
    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR
    ) WITH (
      'connector'='ots',
      'endPoint' ='<yourEndpoint>',
      'instanceName' = 'flink-source',
      'tableName' ='flink_source_table',
      'tunnelName' = 'flinksourcestream',
      'accessId' ='<yourAccessId>',
      'accessKey' ='<yourAccessSecret>',
      'ignoreDelete' = 'false'
    );
    属性列支持读取待消费字段和Tunnel Service,以及返回数据中的OtsRecordTypeOtsRecordTimestamp两个字段。字段说明请参见下表。
    字段名Flink映射名描述
    OtsRecordTypetype数据操作类型。
    OtsRecordTimestamptimestamp数据操作时间,单位为微秒。
    说明 全量读取数据时,OtsRecordTimestamp取值为0。
    当需要读取OtsRecordTypeOtsRecordTimestamp字段时,Flink提供了METADATA关键字用于获取源表中的属性字段,具体DDL示例如下。
    CREATE TABLE tablestore_stream(
      `order` VARCHAR,
      orderid VARCHAR,
      customerid VARCHAR,
      customername VARCHAR,
      record_type STRING METADATA FROM 'type',
      record_timestamp BIGINT METADATA FROM 'timestamp'
    ) WITH (
      ...
    );
    警告 OTS Source功能为实验性功能,请谨慎使用。

WITH参数

  • 通用
    参数说明数据类型是否必填默认值备注
    connector表类型。String固定值为ots
    instanceName实例名。String无。
    endPoint实例访问地址。String请参见服务地址
    tableName表名。String无。
    accessId阿里云账号或者RAM用户的AccessKey ID。String获取AccessKey ID的具体操作,请参见获取AccessKey
    accessKey阿里云账号或者RAM用户的AccessKey Secret。String获取AccessKey Secret的具体操作,请参见获取AccessId
    retryIntervalMs重试间隔时间。Integer1000单位为毫秒。
    maxRetryTimes最大重试次数。Integer100无。
    connectTimeout连接器连接Tablestore的超时时间。Integer30000单位为毫秒。
    socketTimeout连接器连接Tablestore的Socket超时时间。Integer30000单位为毫秒。
  • 源表独有
    参数说明数据类型 是否必填默认值备注
    tunnelName表格存储数据表的数据通道名称。String您需要提前在表格存储产品侧创建好通道名称和对应的通道类型(增量、全量和全量加增量)。关于创建通道的具体操作,请参见创建数据通道
    ignoreDelete是否忽略DELETE操作类型的实时数据。Booleanfalse参数取值如下:
    • true:忽略。
    • false(默认值):不忽略。
  • 结果表独有
    参数说明数据类型是否必填默认值备注
    valueColumns插入字段的列名。String多个字段以英文逗号(,)分割,例如ID或NAME。
    bufferSize流入多少条数据后开始输出。Integer5000无。
    batchWriteTimeoutMs写入超时的时间。Integer5000单位为毫秒。表示如果缓存中的数据在等待batchWriteTimeoutMs秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
    batchSize一次批量写入的条数。Integer100无。
    ignoreDelete是否忽略DELETE操作。BooleanFalse无。
  • 维表独有
    参数说明数据类型是否必填默认值备注
    cache缓存策略。StringALL目前OTS维表支持以下三种缓存策略:
    • None:无缓存。
    • LRU:缓存维表里的部分数据。源表的每条数据都会触发系统先在Cache中查找数据,如果没有找到,则去物理维表中查找。

      需要配置相关参数:缓存大小(cacheSize)和缓存更新时间间隔(cacheTTLMs)。

    • ALL(默认值):缓存维表里的所有数据。在Job运行前,系统会将维表中所有数据加载到Cache中,之后所有的维表查找数据都会通过Cache进行。如果在Cache中无法找到数据,则KEY不存在,并在Cache过期后重新加载一遍全量Cache。

      适用于远程表数据量小且MISS KEY(源表数据和维表JOIN时,ON条件无法关联)特别多的场景。需要配置相关参数:缓存更新时间间隔cacheTTLMs,更新时间黑名单cacheReloadTimeBlackList

      说明 因为系统会异步加载维表数据,所以在使用CACHE ALL时,需要增加维表JOIN节点的内存,增加的内存大小为远程表数据量的两倍。
    cacheSize缓存大小。Integer当缓存策略选择LRU时,可以设置缓存大小。
    cacheTTLMs缓存失效时间。Integer单位为毫秒。cacheTTLMs配置和cache有关:
    • 如果cache配置为None,则cacheTTLMs可以不配置,表示缓存不超时。
    • 如果cache配置为LRU,则cacheTTLMs为缓存超时时间。默认不过期。
    • 如果cache配置为ALL,则cacheTTLMs为缓存加载时间。默认不重新加载。
    cacheEmpty是否缓存空结果。Boolean
    • true:缓存
    • false:不缓存

类型映射

Tablestore字段类型Flink字段类型
INTEGERBIGINT
STRINGSTRING
BOOLEANBOOLEAN
DOUBLEDOUBLE

使用示例

CREATE TEMPORARY TABLE tablestore_stream(
 `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR
) WITH (
  'connector'='ots',
  'endPoint' ='<yourEndpoint>',
  'instanceName' = 'flink-source',
  'tableName' ='flink_source_table',
  'tunnelName' = 'flinksourcestream',
  'accessId' ='<yourAccessId>',
  'accessKey' ='<yourAccessSecret>',
  'ignoreDelete' = 'false'
);

CREATE TEMPORARY TABLE ots_sink (
  `order` VARCHAR,
  orderid VARCHAR,
  customerid VARCHAR,
  customername VARCHAR,
  PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
  'connector'='ots',
  'endPoint'='<yourEndpoint>',
  'instanceName'='flink-sink',
  'tableName'='flink_sink_table',
  'accessId'='<yourAccessId>',
  'accessKey'='<yourAccessSecret>',
  'valueColumns'='customerid,customername'
);

INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;