Log Service provides the flink-log-connector agent to connect with Flink. This topic describes how to connect Log Service with Flink to consume logs.


Background information

The flink-log-connector agent consists of the flink-log-consumer and flink-log-producer agents.
  • The flink-log-consumer agent reads data from Log Service. This agent supports the exactly-once semantics and load balancing among shards.
  • The flink-log-producer agent writes data to Log Service.
You must add Maven dependencies before you use the flink-log-producer agent to write data to Log Service. The following example shows sample Maven dependencies:

For more information about the flink-log-producer agent, visit GitHub.

Flink Log Consumer

The flink-log-consumer agent can consume log data from a Logstore of Log Service. The exactly-once semantics is applied during log consumption. The flink-log-consumer agent detects the change of the number of shards in a Logstore. This increases efficiency.

Each Flink subtask consumes data of some shards in a Logstore. If shards in a Logstore are split or merged, the shards consumed by the subtask also change.

When you use the flink-log-consumer agent to consume data from Log Service, you can call the following API operations:
  • GetCursorOrData

    You can call this operation to pull log data from a shard. If you frequently call this operation, data transfer may exceed the capacities of shards. You can use the ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS and ConfigConstants.LOG_MAX_NUMBER_PER_FETCH parameters to control the interval of API calls and number of log entries pulled by each call. For more information about the shard quota, see Shards.

    configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
    configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
  • ListShards
    You can call this operation to view all shards in a Logstore and the status of each shard. If the shards are frequently split and merged, you can adjust the call interval to detect the changes in the number of shards in a timely manner. Example:
    // Call the ListShards operation once every 30 seconds.
    configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");
  • CreateConsumerGroup

    You can call this operation to create a consumer group to synchronize checkpoints.

  • ConsumerGroupUpdateCheckPoint

    You can call this operation to synchronize snapshots of Flink to a consumer group.

  1. Configure startup parameters.
    The following example shows how to consume log data. The java.util.Properties class is used as the configuration tool. You can find all constants to be configured in the ConfigConstants class.
    Properties configProps = new Properties();
    // Specify the endpoint of Log Service.
    configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
    Specify the AccessKey ID and AccessKey secret.
    configProps.put(ConfigConstants.LOG_ACCESSKEYID, "");
    configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
    // Specify the project.
    configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
    // Specify the Logstore.
    configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
    // Specify the start position to consume logs.
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
    // Specify the data deserialization method.
    RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<RawLogGroupList> logTestStream = env.addSource(
            new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));
    Note The number of Flink subtasks is independent of the number of shards in a Logstore. If the number of shards is greater than that of subtasks, each subtask consumes one or more shards. If the number of shards is less than that of subtasks, some subtasks are idle until new shards are generated. Each shard is consumed by only one subtask.
  2. Specify the start position of log consumption.
    When you use the flink-log-consumer agent to consume data from a Logstore, you can use the ConfigConstants.LOG_CONSUMER_BEGIN_POSITION parameter to specify the start position of log consumption. You can start consumption from the earliest data, latest data, or from a specific time. In addition, the flink-log-consumer agent also allows you to resume consumption from a specific consumer group. You can set the parameter to one of the following values:
    • Consts.LOG_BEGIN_CURSOR: starts consumption from the earliest data.
    • Consts.LOG_END_CURSOR: starts consumption from the latest data.
    • Consts.LOG_FROM_CHECKPOINT: starts consumption from a checkpoint that is stored in a specific consumer group. You can use the ConfigConstants.LOG_CONSUMERGROUP parameter to specify the consumer group.
    • UnixTimestamp: a string of the INTEGER data type. The timestamp is the number of seconds that have elapsed since 00:00:00 January 1, 1970. The value indicates that data in a shard is consumed from this time point.
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
    Note If you have configured consumption resumption from a state backend of Flink when you start a Flink job, the flink-log-connector agent uses checkpoints stored in the state backend.
  3. Optional:Configure consumption progress monitoring.
    The flink-log-connector agent allows you monitor consumption progress. You can use the monitoring feature to obtain the real-time consumption position of each shard. The consumption position is indicated by a timestamp. For more information, see View consumer group status.
    configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name");
    Note This setting is optional. If you configure consumption progress monitoring and no consumer group exists, the flink-log-connector agent creates a consumer group. If a consumer group is available, the agent synchronizes snapshots to the consumer group. You can view the consumption progress of the agent in the Log Service console.
  4. Configure consumption resumption and the exactly-once semantics.
    If the checkpointing feature of Flink is enabled, the flink-log-consumer agent periodically stores the consumption progress of each shard. When a subtask fails, Flink automatically restores the subtask and starts to consume data from the latest checkpoint.
    While you configure the checkpointing period, the maximum amount of data to be re-consumed when a failure occurs is defined. You can use the following code to configure the checkpointing period:
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Configure the exactly-once semantics.
    // Save checkpoints every five seconds.
    For more information about the Flink checkpoint, see Checkpoints.

