All Products
Search
Document Center

Simple Log Service:Use Realtime Compute for Apache Flink to consume log data

Last Updated:Mar 31, 2026

You can use Realtime Compute for Apache Flink to create a Simple Log Service source table to consume log data in Simple Log Service. This topic describes how to use Realtime Compute for Apache Flink to create a Simple Log Service source table and how to extract the attribute fields involved in the creation process.

Background information

The following table describes the settings that you must configure for Realtime Compute for Apache Flink to consume log data.

Category

Description

Supported type

You can configure a source table and a result table.

Running mode

Only the streaming mode is supported.

Metric

Metrics are not supported.

Data format

None.

API type

SQL statements are supported.

Whether log data can be updated or deleted in a result table

You cannot update or delete log data in a result table. You can only insert log data into a result table.

For more information about how to use Realtime Compute for Apache Flink to consume log data, see Getting started with a Flink SQL deployment.

Prerequisites

  • If you want to use a Resource Access Management (RAM) user or a RAM role to consume log data,make sure that the RAM user or RAM role has the required permissions on the Realtime Compute for Apache Flink console. For more information, see Permissions.

  • A Realtime Compute for Apache Flink workspace is created. For more information, see Create a workspace.

  • A project and a Logstore are created. For more information, see Create a project and a Logstore.

Limits

  • Only Ververica Runtime (VVR) 11.1 or later supports using SLS as a data ingestion source.

  • The SLS connector supports only at-least-once semantics.

  • Avoid setting the source parallelism higher than the number of shards. Doing so wastes resources. In VVR 8.0.5 or earlier, if the shard count changes after you set a high parallelism, automatic failover may fail. This can leave some shards unconsumed.

Create a Simple Log Service source table and a result table

Important

You must develop a complete SQL draft before you use Realtime Compute for Apache Flink to consume log data in Simple Log Service. A complete SQL draft contains a source table and a result table. After log data in the source table is processed, the results are inserted into the result table by using the INSERT INTO statement.

For more information about how to develop an SQL draft in Realtime Compute for Apache Flink, see Job development overview.

Simple Log Service stores log data in real time. Realtime Compute for Apache Flink can read the data in streaming mode as input data. The following code provides an example of a log:

__source__:  11.85.*.199
__tag__:__receive_time__:  1562125591
__topic__:  test-topic
request_method:  GET
status:  200

Sample code

The following code provides an example of an SQL draft that you can develop in Realtime Compute for Apache Flink to consume log data in Simple Log Service.

Important

