This topic provides the DDL syntax that is used to create a Log Service source table, describes the parameters in the WITH clause, and provides data type mappings, attribute fields, and sample code.
What is Log Service?
Log Service is an end-to-end data logging service that is developed by Alibaba Cloud. Log Service allows you to collect, consume, ship, query, and analyze log data in an efficient manner. Log Service improves the operations and maintenance (O&M) efficiency and provides the capability to process large amounts of log data.
Prerequisites
A project and a Logstore of Log Service are created. For more information, see Step 2: Create a project and a Logstore.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the Log Service connector.
DDL syntax
create table sls_source(
a int,
b int,
c varchar
) with (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessKey>',
'startTime' = '<yourStartTime>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'consumerGroup' = '<yourConsumerGroupName>'
);
- Log Service does not support fields of the MAP data type.
- Log Service sets the fields that do not exist to null.
- Fields in the DDL statement can be in a random order. We recommend that you place the fields in the DDL statement in the same order as those in physical tables.
Parameters in the WITH clause
Parameter | Description | Required | Remarks |
---|---|---|---|
connector | The type of the source table. | Yes | Set the value to sls .
|
endPoint | The consumer endpoint. | Yes | For more information, see Endpoints. |
accessId | The AccessKey ID that is used to access the Log Service project. | Yes | N/A. |
accessKey | The AccessKey secret that is used to access the Log Service project. | Yes | N/A. |
project | The name of the Log Service project. | Yes | N/A. |
logStore | The name of the Logstore. | Yes | N/A. |
startTime | The time at which logs start to be consumed. | No | The format is yyyy-MM-dd hh:mm:ss . By default, logs start to be consumed from the current time.
|
stopTime | The time at which log consumption is stopped. | No | The format is yyyy-MM-dd hh:mm:ss .
|
consumerGroup | The name of the consumer group. | No | You can specify this parameter based on your business requirements. The format of the name is not fixed. |
batchGetSize | The number of log entries that can be read from a log group at a time. | No | Default value: 100.
Note
|
exitAfterFinish | Specifies whether fully managed Flink exits after data consumption is complete. | No | Valid values:
Note Only Realtime Compute for Apache Flink that uses VVR 4.0.13 supports this parameter.
|
Data type mappings
Data type of Log Service | Data type of Flink |
---|---|
STRING | VARCHAR |
Attribute fields
Field | Data type | Description |
---|---|---|
__source__ |
STRING METADATA VIRTUAL | The message source. |
__topic__ |
STRING METADATA VIRTUAL | The message topic. |
__timestamp__ |
BIGINT METADATA VIRTUAL | The timestamp at which logs are generated. |
__tag__ |
MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | The message tag. The "__tag__:__receive_time__":"1616742274", '__receive_time__', and '1616742274' attributes are logged as key-value pairs in a map and accessed in __tag__['__receive_time__'] mode in SQL. |
Sample code
CREATE TEMPORARY TABLE sls_input(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` BIGINT METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessid' ='xx',
'accesskey' ='xxx',
'starttime' = '2001-08-01 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE print_sink(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING ,
`__source__` STRING ,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'print',
'logger'='true'
);
INSERT INTO print_sink
SELECT
`time`,
url,
dt,
float_field,
double_field,
boolean_field,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;