如果您通过DataStream的方式以精确一次(Exactly-once)的语义在Kafka Topic中读写数据,则需要使用Kafka DataStream Connector连接Flink全托管。本文为您介绍如何在Flink全托管控制台上使用Kafka DataStream Connector来读写Kafka数据。
背景信息
(推荐)直接将Connector作为项目依赖打进作业JAR包
- 在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。
<repositories> <repository> <id>oss.sonatype.org-snapshot</id> <name>OSS Sonatype Snapshot Repository</name> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> <repository> <id>apache.snapshots</id> <name>Apache Development Snapshot Repository</name> <url>https://repository.apache.org/content/repositories/snapshots/</url> <releases> <enabled>false</enabled> </releases> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories>
- 检查您的settings.xml配置文件中是否存在
<mirrorOf>*</mirrorOf>
配置。如果存在
<mirrorOf>*</mirrorOf>
配置,则需要将此配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>
。修改的目的是为了避免SNAPSHOT仓库被覆盖,因为mirrorOf中只使用星号(*)会导致第一步中配置的两个repository被覆盖。
- 在作业的Maven POM文件中添加您需要的Connector作为项目依赖。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${connector.version}</version> </dependency>
每个Connector版本对应的Connector类型可能不同,建议您使用最新版本。Connector版本、VVR/Flink版本和Connector类型的对应关系请参见Connector列表。完整的依赖信息请参见Kafka示例代码中的pom.xml文件。注意- 您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本,在Maven中央库(search.maven.org)上会查找不到。
- 在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
<transformers> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> <projectName>Apache Flink</projectName> <encoding>UTF-8</encoding> </transformer> </transformers>
- 构建Kafka Source或者Kafka Producer。
- Kafka Source
Kafka Source提供了构建类来创建Kafka Source的实例。我们将通过以下示例代码为您介绍如何构建Kafka Source来消费input-topic最早位点的数据,消费组名称为my-group,并将Kafka消息体反序列化为字符串。
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
在构建KafkaSource时,必须指定以下参数。参数 说明 BootstrapServers Kafka Broker地址,通过setBootstrapServers(String)方法配置。 GroupId 消费者组ID,通过setGroupId(String)方法配置。 Topics或Partition 订阅的Topic或Partition名称。Kafka Source提供了以下三种Topic或Partition的订阅方式: - Topic列表,订阅Topic列表中所有Partition。
KafkaSource.builder().setTopics("topic-a","topic-b")
- 正则表达式匹配,订阅与正则表达式所匹配的Topic下的所有Partition。
KafkaSource.builder().setTopicPattern("topic.*")
- Partition列表,订阅指定的Partition。
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a" new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b" KafkaSource.builder().setPartitions(partitionSet)
Deserializer 解析Kafka消息的反序列化器。 反序列化器通过setDeserializer(KafkaRecordDeserializationSchema)来指定,其中KafkaRecordDeserializationSchema定义了如何解析Kafka的ConsumerRecord。如果只解析Kafka消息中的消息体(Value)的数据,则您可以通过以下任何一种方式实现:- 使用Flink提供的KafkaSource构建类中的setValueOnlyDeserializer(DeserializationSchema)方法,其中DeserializationSchema定义了如何解析Kafka消息体中的二进制数据。
- 使用Kafka提供的解析器,包括多种实现类。例如,您可以使用StringDeserializer来将Kafka消息体解析成字符串。
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
说明 如果需要完整地解析ConsumerRecord,则需要您自己实现KafkaRecordDeserializationSchema接口。在使用Kafka DataStream Connector时,您还需要了解以下Kafka属性:- 起始消费位点
Kafka Source能够通过位点初始化器(OffsetsInitializer)来指定从不同的偏移量开始消费。内置的位点初始化器包括以下内容。
位点初始化器 代码设置 从最早位点开始消费。 KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())
从最末尾位点开始消费。 KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
从时间戳大于等于指定时间的数据开始消费。 KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点。 KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
从消费组提交的位点开始消费,不指定位点重置策略。 KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())
说明- 如果以上内置的初始化器不能满足需求,您也可以自己实现自定义的位点初始化器。
- 如果您未指定位点初始化器,则默认使用OffsetsInitializer.earliest(),即最早位点。
- 流模式和批模式
Kafka Source支持流式和批式两种运行模式。默认情况下,Kafka Source设置为以流模式运行,因此作业永远不会停止,直到Flink作业失败或被取消。如果要配置Kafka Source在批模式下运行,您可以使用setBounded(OffsetsInitializer)指定停止偏移量,当所有分区都达到其停止偏移量时,Kafka Source会退出运行。说明 通常,流模式下Kafka Source没有停止偏移量。为了方便对代码进行调试,流模式下您也可以使用 setUnbounded(OffsetsInitializer) 指定停止偏移量。请留意指定流模式和批模式停止偏移量的方法名(setUnbounded 和 setBounded)是不同的。
- 动态分区检查
为了在不重启Flink作业的情况下,处理Topic扩容或新建Topic等场景,您可以在提供的Topic或Partition订阅模式下,启用动态分区检查功能。说明 默认不开启动态分区检查功能。如果您需要开启此功能,则需要将partition.discovery.interval.ms设置为非负值,即可启用动态分区检查功能。代码示例如下。
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // 每10秒检查一次新分区。
- 事件时间和水印
Kafka Source默认使用Kafka消息中的时间戳作为事件时间。您可以自定义水印策略(Watermark Strategy)以从消息中提取事件时间,并向下游发送水印。
如果您需要了解自定义水印策略(Watermark Strategy),请参见Generating Watermarks。env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")
- 消费位点提交
Kafka Source在Checkpoint完成时,提交当前的消费位点,以保证Flink的Checkpoint状态和Kafka Broker上的提交位点一致。如果未开启Checkpoint,Kafka Source依赖于Kafka Consumer内部的位点定时自动提交逻辑,自动提交功能由enable.auto.commit和 auto.commit.interval.ms两个Kafka Consumer配置项进行配置。说明 Kafka Source不依赖于Broker上提交的位点来恢复失败的作业。提交位点只是为了上报Kafka Consumer和消费组的消费进度,以在Broker端进行监控。
- 其他属性
除了上述属性之外,您还可以使用setProperties(Properties) 和setProperty(String, String) 为Kafka Source和Kafka Consumer设置任意属性。KafkaSource通常有以下配置项。
配置项 说明 client.id.prefix 指定用于Kafka Consumer的客户端ID前缀。 partition.discovery.interval.ms 定义Kafka Source检查新分区的时间间隔。 说明 partition.discovery.interval.ms会在批模式下被覆盖为-1。register.consumer.metrics 指定是否在Flink中注册Kafka Consumer的指标。 其他Kafka Consumer配置 Kafka Consumer的配置详情,请参见Apache Kafka。注意 Kafka Connector会强制覆盖部分您手动配置的参数项,覆盖详情如下:- key.deserializer始终被覆盖为ByteArrayDeserializer。
- value.deserializer始终被覆盖为ByteArrayDeserializer。
- auto.offset.reset.strategy被覆盖为OffsetsInitializer#getAutoOffsetResetStrategy()。
以下示例为您展示如何配置Kafka Consumer,以使用PLAIN作为SASL机制并提供JAAS配置。KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
- 监控
Kafka Source在Flink中注册指标,用于监控和诊断。
- 指标范围
Kafka source reader的所有指标都注册在KafkaSourceReader指标组下,KafkaSourceReader是operator指标组的子组。与特定主题分区相关的指标注册在KafkaSourceReader.topic.<topic_name>.partition.<partition_id>指标组中。
例如Topic "my-topic"、分区1的当前消费位点(currentOffset)注册在<some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset。成功提交位点的次数(commitsSucceeded)注册在<some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded。
- 指标列表
指标名称 描述 范围 currentOffset 当前消费位点 TopicPartition committedOffset 当前提交位点 TopicPartition commitsSucceeded 成功提交的次数 KafkaSourceReader commitsFailed 失败的提交次数 KafkaSourceReader - Kafka Consumer指标
Kafka Consumer的指标注册在指标组KafkaSourceReader.KafkaConsumer。例如Kafka Consumer指标records-consumed-total注册在<some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total。
您可以使用配置项register.consumer.metrics配置是否注册Kafka消费者的指标。默认此选项设置为true。对于Kafka Consumer的指标,您可以参见Apache Kafka。
- 指标范围
- Topic列表,订阅Topic列表中所有Partition。
- Kafka Producer
Flink Kafka Producer可以实现将流数据写入一个或多个Kafka Topic。
DataStream<String> stream = ... Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>( "my-topic", // target topic new SimpleStringSchema(), // serialization schema properties, // producer config FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance stream.addSink(myProducer);
您需要配置以下参数。参数 说明 Topic 数据写入的默认Topic名称。 数据序列化 用于将数据序列化的SerializationSchema或KafkaSerializationSchema。 Flink Kafka Producer需要知道如何将Java或Scala对象转换为二进制数据,KafkaSerializationSchema允许用户指定如何进行数据的序列化。ProducerRecord<byte[], byte[]> serialize(T element, Long timestamp)方法会在每条数据流入的时候调用,来生成ProducerRecord写入Kafka。
用户可以对每条数据如何写入Kafka进行细粒度地控制。通过ProducerRecord可以进行以下操作:- 设置写入的Topic名称。
- 定义消息键(Key)。
- 指定数据写入的Partition。
Kafka客户端属性 bootstrap.servers必填,以逗号分隔的Kafka Broker列表。 容错语义 启用Flink的Checkpoint后,Flink Kafka Producer可以保证精确一次的语义。除了启用Flink的Checkpoint外,您还可以通过Semantic参数来指定不同的容错语义,Semantic参数详情如下:- Semantic.NONE:Flink不做任何保证,数据可能会丢失或重复。
- Semantic.AT_LEAST_ONCE(默认设置):保证不会丢失任何数据,但可能会重复。
- Semantic.EXACTLY_ONCE:使用Kafka事务提供精确一次的语义保证。
说明 使用EXACTLY_ONCE语义时,需要注意的事项请参见EXACTLY_ONCE语义注意事项。
- Kafka Source
上传Connector JAR包到Flink全托管开发控制台后,填写配置信息
- 登录实时计算管理控制台。
- 在Flink全托管页签,单击目标工作空间操作列下的控制台。
- 在左侧导航栏,单击资源上传。
- 单击上传资源,选择您要上传的目标Connector的JAR包。
您可以上传您自己开发的Connector,也可以上传Flink全托管产品提供的Connector。Flink全托管产品提供的Connector官方JAR包的下载地址,请参见Connector列表。
- 在目标作业开发页面附加依赖文件项,选择目标Connector的JAR包。
EXACTLY_ONCE语义注意事项
- 当使用事务写入Kafka时,请为所有消费Kafka数据的应用配置isolation.level参数。该参数取值如下:
- read_committed:只读取已提交的数据。
- read_uncommitted(默认值):可以读取未提交的数据。
- Semantic.EXACTLY_ONCE模式依赖于在从某个Checkpoint恢复后,且在该Checkpoint开始之前所提交的事务。如果Flink作业崩溃与完成重启之间的时间大于Kafka的事务超时时间,则会有数据丢失,因为Kafka会自动中止超过超时时间的事务。因此,请根据您的预期停机时间适当地配置您的事务超时。
Kafka Broker默认的transaction.max.timeout.ms设置为15分钟,Producer设置的事务超时不能超过Broker指定的时间。Flink Kafka Producer默认会将Kafka Producer配置中的transaction.timeout.ms属性设置为1小时,因此在使用Semantic.EXACTLY_ONCE模式前,需要增加Broker端的transaction.max.timeout.ms值。
- Semantic.EXACTLY_ONCE模式为在Flink Kafka Producer实例中使用一个固定大小的Kafka Producer池。每个Checkpoint会使用池中的一个Kafka Producer。如果并发的Checkpoint数量超过Producer池的大小,Flink Kafka Producer会抛出异常并使整个作业失败。请相应地配置Producer池大小和最大并发的Checkpoint数量。
- Semantic.EXACTLY_ONCE会尽可能清除阻止Consumer从Kafka topic中读取数据的残留事务。但如果Flink作业在第一个 Checkpoint之前就出现故障,则在重启该作业后并不会保留重启前Producer池的信息。因此,在第一个Checkpoint完成之前缩减Flink作业的并行度是不安全的,即使要缩减并行度,也不能小于FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR。
- 对于在read_committed模式下运行的Kafka Consumer,任何未结束(既未中止也未完成)的事务都将阻塞对该Kafka Topic的所有读取。如果您按照以下步骤进行了操作:
- 用户开启事务1并通过该事务写入了一些数据。
- 用户开启事务2并通过该事务写入了更多的数据。
- 用户提交事务2。
即使来自事务2的数据已经提交,在事务1提交或中止之前,事务2的数据对消费者是不可见的。因此:- 在Flink作业正常工作期间,您可以预期写入Kafka topic的数据会有延迟,约为Checkpoint的平均间隔。
- 在Flink作业失败的情况下,该作业正在写入的Topic将会阻塞Consumer的读取,直到作业重新启动或事务超时。