If the names of tables, columns, and reserved fields in an SQL draft conflict with each other, you must enclose the names in backticks (`).

CREATE TEMPORARY TABLE sls_input(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` BIGINT METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  request_method STRING,
  status BIGINT,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
  request_method,
  status,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

WITH parameters

  • General

    Parameters

    Description

    Data type

    Required?

    Default value

    Remarks

    connector

    Table type.

    String

    Yes

    None

    Set it to sls.

    endPoint

    The endpoint address.

    String

    Yes

    None

    Enter the VPC endpoint of SLS. For more information, see Endpoints.

    Note
    • By default, Realtime Compute for Apache Flink cannot access the Internet. However, Alibaba Cloud provides NAT gateways to enable communication between VPCs and the Internet. For more information, see How do I access the Internet?.

    • Avoid accessing SLS over the Internet. If you must do so, use HTTPS and enable transfer acceleration for SLS.

    project

    The name of the SLS project.

    String

    Yes

    None

    None.

    logStore

    The name of an SLS Logstore or Metricstore.

    String

    Yes

    None

    Data in a Logstore is consumed in the same way as in a Metricstore.

    accessId

    The AccessKey ID of your Alibaba Cloud account.

    String

    Yes

    No default value

    For more information, see How do I view the AccessKey ID and AccessKey secret?.

    Important

    To protect your AccessKey pair, use variables to configure your AccessKey.

    accessKey

    The AccessKey secret of your Alibaba Cloud account.

    String

    Yes

    No default value

  • Source-specific

    Parameters

    Description

    Data type

    Required?

    Default value

    Remarks

    enableNewSource

    Specifies whether to use the new source interface that implements FLIP-27.

    Boolean

    No

    false

    The new source adapts automatically to shard changes and distributes shards evenly across all source subtasks.

    Important
    • This option is supported only in VVR 8.0.9 or later. Starting from VVR 11.1, this option defaults to true.

    • If you change this option, your job cannot resume from a saved state. To work around this, first start your job with the consumerGroup option to record the current consumer offset. Then, set consumeFromCheckpoint to true and restart your job without states.

    • If SLS contains read-only shards, some Flink subtasks may finish reading from those shards and then request other unread shards. This can cause uneven shard distribution across subtasks, reducing overall consumption efficiency and system performance. To reduce this imbalance, adjust the source parallelism, optimize task scheduling, or merge small shards.

    shardDiscoveryIntervalMs

    The interval at which shard changes are detected dynamically. Unit: milliseconds.

    Long

    No

    60000

    Set this option to a negative value to disable dynamic detection.

    Note
    • This option must be at least 1 minute (60,000 milliseconds).

    • This option takes effect only if enableNewSource is set to true.

    • This option is supported only in VVR 8.0.9 or later.

    startupMode

    The startup mode of the source table.

    String

    No

    timestamp

    • timestamp (default): Consume logs starting from the specified time.

    • latest: Consume logs starting from the latest offset.

    • earliest: Consume logs starting from the earliest offset.

    • consumer_group: Consume logs starting from the offset recorded in the consumer group. If no offset is recorded for a shard, consume logs starting from the earliest offset.

    Important
    • In VVR versions earlier than 11.1, the consumer_group value is not supported. To consume logs from the offset recorded by the specified consumer group, set consumeFromCheckpoint to true. In this case, this startup mode will not take effect.

    startTime

    The time to start consuming logs.

    String

    No

    Current time

    Format: yyyy-MM-dd hh:mm:ss.

    This option takes effect only if startupMode is set to timestamp.

    Note

    The startTime and stopTime options are based on the __receive_time__ field in SLS, not the __timestamp__ field.

    stopTime

    The end time of the consumption log.

    String

    No

    None

    Format: yyyy-MM-dd hh:mm:ss.

    Note
    • Use this option only to consume historical logs. Set it to a past time point. If you set it to a future time, consumption may stop unexpectedly if no new logs are written. This appears as a broken data stream with no error messages.

    • To exit the Flink program after log consumption finishes, also set exitAfterFinish to true.

    consumerGroup

    The name of the consumer group.

    String

    No

    None

    A consumer group records consumption progress. You can specify any custom name.

    Note

    You cannot share a consumer group across multiple jobs for collaborative consumption. Use different consumer groups for different jobs. If you use the same consumer group for different jobs, each job consumes all data. When Flink consumes data from SLS, it does not assign shards through the SLS consumer group. So each job independently consumes all messages, even if they share the same consumer group.

    consumeFromCheckpoint

    Specifies whether to consume logs from the checkpoint saved in the specified consumer group.

    String

    No

    false

    • true: If you set this parameter to true, you must also specify a consumer group. Flink consumes logs from the checkpoint that is stored in the consumer group. If no checkpoint exists in the consumer group, Flink consumes logs from the time specified by the startTime parameter.

    • false (default): Flink does not consume logs from the checkpoint saved in the specified consumer group.

    Important

    This option is not supported in VVR 11.1 or later. For VVR 11.1 or later, set startupMode to consumer_group.

    maxRetries

    The number of retries after reading from SLS fails.

    String

    No

    3

    None.

    batchGetSize

    The number of log groups to read per request.

    String

    No

    100

    Set batchGetSize to a value less than 1000. Otherwise, an error occurs.

    exitAfterFinish

    Specifies whether the Flink program exits after data consumption finishes.

    String

    No

    false

    • true: The Flink program exits after data consumption finishes.

    • false (default): The Flink program does not exit after data consumption finishes.

    query

    Important

    This option was deprecated in VVR 11.3 but remains compatible in later versions.

    The query statement used to preprocess data before consuming SLS data.

    String

    No

    No default value

    Use the query option to filter SLS data before consumption. This avoids loading all data into Flink, reducing costs and improving processing speed.

    For example, 'query' = '*| where request_method = ''GET''' filters logs where the request_method field equals GET before Flink reads them.

    Note

    Write queries using SPL syntax.

    Important
    • This option is supported only in VVR 8.0.1 or later.

    • This feature incurs SLS fees. For details, see Billing.

    processor

    The SLS consumer processor. If both query and processor are set, query takes precedence.

    String

    No

    None

    Use the processor option to filter SLS data before consumption. This avoids loading all data into Flink, reducing costs and improving processing speed. We recommend using processor instead of query.

    For example, 'processor' = 'test-filter-processor' applies the SLS consumer processor to filter data before Flink reads it.

    Note

    Write processors using SPL syntax. For details about creating and updating SLS consumer processors, see Manage consumer processors.

    Important

    This option is supported only in VVR 11.3 or later.

    This feature incurs SLS fees. For details, see Billing.

  • Sink-specific

    parameter

    Description

    Data type

    Required?

    Default value

    Remarks

    topicField

    The name of a field whose value overrides the __topic__ field. This indicates the log topic.

    String

    No

    None

    This parameter specifies an existing field in the table.

    timeField

    The name of a field whose value overrides the __timestamp__ field. This indicates the log write time.

    String

    No

    Current time

    This field must exist in the table and its type must be INT. If not specified, the current time is used.

    sourceField

    The name of a field whose value overrides the __source__ field. This indicates the log source, such as the IP address of the machine that generated the log.

    String

    No

    None

    This field must exist in the table.

    partitionField

    The name of a field. A hash value is calculated from this field's value when writing data. Data with the same hash value is written to the same shard.

    String

    No

    No default value

    If not specified, each data entry is written randomly to an available shard.

    buckets

    The number of buckets to regroup by hash value when partitionField is specified.

    String

    No

    64

    Valid values: [1, 256]. The value must be a power of 2. The number of buckets must be greater than or equal to the number of shards. Otherwise, some shards receive no data.

    flushIntervalMs

    The interval at which data writes are triggered.

    String

    No

    2000

    Unit: milliseconds.

    writeNullProperties

    Specifies whether to write null values as empty strings to SLS.

    Boolean

    No

    true

    • true (default): Write null values as empty strings.

    • false: Do not write fields whose computed value is null.

    Note

    This option is supported only in VVR 8.0.6 or later.

Extract attribute fields

Realtime Compute for Apache Flink can extract log fields, custom fields, and the following attribute fields.

Field

Type

Description

__source__

STRING METADATA VIRTUAL

The message source.

__topic__

STRING METADATA VIRTUAL

The message topic.

__timestamp__

BIGINT METADATA VIRTUAL

The log time.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

The message tag.

For the "__tag__:__receive_time__":"1616742274" attribute, the __receive_time__ and 1616742274 fields are recorded as key-value pairs in a map. You can include __tag__['__receive_time__'] in an SQL statement to query the tag.

To extract attribute fields, you must define headers in an SQL statement. Example:

create table sls_stream(
  __timestamp__ bigint HEADER,
  __receive_time__ bigint HEADER
  b int,
  c varchar
) with (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

References

For more information about how to use the DataStream API of Realtime Compute for Apache Flink to consume log data, see DataStream API.