Alibaba Cloud Realtime Compute (Flink) can directly consume log data in Log Service by creating a Log Service source table.

Log Service stores streaming data. The streaming data can be used as input data for Realtime Compute. In Log Service, each log contains a set of fields that are key-value pairs. Assume that a log has the following content:
__source__:  11.85.123.199
__tag__:__receive_time__:  1562125591
__topic__:  test-topic
a:  1234
b:  0
c:  hello
You can use the following data definition language (DDL) statement to create a table in Realtime Compute:
create table sls_stream(
  a int,
  b int,
  c varchar
) with (
  type ='sls',
  endPoint ='<your endpoint>',
  accessId ='<your access key id>',
  accessKey ='<your access key>',
  startTime = '2017-07-05 00:00:00',
  project ='ali-cloud-streamtest',
  logStore ='stream-test',
  consumerGroup ='consumerGroupTest1'
);
In addition to log fields, Realtime Compute can also extract three attribute fields and custom fields in tags, such as __receive_time__. The following table describes the three attribute fields.
Table 1. Attribute fields
Field Description
__source__ The source of the log.
__topic__ The topic of the log.
__timestamp__ The time when the log was generated.
To extract attribute fields, you must add HEADER to declare the fields. For example:
create table sls_stream(
  __timestamp__ bigint HEADER,
  __receive_time__ bigint HEADER
  b int,
  c varchar
) with (
  type ='sls',
  endPoint ='<your endpoint>',
  accessId ='<your access key id>',
  accessKey ='<your access key>',
  startTime = '2017-07-05 00:00:00',
  project ='ali-cloud-streamtest',
  logStore ='stream-test',
  consumerGroup ='consumerGroupTest1'
);
WITH parameters
Parameter Required Description
endPoint Yes The endpoint of Log Service.
accessId Yes The AccessKey ID used to access Log Service.
accessKey Yes The AccessKey secret used to access Log Service.
project Yes The name of the project in Log Service.
logStore Yes The name of the Logstore in Log Service.
consumerGroup No The name of the consumer group.
startTime No The time when Realtime Compute starts to consume logs.
heartBeatIntervalMills No The heartbeat interval of the consumption client. Unit: seconds. Default value: 10.
maxRetryTimes No The maximum number of attempts to read data. Default value: 5.
batchGetSize No The number of log groups that are read at a time. Default value: 10. If the Flink version is 1.4.2 or later, the default value is 100 and the maximum value is 1000.
columnErrorDebug No Specifies whether to enable debugging. If debugging is enabled, logs that fail to be parsed are displayed. Default value: false.

Type mapping

All log fields in Log Service are of the string type. The following table lists the mapping between the type of Log Service fields and the type of Realtime Compute fields. We strongly recommend that you declare the mapping in DDL statements.
Field type of Log Service Field type of Realtime Compute
STRING VARCHAR
If other types are used, Realtime Compute also tries to automatically convert the type. For example, fields whose values are 1000 and 2018-01-12 12:00:00 can also be defined as the fields of the bigint and timestamp types, respectively.
Note
  • The Blink versions earlier than 2.2.0 do not support shard scaling. If you split or merge shards when a job is reading data from the Logstore, the job may fail repeatedly and cannot be recovered. In this case, you must restart the job to restore it to normal.
  • All Blink versions do not allow you to delete or re-create a Logstore whose logs are being consumed.
  • For the Blink versions 1.6.0 and earlier, if you specify a consumer group to consume logs from a Logstore that contains a large number of shards, the read performance may be affected.
  • Currently, Log Service does not support data of the map type.
  • Nonexistent fields are set to null.
  • We recommend that you define the fields in the same order as the fields in the source table. Unordered fields are also supported.
  • The batchGetSize parameter specifies the number of log groups that are read at a time. If the size of each log and the value of the batchGetSize parameter are both large, garbage collection (GC) may frequently occur.
FAQ
  • If no new data is written to a shard, the overall latency of jobs increases, or jobs aggregated in some windows do not have output. In this case, you only need to adjust the number of concurrent jobs to be the same as the number of shards from which data can be read and written.
  • We recommend that you set the number of concurrent jobs the same as the number of shards. If they are inconsistent, data may be filtered out when jobs read historical data from two shards at significantly different speeds.
  • To extract fields in tags such as __tag__:__hostname__ and __tag__:__path__, you can delete the __tag__: prefix and follow the method for extracting attribute fields.
    Note This type of data cannot be extracted during debugging. We recommend that you use the local debugging method and the print method to display data in logs.