All Products
Search
Document Center

Realtime Compute for Apache Flink:Simple Log Service connector

Last Updated:Mar 15, 2024

This topic describes how to use the Simple Log Service connector.

Background information

Simple Log Service is an end-to-end data logging service that is developed by Alibaba Cloud. Simple Log Service allows you to collect, consume, ship, query, and analyze log data in a quick manner. It improves the operations and maintenance (O&M) efficiency and provides the capability to process large amounts of log data.

The following table describes the capabilities supported by the Simple Log Service connector.

Item

Description

Table type

Source table and result table

Running mode

Streaming mode

Metric

N/A

Data format

N/A

API type

SQL API

Data update or deletion in a result table

Data in a result table cannot be updated or deleted. Data can only be inserted into a result table.

Features

The Simple Log Service source connector can be used to read the attribute fields of messages. The following table describes the attribute fields supported by the Simple Log Service source connector.

Field

Field type

Description

__source__

STRING METADATA VIRTUAL

The message source.

__topic__

STRING METADATA VIRTUAL

The message topic.

__timestamp__

BIGINT METADATA VIRTUAL

The time when the log is generated.

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

The message tag.

The "__tag__:__receive_time__":"1616742274", '__receive_time__', and '1616742274' attributes are logged as key-value pairs in a map and accessed in __tag__['__receive_time__'] mode in SQL.

Prerequisites

A project and a Logstore are created. For more information, see Create a project and a Logstore.

Limits

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 2.0.0 or later supports the Simple Log Service connector.

  • The Simple Log Service connector supports only the at-least-once semantics.

  • Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports automatic failovers of deployments due to the change of the number of shards.

  • We recommended that you do not set the deployment parallelism for a source node to a value greater than the number of shards. If the deployment parallelism for a source node is greater than the number of shards, resources may be wasted. In Realtime Compute for Apache Flink that uses VVR 8.0.5 or earlier, if the number of shards changes, automatic failovers of deployments may become invalid and specific shards may not be consumed.

Syntax

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

