全部产品
Search
文档中心

实时计算Flink版:日志服务SLS

更新时间:Jun 30, 2025

本文为您介绍如何使用日志服务SLS连接器。

背景信息

日志服务是针对日志类数据的一站式服务。日志服务可以帮助您快捷地完成数据采集、消费、投递以及查询分析,提升运维和运营效率,建立海量日志处理能力。

SLS连接器支持的信息如下。

类别

详情

支持类型

源表和结果表

运行模式

仅支持流模式

特有监控指标

暂不适用

数据格式

暂无

API种类

SQL,Datastream和数据摄入YAML

是否支持更新或删除结果表数据

不支持更新和删除结果表数据,只支持插入数据。

特色功能

SLS连接器源表支持直接读取消息的属性字段,支持的属性字段如下。

字段名

字段类型

字段说明

__source__

STRING METADATA VIRTUAL

消息源。

__topic__

STRING METADATA VIRTUAL

消息主题。

__timestamp__

BIGINT METADATA VIRTUAL

日志时间。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

消息TAG。

对于属性"__tag__:__receive_time__":"1616742274"'__receive_time__'和'1616742274'会被作为KV对,记录在Map中,在SQL中通过__tag__['__receive_time__']的方式访问。

前提条件

已创建日志服务Project和Logstore,详情请参见创建Project和Logstore

使用限制

  • 仅实时计算引擎VVR 11.1及以上版本支持日志服务SLS作为数据摄入YAML的同步数据源。

  • SLS连接器仅保证At-Least-Once语义。

  • 强烈建议不要设置Source并发度大于Shard个数,不仅会造成资源浪费,且在8.0.5及更低版本中,如果后续Shard数目发生变化,自动Failover功能可能会失效,造成部分Shard不被消费。

SQL

语法结构

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH参数

  • 通用

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    connector

    表类型。

    String

    固定值sls。

    endPoint

    EndPoint地址。

    String

    请填写SLS的私网服务地址,详情请参见服务接入点

    说明
    • 实时计算Flink版默认不具备访问公网的能力,但阿里云提供的NAT网关可以实现VPC网络与公网网络互通,详情请参见如何访问公网?

    • 不建议跨公网访问SLS。如确有需要,请使用HTTPS网络传输协议并且开启SLS全球加速服务,详情请参见管理传输加速

    project

    SLS项目名称。

    String

    无。

    logStore

    SLS LogStore或metricstore名称。

    String

    logStore和metricstore是相同的消费方式。

    accessId

    阿里云账号的AccessKey ID。

    String

    详情请参见如何查看AccessKey ID和AccessKey Secret信息?

    重要

    为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量

    accessKey

    阿里云账号的AccessKey Secret。

    String

  • 源表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    enableNewSource

    是否启用实现了FLIP-27接口的新数据源。

    Boolean

    false

    新数据源可以自动适应shard变化,同时尽可能保证shard在所有的source并发上分布均匀。

    说明

    仅实时计算引擎VVR 8.0.9及以上版本支持该参数。

    重要
    • 从实时计算引擎VVR 11.1版本开始该参数默认为true。

    • 作业在该配置项发生变化后无法从状态恢复。可通过先设置配置项consumerGroup启动作业,将消费进度记录到SLS消费组中,再将配置项consumeFromCheckpoint设为true后无状态启动作业,从而实现从历史进度继续消费。

    shardDiscoveryIntervalMs

    动态检测shard变化时间间隔,单位为毫秒。

    Long

    60000

    设置为负值时可以关闭动态检测。

    说明
    • 该参数值不能少于1分钟(60000毫秒)。

    • 仅当配置项enableNewSource为true时生效。

    • 仅实时计算引擎VVR 8.0.9及以上版本支持该参数。

    startupMode

    源表启动模式。

    String

    timestamp

    • timestamp(默认):从指定的起始时间开始消费日志。

    • latest:从最新位点开始消费日志。

    • earliest:从最早位点开始消费日志。

    • consumer-group:从消费组记录位点开始消费日志。若消费组未记录某shard消费位点,则会从最早位点开始消费日志。

    重要
    • 实时计算引擎VVR 11.1以下版本,不支持取值为consumer-group,需要将consumeFromCheckpoint设为true,此时会从指定消费组记录的位点开始消费日志,此处的启动模式将不会生效。

    startTime

    消费日志的开始时间。

    String

    当前时间

    格式为yyyy-MM-dd hh:mm:ss

    仅当startupMode设为timestamp时生效。

    说明

    startTime和stopTime基于SLS中的__receive_time__属性,而非__timestamp__属性。

    stopTime

    消费日志的结束时间。

    String

    格式为yyyy-MM-dd hh:mm:ss

    说明

    如期望日志消费到结尾时退出Flink程序,需要同时设置exitAfterFinish=true.

    consumerGroup

    消费组名称。

    String

    消费组用于记录消费进度。您可以自定义消费组名,无固定格式。

    说明

    不支持通过相同的消费组进行多作业的协同消费。不同的Flink作业应该设置不同的消费组。如果不同的Flink作业使用相同的消费组,它们将会消费全部数据。这是因为在Flink消费SLS的数据时,并不会经过SLS消费组进行分区分配,因此导致各个消费者独立消费各自的消息,即使消费组是相同的。

    consumeFromCheckpoint

    是否从指定的消费组中保存的Checkpoint开始消费日志。

    String

    false

    • true:必须同时指定消费组,Flink程序会从消费组中保存的Checkpoint开始消费日志,如果该消费组没有对应的Checkpoint,则从startTime配置值开始消费。

    • false(默认值):不从指定的消费组中保存的Checkpoint开始消费日志。

    重要

    实时计算引擎VVR 11.1版本开始不再支持配置该参数。对于VVR 11.1及更高版本,需要将startupMode配置为consumer-group

    maxRetries

    读取SLS失败后重试次数。

    String

    3

    无。

    batchGetSize

    单次请求读取logGroup的个数。

    String

    100

    batchGetSize设置不能超过1000,否则会报错。

    exitAfterFinish

    在数据消费完成后,Flink程序是否退出。

    String

    false

    • true:数据消费完后,Flink程序退出。

    • false(默认):数据消费完后,Flink程序不退出。

    query

    SLS消费预处理语句。

    String

    通过使用query参数,您可以在消费SLS数据之前对其进行过滤,以避免将所有数据都消费到Flink中,从而实现节约成本和提高处理速度的目的。

    例如 'query' = '*| where request_method = ''GET'''表示在Flink读取SLS数据前,先匹配出request_method字段值等于get的数据。

    说明

    query需使用日志服务SPL语言,请参见SPL语法

    重要
    • 仅实时计算引擎VVR 8.0.1及以上版本支持该参数。

    • 日志服务SLS支持该功能的地域请参见基于规则消费日志

    • 公测阶段免费,后续可能会在产生日志服务SLS费用,详情请参见费用说明

  • 结果表独有

    参数

    说明

    数据类型

    是否必填

    默认值

    备注

    topicField

    指定字段名,该字段的值会覆盖__topic__属性字段的值,表示日志的主题。

    String

    该参数值是表中已存在的字段之一。

    timeField

    指定字段名,该字段的值会覆盖__timestamp__属性字段的值,表示日志写入时间。

    String

    当前时间

    该参数值是表中已存在的字段之一,且字段类型必须为INT。如果未指定,则默认填充当前时间。

    sourceField

    指定字段名,该字段的值会覆盖__source__属性字段的值,表示日志的来源地,例如产生该日志机器的IP地址。

    String

    该参数值是表中已存在的字段之一。

    partitionField

    指定字段名,数据写入时会根据该列值计算Hash值,Hash值相同的数据会写入同一个shard。

    String

    如果未指定,则每条数据会随机写入当前可用的Shard中。

    buckets

    当指定partitionField时,根据Hash值重新分组的个数。

    String

    64

    该参数的取值范围是[1, 256],且必须是2的整数次幂。同时,buckets个数应当大于等于Shard个数,否则会出现部分Shard没有数据写入的情况。

    flushIntervalMs

    触发数据写入的时间周期。

    String

    2000

    单位为毫秒。

    writeNullProperties

    是否将null值作为空字符串写入SLS。

    Boolean

    true

    • true(默认值):将null值作为空字符串写入日志。

    • false:计算结果为null的字段不会写入到日志中。

    说明

    仅实时计算引擎VVR 8.0.6及以上版本支持该参数。

类型映射

Flink字段类型

SLS字段类型

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

数据摄入

使用限制

