All Products
Search
Document Center

Realtime Compute for Apache Flink:Simple Log Service (SLS) connector

Last Updated:Nov 05, 2025

This topic describes how to use the Simple Log Service (SLS) connector.

Background information

Simple Log Service is an end-to-end data logging service that is developed by Alibaba Cloud. It allows you to collect, consume, ship, query, and analyze log data in an efficient manner. It improves the O&M efficiency and provides the capability to process large amounts of log data.

The following table describes the capabilities supported by the SLS connector.

Category

Description

Supported Types

Source table and sink table

Running mode

Streaming mode

Metric

N/A

Data format

N/A

API type

SQL, DataStream API, and data ingestion YAML API

Data update or deletion in the sink table

Data in a sink table cannot be updated or deleted. Data can only be inserted into a sink table.

Features

The SLS source connector can be used to read the attribute fields of messages. The following table describes the attribute fields supported by the SLS source connector.

Field

Type

Description

__source__

STRING METADATA VIRTUAL

The message source.

__topic__

STRING METADATA VIRTUAL

The message topic.

__timestamp__

BIGINT METADATA VIRTUAL

The time when the log is generated.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

The message tag.

The "__tag__:__receive_time__":"1616742274", '__receive_time__', and '1616742274' attributes are logged as key-value pairs in a map and accessed in __tag__['__receive_time__'] mode in SQL.

Prerequisites

A project and a Logstore are created. For more information, see Create a project and a logstore.

Limits

  • Only Ververica Runtime (VVR) 11.1 or later supports using SLS as a data ingestion source.

  • The SLS connector supports only the at-least-once semantics.

  • To increase resource efficiency, set the source operator's parallelism to a value equal to or less than the number of shards. In VVR 8.0.5 or earlier, if the source parallelism exceeds the shard count and the number of shards changes, automatic job failovers may become invalid, potentially leading to unconsumed shards.

SQL

Syntax

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

