In Realtime Compute, source tables store streaming data. Streaming data storage triggers stream processing jobs in Realtime Compute. To perform stream processing, you must create at least one source table that provides streaming data for each Realtime Compute job.

Syntax

  CREATE TABLE tableName
      (columnName dataType [, columnName dataType ]*)
      [ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];

Example

CREATE TABLE metaq_stream(
 x VARCHAR,
 y VARCHAR,
 z VARCHAR
) WITH (
 type='mq',
 topic='<yourTopicName>',
 endpoint='<yourEndpoint>',
 pullIntervalMs='1000', 
 accessId='<yourAccessId>',
 accessKey='<yourAccessSecret>',
 startMessageOffset='1000',
 consumerGroup='<yourConsumerGroupName>',
 fieldDelimiter='|'
);

Obtain attribute fields of a source table

  • Syntax

    Realtime Compute provides the HEADER keyword in the DDL statement of a source table for you to obtain the attribute fields from the source table.

    CREATE TABLE sourcetable
    (
     `timestamp`  VARCHAR HEADER,
      name        VARCHAR,
      MsgID       VARCHAR
    )WITH(
         type='<yourSourceTableType>'
    );                 
    In this example, the 'timestamp' field is defined as HEADER. Realtime Compute reads the values of attribute fields from the source table. Then, the 'timestamp' field is used as a common field.
    Note The default attribute fields vary depending on the source table type, such as DataHub, Log Service, and Message Queue (MQ). You can customize attribute fields for certain types of source tables. For more information, see the topics about source tables of the related type.
  • Example

    The following table uses a source table of Log Service to describe how to obtain attribute fields of the source table. Currently, a source table of Log Service has three attribute fields listed in the following table.

    Field name Description
    __source__ The message source.
    __topic__ The message topic.
    __timestamp__ The time when a log was generated.
    Note To obtain attribute fields of a source table, you must add the HEADER keyword to the end of a field declaration.
    The example is as follows:
    • Test data
      __topic__:  ens_altar_flow  
              result:  {"MsgID":"ems0a","Version":"0.0.1"}
    • Test statements
      CREATE TABLE sls_log (
        __topic__  VARCHAR HEADER,
        result     VARCHAR  
      )WITH(
        type ='sls'
      );
      CREATE TABLE sls_out (
        name     varchar,
        MsgID    varchar,
        Version  varchar 
      )WITH(
        type ='RDS'
      );
      INSERT INTO sls_out
      SELECT 
      __topic__,
      JSON_VALUE(result,'$.MsgID'),
      JSON_VALUE(result,'$.Version')
      FROM
      sls_log
    • Test results
      name (VARCHAT) MsgID (VARCHAT) Version (VARCHAT)
      ens_altar_flow ems0a 0.0.1

Source tables with window functions

Realtime Compute aggregates data in windows based on two time attributes: event time and processing time. For more information, see Event Time and Processing Time. If window functions are used in a Realtime Compute job, you must define a watermark and computed column in the DDL statement of a source table. For more information, see Watermark and Computed column. For more information about data aggregation based on time attributes in Realtime Compute, see Time attributes.

Supported source table types

Realtime Compute allows you to create multiple types of source tables. For more information, see the following topics: