如果您通过DataStream的方式以精确一次(Exactly-once)的语义在Kafka Topic中读写数据,则需要使用Kafka DataStream Connector连接Flink全托管。本文为您介绍如何在Flink全托管控制台上使用Kafka DataStream Connector来读写Kafka数据。

背景信息

Maven中央库中已经放置了VVR Connector,以供您在作业开发时直接使用。您可以通过以下任何一种方式来使用DataStream Connector:

(推荐)直接将Connector作为项目依赖打进作业JAR包

  1. 在Maven项目的pom.xml文件中添加以下配置以引用SNAPSHOT仓库。
    <repositories>
      <repository>
        <id>oss.sonatype.org-snapshot</id>
        <name>OSS Sonatype Snapshot Repository</name>
        <url>http://oss.sonatype.org/content/repositories/snapshots</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
      <repository>
        <id>apache.snapshots</id>
        <name>Apache Development Snapshot Repository</name>
        <url>https://repository.apache.org/content/repositories/snapshots/</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
    </repositories>
  2. 检查您的settings.xml配置文件中是否存在<mirrorOf>*</mirrorOf>配置。

    如果存在<mirrorOf>*</mirrorOf>配置,则需要将此配置改为<mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>

    修改的目的是为了避免SNAPSHOT仓库被覆盖,因为mirrorOf中只使用星号(*)会导致第一步中配置的两个repository被覆盖。

  3. 在作业的Maven POM文件中添加您需要的Connector作为项目依赖。
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${connector.version}</version>
    </dependency>
    每个Connector版本对应的Connector类型可能不同,建议您使用最新版本。Connector版本、VVR/Flink版本和Connector类型的对应关系请参见Connector列表。完整的依赖信息请参见Kafka示例代码中的pom.xml文件。
    注意
    • 您需要在SNAPSHOT仓库(oss.sonatype.org)查找带SNAPSHOT的Connector版本,在Maven中央库(search.maven.org)上会查找不到。
    • 在使用多个Connector时,请注意META-INF目录需要Merge,即在pom.xml文件中添加如下代码。
      <transformers>
          <!-- The service transformer is needed to merge META-INF/services files -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
              <projectName>Apache Flink</projectName>
              <encoding>UTF-8</encoding>
          </transformer>
      </transformers>
  4. 构建Kafka Source或者Kafka Producer。
    • Kafka Source
      Kafka Source提供了构建类来创建Kafka Source的实例。我们将通过以下示例代码为您介绍如何构建Kafka Source来消费input-topic最早位点的数据,消费组名称为my-group,并将Kafka消息体反序列化为字符串。
      KafkaSource<String> source = KafkaSource.<String>builder()
          .setBootstrapServers(brokers)
          .setTopics("input-topic")
          .setGroupId("my-group")
          .setStartingOffsets(OffsetsInitializer.earliest())
          .setValueOnlyDeserializer(new SimpleStringSchema())
          .build();
      
      env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
      在构建KafkaSource时,必须指定以下参数。
      参数 说明
      BootstrapServers Kafka Broker地址,通过setBootstrapServers(String)方法配置。
      GroupId 消费者组ID,通过setGroupId(String)方法配置。
      Topics或Partition 订阅的Topic或Partition名称。Kafka Source提供了以下三种Topic或Partition的订阅方式:
      • Topic列表,订阅Topic列表中所有Partition。
        KafkaSource.builder().setTopics("topic-a","topic-b")
      • 正则表达式匹配,订阅与正则表达式所匹配的Topic下的所有Partition。
        KafkaSource.builder().setTopicPattern("topic.*")
      • Partition列表,订阅指定的Partition。
        final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
                new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
                new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
        KafkaSource.builder().setPartitions(partitionSet)
      Deserializer 解析Kafka消息的反序列化器。
      反序列化器通过setDeserializer(KafkaRecordDeserializationSchema)来指定,其中KafkaRecordDeserializationSchema定义了如何解析Kafka的ConsumerRecord。如果只解析Kafka消息中的消息体(Value)的数据,则您可以通过以下任何一种方式实现:
      • 使用Flink提供的KafkaSource构建类中的setValueOnlyDeserializer(DeserializationSchema)方法,其中DeserializationSchema定义了如何解析Kafka消息体中的二进制数据。
      • 使用Kafka提供的解析器,包括多种实现类。例如,您可以使用StringDeserializer来将Kafka消息体解析成字符串。
        import org.apache.kafka.common.serialization.StringDeserializer;
        
        KafkaSource.<String>builder()
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
      说明 如果需要完整地解析ConsumerRecord,则需要您自己实现KafkaRecordDeserializationSchema接口。
      在使用Kafka DataStream Connector时,您还需要了解以下Kafka属性:
      • 起始消费位点
        Kafka Source能够通过位点初始化器(OffsetsInitializer)来指定从不同的偏移量开始消费。内置的位点初始化器包括以下内容。
        位点初始化器 代码设置
        从最早位点开始消费。 KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())
        从最末尾位点开始消费。 KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
        从时间戳大于等于指定时间的数据开始消费。 KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
        从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点。 KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
        从消费组提交的位点开始消费,不指定位点重置策略。 KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())
        说明
        • 如果以上内置的初始化器不能满足需求,您也可以自己实现自定义的位点初始化器。
        • 如果您未指定位点初始化器,则默认使用OffsetsInitializer.earliest(),即最早位点。
      • 流模式和批模式
        Kafka Source支持流式和批式两种运行模式。默认情况下,Kafka Source设置为以流模式运行,因此作业永远不会停止,直到Flink作业失败或被取消。如果要配置Kafka Source在批模式下运行,您可以使用setBounded(OffsetsInitializer)指定停止偏移量,当所有分区都达到其停止偏移量时,Kafka Source会退出运行。
        说明 通常,流模式下Kafka Source没有停止偏移量。为了方便对代码进行调试,流模式下您也可以使用 setUnbounded(OffsetsInitializer) 指定停止偏移量。请留意指定流模式和批模式停止偏移量的方法名(setUnbounded 和 setBounded)是不同的。
      • 动态分区检查
        为了在不重启Flink作业的情况下,处理Topic扩容或新建Topic等场景,您可以在提供的Topic或Partition订阅模式下,启用动态分区检查功能。
        说明 默认不开启动态分区检查功能。如果您需要开启此功能,则需要将partition.discovery.interval.ms设置为非负值,即可启用动态分区检查功能。代码示例如下。
        KafkaSource.builder()
            .setProperty("partition.discovery.interval.ms", "10000") // 每10秒检查一次新分区。
      • 事件时间和水印
        Kafka Source默认使用Kafka消息中的时间戳作为事件时间。您可以自定义水印策略(Watermark Strategy)以从消息中提取事件时间,并向下游发送水印。
        env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")
        如果您需要了解自定义水印策略(Watermark Strategy),请参见Generating Watermarks
      • 消费位点提交
        Kafka Source在Checkpoint完成时,提交当前的消费位点,以保证Flink的Checkpoint状态和Kafka Broker上的提交位点一致。如果未开启Checkpoint,Kafka Source依赖于Kafka Consumer内部的位点定时自动提交逻辑,自动提交功能由enable.auto.commit和 auto.commit.interval.ms两个Kafka Consumer配置项进行配置。
        说明 Kafka Source不依赖于Broker上提交的位点来恢复失败的作业。提交位点只是为了上报Kafka Consumer和消费组的消费进度,以在Broker端进行监控。
      • 其他属性
        除了上述属性之外,您还可以使用setProperties(Properties) 和setProperty(String, String) 为Kafka Source和Kafka Consumer设置任意属性。KafkaSource通常有以下配置项。
        配置项 说明
        client.id.prefix 指定用于Kafka Consumer的客户端ID前缀。
        partition.discovery.interval.ms 定义Kafka Source检查新分区的时间间隔。
        说明 partition.discovery.interval.ms会在批模式下被覆盖为-1。
        register.consumer.metrics 指定是否在Flink中注册Kafka Consumer的指标。
        其他Kafka Consumer配置
        Kafka Consumer的配置详情,请参见Apache Kafka
        注意 Kafka Connector会强制覆盖部分您手动配置的参数项,覆盖详情如下:
        • key.deserializer始终被覆盖为ByteArrayDeserializer。
        • value.deserializer始终被覆盖为ByteArrayDeserializer。
        • auto.offset.reset.strategy被覆盖为OffsetsInitializer#getAutoOffsetResetStrategy()。
        以下示例为您展示如何配置Kafka Consumer,以使用PLAIN作为SASL机制并提供JAAS配置。
        KafkaSource.builder()
            .setProperty("sasl.mechanism", "PLAIN")
            .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
      • 监控
        Kafka Source在Flink中注册指标,用于监控和诊断。
        • 指标范围

          Kafka source reader的所有指标都注册在KafkaSourceReader指标组下,KafkaSourceReader是operator指标组的子组。与特定主题分区相关的指标注册在KafkaSourceReader.topic.<topic_name>.partition.<partition_id>指标组中。

          例如Topic "my-topic"、分区1的当前消费位点(currentOffset)注册在<some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset。成功提交位点的次数(commitsSucceeded)注册在<some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded。

        • 指标列表
          指标名称 描述 范围
          currentOffset 当前消费位点 TopicPartition
          committedOffset 当前提交位点 TopicPartition
          commitsSucceeded 成功提交的次数 KafkaSourceReader
          commitsFailed 失败的提交次数 KafkaSourceReader
        • Kafka Consumer指标

          Kafka Consumer的指标注册在指标组KafkaSourceReader.KafkaConsumer。例如Kafka Consumer指标records-consumed-total注册在<some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total。

          您可以使用配置项register.consumer.metrics配置是否注册Kafka消费者的指标。默认此选项设置为true。对于Kafka Consumer的指标,您可以参见Apache Kafka

    • Kafka Producer
      Flink Kafka Producer可以实现将流数据写入一个或多个Kafka Topic。
      DataStream<String> stream = ...
      
      Properties properties = new Properties();
      properties.setProperty("bootstrap.servers", "localhost:9092");
      
      FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
              "my-topic",                  // target topic
              new SimpleStringSchema(),    // serialization schema
              properties,                  // producer config
              FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
      
      stream.addSink(myProducer);
      您需要配置以下参数。
      参数 说明
      Topic 数据写入的默认Topic名称。
      数据序列化 用于将数据序列化的SerializationSchema或KafkaSerializationSchema。

      Flink Kafka Producer需要知道如何将Java或Scala对象转换为二进制数据,KafkaSerializationSchema允许用户指定如何进行数据的序列化。ProducerRecord<byte[], byte[]> serialize(T element, Long timestamp)方法会在每条数据流入的时候调用,来生成ProducerRecord写入Kafka。

      用户可以对每条数据如何写入Kafka进行细粒度地控制。通过ProducerRecord可以进行以下操作:
      • 设置写入的Topic名称。
      • 定义消息键(Key)。
      • 指定数据写入的Partition。
      Kafka客户端属性 bootstrap.servers必填,以逗号分隔的Kafka Broker列表。
      容错语义
      启用Flink的Checkpoint后,Flink Kafka Producer可以保证精确一次的语义。除了启用Flink的Checkpoint外,您还可以通过Semantic参数来指定不同的容错语义,Semantic参数详情如下:
      • Semantic.NONE:Flink不做任何保证,数据可能会丢失或重复。
      • Semantic.AT_LEAST_ONCE(默认设置):保证不会丢失任何数据,但可能会重复。
      • Semantic.EXACTLY_ONCE:使用Kafka事务提供精确一次的语义保证。
        说明 使用EXACTLY_ONCE语义时,需要注意的事项请参见EXACTLY_ONCE语义注意事项