Flink Log Producer

The flink-log-producer agent writes data to Log Service.
Note The flink-log-producer agent only supports the Flink At Least Once semantics. If a task fails, some data that is written to Log Service may be duplicate. However, no data will be lost.
When you use the flink-log-producer agent to write data to Log Service, you can call the following API operations:
  • PostLogStoreLogs
  • ListShards
  1. Initialize the flink-log-producer agent.
    1. Configure initialization parameters of the flink-log-producer agent.
      The initialization of the flink-log-producer agent is similar to that of the flink-log-consumer agent. The following example shows how to configure the initialization parameters of the agent. In most cases, you can use the default values of the parameters. Example:
      // The number of I/O threads used to send data. Default value: 8.
      // The time it takes to send the data after log data is cached. Default value: 3000.
      // The number of log entries in the cached package. Default value: 4096.
      // The size of the cached package. Default value: 3 MB.
      // The total memory size that the job can use. Default value: 100 MB.
    2. Reload LogSerializationSchema to define the method of serializing data into raw log groups.
      A raw log groups is a collection of log entries. For information about log fields, see Log entry.

      If you need to write data to a specific shard, you can use the LogPartitioner parameter to generate hash keys for log data. LogPartitioner is an optional parameter. If you do not specify this parameter, data is written to a random shard.

      FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
      logProducer.setCustomPartitioner(new LogPartitioner<String>() {
            // Generate a 32-bit hash value.
            public String getHashKey(String element) {
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                return  "0000000000000000000000000000000000000000000000000000000000000000";
  2. Write simulated data to Log Service, as shown in the following example:
    // Serialize data into the format of raw log groups.
    class SimpleLogSerializer implements LogSerializationSchema<String> {
        public RawLogGroup serialize(String element) {
            RawLogGroup rlg = new RawLogGroup();
            RawLog rl = new RawLog();
            rl.setTime((int)(System.currentTimeMillis() / 1000));
            rl.addContent("message", element);
            return rlg;
    public class ProducerSample {
        public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        public static String sAccessKeyId = "";
        public static String sAccessKey = "";
        public static String sProject = "ali-cn-hangzhou-sls-admin";
        public static String sLogstore = "test-flink-producer";
        private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);
        public static void main(String[] args) throws Exception {
            final ParameterTool params = ParameterTool.fromArgs(args);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());
            Properties configProps = new Properties();
            // Specify the endpoint of Log Service.
            configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
            Specify the AccessKey ID and AccessKey secret.
            configProps.put(ConfigConstants.LOG_ACCESSKEYID, sAccessKeyId);
            configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
            // Specify the project to which logs are written.
            configProps.put(ConfigConstants.LOG_PROJECT, sProject);
            // Specify the Logstore to which logs are written.
            configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);
            FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(),configProps);
            env.execute("flink log producer");
        // Simulate log generation.
        public static class EventsGenerator implements SourceFunction<String> {
            private boolean running = true;
            public void run(SourceContext<String> ctx) throws Exception {
                long seq = 0;
                while (running) {
                    ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
            public void cancel() {
                running = false;