Alibaba Cloud Realtime Compute (Flink) can directly consume log data in Log Service by creating a Log Service source table.
Log Service stores streaming data. The streaming data can be used as input data for Realtime Compute. In Log Service, each log contains a set of fields that are key-value pairs. Assume that a log has the following content:
__source__: 18.104.22.168 __tag__:__receive_time__: 1562125591 __topic__: test-topic a: 1234 b: 0 c: hello
You can use the following data definition language (DDL) statement to create a table in Realtime Compute:
create table sls_stream( a int, b int, c varchar ) with ( type ='sls', endPoint ='<your endpoint>', accessId ='<your access key id>', accessKey ='<your access key>', startTime = '2017-07-05 00:00:00', project ='ali-cloud-streamtest', logStore ='stream-test', consumerGroup ='consumerGroupTest1' );
In addition to log fields, Realtime Compute can also extract three attribute fields and custom fields in tags, such as
__receive_time__. The following table describes the three attribute fields.
||The source of the log.|
||The topic of the log.|
||The time when the log was generated.|
To extract attribute fields, you must add HEADER to declare the fields. For example:
create table sls_stream( __timestamp__ bigint HEADER, __receive_time__ bigint HEADER b int, c varchar ) with ( type ='sls', endPoint ='<your endpoint>', accessId ='<your access key id>', accessKey ='<your access key>', startTime = '2017-07-05 00:00:00', project ='ali-cloud-streamtest', logStore ='stream-test', consumerGroup ='consumerGroupTest1' );
|endPoint||Yes||The endpoint of Log Service.|
|accessId||Yes||The AccessKey ID used to access Log Service.|
|accessKey||Yes||The AccessKey secret used to access Log Service.|
|project||Yes||The name of the project in Log Service.|
|logStore||Yes||The name of the Logstore in Log Service.|
|consumerGroup||No||The name of the consumer group.|
|startTime||No||The time when Realtime Compute starts to consume logs.|
|heartBeatIntervalMills||No||The heartbeat interval of the consumption client. Unit: seconds. Default value: 10.|
|maxRetryTimes||No||The maximum number of attempts to read data. Default value: 5.|
|batchGetSize||No||The number of log groups that are read at a time. Default value: 10. If the Flink version is 1.4.2 or later, the default value is 100 and the maximum value is 1000.|
|columnErrorDebug||No||Specifies whether to enable debugging. If debugging is enabled, logs that fail to be parsed are displayed. Default value: false.|
All log fields in Log Service are of the string type. The following table lists the mapping between the type of Log Service fields and the type of Realtime Compute fields. We strongly recommend that you declare the mapping in DDL statements.
If other types are used, Realtime Compute also tries to automatically convert the
type. For example, fields whose values are 1000 and 2018-01-12 12:00:00 can also be
defined as the fields of the bigint and timestamp types, respectively.
|Field type of Log Service||Field type of Realtime Compute|
- The Blink versions earlier than 2.2.0 do not support shard scaling. If you split or merge shards when a job is reading data from the Logstore, the job may fail repeatedly and cannot be recovered. In this case, you must restart the job to restore it to normal.
- All Blink versions do not allow you to delete or re-create a Logstore whose logs are being consumed.
- For the Blink versions 1.6.0 and earlier, if you specify a consumer group to consume logs from a Logstore that contains a large number of shards, the read performance may be affected.
- Currently, Log Service does not support data of the map type.
- Nonexistent fields are set to null.
- We recommend that you define the fields in the same order as the fields in the source table. Unordered fields are also supported.
- The batchGetSize parameter specifies the number of log groups that are read at a time. If the size of each log and the value of the batchGetSize parameter are both large, garbage collection (GC) may frequently occur.
- If no new data is written to a shard, the overall latency of jobs increases, or jobs aggregated in some windows do not have output. In this case, you only need to adjust the number of concurrent jobs to be the same as the number of shards from which data can be read and written.
- We recommend that you set the number of concurrent jobs the same as the number of shards. If they are inconsistent, data may be filtered out when jobs read historical data from two shards at significantly different speeds.
- To extract fields in tags such as
__tag__:__path__, you can delete the __tag__: prefix and follow the method for extracting attribute fields.Note This type of data cannot be extracted during debugging. We recommend that you use the local debugging method and the print method to display data in logs.