This topic provides the DDL syntax that is used to create a Log Service source table, describes the parameters in the WITH clause, and provides data type mappings, attribute fields, and sample code.

What is Log Service?

Log Service is an end-to-end logging service. Log Service allows you to collect, consume, ship, query, and analyze log data in a quick manner. It improves the operations and maintenance (O&M) efficiency and provides the capability to process large amounts of log data.

Prerequisites

A project and a Logstore of Log Service are created. For more information, see Step 2: Create a project and a Logstore.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Log Service connectors.

DDL syntax

create table sls_source(
  a int,
  b int,
  c varchar
) with (
  'connector' = 'sls',  
  'endPoint' = '<yourEndPoint>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessKey>',
  'startTime' = '<yourStartTime>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'consumerGroup' = '<yourConsumerGroupName>'
);
Note
  • Log Service does not support the MAP data type.
  • Log Service sets the fields that do not exist to null.
  • Fields in the DDL statement can be in a random order. We recommend that you place the fields in the DDL statement in the same order as those in physical tables.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the source table. Yes Set the value to sls.
endPoint The consumer endpoint. Yes For more information, see Endpoints.
accessId The AccessKey ID that is used to access the Log Service project. Yes N/A.
accessKey The AccessKey secret that is used to access the Log Service project. Yes N/A.
project The name of the Log Service project. Yes N/A.
logStore The name of the Logstore. Yes N/A.
startTime The time at which logs start to be consumed. Yes N/A.
consumerGroup The name of the consumer group. No You can specify this parameter based on your business requirements. The format of the name is not fixed.
batchGetSize The number of log entries that can be read from a log group at a time. No Default value: 100.
Note
  • The value of the batchGetSize parameter cannot exceed 1000. Otherwise, an error is returned.
  • This parameter specifies the number of log entries that are read from a log group at a time. If both the size of a log entry and the value of the batchGetSize parameter are large, frequent garbage collections may occur. To prevent this issue, you can set the batchGetSize parameter to a small value.

Data type mapping

The following table describes the data type mappings between Log Service and Flink fields. We recommend that you declare the mappings in a DDL statement.
Data type of Log Service Data type of Flink
STRING VARCHAR

Attribute fields

Field Data type Description
__source__ STRING METADATA VIRTUAL The message source.
__topic__ STRING METADATA VIRTUAL The message topic.
__timestamp__ BIGINT METADATA VIRTUAL The timestamp at which logs are generated.
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL The mesasge tag. The "__tag__:__receive_time__":"1616742274", '__receive_time__', and '1616742274' attributes are logged as key-value pairs in a map and accessed in __tag__['__receive_time__'] mode in SQL.
Note Only VVR 3.0.1 and later can obtain the Log Service attribute fields in the preceding table.

Sample code

create table sls_input(
  `time` bigint,
  `url` string,
  dt string,
  float_field float,
  double_field double,
  boolean_field boolean,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
  proctime as PROCTIME()
) with (
  'connector' = 'sls',
  'endpoint' = '<yourEndPoint>',
  'accessid' = '<yourAccessId>',
  'accesskey' = '<yourAccessKey>',
  'starttime' = '2001-08-01 00:00:00',
  'project' = 'sls-test',
  'logstore' = 'sls-input'
);