This topic describes the Log Service connector.
Background information
Log Service is an end-to-end data logging service that is developed by Alibaba Cloud. Log Service allows you to collect, consume, ship, query, and analyze log data in a quick manner. It improves the operations and maintenance (O&M) efficiency and provides the capability to process large amounts of log data.
The following table describes the capabilities supported by the Log Service connector.
Item | Description |
Table type | Source table and result table |
Running mode | Streaming mode |
Metric | N/A |
Data format | N/A |
API type | SQL API |
Data update or deletion in a result table | Data in a result table cannot be updated or deleted. Data can only be inserted into a result table. |
Features
The Log Service source connector can be used to read the attribute fields of messages. The following table describes the attribute fields supported by the Log Service source connector.
Field | Field type | Description |
__source__ | STRING METADATA VIRTUAL | The message source. |
__topic__ | STRING METADATA VIRTUAL | The message topic. |
__timestamp__ | BIGINT METADATA VIRTUAL | The time when the log is generated. |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | The message tag. The |
Prerequisites
A project and a Logstore are created. For more information, see Create a project and a Logstore.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the Log Service connector.
The Log Service connector cannot be used for dimension tables.
The Log Service connector supports only the at-least-once semantics.
Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports the exitAfterFinish parameter and automatic failovers that are triggered based on the changes in the number of shards.
Only Realtime Compute for Apache Flink that uses VVR 6.0.5 or later supports the consumeFromCheckpoint and buckets parameters.
Syntax
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '<yourAccessId>',
'accessKey' = '<yourAccessKey>'
);
Parameters in the WITH clause
Common parameters
Parameter
Description
Data type
Required
Default value
Remarks
connector
The type of the table.
STRING
Yes
No default value
Set the value to sls.
endPoint
The endpoint of Message Queue for Apache RocketMQ.
STRING
Yes
No default value
For more information, see Endpoints.
project
The name of the Log Service project.
STRING
Yes
No default value
N/A.
logStore
The name of the Logstore in Log Service or the name of the Metricstore.
STRING
Yes
No default value
Data in a Logstore is consumed in the same way as in a Metricstore.
accessId
The AccessKey ID of your Alibaba Cloud account.
STRING
Yes
No default value
N/A.
accessKey
The AccessKey secret of your Alibaba Cloud account.
STRING
Yes
No default value
N/A.
Parameters only for source tables
Parameter
Description
Data type
Required
Default value
Remarks
startTime
The time at which logs start to be consumed.
STRING
No
Current time
The value of this parameter is in the yyyy-MM-dd hh:mm:ss format.
stopTime
The time at which log consumption is stopped.
STRING
No
No default value
The value of this parameter is in the yyyy-MM-dd hh:mm:ss format.
consumerGroup
The name of the consumer group.
STRING
No
No default value
You can specify a custom consumer group name. The format of the name is not fixed.
consumeFromCheckpoint
Specifies whether to consume logs from the checkpoint that is stored in the specified consumer group.
STRING
No
false
Valid values:
true: If you set this parameter to true, you must specify a consumer group. After you specify a consumer group, fully managed Flink consumes logs from the checkpoint that is stored in the consumer group. If no checkpoint exists in the consumer group, fully managed Flink consumes logs from the time specified by the startTime parameter.
false: Fully managed Flink does not consume logs from the checkpoint that is stored in the specified consumer group. This is the default value.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.5 or later supports this parameter.
directMode
Specifies whether to enable the direct connection mode of Log Service.
STRING
No
false
Valid values:
true: The direct connection mode is enabled.
false: The direct connection mode is disabled. This is the default value.
maxRetries
The number of retries that are allowed when data fails to be read from Log Service.
STRING
No
3
N/A.
batchGetSize
The number of log groups from which data is read in a request.
STRING
No
100
The value of the batchGetSize parameter cannot exceed 1000. Otherwise, an error is returned.
exitAfterFinish
Specifies whether fully managed Flink exits after data consumption is complete.
STRING
No
false
Valid values:
true: Fully managed Flink exits after data consumption is complete.
false: Fully managed Flink does not exit after data consumption is complete. This is the default value.
NoteOnly Realtime Compute for Apache Flink that uses VVR 4.0.13, VVR 6.0.0, or a version later than VVR 6.0.0 supports this parameter.
Parameters only for result tables
Parameter
Description
Data type
Required
Default value
Remarks
topicField
Specifies a field name. The value of this parameter overwrites the value of the __topic__ attribute field to indicate the topic of the log.
STRING
No
No default value
The value of this parameter must be an existing field in the table.
timeField
Specifies a field name. The value of this parameter overwrites the value of the __timestamp__ attribute field to indicate the log write time.
STRING
No
Current time
The value of this parameter must be an existing field of the INT type in the table. If no field is specified, the current time is used by default.
sourceField
Specifies a field name. The value of this parameter overwrites the value of the __source__ attribute field to indicate the origin of the log. For example, the value is the IP address of the machine that generates the log.
STRING
No
No default value
The value of this parameter must be an existing field in the table.
partitionField
Specifies a field name. A hash value is calculated based on the value of this parameter when data is written to Log Service. Data that includes the same hash value is written to the same shard.
STRING
No
No default value
If you do not specify this parameter, each data entry is randomly written to an available shard.
buckets
The number of buckets that are regrouped based on the hash value when the partitionField parameter is specified.
STRING
No
64
Valid values: [1,256]. The value of this parameter must be an integer power of 2. The number of buckets must be greater than or equal to the number of shards. Otherwise, no data is written to specific shards.
NoteOnly Realtime Compute for Apache Flink that uses VVR 6.0.5 or later supports this parameter.
flushIntervalMs
The interval at which data writing is triggered.
STRING
No
2000
Unit: milliseconds.
Data type mappings
Data type of Flink | Data type of Log Service |
VARCHAR | STRING |
Sample code
CREATE TEMPORARY TABLE sls_input(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` BIGINT METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou.log.aliyuncs.com',
'accessid' ='xx',
'accesskey' ='xxx',
'starttime' = '2001-08-01 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING ,
`__source__` STRING ,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou.log.aliyuncs.com',
'accessid' ='xx',
'accesskey' ='xxx',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
`time`,
url,
dt,
float_field,
double_field,
boolean_field,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;
DataStream API
If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to fully managed Flink. For more information about how to configure a DataStream connector, see Usage of DataStream connectors. The Log Service DataStream connectors of different versions are stored in the Maven central repository.
Read data from Log Service
VVR of Realtime Compute for Apache Flink provides the implementation class SlsSourceFunction of SourceFunction to read data from Log Service. Sample code:
public class SlsDataStreamSource { public static void main(String[] args) throws Exception { // Sets up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Creates and adds SLS source and sink. env.addSource(createSlsSource()) .map(SlsDataStreamSource::convertMessages) .print(); env.execute("SLS Stream Source"); } private static SlsSourceFunction createSlsSource() { SLSAccessInfo accessInfo = new SLSAccessInfo(); accessInfo.setEndpoint("yourEndpoint"); accessInfo.setProjectName("yourProject"); accessInfo.setLogstore("yourLogStore"); accessInfo.setAccessId("yourAccessId"); accessInfo.setAccessKey("yourAccessKey"); // The batch get size must be given. accessInfo.setBatchGetSize(10); // Optional parameters accessInfo.setConsumerGroup("yourConsumerGroup"); accessInfo.setMaxRetries(3); // time to start consuming, set to current time. int startInSec = (int) (new Date().getTime() / 1000); // time to stop consuming, -1 means never stop. int stopInSec = -1; return new SlsSourceFunction(accessInfo, startInSec, stopInSec); } private static List<String> convertMessages(SourceRecord input) { List<String> res = new ArrayList<>(); for (FastLogGroup logGroup : input.getLogGroups()) { int logsCount = logGroup.getLogsCount(); for (int i = 0; i < logsCount; i++) { FastLog log = logGroup.getLogs(i); int fieldCount = log.getContentsCount(); for (int idx = 0; idx < fieldCount; idx++) { FastLogContent f = log.getContents(idx); res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue())); } } } return res; } }
Write data to Log Service
VVR of Realtime Compute for Apache Flink provides the implementation class SLSOutputFormat of OutputFormat to write data to Log Service. Sample code:
public class SlsDataStreamSink { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromSequence(0, 100) .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong)) .addSink(createSlsSink()) .name(SlsDataStreamSink.class.getSimpleName()); env.execute("SLS Stream Sink"); } private static OutputFormatSinkFunction createSlsSink() { Configuration conf = new Configuration(); conf.setString(SLSOptions.ENDPOINT, "yourEndpoint"); conf.setString(SLSOptions.PROJECT, "yourProject"); conf.setString(SLSOptions.LOGSTORE, "yourLogStore"); conf.setString(SLSOptions.ACCESS_ID, "yourAccessId"); conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey"); SLSOutputFormat outputFormat = new SLSOutputFormat(conf); return new OutputFormatSinkFunction<>(outputFormat); } private static SinkRecord getSinkRecord(Long seed) { SinkRecord record = new SinkRecord(); LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000)); logItem.PushBack("level", "info"); logItem.PushBack("name", String.valueOf(seed)); logItem.PushBack("message", "it's a test message for " + seed.toString()); record.setContent(logItem); return record; } }