All Products
Search
Document Center

Realtime Compute for Apache Flink:Simple Log Service

Last Updated:Mar 26, 2026

Use the Simple Log Service (SLS) connector to read log data into Flink jobs or write processed results back to SLS Logstores in streaming mode.

Supported table types: Source and sink  |  Execution mode: Streaming only  |  API types: SQL · DataStream · Data ingestion YAML

Prerequisites

Before you begin, make sure you have:

Limitations

  • Data ingestion YAML requires Ververica Runtime (VVR) 11.1 or later.

  • The SLS connector provides at-least-once delivery guarantees — records are never lost, but may be delivered more than once (for example, after a job restart from a checkpoint). Design your downstream systems to handle duplicates.

  • Keep source concurrency at or below the number of shards. Higher concurrency wastes resources without improving throughput.

  • In VVR 8.0.5 and earlier, automatic failover may stop working if the shard count changes. After a shard change, verify that all shards are still being consumed.

  • The sink table supports insert operations only. Updating or deleting existing data is not supported.

Metadata fields

The source table can expose the following SLS message metadata as virtual columns. Declare them with the METADATA VIRTUAL modifier; they are read-only and are excluded automatically from INSERT INTO operations.

Field name Field type Description
__source__ STRING METADATA VIRTUAL Message source
__topic__ STRING METADATA VIRTUAL Message topic
__timestamp__ BIGINT METADATA VIRTUAL Log time
__tag__ MAP\<VARCHAR, VARCHAR\> METADATA VIRTUAL Message tags as key-value pairs

Example — accessing a tag value:

__tag__['__receive_time__'] returns the value from the tag entry "__tag__:__receive_time__":"1616742274".

SQL

Syntax

CREATE TABLE sls_table (
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector'  = 'sls',
  'endPoint'   = '<yourEndPoint>',
  'project'    = '<yourProjectName>',
  'logStore'   = '<yourLogStoreName>',
  'accessId'   = '${secret_values.ak_id}',
  'accessKey'  = '${secret_values.ak_secret}'
);

Required parameters: connector, endPoint, project, logStore, accessId, accessKey.

WITH parameters

Common parameters

Parameter Data type Required Default Description
connector String Yes Fixed value: sls
endPoint String Yes Private network endpoint for SLS. See Service endpoint.
Note

Realtime Compute for Apache Flink does not support public network access by default. Use Alibaba Cloud NAT Gateway for VPC-to-internet communication. If public access is required, use HTTPS and enable Global Accelerator (GA) for SLS.

project String Yes SLS project name
logStore String Yes Logstore or Metricstore name. Logstores and Metricstores use the same consumption method.
accessId String Yes AccessKey ID. To avoid exposing credentials, use project variables. See How do I view my AccessKey ID and AccessKey secret? and Project variables.
accessKey String Yes AccessKey secret

Source parameters

Start reading position

Use startupMode to control where consumption begins. The available modes are:

Mode Behavior
timestamp (default) Start from the time specified by startTime. If startTime is not set, consumption starts from the current time.
latest Start from the latest available offset
earliest Start from the earliest available offset
consumer_group Resume from the offset saved in the consumer group. If no offset is recorded for a shard, falls back to earliest.

Related parameters:

  • startTime (String, default: current time) — Start time in yyyy-MM-dd hh:mm:ss format. Takes effect only when startupMode is timestamp. Times are based on the SLS __receive_time__ attribute, not __timestamp__.

  • stopTime (String, default: none) — End time in yyyy-MM-dd hh:mm:ss format. Set this only for historical log consumption. If set to a future time, the stream may end prematurely when no new logs arrive — without an error message. To exit Flink after consumption finishes, also set exitAfterFinish=true.

  • consumerGroup (String, default: none) — Records consumption progress. Each Flink job must use a unique consumer group; sharing one across jobs does not coordinate offsets because Flink does not use the SLS consumer group for partition assignment — each consumer reads all data independently.

  • consumeFromCheckpoint (String, default: false) — Not supported in VVR 11.1 and later. In earlier versions, set to true to resume from the offset saved in the specified consumer group; if no checkpoint exists, falls back to startTime. For VVR 11.1 and later, use startupMode=consumer_group instead.