Connector options in the WITH clause

  • General

    Option

    Description

    Data type

    Required?

    Default value

    Remarks

    connector

    The connector to use.

    String

    Yes

    No default value

    Set it to sls.

    endPoint

    The endpoint of SLS.

    String

    Yes

    No default value

    Enter the VPC access address of SLS. For more information, see Endpoints.

    Note
    • By default, Realtime Compute for Apache Flink cannot access the Internet. However, Alibaba Cloud provides NAT gateways to enable the communication between VPCs and the Internet. For more information, see How does Realtime Compute for Apache Flink access the Internet?.

    • We recommend that you do not access SLS over the Internet. If you do need to access SLS over the Internet, use HTTPS and enable transfer acceleration for SLS.

    project

    The name of the SLS project.

    String

    Yes

    No default value

    logStore

    The name of a SLS Logstore or Metricstore.

    STRING

    Yes

    No default value

    Data in a Logstore is consumed in the same way as in a Metricstore.

    accessId

    The AccessKey ID of your Alibaba Cloud account.

    STRING

    Yes

    No default value

    For more information, see How do I view the AccessKey pair of an account?

    Important

    To protect your AccessKey pair, configure your AccessKey by using variables.

    accessKey

    The AccessKey secret of your Alibaba Cloud account.

    STRING

    Yes

    No default value

  • Source-specific

    Option

    Description

    Data type

    Required?

    Default value

    Remarks

    enableNewSource

    Specifies whether to use the FLIP-27 refactor source interface.

    BOOLEAN

    No

    false

    Enable this option and the source automatically adapts to shard changes and distribute shards across source subtasks as evenly as possible.

    Note

    Only VVR 8.0.9 or later supports this option.

    Important
    • Starting from VVR 11.1, this option is set to true by default.

    • If the option's value changes, your job cannot resume from a specific state. To resolve this, configure the consumerGroup option to record the current consumer offset and start your job. Then, set consumeFromCheckpoint to true and start the job without states.

    • When source subtasks finish reading from read-only shards, they continue to request consuming other shards. This can cause uneven shard consumption among source subtasks, affecting overall job performance. To alleviate this issue, consider adjusting the source parallelism, optimizing your scheduling strategy, or merging small shards to simplify shard assignment.

    shardDiscoveryIntervalMs

    The interval at which shard changes are dynamically detected.

    LONG

    No

    60000

    Unit: milliseconds.

    To disable dynamic detection, set the option to a negative value.

    Note
    • The value of this option cannot be less than 1 minute (or 60,000 milliseconds).

    • This option takes effect only if the enableNewSource option is set to true.

    • Only VVR 8.0.9 or later supports this option.

    startupMode

    The startup mode of the source table.

    STRING

    No

    timestamp

    • timestamp: Logs are consumed from the specified start time.

    • latest: Logs are consumed from the latest offset.

    • earliest: Logs are consumed from the earliest offset.

    • consumer_group: Logs are consumed from the offset recorded in the consumer group. If the consumer group does not record the consumption offset of a shard, logs are consumed from the earliest offset.

    Important
    • In VVR versions earlier than 11.1, consumer_group is no longer supported. To consume data from a recorded offset in a specified consumer group, set consumeFromCheckpoint to true.

    startTime

    The time at which logs start to be consumed.

    STRING

    No

    Current time

    The value of this option is in the yyyy-MM-dd hh:mm:ss format.

    This option takes effect only if startupMode is set to timestamp.

    Note

    The startTime and stopTime parameters are configured based on the __receive_time__ field in a SLS source table rather than on the __timestamp__ field.

    stopTime

    The time at which log consumption is stopped.

    String

    No

    No default value

    The value of this option is in the yyyy-MM-dd hh:mm:ss format.

    Note
    • To consume only historical logs, set this option to a specific historical time point. Using a future time point may cause consumption to stop unexpectedly if new log ingestion is temporarily disrupted. The observable symptom is the disruption of data streams without any accompanying error messages or exceptions.

    • If you want Realtime Compute for Apache Flink programs to exit after log consumption is complete, you must also configure the exitAfterFinish option and set the exitAfterFinish option to true.

    consumerGroup

    The name of the consumer group.

    STRING

    No

    No default value

    A consumer group records the consumption progress. You can specify a custom consumer group name. The format of the name is not fixed.

    Note

    A consumer group cannot be shared by multiple jobs for collaborative consumption. We recommend that you specify different consumer groups for different jobs. If you specify the same consumer group for different jobs, all data is consumed. When Realtime Compute for Apache Flink consumes data from SLS, the data is not sharded in a consumer group. Therefore, if multiple jobs share the same consumer group, all messages in the consumer group are consumed by each job.

    consumeFromCheckpoint

    Specifies whether to consume logs from the checkpoint that is stored in the specified consumer group.

    STRING

    No

    false

    • true: If you set this option to true, you must also specify a consumer group. Flink consumes logs from the checkpoint that is stored in the consumer group. If no checkpoint exists in the consumer group, Flink consumes logs from the time specified by the startTime option.

    • false : Flink does not consume logs from the checkpoint that is stored in the specified consumer group.

    Important

    Starting from VVR 11.1, this option is no longer supported. You need to set startupMode to consumer_group.

    maxRetries

    The number of retries that are allowed when data fails to be read from SLS.

    String

    No

    3

    batchGetSize

    The number of log groups to read in a request.

    String

    No

    100

    To prevent errors, set batchGetSize to a value less than 1000.

    exitAfterFinish

    Specifies whether Realtime Compute for Apache Flink programs exit after data consumption is complete.

    String

    No

    false

    • true

    • false

    query

    Important

    This option was deprecated in VVR 11.3, but subsequent versions remain compatible.

    The query statement that is used to preprocess data before data consumption.

    STRING

    No

    No default value

    Configuring this option to filter data from SLS before data consumption begins, reducing costs and improving data processing efficiency.

    For example, if you specify 'query' = '*| where request_method = ''GET''', Realtime Compute for Apache Flink filters data where its request_method field values are equal to GET before data consumption.

    Note

    Use SPL syntax when configuring this option.

    Important
    • Only VVR 8.0.1 or later supports this option.

    • This feature incurs fees from SLS. For details, see Billing.

    processor

    The SLS processor. If both this option and query are set, query takes precedence.

    STRING

    No

    No default value

    This option is functionally equivalent to query, but we recommend using this option. For example, setting 'processor' = 'test-filter-processor' indicates that data will be filtered by the SLS processor before being consumed by Flink.

    Note

    Use SPL syntax when configuring this option.

    Important
    • Only VVR 8.0.1 or later supports this option.

    • This feature incurs fees from SLS. For details, see Billing.

  • Sink-specific

    Option

    Description

    Data type

    Required?

    Default value

    Remarks

    topicField

    Specifies a field name. The value of this option overwrites the value of the __topic__ field to indicate the topic of the log.

    String

    No

    No default value

    The value of this option must be an existing field in the table.

    timeField

    Specifies a field name. The value of this option overwrites the value of the __timestamp__ field to indicate the log write time.

    String

    No

    Current time

    The option must be set to an existing INT field. If no field is specified, the current time is used.

    sourceField

    Specifies a field name. The value of this option overwrites the value of the __source__ attribute field to indicate the origin of the log. For example, the value is the IP address of the machine that generates the log.

    STRING

    No

    No default value

    The value of this option must be an existing field in the table.

    partitionField

    Specifies a field name. A hash value is calculated based on the value of this parameter when data is written to SLS. Data that includes the same hash value is written to the same shard.

    STRING

    No

    No default value

    If you do not specify this option, each data entry is randomly written to an available shard.

    buckets

    The number of buckets that are regrouped based on the hash value when the partitionField option is specified.

    String

    No

    64

    Valid value range: [1,256]. The value of this option must be an integer power of 2. The number of buckets must be greater than or equal to the number of shards. Otherwise, no data is written to specific shards.

    flushIntervalMs

    The interval at which data writing is triggered.

    STRING

    No

    2000

    Unit: milliseconds.

    writeNullProperties

    Specifies whether to write null values as empty strings to SLS.

    BOOLEAN

    No

    true

    • true

    • false

    Note

    Only VVR 8.0.6 or later supports this option.

