This topic provides the DDL syntax that is used to create a Log Service source table, describes the parameters in the WITH clause, and provides data type mappings, attribute fields, and sample code.

What is Log Service?

Log Service is an end-to-end data logging service that is developed by Alibaba Cloud. Log Service allows you to collect, consume, ship, query, and analyze log data in an efficient manner. Log Service improves the operations and maintenance (O&M) efficiency and provides the capability to process large amounts of log data.

Prerequisites

A project and a Logstore of Log Service are created. For more information, see Step 2: Create a project and a Logstore.

Limits

Only Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports Log Service connectors.

DDL syntax

create table sls_source(
  a int,
  b int,
  c varchar
) with (
  'connector' = 'sls',  
  'endPoint' = '<yourEndPoint>',
  'accessId' = '<yourAccessId>',
  'accessKey' = '<yourAccessKey>',
  'startTime' = '<yourStartTime>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'consumerGroup' = '<yourConsumerGroupName>'
);
Note
  • Log Service does not support fields of the MAP data type.
  • Log Service sets the fields that do not exist to null.
  • Fields in the DDL statement can be in a random order. We recommend that you place the fields in the DDL statement in the same order as those in physical tables.

Parameters in the WITH clause

Parameter Description Required Remarks
connector The type of the source table. Yes Set the value to sls.
endPoint The consumer endpoint. Yes For more information, see Endpoints.
accessId The AccessKey ID that is used to access the Log Service project. Yes N/A.
accessKey The AccessKey secret that is used to access the Log Service project. Yes N/A.
project The name of the Log Service project. Yes N/A.
logStore The name of the Logstore. Yes N/A.
startTime The time at which logs start to be consumed. No The format is yyyy-MM-dd hh:mm:ss. By default, logs start to be consumed from the current time.
stopTime The time at which log consumption is stopped. No The format is yyyy-MM-dd hh:mm:ss.
consumerGroup The name of the consumer group. No You can specify this parameter based on your business requirements. The format of the name is not fixed.
batchGetSize The number of log entries that can be read from a log group at a time. No Default value: 100.
Note
  • The value of the batchGetSize parameter cannot exceed 1000. Otherwise, an error is returned.
  • This parameter specifies the number of log entries that can be read from a log group at a time. If both the size of a log entry and the value of the batchGetSize parameter are large, frequent garbage collections may occur. To prevent this issue, you can set the batchGetSize parameter to a small value.
exitAfterFinish Specifies whether fully managed Flink exits after data consumption is complete. No Valid values:
  • true: Fully managed Flink exits after data consumption is complete.
  • false: Fully managed Flink does not exit after data consumption is complete. This is the default value.
Note Only Flink that uses VVR 4.0.13 supports this parameter.

Data type mappings

The following table describes the data type mappings between Log Service and Flink fields. We recommend that you declare the mappings in a DDL statement.
Data type of Log Service Data type of Flink
STRING VARCHAR

Attribute fields

Field Data type Description
__source__ STRING METADATA VIRTUAL The message source.
__topic__ STRING METADATA VIRTUAL The message topic.
__timestamp__ BIGINT METADATA VIRTUAL The timestamp at which logs are generated.
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL The message tag. The "__tag__:__receive_time__":"1616742274", '__receive_time__', and '1616742274' attributes are logged as key-value pairs in a map and accessed in __tag__['__receive_time__'] mode in SQL.
Note Only VVR 3.0.1 and later can obtain the Log Service attribute fields in the preceding table.

Sample code

CREATE TEMPORARY TABLE sls_input(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessid' ='xx',
  'accesskey' ='xxx',
  'starttime' = '2001-08-01 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE print_sink(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING ,
  `__source__` STRING ,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'print',
  'logger'='true'
);

INSERT INTO print_sink
SELECT 
 `time`,
  url,
  dt,
  float_field,
  double_field,
  boolean_field,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;