Use the Simple Log Service (SLS) connector to read log data into Flink jobs or write processed results back to SLS Logstores in streaming mode.
Supported table types: Source and sink | Execution mode: Streaming only | API types: SQL · DataStream · Data ingestion YAML
Prerequisites
Before you begin, make sure you have:
-
An SLS project and Logstore. See Create a project and a Logstore.
Limitations
-
Data ingestion YAML requires Ververica Runtime (VVR) 11.1 or later.
-
The SLS connector provides at-least-once delivery guarantees — records are never lost, but may be delivered more than once (for example, after a job restart from a checkpoint). Design your downstream systems to handle duplicates.
-
Keep source concurrency at or below the number of shards. Higher concurrency wastes resources without improving throughput.
-
In VVR 8.0.5 and earlier, automatic failover may stop working if the shard count changes. After a shard change, verify that all shards are still being consumed.
-
The sink table supports insert operations only. Updating or deleting existing data is not supported.
Metadata fields
The source table can expose the following SLS message metadata as virtual columns. Declare them with the METADATA VIRTUAL modifier; they are read-only and are excluded automatically from INSERT INTO operations.
| Field name | Field type | Description |
|---|---|---|
__source__ |
STRING METADATA VIRTUAL | Message source |
__topic__ |
STRING METADATA VIRTUAL | Message topic |
__timestamp__ |
BIGINT METADATA VIRTUAL | Log time |
__tag__ |
MAP\<VARCHAR, VARCHAR\> METADATA VIRTUAL | Message tags as key-value pairs |
Example — accessing a tag value:
__tag__['__receive_time__'] returns the value from the tag entry "__tag__:__receive_time__":"1616742274".
SQL
Syntax
CREATE TABLE sls_table (
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);
Required parameters: connector, endPoint, project, logStore, accessId, accessKey.
WITH parameters
Common parameters
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
connector |
String | Yes | — | Fixed value: sls |
endPoint |
String | Yes | — | Private network endpoint for SLS. See Service endpoint. Note
Realtime Compute for Apache Flink does not support public network access by default. Use Alibaba Cloud NAT Gateway for VPC-to-internet communication. If public access is required, use HTTPS and enable Global Accelerator (GA) for SLS. |
project |
String | Yes | — | SLS project name |
logStore |
String | Yes | — | Logstore or Metricstore name. Logstores and Metricstores use the same consumption method. |
accessId |
String | Yes | — | AccessKey ID. To avoid exposing credentials, use project variables. See How do I view my AccessKey ID and AccessKey secret? and Project variables. |
accessKey |
String | Yes | — | AccessKey secret |
Source parameters
Start reading position
Use startupMode to control where consumption begins. The available modes are:
| Mode | Behavior |
|---|---|
timestamp (default) |
Start from the time specified by startTime. If startTime is not set, consumption starts from the current time. |
latest |
Start from the latest available offset |
earliest |
Start from the earliest available offset |
consumer_group |
Resume from the offset saved in the consumer group. If no offset is recorded for a shard, falls back to earliest. |
Related parameters:
-
startTime(String, default: current time) — Start time inyyyy-MM-dd hh:mm:ssformat. Takes effect only whenstartupModeistimestamp. Times are based on the SLS__receive_time__attribute, not__timestamp__. -
stopTime(String, default: none) — End time inyyyy-MM-dd hh:mm:ssformat. Set this only for historical log consumption. If set to a future time, the stream may end prematurely when no new logs arrive — without an error message. To exit Flink after consumption finishes, also setexitAfterFinish=true. -
consumerGroup(String, default: none) — Records consumption progress. Each Flink job must use a unique consumer group; sharing one across jobs does not coordinate offsets because Flink does not use the SLS consumer group for partition assignment — each consumer reads all data independently. -
consumeFromCheckpoint(String, default:false) — Not supported in VVR 11.1 and later. In earlier versions, set totrueto resume from the offset saved in the specified consumer group; if no checkpoint exists, falls back tostartTime. For VVR 11.1 and later, usestartupMode=consumer_groupinstead.
VVR versions earlier than 11.1 do not supportstartupMode=consumer_group. UseconsumeFromCheckpoint=truewith aconsumerGroupto resume from a saved offset.
Other source parameters
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
enableNewSource |
Boolean | No | false (VVR 11.1+: true) |
Enable the FLIP-27 source interface. The new source adapts automatically to shard changes and distributes shards evenly. Requires VVR 8.0.9 or later. Important
Changing this parameter prevents the job from resuming its previous state. To resume from a historical offset, first run the job with |
shardDiscoveryIntervalMs |
Long | No | 60000 |
How often (in milliseconds) to detect shard changes. Minimum: 60,000 ms. Set to a negative value to disable. Takes effect only when enableNewSource=true. Requires VVR 8.0.9 or later. |
maxRetries |
String | No | 3 |
Number of retries after a failed SLS read |
batchGetSize |
String | No | 100 |
Log groups to fetch per request. Maximum: 1,000. Exceeding this limit causes an error. |
exitAfterFinish |
String | No | false |
Whether to exit the Flink job after consumption completes. Set to true when consuming historical logs with stopTime. |
query |
String | No | — | Deprecated in VVR 11.3 (still compatible). SPL filter applied before Flink reads the data, reducing data volume and cost. Example: 'query' = '* | where request_method = ''GET'''. Use SPL syntax. See SPL syntax. Requires VVR 8.0.1 or later. This feature incurs SLS charges. For more information, see Pricing. |
processor |
String | No | — | SLS consumer processor. Takes precedence over query when both are set. Filters data before Flink reads it. Example: 'processor' = 'test-filter-processor'. Use SPL syntax. See SPL syntax and Manage consumer processors. Requires VVR 11.3 or later. This feature incurs SLS charges. For more information, see Pricing. |
Sink parameters
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
topicField |
String | No | — | Name of a table field whose value overrides the __topic__ metadata. The field must exist in the table. |
timeField |
String | No | Current time | Name of a table field whose value overrides the __timestamp__ metadata. The field must exist in the table and have the INT type. |
sourceField |
String | No | — | Name of a table field whose value overrides the __source__ metadata (for example, the machine IP). The field must exist in the table. |
partitionField |
String | No | — | Name of a field used to route records to shards by hash value. Records with the same hash value go to the same shard. If unset, records are written to shards randomly. |
buckets |
String | No | 64 |
Number of buckets for hash-based routing when partitionField is set. Valid values: integers from 1 to 256 that are powers of two. Must be greater than or equal to the shard count — otherwise some shards receive no data. |
flushIntervalMs |
String | No | 2000 |
How often (in milliseconds) data is written to SLS |
writeNullProperties |
Boolean | No | true |
Whether to write null values as empty strings (true) or skip null fields (false). Requires VVR 8.0.6 or later. |
Type mapping
All Flink field types map to SLS STRING.
| Flink field type | SLS field type |
|---|---|
| BOOLEAN | STRING |
| VARBINARY | STRING |
| VARCHAR | STRING |
| TINYINT | STRING |
| INTEGER | STRING |
| BIGINT | STRING |
| FLOAT | STRING |
| DOUBLE | STRING |
| DECIMAL | STRING |
Data ingestion YAML (public preview)
Requires VVR 11.1 or later.
Syntax
source:
type: sls
name: SLS Source
endpoint: <endpoint>
project: <project>
logstore: <logstore>
accessId: <accessId>
accessKey: <accessKey>
Configuration parameters
Common parameters
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
type |
String | Yes | — | Fixed value: sls |
endpoint |
String | Yes | — | Private network endpoint for SLS. See Service endpoint. For public access considerations, see the note under the SQL endPoint parameter above. |
project |
String | Yes | — | SLS project name |
logStore |
String | Yes | — | Logstore or Metricstore name |
accessId |
String | Yes | — | AccessKey ID. Use project variables to avoid exposing credentials. |
accessKey |
String | Yes | — | AccessKey secret |
startupMode |
String | No | timestamp |
Start reading position. Same modes as the SQL connector: timestamp, latest, earliest, consumer_group. |
startTime |
String | No | Current time | Start time in yyyy-MM-dd hh:mm:ss format. Takes effect when startupMode=timestamp. Based on __receive_time__, not __timestamp__. |
stopTime |
String | No | — | End time in yyyy-MM-dd hh:mm:ss format. To exit after consumption finishes, also set exitAfterFinish=true. |
consumerGroup |
String | No | — | Records consumption progress |
batchGetSize |
Integer | No | 100 |
Log groups to fetch per request. Maximum: 1,000. |
maxRetries |
Integer | No | 3 |
Retries after a failed SLS read |
exitAfterFinish |
Boolean | No | false |
Whether to exit after consumption completes |
shardDiscoveryIntervalMs |
Long | No | 60000 |
Shard change detection interval in milliseconds. Minimum: 60,000 ms. Set to a negative value to disable. |
query |
String | No | — | SPL filter applied before Flink reads the data. See SPL syntax. During public preview, see Consume logs based on rules for supported regions. This feature is free during public preview; charges may apply later. For more information, see Pricing. |
Schema and parsing parameters
| Parameter | Data type | Required | Default | Description |
|---|---|---|---|---|
schema.inference.strategy |
String | No | continuous |
Schema inference strategy: continuous (infer for every record; generate a schema change event when schemas are incompatible) or static (infer once at job startup; no schema change events). |
maxPreFetchLogGroups |
Integer | No | 50 |
Maximum log groups to pre-consume per shard during initial schema inference |
fixed-types |
String | No | — | Explicit field types for parsing. Example: id BIGINT, name VARCHAR(10). Requires VVR 11.6 or later. |
timestamp-format.standard |
String | No | SQL |
Timestamp format: SQL (yyyy-MM-dd HH:mm:ss.s{precision}) or ISO-8601 (yyyy-MM-ddTHH:mm:ss.s{precision}). Requires VVR 11.6 or later. |
ingestion.ignore-errors |
Boolean | No | false |
Whether to ignore parsing errors instead of failing the job. Requires VVR 11.6 or later. |
ingestion.error-tolerance.max-count |
Integer | No | -1 |
Maximum parsing errors allowed before the job fails, when ingestion.ignore-errors is enabled. -1 means errors never trigger job failure. Requires VVR 11.6 or later. |
decode.table-id.fields |
String | No | — | Fields used to generate the table ID when parsing log data. Comma-separated. For example, given a log record {"col0":"a", "col1":"b", "col2":"c"}: setting col0 produces table ID a; setting col0,col1 produces a.b; setting col0,col1,col2 produces a.b.c. Without this parameter, all messages are stored in Project.LogStore. Requires VVR 11.6 or later. |
compressType |
String | No | — | SLS compression type: lz4, deflate, or zstd |
timeZone |
String | No | — | Time zone for startTime and stopTime. No offset is applied by default. |
regionId |
String | No | — | SLS region. See Available regions. |
signVersion |
String | No | — | SLS request signature version. See Request signing. |
shardModDivisor |
Int | No | -1 |
Divisor for shard-based filtering when reading. See Shard. |
shardModRemainder |
Int | No | -1 |
Remainder for shard-based filtering when reading. See Shard. |
metadata.list |
String | No | — | Metadata columns to pass downstream. Supported fields: __source__, __topic__, __timestamp__, __tag__. Comma-separated. |
Type mapping
Without fixed-types, all SLS STRING fields map to CDC STRING. With fixed-types, the connector parses fields using the specified types.
| SLS field type | CDC field type |
|---|---|
| STRING | STRING |
Schema inference and schema changes
Shard pre-consumption and schema initialization
Before reading data, the SLS connector pre-consumes up to maxPreFetchLogGroups log groups per shard — starting from one hour before the current time — to infer and merge schemas. After initialization, it generates table creation events based on the merged schema.
Primary keys
SLS logs do not include primary key information. Add primary keys using transform rules:
transform:
- source-table: <project>.<logstore>
projection: '*'
primary-keys: key1, key2
Schema changes
After schema initialization, behavior depends on schema.inference.strategy:
-
static: Each record is parsed using the initial schema. No schema change events are generated. -
continuous: Each record is parsed and its inferred columns are compared with the current schema. Merging rules:-
New fields: Added to the schema as nullable columns; a column-addition event is generated.
-
Missing fields: Existing columns are retained and filled with NULL. No deletion event is generated.
-
All field types are inferred as STRING. Only column addition is supported; new columns are appended to the end of the schema.
Code examples
SQL source and sink
CREATE TEMPORARY TABLE sls_input (
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` STRING METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime AS PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' = 'cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' = 'sls-test',
'logstore' = 'sls-input'
);
CREATE TEMPORARY TABLE sls_sink (
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' = 'cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' = 'sls-test',
'logstore' = 'sls-output'
);
INSERT INTO sls_sink
SELECT
`time`,
url,
dt,
float_field,
double_field,
boolean_field,
`__topic__`,
`__source__`,
`__timestamp__`,
CAST(__tag__['__receive_time__'] AS BIGINT) AS receive_time
FROM sls_input;
Data ingestion: SLS to Paimon
SLS can serve as a data source for data ingestion jobs, writing log data in real time to downstream systems. The following example writes data from a Logstore to a DLF data lake in Paimon format. The job infers field types and table structure automatically and supports dynamic schema evolution during runtime.
source:
type: sls
name: SLS Source
endpoint: ${endpoint}
project: test_project
logstore: test_log
accessId: ${accessId}
accessKey: ${accessKey}
# Add a primary key to all source tables
transform:
- source-table: '.*\..*'
projection: '*'
primary-keys: id
# Write test_log data to the downstream inventory table
route:
- source-table: test_project.test_log
sink-table: test_database.inventory
sink:
type: paimon
catalog.properties.metastore: rest
catalog.properties.uri: dlf_uri
catalog.properties.warehouse: your_warehouse
catalog.properties.token.provider: dlf
# (Optional) Enable deletion vectors to improve read performance
table.properties.deletion-vectors.enabled: true
DataStream API
To use DataStream, you need the DataStream connector for Flink. See Use DataStream connectors. If you use VVR earlier than 8.0.10, add the corresponding -uber JAR as an additional dependency to prevent missing dependency errors at job startup.
Read from SLS
VVR provides the SlsSourceFunction class for reading from SLS.
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// Required: batch size must be set
accessInfo.setBatchGetSize(10);
// Optional parameters
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// Start consuming from the current time
int startInSec = (int) (new Date().getTime() / 1000);
// -1 means consume indefinitely
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}
Write to SLS
VVR provides the SLSOutputFormat class for writing to SLS.
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}
Maven dependency
The SLS DataStream connector is available in the Maven Central Repository.
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-format-common</artifactId>
</exclusion>
</exclusions>
</dependency>