Data type mappings

Data type of Realtime Compute for Apache Flink

Data type of SLS

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Data ingestion

Limits

Only VVR 11.1 or later supports data ingestion from SLS.

Syntax

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

Configuration options

Option

Description

Data type

Required?

Default value

Remarks

type

The type of the data source.

String

Yes

No default value

Set it to sls.

endpoint

The endpoint.

String

Yes

No default value

Enter the VPC access address of SLS. For more information, see Endpoints.

Note

accessId

The AccessKey ID of your Alibaba Cloud account.

String

Yes

No default value

See How do I view the AccessKey pair of an account?

Important

To protect your AccessKey pair, use variables to configure AccessKey ID and secret.

accessKey

The AccessKey secret of your Alibaba Cloud account.

String

Yes

No default value

project

The name of the SLS project.

String

Yes

No default value

logStore

The name of a Logstore or Metricstore.

String

Yes

No default value

Data in a Logstore is consumed in the same way as in a Metricstore.

schema.inference.strategy

The strategy for schema inference.

String

No

continuous

  • continuous: Schema inference is performed for each data entry. If the schemas are incompatible, a wider schema is inferred and schema change events are generated.

  • static: Schema inference is performed only once when the job starts. Subsequent data is parsed based on the initial schema, and no schema change events are generated.

maxPreFetchLogGroups

The maximum number of log groups read and parsed for each shard during initial schemainference.

Integer

No

50

Before data is loaded and processed, the connector attempts to consume a specified number of log groups from each shard in advance to initialize the schema.

shardDiscoveryIntervalMs

The interval at which changes to shards are dynamically detected.

Long

No

60000

Setting this option to a negative value to disable dynamic detection. Unit: milliseconds.

Note

The value of this option cannot be less than 1 minute (namely, 60,000 milliseconds).

startupMode

The startup mode.

String

No

timestamp

  • timestamp: Logs are consumed from the specified start time.

  • latest: Logs are consumed from the latest offset.

  • earliest: Logs are consumed from the earliest offset.

  • consumer_group: Logs are consumed from the offset recorded in the consumer group. If the consumer group does not record the consumption offset of a shard, logs are consumed from the earliest offset.

startTime

The time at which log consumption starts.

String

No

Current time

The value of this option is in the yyyy-MM-dd hh:mm:ss format.

It takes effect only if startupMode is set to timestamp.

Note

The startTime and stopTime options are configured based on the __receive_time__ field in SLS rather than the __timestamp__ field.

stopTime

The time at which log consumption is stopped.

String

No

No default value

The value of this option is in the yyyy-MM-dd hh:mm:ss format.

Note

To cancel a Flink job upon the completion of log consumption, set exitAfterFinish to true.

consumerGroup

The name of the consumer group.

String

No

No default value