VVR versions earlier than 11.1 do not support startupMode=consumer_group. Use consumeFromCheckpoint=true with a consumerGroup to resume from a saved offset.
Other source parameters
Parameter Data type Required Default Description
enableNewSource Boolean No false (VVR 11.1+: true) Enable the FLIP-27 source interface. The new source adapts automatically to shard changes and distributes shards evenly. Requires VVR 8.0.9 or later.
Important

Changing this parameter prevents the job from resuming its previous state. To resume from a historical offset, first run the job with consumerGroup set, then restart without state and use startupMode=consumer_group. If read-only shards exist, some tasks may request additional unprocessed shards after completing read-only shard consumption, causing uneven distribution. Adjust concurrency, optimize task scheduling, or merge small shards to reduce the effect.

shardDiscoveryIntervalMs Long No 60000 How often (in milliseconds) to detect shard changes. Minimum: 60,000 ms. Set to a negative value to disable. Takes effect only when enableNewSource=true. Requires VVR 8.0.9 or later.
maxRetries String No 3 Number of retries after a failed SLS read
batchGetSize String No 100 Log groups to fetch per request. Maximum: 1,000. Exceeding this limit causes an error.
exitAfterFinish String No false Whether to exit the Flink job after consumption completes. Set to true when consuming historical logs with stopTime.
query String No Deprecated in VVR 11.3 (still compatible). SPL filter applied before Flink reads the data, reducing data volume and cost. Example: 'query' = '* | where request_method = ''GET'''. Use SPL syntax. See SPL syntax. Requires VVR 8.0.1 or later. This feature incurs SLS charges. For more information, see Pricing.
processor String No SLS consumer processor. Takes precedence over query when both are set. Filters data before Flink reads it. Example: 'processor' = 'test-filter-processor'. Use SPL syntax. See SPL syntax and Manage consumer processors. Requires VVR 11.3 or later. This feature incurs SLS charges. For more information, see Pricing.

Sink parameters

Parameter Data type Required Default Description
topicField String No Name of a table field whose value overrides the __topic__ metadata. The field must exist in the table.
timeField String No Current time Name of a table field whose value overrides the __timestamp__ metadata. The field must exist in the table and have the INT type.
sourceField String No Name of a table field whose value overrides the __source__ metadata (for example, the machine IP). The field must exist in the table.
partitionField String No Name of a field used to route records to shards by hash value. Records with the same hash value go to the same shard. If unset, records are written to shards randomly.
buckets String No 64 Number of buckets for hash-based routing when partitionField is set. Valid values: integers from 1 to 256 that are powers of two. Must be greater than or equal to the shard count — otherwise some shards receive no data.
flushIntervalMs String No 2000 How often (in milliseconds) data is written to SLS
writeNullProperties Boolean No true Whether to write null values as empty strings (true) or skip null fields (false). Requires VVR 8.0.6 or later.

Type mapping

All Flink field types map to SLS STRING.

Flink field type SLS field type
BOOLEAN STRING
VARBINARY STRING
VARCHAR STRING
TINYINT STRING
INTEGER STRING
BIGINT STRING
FLOAT STRING
DOUBLE STRING
DECIMAL STRING

Data ingestion YAML (public preview)

Requires VVR 11.1 or later.

Syntax

source:
  type: sls
  name: SLS Source
  endpoint: <endpoint>
  project: <project>
  logstore: <logstore>
  accessId: <accessId>
  accessKey: <accessKey>

Configuration parameters

Common parameters

