This topic describes how to use the Simple Log Service (SLS) connector.
Background information
Simple Log Service is an end-to-end data logging service that is developed by Alibaba Cloud. It allows you to collect, consume, ship, query, and analyze log data in an efficient manner. It improves the O&M efficiency and provides the capability to process large amounts of log data.
The following table describes the capabilities supported by the SLS connector.
Category | Description |
Supported Types | Source table and sink table |
Running mode | Streaming mode |
Metric | N/A |
Data format | N/A |
API type | SQL, DataStream API, and data ingestion YAML API |
Data update or deletion in the sink table | Data in a sink table cannot be updated or deleted. Data can only be inserted into a sink table. |
Features
The SLS source connector can be used to read the attribute fields of messages. The following table describes the attribute fields supported by the SLS source connector.
Field | Type | Description |
__source__ | STRING METADATA VIRTUAL | The message source. |
__topic__ | STRING METADATA VIRTUAL | The message topic. |
__timestamp__ | BIGINT METADATA VIRTUAL | The time when the log is generated. |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | The message tag. The |
Prerequisites
A project and a Logstore are created. For more information, see Create a project and a logstore.
Limits
Only Ververica Runtime (VVR) 11.1 or later supports using SLS as a data ingestion source.
The SLS connector supports only the at-least-once semantics.
To increase resource efficiency, set the source operator's parallelism to a value equal to or less than the number of shards. In VVR 8.0.5 or earlier, if the source parallelism exceeds the shard count and the number of shards changes, automatic job failovers may become invalid, potentially leading to unconsumed shards.
SQL
Syntax
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);Connector options in the WITH clause
General
Option
Description
Data type
Required?
Default value
Remarks
connector
The connector to use.
String
Yes
No default value
Set it to
sls.endPoint
The endpoint of SLS.
String
Yes
No default value
Enter the VPC access address of SLS. For more information, see Endpoints.
NoteBy default, Realtime Compute for Apache Flink cannot access the Internet. However, Alibaba Cloud provides NAT gateways to enable the communication between VPCs and the Internet. For more information, see How does Realtime Compute for Apache Flink access the Internet?.
We recommend that you do not access SLS over the Internet. If you do need to access SLS over the Internet, use HTTPS and enable transfer acceleration for SLS.
project
The name of the SLS project.
String
Yes
No default value
logStore
The name of a SLS Logstore or Metricstore.
STRING
Yes
No default value
Data in a Logstore is consumed in the same way as in a Metricstore.
accessId
The AccessKey ID of your Alibaba Cloud account.
STRING
Yes
No default value
For more information, see How do I view the AccessKey pair of an account?
ImportantTo protect your AccessKey pair, configure your AccessKey by using variables.
accessKey
The AccessKey secret of your Alibaba Cloud account.
STRING
Yes
No default value
Source-specific
Option
Description
Data type
Required?
Default value
Remarks
enableNewSource
Specifies whether to use the FLIP-27 refactor source interface.
BOOLEAN
No
falseEnable this option and the source automatically adapts to shard changes and distribute shards across source subtasks as evenly as possible.
NoteOnly VVR 8.0.9 or later supports this option.
ImportantStarting from VVR 11.1, this option is set to
trueby default.If the option's value changes, your job cannot resume from a specific state. To resolve this, configure the
consumerGroupoption to record the current consumer offset and start your job. Then, setconsumeFromCheckpointtotrueand start the job without states.When source subtasks finish reading from read-only shards, they continue to request consuming other shards. This can cause uneven shard consumption among source subtasks, affecting overall job performance. To alleviate this issue, consider adjusting the source parallelism, optimizing your scheduling strategy, or merging small shards to simplify shard assignment.
shardDiscoveryIntervalMs
The interval at which shard changes are dynamically detected.
LONG
No
60000Unit: milliseconds.
To disable dynamic detection, set the option to a negative value.
NoteThe value of this option cannot be less than 1 minute (or 60,000 milliseconds).
This option takes effect only if the
enableNewSourceoption is set totrue.Only VVR 8.0.9 or later supports this option.
startupMode
The startup mode of the source table.
STRING
No
timestamptimestamp: Logs are consumed from the specified start time.latest: Logs are consumed from the latest offset.earliest: Logs are consumed from the earliest offset.consumer_group: Logs are consumed from the offset recorded in the consumer group. If the consumer group does not record the consumption offset of a shard, logs are consumed from the earliest offset.
ImportantIn VVR versions earlier than 11.1,
consumer_groupis no longer supported. To consume data from a recorded offset in a specified consumer group, setconsumeFromCheckpointtotrue.
startTime
The time at which logs start to be consumed.
STRING
No
Current time
The value of this option is in the
yyyy-MM-dd hh:mm:ssformat.This option takes effect only if
startupModeis set totimestamp.NoteThe startTime and stopTime parameters are configured based on the __receive_time__ field in a SLS source table rather than on the __timestamp__ field.
stopTime
The time at which log consumption is stopped.
String
No
No default value
The value of this option is in the
yyyy-MM-dd hh:mm:ssformat.NoteTo consume only historical logs, set this option to a specific historical time point. Using a future time point may cause consumption to stop unexpectedly if new log ingestion is temporarily disrupted. The observable symptom is the disruption of data streams without any accompanying error messages or exceptions.
If you want Realtime Compute for Apache Flink programs to exit after log consumption is complete, you must also configure the
exitAfterFinishoption and set theexitAfterFinishoption totrue.
consumerGroup
The name of the consumer group.
STRING
No
No default value
A consumer group records the consumption progress. You can specify a custom consumer group name. The format of the name is not fixed.
NoteA consumer group cannot be shared by multiple jobs for collaborative consumption. We recommend that you specify different consumer groups for different jobs. If you specify the same consumer group for different jobs, all data is consumed. When Realtime Compute for Apache Flink consumes data from SLS, the data is not sharded in a consumer group. Therefore, if multiple jobs share the same consumer group, all messages in the consumer group are consumed by each job.
consumeFromCheckpoint
Specifies whether to consume logs from the checkpoint that is stored in the specified consumer group.
STRING
No
falsetrue: If you set this option to true, you must also specify a consumer group. Flink consumes logs from the checkpoint that is stored in the consumer group. If no checkpoint exists in the consumer group, Flink consumes logs from the time specified by thestartTimeoption.false: Flink does not consume logs from the checkpoint that is stored in the specified consumer group.
ImportantStarting from VVR 11.1, this option is no longer supported. You need to set
startupModetoconsumer_group.maxRetries
The number of retries that are allowed when data fails to be read from SLS.
String
No
3batchGetSize
The number of log groups to read in a request.
String
No
100To prevent errors, set
batchGetSizeto a value less than 1000.exitAfterFinish
Specifies whether Realtime Compute for Apache Flink programs exit after data consumption is complete.
String
No
falsetruefalse
query
ImportantThis option was deprecated in VVR 11.3, but subsequent versions remain compatible.
The query statement that is used to preprocess data before data consumption.
STRING
No
No default value
Configuring this option to filter data from SLS before data consumption begins, reducing costs and improving data processing efficiency.
For example, if you specify
'query' = '*| where request_method = ''GET''', Realtime Compute for Apache Flink filters data where itsrequest_methodfield values are equal toGETbefore data consumption.NoteUse SPL syntax when configuring this option.
ImportantOnly VVR 8.0.1 or later supports this option.
This feature incurs fees from SLS. For details, see Billing.
processor
The SLS processor. If both this option and
queryare set,querytakes precedence.STRING
No
No default value
This option is functionally equivalent to
query, but we recommend using this option. For example, setting'processor' = 'test-filter-processor'indicates that data will be filtered by the SLS processor before being consumed by Flink.NoteUse SPL syntax when configuring this option.
ImportantOnly VVR 8.0.1 or later supports this option.
This feature incurs fees from SLS. For details, see Billing.
Sink-specific
Option
Description
Data type
Required?
Default value
Remarks
topicField
Specifies a field name. The value of this option overwrites the value of the __topic__ field to indicate the topic of the log.
String
No
No default value
The value of this option must be an existing field in the table.
timeField
Specifies a field name. The value of this option overwrites the value of the __timestamp__ field to indicate the log write time.
String
No
Current time
The option must be set to an existing INT field. If no field is specified, the current time is used.
sourceField
Specifies a field name. The value of this option overwrites the value of the __source__ attribute field to indicate the origin of the log. For example, the value is the IP address of the machine that generates the log.
STRING
No
No default value
The value of this option must be an existing field in the table.
partitionField
Specifies a field name. A hash value is calculated based on the value of this parameter when data is written to SLS. Data that includes the same hash value is written to the same shard.
STRING
No
No default value
If you do not specify this option, each data entry is randomly written to an available shard.
buckets
The number of buckets that are regrouped based on the hash value when the partitionField option is specified.
String
No
64Valid value range: [1,256]. The value of this option must be an integer power of 2. The number of buckets must be greater than or equal to the number of shards. Otherwise, no data is written to specific shards.
flushIntervalMs
The interval at which data writing is triggered.
STRING
No
2000Unit: milliseconds.
writeNullProperties
Specifies whether to write null values as empty strings to SLS.
BOOLEAN
No
truetruefalse
NoteOnly VVR 8.0.6 or later supports this option.
Data type mappings
Data type of Realtime Compute for Apache Flink | Data type of SLS |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
Data ingestion
Limits
Only VVR 11.1 or later supports data ingestion from SLS.
Syntax
source:
type: sls
name: SLS Source
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>Configuration options
Option | Description | Data type | Required? | Default value | Remarks |
type | The type of the data source. | String | Yes | No default value | Set it to |
endpoint | The endpoint. | String | Yes | No default value | Enter the VPC access address of SLS. For more information, see Endpoints. Note
|
accessId | The AccessKey ID of your Alibaba Cloud account. | String | Yes | No default value | See How do I view the AccessKey pair of an account? Important To protect your AccessKey pair, use variables to configure AccessKey ID and secret. |
accessKey | The AccessKey secret of your Alibaba Cloud account. | String | Yes | No default value | |
project | The name of the SLS project. | String | Yes | No default value | |
logStore | The name of a Logstore or Metricstore. | String | Yes | No default value | Data in a Logstore is consumed in the same way as in a Metricstore. |
schema.inference.strategy | The strategy for schema inference. | String | No |
|
|
maxPreFetchLogGroups | The maximum number of log groups read and parsed for each shard during initial schemainference. | Integer | No |
| Before data is loaded and processed, the connector attempts to consume a specified number of log groups from each shard in advance to initialize the schema. |
shardDiscoveryIntervalMs | The interval at which changes to shards are dynamically detected. | Long | No |
| Setting this option to a negative value to disable dynamic detection. Unit: milliseconds. Note The value of this option cannot be less than 1 minute (namely, 60,000 milliseconds). |
startupMode | The startup mode. | String | No |
|
|
startTime | The time at which log consumption starts. | String | No | Current time | The value of this option is in the yyyy-MM-dd hh:mm:ss format. It takes effect only if Note The |
stopTime | The time at which log consumption is stopped. | String | No | No default value | The value of this option is in the yyyy-MM-dd hh:mm:ss format. Note To cancel a Flink job upon the completion of log consumption, set |
consumerGroup | The name of the consumer group. | String | No | No default value | A consumer group records the consumption progress. You can specify a custom consumer group name. The format of the name is not fixed. |
batchGetSize | The number of log groups read in a request. | Integer | No |
| To prevent errors, set |
maxRetries | The number of retries after reading from SLS fails. | Integer | No |
| |
exitAfterFinish | Specifies whether a Flink program exits after data consumption is complete. | Boolean | No |
|
|
query | The query statement that is used to preprocess data before Flink consumes data from SLS. | String | No | No default value | Configuring this option can help you filter data before consumption, reducing costs and improving data processing efficiency. For example, Note Use SPL syntax to write the query. Important
|
compressType | The compression type. | String | No | No default value | Valid values:
|
timeZone | The time zone for | String | No | No default value | By default, no offset is added. |
regionId | The region where SLS resides. | String | No | No default value | See Supported regions. |
signVersion | The SLS requestsignature version. | String | No | No default value | See Request signatures. |
shardModDivisor | The divisor used when reading from SLS Logstore shards. | Integer | No |
| See Shard to configure this option. |
shardModRemainder | The remainder used when reading from SLS Logstore shards. | Integer | No |
| See Shard to configure this option. |
metadata.list | The metadata columns passed to downstream. | String | No | No default value | Available metadata fields include |
Data type mappings
The data type mappings for data ingestion are as follows:
SLS data type | Flink CDC data type |
STRING | STRING |
Schema inference and evolution
Data pre-consumption and schema initialization
The SLS connector maintains the schema of the current Logstore. Before reading data from the Logstore, the connector attempts to pre-consume up to
maxPreFetchLogGroupslog groups from each shard and initializes the schema by parsing and merging the schema of each log. Subsequently, before data consumption begins, a table creation event is generated based on the initialized schema.NoteFor each shard, the connector attempts to consume data one hour before the current time to parse the schema.
Primary key
SLS logs do not contain primary keys. Manually add primary keys to the table in the transform module:
transform: - source-table: <project>.<logstore> projection: \* primary-keys: key1, key2Schema inference and evolution
After schema initialization, if schema.inference.strategy is set to
static, the connector parses each log entry based on the the schema and does not generate schema change events. If schema.inference.strategy is set tocontinuous, the connector parses each log entry, infers the physical columns, and compares them with the current schema. If the inferred schema is inconsistent with the current schema, the schemas are merged according to the following rules:If the inferred schema contains physical columns not in the current schema, the missing columns are added to the current schema, and nullable column addition events are generated.
If the inferred schema does not contain specific columns in the current schema, these columns are retained and their values are set to NULL.
The SLS connector infers all fields as string ones. Currently, only column addition is supported. New columns are appended to the current schema as nullable columns.
Sample code
Source table and sink table:
CREATE TEMPORARY TABLE sls_input( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'starttime' = '2023-08-30 00:00:00', 'project' ='sls-test', 'logstore' ='sls-input' ); CREATE TEMPORARY TABLE sls_sink( `time` BIGINT, url STRING, dt STRING, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, `__topic__` STRING, `__source__` STRING, `__timestamp__` BIGINT , receive_time BIGINT ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com', 'accessId' = '${ak_id}', 'accessKey' = '${ak_secret}', 'project' ='sls-test', 'logstore' ='sls-output' ); INSERT INTO sls_sink SELECT `time`, url, dt, float_field, double_field, boolean_field, `__topic__` , `__source__` , `__timestamp__` , cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;Data ingestion source:
source: type: sls name: SLS Source endpoint: ${endpoint} project: ${project} logstore: ${logstore} accessId: ${accessId} accessKey: ${accessKey} sink: type: values name: Values Sink print.enabled: true sink.print.logger: true
DataStream API
If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to Realtime Compute for Apache Flink. For more information, see Usage of DataStream connectors.
If you use VVR versions earlier than 8.0.10, you might encounter missing dependency JAR packages at job startup. To resolve this, include the corresponding uber JAR package as an additional dependency.
Read data from SLS
VVR of Realtime Compute for Apache Flink provides the implementation class SlsSourceFunction of SourceFunction to read data from SLS. Sample code:
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Creates and adds SLS source and sink.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// The batch get size must be given.
accessInfo.setBatchGetSize(10);
// Optional parameters
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// time to start consuming, set to current time.
int startInSec = (int) (new Date().getTime() / 1000);
// time to stop consuming, -1 means never stop.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}Write data to SLS
VVR of Realtime Compute for Apache Flink provides the implementation class SLSOutputFormat of OutputFormat to write data to SLS. Sample code:
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}XML
The Simple Log Service DataStream connectors of different versions are stored in the Maven central repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
</dependency>