A consumer group records the consumption progress. You can specify a custom consumer group name. The format of the name is not fixed.

batchGetSize

The number of log groups read in a request.

Integer

No

100

To prevent errors, set batchGetSize to a value less than 1000.

maxRetries

The number of retries after reading from SLS fails.

Integer

No

3

exitAfterFinish

Specifies whether a Flink program exits after data consumption is complete.

Boolean

No

false

  • true

  • false

query

The query statement that is used to preprocess data before Flink consumes data from SLS.

String

No

No default value

Configuring this option can help you filter data before consumption, reducing costs and improving data processing efficiency.

For example, 'query' = '*| where request_method = ''GET''' indicates filtering data where request_method is GET.

Note

Use SPL syntax to write the query.

Important
  • For regions that support this feature, see Consume logs based on rules.

  • This feature is in public preview and currently free of charge. You may be billed in the future. For more information, see Billing.

compressType

The compression type.

String

No

No default value

Valid values:

  • lz4

  • deflate

  • zstd

timeZone

The time zone for startTime and stopTime.

String

No

No default value

By default, no offset is added.

regionId

The region where SLS resides.

String

No

No default value

See Supported regions.

signVersion

The SLS requestsignature version.

String

No

No default value

See Request signatures.

shardModDivisor

The divisor used when reading from SLS Logstore shards.

Integer

No

-1

See Shard to configure this option.

shardModRemainder

The remainder used when reading from SLS Logstore shards.

Integer

No

-1

See Shard to configure this option.

metadata.list

The metadata columns passed to downstream.

String

No

No default value

Available metadata fields include __source__, __topic__, __timestamp__, and __tag__. You can separate them with commas.

Data type mappings

The data type mappings for data ingestion are as follows:

SLS data type

Flink CDC data type

STRING

STRING

Schema inference and evolution

  • Data pre-consumption and schema initialization

    The SLS connector maintains the schema of the current Logstore. Before reading data from the Logstore, the connector attempts to pre-consume up to maxPreFetchLogGroups log groups from each shard and initializes the schema by parsing and merging the schema of each log. Subsequently, before data consumption begins, a table creation event is generated based on the initialized schema.

    Note

    For each shard, the connector attempts to consume data one hour before the current time to parse the schema.

  • Primary key

    SLS logs do not contain primary keys. Manually add primary keys to the table in the transform module:

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • Schema inference and evolution

    After schema initialization, if schema.inference.strategy is set to static, the connector parses each log entry based on the the schema and does not generate schema change events. If schema.inference.strategy is set to continuous, the connector parses each log entry, infers the physical columns, and compares them with the current schema. If the inferred schema is inconsistent with the current schema, the schemas are merged according to the following rules:

    • If the inferred schema contains physical columns not in the current schema, the missing columns are added to the current schema, and nullable column addition events are generated.

    • If the inferred schema does not contain specific columns in the current schema, these columns are retained and their values are set to NULL.

    The SLS connector infers all fields as string ones. Currently, only column addition is supported. New columns are appended to the current schema as nullable columns.

Sample code

  • Source table and sink table:

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • Data ingestion source:

    source:
       type: sls
       name: SLS Source
       endpoint: ${endpoint}
       project: ${project}
       logstore: ${logstore}
       accessId: ${accessId}
       accessKey: ${accessKey}
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

DataStream API

Important

If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to Realtime Compute for Apache Flink. For more information, see Usage of DataStream connectors.

If you use VVR versions earlier than 8.0.10, you might encounter missing dependency JAR packages at job startup. To resolve this, include the corresponding uber JAR package as an additional dependency.

Read data from SLS

VVR of Realtime Compute for Apache Flink provides the implementation class SlsSourceFunction of SourceFunction to read data from SLS. Sample code:

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Creates and adds SLS source and sink.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // The batch get size must be given.
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // time to start consuming, set to current time.
        int startInSec = (int) (new Date().getTime() / 1000);

        // time to stop consuming, -1 means never stop.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

Write data to SLS

VVR of Realtime Compute for Apache Flink provides the implementation class SLSOutputFormat of OutputFormat to write data to SLS. Sample code:

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

The Simple Log Service DataStream connectors of different versions are stored in the Maven central repository.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

FAQ

What do I do if an OOM error occurs in TaskManagers and the error message "java.lang.OutOfMemoryError: Java heap space" appears for the source table when I restore a failed Flink program?