Log Service provides Flink Log Connector to connect to Flink. Open source Flink and Realtime Compute for Apache Flink are supported. This topic describes how to connect Flink to Log Service to consume logs.
Prerequisites
- An AccessKey pair is obtained. For more information, see AccessKey pair.
- A project and a Logstore are created. For more information, see Create a project and Create a Logstore.
- A RAM user is granted the required permissions. For more information, see Configure the permissions to consume data from a specified Logstore.
Background information
Flink Log Connector consists of Flink Log Consumer and Flink Log Producer. The following list describes the differences between Flink Log Consumer and Flink Log Producer:- Flink Log Consumer reads data from Log Service. Flink Log Consumer supports the exactly-once semantics and load balancing among shards.
- Flink Log Producer writes data to Log Service.
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>flink-log-connector</artifactId>
<version>0.1.31</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
You can refer to the source code on GitHub to write code in other programming languages. For more information, visit aliyun-log-flink-connector.
Flink Log Consumer
Flink Log Consumer can consume log data from a Logstore. The exactly-once semantics is applied during log consumption. Flink Log Consumer detects the change in the number of shards in a Logstore.
Each Flink subtask consumes data from some shards in a Logstore. If the shards in a Logstore are split or merged, the shards consumed by the subtask also change.
- GetCursorOrData
You can call this operation to pull log data from a shard. If you frequently call this operation, data traffic may exceed the capabilities of shards. You can use the ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS parameter to control the interval of API calls. You can use the ConfigConstants.LOG_MAX_NUMBER_PER_FETCH parameter to control the number of logs that are pulled by each API call. For more information about the shard capabilities, see Shard.
Example:configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100"); configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");
- ListShardsYou can call this operation to view all shards in a Logstore and the status of each shard. If the shards are frequently split and merged, you can adjust the call interval to detect the changes in the number of shards in a timely manner. Example:
// Call the ListShards operation once every 30 seconds. configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");
- CreateConsumerGroup
You can call this operation to create a consumer group to synchronize checkpoints.
- UpdateCheckPoint
You can call this operation to synchronize Flink snapshots to a consumer group.
- Configure startup parameters. The following code provides an example on how to consume data. The java.util.Properties class is used as a configuration tool, and the configurations of Flink Log Consumer are included in the ConfigConstants class.
Properties configProps = new Properties(); // Specify the Log Service endpoint. configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com"); // Specify the AccessKey ID and AccessKey secret of your account. configProps.put(ConfigConstants.LOG_ACCESSKEYID, ""); configProps.put(ConfigConstants.LOG_ACCESSKEY, ""); // Specify the project. configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin"); // Specify the Logstore. configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log"); // Specify the start position of log consumption. configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); // Specify the method that you want to use for data deserialization. RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<RawLogGroupList> logTestStream = env.addSource( new FlinkLogConsumer<RawLogGroupList>(deserializer, configProps));
Note The number of Flink subtasks is independent of the number of shards in a Logstore. If the number of shards is greater than the number of subtasks, each subtask consumes data from one or more shards. If the number of shards is less than the number of subtasks, some subtasks are idle until new shards are generated. Data of each shard can be consumed by only one subtask. - Specify the start position of log consumption.When you use Flink Log Consumer to consume data from a Logstore, you can use the ConfigConstants.LOG_CONSUMER_BEGIN_POSITION parameter to specify the start position of log consumption. You can start to consume data from the earliest log, the latest log, or a specific point in time. In addition, Flink Log Consumer allows you to resume consumption from a specific consumer group. You can set the parameter to one of the following values:
- Consts.LOG_BEGIN_CURSOR: starts to consume data from the earliest log.
- Consts.LOG_END_CURSOR: starts to consume data from the latest log.
- Consts.LOG_FROM_CHECKPOINT: starts to consume data from a checkpoint that is stored in a specified consumer group. You can use the ConfigConstants.LOG_CONSUMERGROUP parameter to specify the consumer group.
- UnixTimestamp: starts to consume data from a point in time. The value is a UNIX timestamp representing the number of seconds that have elapsed since January 1, 1970, 00:00:00 UTC. You must specify a string of the INTEGER data type.
Example:configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000"); configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
Note If you have configured consumption resumption from a state backend of Flink when you start a Flink task, Flink Log Connector uses checkpoints stored in the state backend. - Optional:Configure consumption progress monitoring. Flink Log Consumer allows you to monitor consumption progress. You can obtain the real-time consumption position of each shard. The consumption position is indicated by a timestamp. For more information, see Step 2: View the status of a consumer group.Example:
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name");
Note This step is optional. If no consumer group exists after you configure consumption progress monitoring, Flink Log Consumer creates a consumer group. If a consumer group exists, you do not need to perform operations, and snapshots in Flink Log Consumer are automatically synchronized to the consumer group. You can view the consumption progress of Flink Log Consumer in the Log Service console. - Configure consumption resumption and the exactly-once semantics.If the checkpointing feature of Flink is enabled, Flink Log Consumer periodically stores the consumption progress of each shard. If a subtask fails, Flink restores the subtask and starts to consume data from the latest checkpoint.When you configure the checkpoint period, you can define the maximum amount of data to be re-consumed if a failure occurs. You can use the following code to configure the checkpoint period:
For more information about Flink checkpoints, see Checkpoints.final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Configure the exactly-once semantics. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // Save checkpoints every 5 seconds. env.enableCheckpointing(5000);
Flink Log Producer
- PutLogs
- ListShards
- Initialize Flink Log Producer.
- Write simulation results to Log Service in the string format. Example:
// Serialize data into raw log groups. class SimpleLogSerializer implements LogSerializationSchema<String> { public RawLogGroup serialize(String element) { RawLogGroup rlg = new RawLogGroup(); RawLog rl = new RawLog(); rl.setTime((int)(System.currentTimeMillis() / 1000)); rl.addContent("message", element); rlg.addLog(rl); return rlg; } } public class ProducerSample { public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com"; public static String sAccessKeyId = ""; public static String sAccessKey = ""; public static String sProject = "ali-cn-hangzhou-sls-admin"; public static String sLogstore = "test-flink-producer"; private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class); public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(3); DataStream<String> simpleStringStream = env.addSource(new EventsGenerator()); Properties configProps = new Properties(); // Specify the Log Service endpoint. configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint); // Specify the AccessKey ID and AccessKey secret of your account. configProps.put(ConfigConstants.LOG_ACCESSKEYID, sAccessKeyId); configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey); // Specify the project to which you want to write logs. configProps.put(ConfigConstants.LOG_PROJECT, sProject); // Specify the Logstore to which you want to write logs. configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore); FlinkLogProducer<String> logProducer = new FlinkLogProducer<String>(new SimpleLogSerializer(), configProps); simpleStringStream.addSink(logProducer); env.execute("flink log producer"); } // Simulate log generation. public static class EventsGenerator implements SourceFunction<String> { private boolean running = true; @Override public void run(SourceContext<String> ctx) throws Exception { long seq = 0; while (running) { Thread.sleep(10); ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12)); } } @Override public void cancel() { running = false; } } }
Consumption example
In this example, Flink Log Consumer stores data that is read to a data stream in the FastLogGroupList format. Then, Flink Log Consumer uses the flatMap function to convert the data from the FastLogGroupList format into the JSON string format and displays the output in the CLI or writes the output to a text file.
package com.aliyun.openservices.log.flink.sample;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.log.common.FastLog;
import com.aliyun.openservices.log.common.FastLogGroup;
import com.aliyun.openservices.log.flink.ConfigConstants;
import com.aliyun.openservices.log.flink.FlinkLogConsumer;
import com.aliyun.openservices.log.flink.data.FastLogGroupDeserializer;
import com.aliyun.openservices.log.flink.data.FastLogGroupList;
import com.aliyun.openservices.log.flink.model.CheckpointMode;
import com.aliyun.openservices.log.flink.util.Consts;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;
public class FlinkConsumerSample {
private static final String SLS_ENDPOINT = "your-endpoint";
private static final String ACCESS_KEY_ID = "your-accesskey-id";
private static final String ACCESS_KEY_SECRET = "your-accesskey-secret";
private static final String SLS_PROJECT = "your-project";
private static final String SLS_LOGSTORE = "your-logstore";
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
Configuration conf = new Configuration();
// Checkpoint dir like "file:///tmp/flink"
conf.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "your-checkpoint-dir");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, conf);
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(1);
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStateBackend(new FsStateBackend("file:///tmp/flinkstate"));
Properties configProps = new Properties();
configProps.put(ConfigConstants.LOG_ENDPOINT, SLS_ENDPOINT);
configProps.put(ConfigConstants.LOG_ACCESSKEYID, ACCESS_KEY_ID);
configProps.put(ConfigConstants.LOG_ACCESSKEY, ACCESS_KEY_SECRET);
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "10");
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_FROM_CHECKPOINT);
configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your-consumer-group");
configProps.put(ConfigConstants.LOG_CHECKPOINT_MODE, CheckpointMode.ON_CHECKPOINTS.name());
configProps.put(ConfigConstants.LOG_COMMIT_INTERVAL_MILLIS, "10000");
FastLogGroupDeserializer deserializer = new FastLogGroupDeserializer();
DataStream<FastLogGroupList> stream = env.addSource(
new FlinkLogConsumer<>(SLS_PROJECT, SLS_LOGSTORE, deserializer, configProps));
stream.flatMap((FlatMapFunction<FastLogGroupList, String>) (value, out) -> {
for (FastLogGroup logGroup : value.getLogGroups()) {
int logCount = logGroup.getLogsCount();
for (int i = 0; i < logCount; i++) {
FastLog log = logGroup.getLogs(i);
JSONObject jsonObject = new JSONObject();
for (int j = 0; j < log.getContentsCount(); j++) {
jsonObject.put(log.getContents(j).getKey(), log.getContents(j).getValue());
}
out.collect(jsonObject.toJSONString());
}
}
}).returns(String.class);
stream.writeAsText("log-" + System.nanoTime());
env.execute("Flink consumer");
}
}