实时计算的数据源表是流式数据存储。流式数据存储驱动实时计算的运行,因此每个实时计算作业必须提供至少1个流式数据存储。
语法
CREATE TABLE tableName
(columnName dataType [, columnName dataType ]*)
[ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];
示例
CREATE TABLE metaq_stream(
x VARCHAR,
y VARCHAR,
z VARCHAR
) WITH (
type='mq',
topic='<yourTopicName>',
endpoint='<yourEndpoint>',
pullIntervalMs='1000',
accessId='<yourAccessId>',
accessKey='<yourAccessSecret>',
startMessageOffset='1000',
consumerGroup='<yourConsumerGroupName>',
fieldDelimiter='|'
);
获取数据源表属性字段
- 获取数据源表属性字段语法
实时计算在源表的DDL语句中提供
HEADER
关键字,用于获取源表中的属性字段。CREATE TABLE sourcetable ( `timestamp` VARCHAR HEADER, name VARCHAR, MsgID VARCHAR )WITH( type='<yourSourceTableType>' );
上面示例中`timestamp`
字段定义为HEADER
,可从数据的属性字段读取数值,后续当成普通字段使用。说明 不同的源表(DataHub、Log Service或MQ等)存在不同的默认属性字段,部分源表支持设置自定义的属性字段,具体请参见对应的源表文档。 - 获取源表属性字段示例
以日志服务(Log service)为例,为您介绍如何获取源表属性字段。目前日志服务默认支持如下3个属性字段。
字段名 说明 __source__
消息源 __topic__
消息主题 __timestamp__
日志时间 说明 获取属性字段,除了按照正常逻辑声明外,还需要在类型声明后面加上HEADER
。示例如下:- 示例数据
__topic__: ens_altar_flow result: {"MsgID":"ems0a","Version":"0.0.1"}
- 示例语句
CREATE TABLE sls_log ( __topic__ VARCHAR HEADER, result VARCHAR )WITH( type ='sls' ); CREATE TABLE sls_out ( name varchar, MsgID varchar, Version varchar )WITH( type ='RDS' ); INSERT INTO sls_out SELECT __topic__, JSON_VALUE(result,'$.MsgID'), JSON_VALUE(result,'$.Version') FROM sls_log
- 测试结果
name(VARCHAT) MsgID(VARCHAT) Version(VARCHAT) ens_altar_flow ems0a 0.0.1
- 示例数据
包含窗口函数的数据源表
实时计算可以基于Event Time和Processing Time这2种时间属性对数据进行窗口聚合。包含窗口函数的作业中,数据源表的声明中需要使用到Watermark和计算列方法。实时计算基于时间属性的聚合详情,请参见时间属性。
支持创建的数据源表类型
实时计算支持创建多种类型的数据源表: