In Realtime Compute, source tables store streaming data. Streaming data storage triggers stream processing jobs in Realtime Compute. To perform stream processing, you must create at least one source table that provides streaming data for each Realtime Compute job.
Syntax
CREATE TABLE tableName
(columnName dataType [, columnName dataType ]*)
[ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];
Example
CREATE TABLE metaq_stream(
x VARCHAR,
y VARCHAR,
z VARCHAR
) WITH (
type='mq',
topic='<yourTopicName>',
endpoint='<yourEndpoint>',
pullIntervalMs='1000',
accessId='<yourAccessId>',
accessKey='<yourAccessSecret>',
startMessageOffset='1000',
consumerGroup='<yourConsumerGroupName>',
fieldDelimiter='|'
);
Obtain attribute fields of a source table
- Syntax
Realtime Compute provides the
HEADER
keyword in the DDL statement of a source table for you to obtain the attribute fields from the source table.CREATE TABLE sourcetable ( `timestamp` VARCHAR HEADER, name VARCHAR, MsgID VARCHAR )WITH( type='<yourSourceTableType>' );
In this example, the'timestamp'
field is defined asHEADER
. Realtime Compute reads the values of attribute fields from the source table. Then, the 'timestamp' field is used as a common field.Note The default attribute fields vary depending on the source table type, such as DataHub, Log Service, and Message Queue (MQ). You can customize attribute fields for certain types of source tables. For more information, see the topics about source tables of the related type. - Example
The following table uses a source table of Log Service to describe how to obtain attribute fields of the source table. Currently, a source table of Log Service has three attribute fields listed in the following table.
Field name Description __source__
The message source. __topic__
The message topic. __timestamp__
The time when a log was generated. Note To obtain attribute fields of a source table, you must add theHEADER
keyword to the end of a field declaration.The example is as follows:- Test data
__topic__: ens_altar_flow result: {"MsgID":"ems0a","Version":"0.0.1"}
- Test statements
CREATE TABLE sls_log ( __topic__ VARCHAR HEADER, result VARCHAR )WITH( type ='sls' ); CREATE TABLE sls_out ( name varchar, MsgID varchar, Version varchar )WITH( type ='RDS' ); INSERT INTO sls_out SELECT __topic__, JSON_VALUE(result,'$.MsgID'), JSON_VALUE(result,'$.Version') FROM sls_log
- Test results
name (VARCHAT) MsgID (VARCHAT) Version (VARCHAT) ens_altar_flow ems0a 0.0.1
- Test data
Source tables with window functions
Realtime Compute aggregates data in windows based on two time attributes: event time and processing time. For more information, see Event Time and Processing Time. If window functions are used in a Realtime Compute job, you must define a watermark and computed column in the DDL statement of a source table. For more information, see Watermark and Computed column. For more information about data aggregation based on time attributes in Realtime Compute, see Time attributes.