Parameters in the WITH clause

  • Common parameters

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    STRING

    Yes

    No default value

    Set the value to sls.

    endPoint

    The endpoint of Simple Log Service.

    STRING

    Yes

    No default value

    Enter the internal endpoint of Simple Log Service. For more information, see Internal Simple Log Service endpoints.

    Note

    By default, Realtime Compute for Apache Flink cannot access the Internet. Alibaba Cloud provides NAT gateways to enable the communication between virtual private clouds (VPCs) and the Internet. For more information, see How does the fully managed Flink service access the Internet?

    project

    The name of the Simple Log Service project.

    STRING

    Yes

    No default value

    N/A.

    logStore

    The name of a Logstore or Metricstore of Simple Log Service.

    STRING

    Yes

    No default value

    Data in a Logstore is consumed in the same way as in a Metricstore.

    accessId

    The AccessKey ID of the Alibaba Cloud account.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey ID by using the key management method. For more information, see Manage keys.

    accessKey

    The AccessKey secret of your Alibaba Cloud account.

    STRING

    Yes

    No default value

    For more information, see How do I view information about the AccessKey ID and AccessKey secret of the account?

    Important

    To protect your AccessKey pair, we recommend that you specify the AccessKey secret by using the key management method. For more information, see Manage keys.

  • Parameters only for source tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    startTime

    The time at which logs start to be consumed.

    STRING

    No

    Current time

    The value of this parameter is in the yyyy-MM-dd hh:mm:ss format.

    Note

    The startTime and stopTime parameters are configured based on the __receive_time__ attribute field in a Simple Log Service source table not the __timestamp__ attribute field.

    stopTime

    The time at which log consumption is stopped.

    STRING

    No

    No default value

    The value of this parameter is in the yyyy-MM-dd hh:mm:ss format.

    Note

    If you want fully managed Flink to exit after log consumption is complete, you must configure the exitAfterFinish parameter together with this parameter and set the exitAfterFinish parameter to true.

    consumerGroup

    The name of the consumer group.

    STRING

    No

    No default value

    A consumer group records the consumption progress. You can specify a custom consumer group name. The format of the name is not fixed.

    Note

    A consumer group cannot be shared by multiple deployments for collaborative consumption. We recommend that you specify different consumer groups for different Flink deployments. If you specify the same consumer group for different Flink deployments, all data is consumed. When Flink consumes data of Simple Log Service, the data is not sharded in a Simple Log Service consumer group. Therefore, if the deployments share the same consumer group, all messages in the consumer group are consumed by each deployment.

    consumeFromCheckpoint

    Specifies whether to consume logs from the checkpoint that is stored in the specified consumer group.

    STRING

    No

    false

    Valid values:

    • true: If you set this parameter to true, you must specify a consumer group. After you specify a consumer group, fully managed Flink consumes logs from the checkpoint that is stored in the consumer group. If no checkpoint exists in the consumer group, fully managed Flink consumes logs from the time specified by the startTime parameter.

    • false: Fully managed Flink does not consume logs from the checkpoint that is stored in the specified consumer group. This is the default value.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 6.0.5 or later supports this parameter.

    directMode

    Specifies whether to enable the direct connection mode of Simple Log Service.

    STRING

    No

    false

    Valid values:

    • true: The direct connection mode is enabled.

    • false: The direct connection mode is disabled. This is the default value.

    maxRetries

    The number of retries that are allowed when data fails to be read from Simple Log Service.

    STRING

    No

    3

    N/A.

    batchGetSize

    The number of log groups from which data is read in a request.

    STRING

    No

    100

    The value of the batchGetSize parameter cannot exceed 1000. Otherwise, an error is returned.

    exitAfterFinish

    Specifies whether fully managed Flink exits after data consumption is complete.

    STRING

    No

    false

    Valid values:

    • true: Fully managed Flink exits after data consumption is complete.

    • false: Fully managed Flink does not exit after data consumption is complete. This is the default value.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 4.0.13 or later supports this parameter.

    query

    The query statement that is used to preprocess data before data consumption in Simple Log Service.

    STRING

    No

    No default value

    If you configure the query parameter, fully managed Flink can filter out data from Simple Log Service before fully managed Flink consumes data in Simple Log Service. This helps prevent fully managed Flink from consuming all data in Simple Log Service. This reduces costs and improves data processing efficiency.

    For example, if you execute the 'query' = '*| where request_method = ''GET''' statement, fully managed Flink matches the data whose value of the request_method field is equal to the value of the GET clause before fully managed Flink reads data from a Logstore of Simple Log Service.

    Important
    • Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports this parameter.

    • For more information about the regions in which Simple Log Service supports this feature, see Rule-based consumption.

    • This feature is free of charge in public preview phase. You may be charged for Simple Log Service in the future. For more information, see Billing rules.

  • Parameters only for result tables

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    topicField

    Specifies a field name. The value of this parameter overwrites the value of the __topic__ attribute field to indicate the topic of the log.

    STRING

    No

    No default value

    The value of this parameter must be an existing field in the table.

    timeField

    Specifies a field name. The value of this parameter overwrites the value of the __timestamp__ attribute field to indicate the log write time.

    STRING

    No

    Current time

    The value of this parameter must be an existing field of the INT type in the table. If no field is specified, the current time is used by default.

    sourceField

    Specifies a field name. The value of this parameter overwrites the value of the __source__ attribute field to indicate the origin of the log. For example, the value is the IP address of the machine that generates the log.

    STRING

    No

    No default value

    The value of this parameter must be an existing field in the table.

    partitionField

    Specifies a field name. A hash value is calculated based on the value of this parameter when data is written to Simple Log Service. Data that includes the same hash value is written to the same shard.

    STRING

    No

    No default value

    If you do not specify this parameter, each data entry is randomly written to an available shard.

    buckets

    The number of buckets that are regrouped based on the hash value when the partitionField parameter is specified.

    STRING

    No

    64

    Valid values: [1,256]. The value of this parameter must be an integer power of 2. The number of buckets must be greater than or equal to the number of shards. Otherwise, no data is written to specific shards.

    Note

    Only Realtime Compute for Apache Flink that uses VVR 6.0.5 or later supports this parameter.

    flushIntervalMs

    The interval at which data writing is triggered.

    STRING

    No

    2000

    Unit: milliseconds.

