Log Service provides the Flink log connector to connect to Flink. This topic describes how to integrate the Flink log connector with Flink to consume log data.

Prerequisites

  • An AccessKey pair, a Log Service project, and a Logstore are created. For more information, see Preparations.
  • If you use a RAM user to manage Log Service, make sure that the policy for accessing Logstores is correctly set. For more information, see Authorize a RAM user to connect to Log Service.

Background information

The Flink log collector includes the Flink log consumer and Flink log producer.
  • The Flink log consumer reads data from Log Service, which supports the exactly-once semantics and shard-based load balancing.
  • The Flink log producer writes data into Log Service. When using the Fink log connector, you must add the following Maven dependencies to the project:
    <dependency>
       <groupId>com.aliyun.openservices</groupId>
       <artifactId>flink-log-connector</artifactId>
       <version>0.1.13</version>
    </dependency>
    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>2.5.0</version>
    </dependency>
For more information, visit GitHub.

Flink log consumer

The Flink log consumer can subscribe to a specific Logstore in Log Service. The exactly-once semantics is achieved during log shipment. The Flink log consumer automatically detects the change of the number of shards in a Logstore, which minimizes the efforts of users.

Each Flink subtask consumes certain shards in a Logstore. If shards in a Logstore are split or merged, the shards consumed by the subtask change accordingly.

The Flink log consumer uses the following Log Service API operations:
  • GetCursorOrData

    You can call this operation to pull log data from a shard. Calling this operation frequently may exceed the shard quota of Log Service. You can use ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS and ConfigConstants.LOG_MAX_NUMBER_PER_FETCH to control the interval of API calls and number of logs pulled by each call. For more information about the shard quota, see Shard.

    configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
    configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
  • ListShards

    You can call this operation to view all shards in a Logstore and the status of each shard. If the shards are frequently split and merged, you can adjust the call interval to detect the changes of the shards.

    // Call the ListShards operation once every 30 seconds
    configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");
  • CreateConsumerGroup

    You can call this operation to create a consumer group to synchronize checkpoints. This operation can be called only when consumption progress monitoring is enabled.

  • ConsumerGroupUpdateCheckPoint

    You can call this operation to synchronize snapshots of Flink to a consumer group.

The following table lists the Alibaba Cloud resources required for RAM users to use the preceding API operations.
Operation Alibaba Resource Name (ARN)
GetCursorOrData acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
ListShards acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
CreateConsumerGroup acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
ConsumerGroupUpdateCheckPoint acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
For more information about RAM users and how to authorize RAM users, see Authorization - Overview.
  1. Configure startup parameters.
    The following example shows how to consume log data. java.util.Properties is used as the configuration tool. You can find all constants to be configured in the ConfigConstants class.
    Properties configProps = new Properties();
    // Specify the endpoint to access Log Service.
    configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
    // Specify the AccessKey pair.
    configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
    configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
    // Specify the project.
    configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
    // Specify the Logstore.
    configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
    // Specify the start position to consume logs.
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
    // Specify the message deserialization method.
    RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<RawLogGroupList> logTestStream = env.addSource(
            new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));
    Note The number of subtasks in the Flink Streaming is independent from the number of shards in a Logstore. If the number of shards is greater than that of subtasks, each subtask consumes multiple shards exactly once. If the number of shards is less than that of subtasks, certain subtasks are idle until new shards are generated.
  2. Specify the consumption start position.
    The Flink log consumer enables you to specify the start position for consuming a shard. By specifying ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, you can start to consume a shard from its header or tail or at a specific time point. The connector also supports consumption restoration from a specific consumer group. Valid values:
    • Consts.LOG_BEGIN_CURSOR: indicates that you start to consume a shard from its header, which is the earliest data in a shard.
    • Consts.LOG_END_CURSOR: indicates that you start to consume a shard from its tail, which is the latest data in a shard.
    • Consts.LOG_FROM_CHECKPOINT: indicates that you start to consume a shard from a checkpoint that is stored in a specific consumer group. You can use ConfigConstants.LOG_CONSUMERGROUP to specify a consumer group.
    • UnixTimestamp: a string of the integer data type. The timestamp is the number of seconds that have elapsed since 00:00:00 January 1, 1970. The value indicates that data in a shard is consumed from this time point.
    You can use the following formats to specify a consumption start point:
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");
    configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
    Note If you have configured consumption restoration from state backends of Flink when you start a Flink job, the connector uses checkpoints stored in the state backends.
  3. Optional: Configure consumption progress monitoring.
    The Flink log consumer enables you to configure consumption progress monitoring. Consumption progress indicates the real-time consumption position of each shard. These positions are indicated by timestamps. For more information, see View consumer group status and Consumer group - Monitoring alarm.
    configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name");
    Note Optional. If you specify this configuration item, the Flink log consumer creates a consumer group first. If the consumer group exists, no operation is performed. Snapshots in the Flink log consumer are automatically synchronized to the consumer group of Log Service, and you can view the consumption progress of the Flink log consumer in the Log Service console.
  4. Configure parameters for disaster recovery and exactly-once semantics.
    If the checkpointing feature of Flink is enabled, the Flink log consumer periodically stores the consumption progress of each shard. When a job fails, Flink restores the consumption progress to the Flink log consumer and starts to consume from the latest checkpoint.
    The checkpointing period defines the maximum amount of data to be rolled back, or re-consumed when a failure occurs. You can use the following code to specify the checkpointing period:
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // Enable the exactly-once semantics.
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // Store checkpoints every five seconds.
    env.enableCheckpointing(5000);
    For more information about the Flink checkpoints, see Checkpoints in the Flink documentation.

Flink log producer

The Flink log producer writes data into Log Service.
Note The Flink log producer only supports the Flink at-least-once semantics. If a job fails, data written into Log Service may be duplicated but never lost.
The Flink log producer uses the following Log Service API operations:
  • PostLogStoreLogs
  • ListShards
The following table lists the Alibaba Cloud resources required for RAM users to use the preceding API operations.
Operation ARN
PostLogStoreLogs acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
ListShards acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
For more information about RAM users and how to authorize RAM users, see Authorization - Overview.
  1. Initialize the Flink log producer.
    1. Initialize properties for the Flink log producer.
      The initialization process for the Flink log producer is similar to that for the Flink log consumer. The Flink log producer contains the following parameters. You can use the default values of these parameters. You can also specify the parameters based on your needs.
      // The number of I/O threads used to send data. Default value: 8.
      ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
      // The time it takes to send the data after log data is cached. Default value: 3000.
      ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
      // The number of logs in the cached package. Default value: 4096.
      ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
      // The size of the cached package. Default value: 3 Mbits.
      ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
      // The total memory size that the job can use. Default value: 100 Mbits.
      ConfigConstants.LOG_MEM_POOL_BYTES
      Note These parameters are optional. You can use their default values.
    2. Reload LogSerializationSchema to define the method of serializing data into RawLogGroup.
      RawLogGroup is a collection of logs. For more information about the meaning of each field, see Data model.

      To use the shard hash key feature of Log Service, you must specify the shard to which data is written. You can use LogPartitioner to generate the hash key for the data.

      Examples:
      FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
      logProducer.setCustomPartitioner(new LogPartitioner<String>() {
            // Generate a 32-bit hash value.
            public String getHashKey(String element) {
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                }
                return  "0000000000000000000000000000000000000000000000000000000000000000";
            }
        });
      Note LogPartitioner is optional. If this parameter is not specified, data is randomly written into a shard.
  2. Run the following statements and write the generated string to Log Service.
    // Serialize data based on the data format of Log Service.
    class SimpleLogSerializer implements LogSerializationSchema<String> {
        public RawLogGroup serialize(String element) {
            RawLogGroup rlg = new RawLogGroup();
            RawLog rl = new RawLog();
            rl.setTime((int)(System.currentTimeMillis() / 1000));
            rl.addContent("message", element);
            rlg.addLog(rl);
            return rlg;
        }
    }
    public class ProducerSample {
        public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        public static String sAccessKeyId = "";
        public static String sAccessKey = "";
        public static String sProject = "ali-cn-hangzhou-sls-admin";
        public static String sLogstore = "test-flink-producer";
        private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);
        public static void main(String[] args) throws Exception {
            final ParameterTool params = ParameterTool.fromArgs(args);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(params);
            env.setParallelism(3);
            DataStream<String> simpleStringStream = env.addSource(new EventsGenerator());
            Properties configProps = new Properties();
            // Specify the domain to access Log Service.
            configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
            // Specify the AccessKey pair to access Log Service.
            configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);
            configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
            // Specify the project to which logs are written.
            configProps.put(ConfigConstants.LOG_PROJECT, sProject);
            // Specify the Logstore to which logs are written.
            configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);
            FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps);
            simpleStringStream.addSink(logProducer);
            env.execute("flink log producer");
        }
        // Simulate log generation.
        public static class EventsGenerator implements SourceFunction<String> {
            private boolean running = true;
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                long seq = 0;
                while (running) {
                    Thread.sleep(10);
                    ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
                }
            }
            @Override
            public void cancel() {
                running = false;
            }
        }
    }