Parameter Data type Required Default Description
type String Yes Fixed value: sls
endpoint String Yes Private network endpoint for SLS. See Service endpoint. For public access considerations, see the note under the SQL endPoint parameter above.
project String Yes SLS project name
logStore String Yes Logstore or Metricstore name
accessId String Yes AccessKey ID. Use project variables to avoid exposing credentials.
accessKey String Yes AccessKey secret
startupMode String No timestamp Start reading position. Same modes as the SQL connector: timestamp, latest, earliest, consumer_group.
startTime String No Current time Start time in yyyy-MM-dd hh:mm:ss format. Takes effect when startupMode=timestamp. Based on __receive_time__, not __timestamp__.
stopTime String No End time in yyyy-MM-dd hh:mm:ss format. To exit after consumption finishes, also set exitAfterFinish=true.
consumerGroup String No Records consumption progress
batchGetSize Integer No 100 Log groups to fetch per request. Maximum: 1,000.
maxRetries Integer No 3 Retries after a failed SLS read
exitAfterFinish Boolean No false Whether to exit after consumption completes
shardDiscoveryIntervalMs Long No 60000 Shard change detection interval in milliseconds. Minimum: 60,000 ms. Set to a negative value to disable.
query String No SPL filter applied before Flink reads the data. See SPL syntax. During public preview, see Consume logs based on rules for supported regions. This feature is free during public preview; charges may apply later. For more information, see Pricing.

Schema and parsing parameters

Parameter Data type Required Default Description
schema.inference.strategy String No continuous Schema inference strategy: continuous (infer for every record; generate a schema change event when schemas are incompatible) or static (infer once at job startup; no schema change events).
maxPreFetchLogGroups Integer No 50 Maximum log groups to pre-consume per shard during initial schema inference
fixed-types String No Explicit field types for parsing. Example: id BIGINT, name VARCHAR(10). Requires VVR 11.6 or later.
timestamp-format.standard String No SQL Timestamp format: SQL (yyyy-MM-dd HH:mm:ss.s{precision}) or ISO-8601 (yyyy-MM-ddTHH:mm:ss.s{precision}). Requires VVR 11.6 or later.
ingestion.ignore-errors Boolean No false Whether to ignore parsing errors instead of failing the job. Requires VVR 11.6 or later.
ingestion.error-tolerance.max-count Integer No -1 Maximum parsing errors allowed before the job fails, when ingestion.ignore-errors is enabled. -1 means errors never trigger job failure. Requires VVR 11.6 or later.
decode.table-id.fields String No Fields used to generate the table ID when parsing log data. Comma-separated. For example, given a log record {"col0":"a", "col1":"b", "col2":"c"}: setting col0 produces table ID a; setting col0,col1 produces a.b; setting col0,col1,col2 produces a.b.c. Without this parameter, all messages are stored in Project.LogStore. Requires VVR 11.6 or later.
compressType String No SLS compression type: lz4, deflate, or zstd
timeZone String No Time zone for startTime and stopTime. No offset is applied by default.
regionId String No SLS region. See Available regions.
signVersion String No SLS request signature version. See Request signing.
shardModDivisor Int No -1 Divisor for shard-based filtering when reading. See Shard.
shardModRemainder Int No -1 Remainder for shard-based filtering when reading. See Shard.
metadata.list String No Metadata columns to pass downstream. Supported fields: __source__, __topic__, __timestamp__, __tag__. Comma-separated.

Type mapping

Without fixed-types, all SLS STRING fields map to CDC STRING. With fixed-types, the connector parses fields using the specified types.

SLS field type CDC field type
STRING STRING

Schema inference and schema changes

Shard pre-consumption and schema initialization

Before reading data, the SLS connector pre-consumes up to maxPreFetchLogGroups log groups per shard — starting from one hour before the current time — to infer and merge schemas. After initialization, it generates table creation events based on the merged schema.

Primary keys

SLS logs do not include primary key information. Add primary keys using transform rules:

transform:
  - source-table: <project>.<logstore>
    projection: '*'
    primary-keys: key1, key2

Schema changes

