实时计算的数据源表是流式数据存储。流式数据存储驱动实时计算的运行,因此每个实时计算作业必须提供至少1个流式数据存储。

语法

  CREATE TABLE tableName
      (columnName dataType [, columnName dataType ]*)
      [ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];

示例

CREATE TABLE metaq_stream(
 x VARCHAR,
 y VARCHAR,
 z VARCHAR
) WITH (
 type='mq',
 topic='<yourTopicName>',
 endpoint='<yourEndpoint>',
 pullIntervalMs='1000', 
 accessId='<yourAccessId>',
 accessKey='<yourAccessSecret>',
 startMessageOffset='1000',
 consumerGroup='<yourConsumerGroupName>',
 fieldDelimiter='|'
);

获取数据源表属性字段

  • 获取数据源表属性字段语法

    实时计算在源表的DDL语句中提供 HEADER 关键字,用于获取源表中的属性字段。

    CREATE TABLE sourcetable
    (
     `timestamp`  VARCHAR HEADER,
      name        VARCHAR,
      MsgID       VARCHAR
    )WITH(
         type='<yourSourceTableType>'
    );                 
    上面示例中`timestamp`字段定义为HEADER,可从数据的属性字段读取数值,后续当成普通字段使用。
    说明 不同的源表(DataHub、Log Service或MQ等)存在不同的默认属性字段,部分源表支持设置自定义的属性字段,具体请参见对应的源表文档。
  • 获取源表属性字段示例

    以日志服务(Log service)为例,为您介绍如何获取源表属性字段。目前日志服务默认支持如下3个属性字段。

    字段名 说明
    __source__ 消息源
    __topic__ 消息主题
    __timestamp__ 日志时间
    说明 获取属性字段,除了按照正常逻辑声明外,还需要在类型声明后面加上HEADER
    示例如下:
    • 示例数据
      __topic__:  ens_altar_flow  
              result:  {"MsgID":"ems0a","Version":"0.0.1"}
    • 示例语句
      CREATE TABLE sls_log (
        __topic__  VARCHAR HEADER,
        result     VARCHAR  
      )WITH(
        type ='sls'
      );
      CREATE TABLE sls_out (
        name     varchar,
        MsgID    varchar,
        Version  varchar 
      )WITH(
        type ='RDS'
      );
      INSERT INTO sls_out
      SELECT 
      __topic__,
      JSON_VALUE(result,'$.MsgID'),
      JSON_VALUE(result,'$.Version')
      FROM
      sls_log
    • 测试结果
      name(VARCHAT) MsgID(VARCHAT) Version(VARCHAT)
      ens_altar_flow ems0a 0.0.1

包含窗口函数的数据源表

实时计算可以基于Event TimeProcessing Time这2种时间属性对数据进行窗口聚合。包含窗口函数的作业中,数据源表的声明中需要使用到Watermark计算列方法。实时计算基于时间属性的聚合详情,请参见时间属性

支持创建的数据源表类型