仅实时计算引擎VVR 11.1及以上版本支持。

语法结构

source:
   type: sls
   name: SLS Source
   endpoint: <endpoint>
   project: <project>
   logstore: <logstore>
   accessId: <accessId>
   accessKey: <accessKey>

配置项

参数

说明

数据类型

是否必填

默认值

备注

type

数据源类型。

String

固定值sls。

endpoint

EndPoint地址。

String

请填写SLS的私网服务地址,详情请参见服务接入点

说明
  • 实时计算Flink版默认不具备访问公网的能力,但阿里云提供的NAT网关可以实现VPC网络与公网网络互通,详情请参见如何访问公网?

  • 不建议跨公网访问SLS。如确有需要,请使用HTTPS网络传输协议并且开启SLS全球加速服务,详情请参见管理传输加速

accessId

阿里云账号的AccessKey ID。

String

详情请参见如何查看AccessKey ID和AccessKey Secret信息?

重要

为了避免您的AK信息泄露,建议您使用变量的方式填写AccessKey取值,详情请参见项目变量

accessKey

阿里云账号的AccessKey Secret。

String

project

SLS项目名称。

String

无。

logStore

SLS LogStore或metricstore名称。

String

logStore和metricstore是相同的消费方式。

schema.inference.strategy

Schema推导策略。

String

continuous

  • continuous:对每条数据均进行 Schema 推导。在前后 Schema 不兼容时,推导出更宽的 Schema 并产生 Schema 变更事件。

  • static:仅在作业启动时进行一次Schema推导,后续根据初始Schema解析数据,不会产生Schema变更事件。

maxPreFetchLogGroups

Schema初始推导时,对每个shard最多尝试读取解析的logGroup个数。

Integer

50

在作业实际读取并处理数据前,对每个shard尝试提前消费指定数量的logGroup,用于初始化schema信息。

shardDiscoveryIntervalMs

动态检测shard变化时间间隔,单位为毫秒。

Long

60000

设为负值时可以关闭动态检测。

说明

该参数值不能少于1分钟(60000毫秒)。

startupMode

启动模式。

String

  • timestamp(默认):从指定的起始时间开始消费日志。

  • latest:从最新位点开始消费日志。

  • earliest:从最早位点开始消费日志。

  • consumer-group:从消费组记录位点开始消费日志。若消费组未记录某shard消费位点,则会从最早位点开始消费日志。

startTime

消费日志的开始时间。

String

当前时间

格式为yyyy-MM-dd hh:mm:ss。

仅当startupMode设为timestamp时生效。

说明

startTime和stopTime基于SLS中的__receive_time__属性,而非__timestamp__属性。

stopTime

消费日志的结束时间。

String

格式为yyyy-MM-dd hh:mm:ss。

说明

如期望日志消费到结尾时退出Flink程序,需要同时设置exitAfterFinish=true。

consumerGroup

消费组名称。

String

消费组用于记录消费进度。您可以自定义消费组名,无固定格式。

batchGetSize

单次请求读取logGroup的个数。

Integer

100

batchGetSize设置不能超过1000,否则会报错。

maxRetries

读取SLS失败后重试次数。

Integer

3

无。

exitAfterFinish

在数据消费完成后,Flink程序是否退出。

Boolean

false

  • true:数据消费完后,Flink程序退出。

  • false(默认):数据消费完后,Flink程序不退出。

query

SLS消费预处理语句。

String

通过使用query参数,您可以在消费SLS数据之前对其进行过滤,以避免将所有数据都消费到Flink中,从而实现节约成本和提高处理速度的目的。

例如'query' = '*| where request_method = ''GET'''表示在Flink读取SLS数据前,先匹配出request_method字段值等于get的数据。

说明

query需使用日志服务SPL语言,请参见SPL语法

重要

compressType

SLS压缩类型。

String

支持的压缩类型包括:

  • lz4

  • deflate

  • zstd

timeZone

startTime 和 stopTime 对应的时区。

String

默认情况下不添加偏移量。

regionId

SLS开服地域。

String

参阅开服地域文档配置。

signVersion

SLS请求签名版本。

String

参阅请求签名文档配置。

shardModDivisor

在读取SLS LogStore分区时的除数。

Int

-1

参阅分区(Shard)文档配置。

shardModRemainder

在读取SLS LogStore分区时的余数。