After schema initialization, behavior depends on schema.inference.strategy:

  • static: Each record is parsed using the initial schema. No schema change events are generated.

  • continuous: Each record is parsed and its inferred columns are compared with the current schema. Merging rules:

    • New fields: Added to the schema as nullable columns; a column-addition event is generated.

    • Missing fields: Existing columns are retained and filled with NULL. No deletion event is generated.

All field types are inferred as STRING. Only column addition is supported; new columns are appended to the end of the schema.

Code examples

SQL source and sink

CREATE TEMPORARY TABLE sls_input (
  `time`         BIGINT,
  url            STRING,
  dt             STRING,
  float_field    FLOAT,
  double_field   DOUBLE,
  boolean_field  BOOLEAN,
  `__topic__`    STRING METADATA VIRTUAL,
  `__source__`   STRING METADATA VIRTUAL,
  `__timestamp__` STRING METADATA VIRTUAL,
  __tag__        MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime       AS PROCTIME()
) WITH (
  'connector'  = 'sls',
  'endpoint'   = 'cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId'   = '${secret_values.ak_id}',
  'accessKey'  = '${secret_values.ak_secret}',
  'starttime'  = '2023-08-30 00:00:00',
  'project'    = 'sls-test',
  'logstore'   = 'sls-input'
);

CREATE TEMPORARY TABLE sls_sink (
  `time`         BIGINT,
  url            STRING,
  dt             STRING,
  float_field    FLOAT,
  double_field   DOUBLE,
  boolean_field  BOOLEAN,
  `__topic__`    STRING,
  `__source__`   STRING,
  `__timestamp__` BIGINT,
  receive_time   BIGINT
) WITH (
  'connector'  = 'sls',
  'endpoint'   = 'cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId'   = '${ak_id}',
  'accessKey'  = '${ak_secret}',
  'project'    = 'sls-test',
  'logstore'   = 'sls-output'
);

INSERT INTO sls_sink
SELECT
  `time`,
  url,
  dt,
  float_field,
  double_field,
  boolean_field,
  `__topic__`,
  `__source__`,
  `__timestamp__`,
  CAST(__tag__['__receive_time__'] AS BIGINT) AS receive_time
FROM sls_input;

Data ingestion: SLS to Paimon

SLS can serve as a data source for data ingestion jobs, writing log data in real time to downstream systems. The following example writes data from a Logstore to a DLF data lake in Paimon format. The job infers field types and table structure automatically and supports dynamic schema evolution during runtime.

source:
  type: sls
  name: SLS Source
  endpoint: ${endpoint}
  project: test_project
  logstore: test_log
  accessId: ${accessId}
  accessKey: ${accessKey}

# Add a primary key to all source tables
transform:
  - source-table: '.*\..*'
    projection: '*'
    primary-keys: id

# Write test_log data to the downstream inventory table
route:
  - source-table: test_project.test_log
    sink-table: test_database.inventory

sink:
  type: paimon
  catalog.properties.metastore: rest
  catalog.properties.uri: dlf_uri
  catalog.properties.warehouse: your_warehouse
  catalog.properties.token.provider: dlf
  # (Optional) Enable deletion vectors to improve read performance
  table.properties.deletion-vectors.enabled: true

DataStream API

Important

To use DataStream, you need the DataStream connector for Flink. See Use DataStream connectors. If you use VVR earlier than 8.0.10, add the corresponding -uber JAR as an additional dependency to prevent missing dependency errors at job startup.

Read from SLS

VVR provides the SlsSourceFunction class for reading from SLS.

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // Required: batch size must be set
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // Start consuming from the current time
        int startInSec = (int) (new Date().getTime() / 1000);

        // -1 means consume indefinitely
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

Write to SLS

VVR provides the SLSOutputFormat class for writing to SLS.

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }
}

Maven dependency

The SLS DataStream connector is available in the Maven Central Repository.

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-format-common</artifactId>
        </exclusion>
    </exclusions>
</dependency>

FAQ

When recovering a failed Flink job, the TaskManager runs out of memory, and the source table reports java.lang.OutOfMemoryError: Java heap space