上传Connector JAR包到Flink全托管开发控制台后,填写配置信息

  1. 登录实时计算管理控制台
  2. Flink全托管页签,单击目标工作空间操作列下的控制台
  3. 在左侧导航栏,单击资源上传
  4. 单击上传资源,选择您要上传的目标Connector的JAR包。

    您可以上传您自己开发的Connector,也可以上传Flink全托管产品提供的Connector。Flink全托管产品提供的Connector官方JAR包的下载地址,请参见Connector列表

  5. 在目标作业开发页面附加依赖文件项,选择目标Connector的JAR包。

EXACTLY_ONCE语义注意事项

  • 当使用事务写入Kafka时,请为所有消费Kafka数据的应用配置isolation.level参数。该参数取值如下:
    • read_committed:只读取已提交的数据。
    • read_uncommitted(默认值):可以读取未提交的数据。
  • Semantic.EXACTLY_ONCE模式依赖于在从某个Checkpoint恢复后,且在该Checkpoint开始之前所提交的事务。如果Flink作业崩溃与完成重启之间的时间大于Kafka的事务超时时间,则会有数据丢失,因为Kafka会自动中止超过超时时间的事务。因此,请根据您的预期停机时间适当地配置您的事务超时。

    Kafka Broker默认的transaction.max.timeout.ms设置为15分钟,Producer设置的事务超时不能超过Broker指定的时间。Flink Kafka Producer默认会将Kafka Producer配置中的transaction.timeout.ms属性设置为1小时,因此在使用Semantic.EXACTLY_ONCE模式前,需要增加Broker端的transaction.max.timeout.ms值。

  • Semantic.EXACTLY_ONCE模式为在Flink Kafka Producer实例中使用一个固定大小的Kafka Producer池。每个Checkpoint会使用池中的一个Kafka Producer。如果并发的Checkpoint数量超过Producer池的大小,Flink Kafka Producer会抛出异常并使整个作业失败。请相应地配置Producer池大小和最大并发的Checkpoint数量。
  • Semantic.EXACTLY_ONCE会尽可能清除阻止Consumer从Kafka topic中读取数据的残留事务。但如果Flink作业在第一个 Checkpoint之前就出现故障,则在重启该作业后并不会保留重启前Producer池的信息。因此,在第一个Checkpoint完成之前缩减Flink作业的并行度是不安全的,即使要缩减并行度,也不能小于FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR。
  • 对于在read_committed模式下运行的Kafka Consumer,任何未结束(既未中止也未完成)的事务都将阻塞对该Kafka Topic的所有读取。如果您按照以下步骤进行了操作:
    1. 用户开启事务1并通过该事务写入了一些数据。
    2. 用户开启事务2并通过该事务写入了更多的数据。
    3. 用户提交事务2。
    即使来自事务2的数据已经提交,在事务1提交或中止之前,事务2的数据对消费者是不可见的。因此:
    • 在Flink作业正常工作期间,您可以预期写入Kafka topic的数据会有延迟,约为Checkpoint的平均间隔。
    • 在Flink作业失败的情况下,该作业正在写入的Topic将会阻塞Consumer的读取,直到作业重新启动或事务超时。