本文为您介绍如何创建实时计算Flink版表格存储结果表,以及表格存储和实时计算Flink版字段类型之间的映射关系。

注意 本文仅适用于Blink 1.4.5及以上版本。

什么是表格存储Tablestore

表格存储Tablestore是基于阿里云飞天分布式系统的分布式NoSQL数据存储服务。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问服务。

DDL定义

实时计算Flink版支持使用Tablestore作为结果输出,示例代码如下。
CREATE TABLE stream_test_hotline_agent (
 name VARCHAR,
 age BIGINT,
 birthday BIGINT,
 PRIMARY KEY (name,age)
) WITH (
 type='ots',
 instanceName='<yourInstanceName>',
 tableName='<yourTableName>',
 accessId='<yourAccessId>',
 accessKey='<yourAccessSecret>',
 endPoint='<yourEndpoint>',
 valueColumns='birthday'
); 
说明
  • 推荐使用数据存储注册功能,详情请参见注册表格存储Tablestore
  • valueColumns值不能是声明的主键,可以是主键之外的任意字段。
  • Tablestore结果表声明中,除主键列外,至少包含一个属性列。

WITH参数

参数 说明 备注
type 结果表类型 固定值为ots。
instanceName 实例名
tableName 表名
endPoint 实例访问地址 参见服务地址
accessId AccessKey ID
accessKey AccessKey Secret
valueColumns 指定插入的字段列名 插入多个字段以英文逗号(,)分割。例如'ID,NAME'
bufferSize 流入多少条数据后开始输出 可选,默认值为5000,表示输入的数据达到5000条就开始输出。
说明 在实时计算Flink版系统,bufferSize根据Tablestore主键对结果数据进行去重后,再在bufferSize的基础上进行batchSize。
batchWriteTimeoutMs 写入超时的时间 可选,单位为毫秒,默认值为5000。表示如果缓存中的数据在等待5秒后,依然没有达到输出条件,系统会自动输出缓存中的所有数据。
batchSize 一次批量写入的条数 可选,默认值为100。
retryIntervalMs 重试间隔时间 可选,单位毫秒,默认值为1000。
maxRetryTimes 最大重试次数 可选,默认值为100。
ignoreDelete 是否忽略DELETE操作 默认值为False。

类型映射

Tablestore字段类型 实时计算Flink版字段类型
INTEGER BIGINT
STRING VARCHAR
BOOLEAN BOOLEAN
DOUBLE DOUBLE
说明 Tablestore结果表必须定义有Primary Key,以Update方式写入结果数据到Tablestore表。Update方式说明请参见Update类型