This topic describes how to create a Log Service source table in Realtime Compute. It also describes the attribute fields, parameters in the WITH clause, and field type mapping involved in the creation process.

Log Service

Log Service is an all-in-one service for log data. The data format of Log Service is similar to JSON. An example is as follows:
{
    "a": 1000,
    "b": 1234,
    "c": "li"
}

Log Service is used to store streaming data. Therefore, Realtime Compute can use the streaming data stored by Log Service as input data.

DDL syntax

The following sample code creates a Log Service source table in a Realtime Compute job. In the code, sls refers to Log Service.
create table sls_stream(
  a INT,
  b INT,
  c VARCHAR
) with (
  type ='sls',  
  endPoint ='http://cn-hangzhou-share.log.aliyuncs.com',
  accessId ='<yourAccessId>',
  accessKey ='<yourAccessKey>',
  startTime = '2017-07-05 00:00:00',
  project ='<yourProjectName>',
  logStore ='<yourLogStoreName>',
  consumerGroup ='<yourConsumerGroupName>'
);

Parameters in the WITH clause

Parameter Description Required Remarks
endPoint The endpoint of Log Service. Yes For more information, see Service endpoint.
accessId The AccessKey ID read by Log Service. Yes None.
accessKey The AccessKey secret read by Log Service. Yes None.
project The name of the Log Service project. Yes None.
logStore The name of a Logstore in the Log Service project. Yes None.
startTime The time when a log starts to be consumed. No None.
consumerGroup The name of a consumer group. No You can set this parameter as required. The format of the name is not fixed.
heartBeatIntervalMills The heartbeat interval of the consumer client. No Default value: 10000. Unit: milliseconds.
maxRetryTimes The maximum number of retries for reading data. No Default value: 5.
batchGetSize The number of log groups that are read at a time. No Default value: 100.
columnErrorDebug Specifies whether debugging is enabled. No Optional. Default value: false. If you enable debugging, logs that contain parsing exceptions are printed.
Note
  • For Realtime Compute V1.6.0 and earlier, the read performance may be affected when the number of shards in a consumer group is specified. This issue is being rectified.
  • Log Service does not support data of the MAP type.
  • Log Service sets the field that does not exist to null.
  • We recommend that you define the fields in the same order as the fields in the source table. Unordered fields are also supported.
  • If input data is in the JSON format, define a separator and use the built-in function JSON_VALUE to analyze the data. Otherwise, the following parsing error is returned:
    2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing error, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]                
  • The value of the batchGetSize parameter must be less than or equal to 1000. Otherwise, an error occurs.
  • The batchGetSize parameter specifies the number of log groups that are read at a time. If both the size of a single log entry and the batchGetSize value are too large, garbage collection may occur frequently. To avoid this issue, we recommend that you set this parameter to a small value.

Field type mapping

The following table describes the mapping between Log Service data types and Realtime Compute data types. We recommend that you declare the mapping in a data definition language (DDL) statement.

Log Service data type Realtime Compute data type
STRING VARCHAR
Note You can convert the values of other types of fields. For example, 1000 can be defined as the BIGINT type and 2018-01-12 12:00:00 can be defined as the TIMESTAMP type.

Supported attribute fields

Currently, Flink SQL supports three types of Log Service attribute fields by default. Custom fields are also supported. For more information about how to use the attribute fields, see Obtain attribute fields of a source table.

Field Description
__source__ The message source.
__topic__ The message topic.
__timestamp__ The time when a log was generated.

Sample code

create table sls_input(
  a int, 
  b int,
  c varchar
) with (
  type ='sls',
  endPoint ='http://cn-hangzhou-share.log.aliyuncs.com',
  accessId ='<yourAccessI>',
  accessKey ='<yourAccessKey>',
  startTime = '2017-07-05 00:00:00',
  project ='ali-cloud-streamtest',
  logStore ='stream-test',
  consumerGroup ='consumerGroupTest1'
);

create table print_output(
 a int,
 b int,
 c varchar 
) with (
  type='print'
);

INSERT INTO print_output
SELECT 
  a, b, c
from sls_input;

FAQ

  • Q: Why does the overall latency of a job increase, or why is no output generated for the job that has window aggregation?

    A: This issue occurs if no new data is written into a partition. In this case, you only need to change the parallelism to be the same as the number of partitions from which data can be read and written.

  • Q: How do I set the parallelism?

    A: We recommend that you set the parallelism to be the same as the number of partitions. Otherwise, if two partitions read data at significantly different speeds, data may be filtered out or data latency may occur when you set the start offset for a job to the time preceding the current time.

  • Q: How do I troubleshoot the issue that the latency of a Flink job increases?

    A: The Log Service source table may be sharded. Shard indexes may not be continuous after sharding, which will increase the latency of a Flink job. If you find that a Flink job has a longer latency, check whether the Log Service source table has been sharded.

  • Q: How do I obtain attribute fields?
    A: For more information about how to obtain attribute fields, see Obtain attribute fields of a source table.
    Note Attribute field data cannot be extracted during local debugging. We recommend that you use online debugging to view the attribute fields in logs. For more information, see Online debugging.

References