Data type mappings

Data type of Flink

Data type of Simple Log Service

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

Sample code

CREATE TEMPORARY TABLE sls_input(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` STRING METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
 `time`,
  url,
  dt,
  float_field,
  double_field,
  boolean_field,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

DataStream API

Important

If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to connect to fully managed Flink. For more information about how to configure a DataStream connector, see Settings of DataStream connectors. The Simple Log Service DataStream connectors of different versions are stored in the Maven central repository.

  • Read data from Simple Log Service

    VVR of Realtime Compute for Apache Flink provides the implementation class SlsSourceFunction of SourceFunction to read data from Simple Log Service. Sample code:

    public class SlsDataStreamSource {
    
        public static void main(String[] args) throws Exception {
            // Sets up the streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // Creates and adds SLS source and sink.
            env.addSource(createSlsSource())
                    .map(SlsDataStreamSource::convertMessages)
                    .print();
            env.execute("SLS Stream Source");
        }
    
        private static SlsSourceFunction createSlsSource() {
            SLSAccessInfo accessInfo = new SLSAccessInfo();
            accessInfo.setEndpoint("yourEndpoint");
            accessInfo.setProjectName("yourProject");
            accessInfo.setLogstore("yourLogStore");
            accessInfo.setAccessId("yourAccessId");
            accessInfo.setAccessKey("yourAccessKey");
    
            // The batch get size must be given.
            accessInfo.setBatchGetSize(10);
    
            // Optional parameters
            accessInfo.setConsumerGroup("yourConsumerGroup");
            accessInfo.setMaxRetries(3);
    
            // time to start consuming, set to current time.
            int startInSec = (int) (new Date().getTime() / 1000);
    
            // time to stop consuming, -1 means never stop.
            int stopInSec = -1;
    
            return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
        }
    
        private static List<String> convertMessages(SourceRecord input) {
            List<String> res = new ArrayList<>();
            for (FastLogGroup logGroup : input.getLogGroups()) {
                int logsCount = logGroup.getLogsCount();
                for (int i = 0; i < logsCount; i++) {
                    FastLog log = logGroup.getLogs(i);
                    int fieldCount = log.getContentsCount();
                    for (int idx = 0; idx < fieldCount; idx++) {
                        FastLogContent f = log.getContents(idx);
                        res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                    }
                }
            }
            return res;
        }
    }
  • Write data to Simple Log Service

    VVR of Realtime Compute for Apache Flink provides the implementation class SLSOutputFormat of OutputFormat to write data to Simple Log Service. Sample code:

    public class SlsDataStreamSink {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.fromSequence(0, 100)
                    .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                    .addSink(createSlsSink())
                    .name(SlsDataStreamSink.class.getSimpleName());
            env.execute("SLS Stream Sink");
        }
    
        private static OutputFormatSinkFunction createSlsSink() {
            Configuration conf = new Configuration();
            conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
            conf.setString(SLSOptions.PROJECT, "yourProject");
            conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
            conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
            conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
            SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
            return new OutputFormatSinkFunction<>(outputFormat);
        }
    
        private static SinkRecord getSinkRecord(Long seed) {
            SinkRecord record = new SinkRecord();
            LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
            logItem.PushBack("level", "info");
            logItem.PushBack("name", String.valueOf(seed));
            logItem.PushBack("message", "it's a test message for " + seed.toString());
            record.setContent(logItem);
            return record;
        }
    
    }

FAQ

What do I do if an OOM error occurs in TaskManagers and the error message "java.lang.OutOfMemoryError: Java heap space" appears for the source table when I restore a failed Flink program?