Int

-1

参阅分区(Shard)文档配置。

metadata.list

需要传递给下游的元数据列。

String

可用的元数据字段包括__source____topic____timestamp____tag__,您可以用英文逗号分隔。

类型映射

数据摄入类型映射如下表所示:

SLS字段类型

CDC字段类型

STRING

STRING

表结构推导和变更同步

  • Shard数据预消费和表结构初始化

    SLS连接器会维护当前读取logstore的Schema。在读取logstore中的数据前,SLS连接器会预先在每个shard中尝试消费最多maxPreFetchLogGroups个logGroup的数据,解析其中每条日志的Schema,再将这些Schema合并,用于初始化表结构信息。后续在实际消费数据前会根据初始化的Schema产生对应的建表事件。

    说明

    对于每个shard,SLS连接器会尝试从当前时间一小时之前的时间开始消费数据并解析日志Schema。

  • 主键信息

    SLS日志中不包含主键信息,可以通过transform规则手动为表添加主键:

    transform:
      - source-table: <project>.<logstore>
        projection: \*
        primary-keys: key1, key2
  • Schema推导和Schema变更

    在表结构初始化完成后,若schema.inference.strategy配置为static,SLS连接器会根据初始的表结构解析每条日志数据,不会产生Schema变更事件。若schema.inference.strategy配置为continuous,SLS连接器会解析每条日志的数据,推导出物理列,并与当前记录的Schema比对,若推导出的Schema与当前Schema不一致时,会将Schema合并,合并规则如下:

    • 如果推导出的物理列中包含当前Schema中没有的字段,则会将这些字段加入到Schema中,同时产生新增可空列事件。

    • 如果推导出的物理列中不包含当前Schema中已有的字段,该字段仍会保留,该列的数据会填充为NULL,不产生删除列事件。

    SLS连接器会将每条日志中的字段类型都推导为String类型,目前仅支持新增列,即会在当前Schema末尾添加新列,新增的列会设置为可空列。

代码示例

  • SQL源表和结果表

    CREATE TEMPORARY TABLE sls_input(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING METADATA VIRTUAL,
      `__source__` STRING METADATA VIRTUAL,
      `__timestamp__` STRING METADATA VIRTUAL,
       __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'starttime' = '2023-08-30 00:00:00',
      'project' ='sls-test',
      'logstore' ='sls-input'
    );
    
    CREATE TEMPORARY TABLE sls_sink(
      `time` BIGINT,
      url STRING,
      dt STRING,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      `__topic__` STRING,
      `__source__` STRING,
      `__timestamp__` BIGINT ,
      receive_time BIGINT
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
      'accessId' = '${ak_id}',
      'accessKey' = '${ak_secret}',
      'project' ='sls-test',
      'logstore' ='sls-output'
    );
    
    INSERT INTO sls_sink
    SELECT 
     `time`,
      url,
      dt,
      float_field,
      double_field,
      boolean_field,
      `__topic__` ,
      `__source__` ,
      `__timestamp__` ,
      cast(__tag__['__receive_time__'] as bigint) as receive_time
    FROM sls_input; 
  • 数据摄入数据源

    source:
       type: sls
       name: SLS Source
       endpoint: ${endpoint}
       project: ${project}
       logstore: ${logstore}
       accessId: ${accessId}
       accessKey: ${accessKey}
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

DataStream API

重要

通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink,DataStream连接器设置方法请参见DataStream连接器使用方法

如果您使用的实时计算引擎VVR版本低于8.0.10,启动作业可能会存在缺少依赖JAR包的问题,可以在附加依赖文件额外引入对应的-uber包解决。

读取SLS

实时计算引擎VVR提供SourceFunction的实现类SlsSourceFunction,用于读取SLS,读取SLS的示例如下。

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Creates and adds SLS source and sink.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // The batch get size must be given.
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // time to start consuming, set to current time.
        int startInSec = (int) (new Date().getTime() / 1000);

        // time to stop consuming, -1 means never stop.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

写入SLS

提供OutputFormat的实现类SLSOutputFormat,用于写入SLS。写入SLS的示例如下。

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

Maven中央库中已经放置了SLS DataStream连接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

常见问题

恢复失败的Flink程序时,TaskManager发生OOM,源表报错java.lang.OutOfMemoryError: Java heap space