You can use Kafka DataStream connectors to allow the DataStream API operations to read data from or write data to Kafka topics by using the exactly-once semantics. This topic describes how to use Kafka DataStream connectors in fully managed Flink.

Background information

The Ververica Runtime (VVR) connectors are placed in the Maven central repository for you to use when you develop a job. You can use Kafka DataStream connectors in one of the following ways:

(Recommended) Package the connector as a project dependency into the JAR file of your job

  1. Add the following configurations to the POM file of the Maven project to reference SNAPSHOT repositories:
    <repositories>
      <repository>
        <id>oss.sonatype.org-snapshot</id>
        <name>OSS Sonatype Snapshot Repository</name>
        <url>http://oss.sonatype.org/content/repositories/snapshots</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
      <repository>
        <id>apache.snapshots</id>
        <name>Apache Development Snapshot Repository</name>
        <url>https://repository.apache.org/content/repositories/snapshots/</url>
        <releases>
          <enabled>false</enabled>
        </releases>
        <snapshots>
          <enabled>true</enabled>
        </snapshots>
      </repository>
    </repositories>
  2. Check whether the <mirrorOf>*</mirrorOf> configuration is contained in your settings.xml configuration file.

    If the <mirrorOf>*</mirrorOf> configuration is contained in the configuration file, change the configuration to <mirrorOf>*,!oss.sonatype.org-snapshot,!apache.snapshots</mirrorOf>.

    This change prevents the two SNAPSHOT repositories that you configured in Step 1 from being overwritten. If only an asterisk (*) is enclosed in the mirrorOf element, the SNAPSHOT repositories will be overwritten.

  3. Add the connector that you want to use to the Maven POM file as a project dependency.
    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${connector.version}</version>
    </dependency>
    Different connector versions may correspond to different connector types. We recommend that you use the latest version for the type of the connector that you use. For more information about the mappings among connector versions, VVR or Flink versions, and connector types, see DataStream connectors. For more information about the dependencies, see the POM file in the Kafka sample code.
    Notice
    • You must search for the connector versions that contain the SNAPSHOT keyword in the SNAPSHOT repository oss.sonatype.org. You cannot find the versions in the Maven central repository search.maven.org.
    • If you use multiple connectors, you must merge the files in the META-INF directory. To merge the files, add the following code to the POM file:
      <transformers>
          <!-- The service transformer is needed to merge META-INF/services files -->
          <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
              <projectName>Apache Flink</projectName>
              <encoding>UTF-8</encoding>
          </transformer>
      </transformers>
  4. Create a Kafka source or a Kafka producer.
    • Kafka Source
      You can use the KafkaSource class to create a Kafka source. The following sample code shows how to create a Kafka source to consume messages from the earliest offset of a topic. In the sample code, input-topic is the name of the topic and my-group is the name of the Kafka consumer group. The value of each message is deserialized as a string.
      KafkaSource<String> source = KafkaSource.<String>builder()
          .setBootstrapServers(brokers)
          .setTopics("input-topic")
          .setGroupId("my-group")
          .setStartingOffsets(OffsetsInitializer.earliest())
          .setValueOnlyDeserializer(new SimpleStringSchema())
          .build();
      
      env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
      The following table describes the parameters that you must configure when you create a Kafka source.
      Parameter Description
      BootstrapServers The addresses of Kafka brokers. You can call the setBootstrapServers(String) operation to configure the addresses.
      GroupId The ID of the consumer group. You can call the setGroupId(String) method to configure the ID.
      Topics or Partition The topics or names of the partitions to which you subscribe. You can configure a Kafka source to subscribe to topics or partitions by using one of the following subscription patterns:
      • Topic list. After you configure a topic list, the Kafka source subscribes to all partitions of the specified topics.
        KafkaSource.builder().setTopics("topic-a","topic-b")
      • Topic pattern. After you specify a regular expression, the Kafka source subscribes to all partitions of the topics that match the specified regular expression.
        KafkaSource.builder().setTopicPattern("topic.*")
      • Partition list. After you configure a partition list, the Kafka source subscribes to the specified partitions.
        final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
                new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
                new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
        KafkaSource.builder().setPartitions(partitionSet)
      Deserializer A deserializer that deserializes Kafka messages.
      You can call the setDeserializer(KafkaRecordDeserializationSchema) method to specify a deserializer. The KafkaRecordDeserializationSchema interface defines how a ConsumerRecord object is deserialized. You can use one of the following methods to deserialize only the Value fields in the Kafka messages of the ConsumerRecord object:
      • A Kafka source provides the setValueOnlyDeserializer(DeserializationSchema) method. The DeserializationSchema class defines how a Kafka message that is stored as a binary value is deserialized.
      • Use the classes that implement the Deserializer interface of Kafka. For example, you can use the StringDeserializer class to deserialize a message into a string.
        import org.apache.kafka.common.serialization.StringDeserializer;
        
        KafkaSource.<String>builder()
                .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
      Note If you want to deserialize a ConsumerRecord object, you must create a class that implements the KafkaRecordDeserializationSchema interface.
      When you use the Kafka DataStream connector, you must configure the following Kafka properties:
      • Starting offset
        You can use an offset initializer to specify an offset for a Kafka source when the Kafka source starts to read data. An offset initializer is an object that is based on the OffsetsInitializer interface. The KafkaSource class provides the following built-in offset initializers.
        Offset initializer Code
        Specifies that the Kafka source starts to consume messages from the earliest record of each partition. KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())
        Specifies that the Kafka source starts to consume messages from the latest record of each partition. KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())
        Specifies that the Kafka source starts to consume messages from the first record of each partition. The first record has a timestamp that is greater than or equal to the specified timestamp. KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200L))
        Specifies that the Kafka source starts to consume messages from the committed offset of each partition and a reset strategy is specified. If a partition does not have a committed offset, the reset strategy resets the offset and the Kafka source starts from the earliest record of the partition. KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
        Specifies that the Kafka source starts to consume messages from the committed offset of each partition and no reset strategy is specified. KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())
        Note
        • If the built-in offset initializers do not meet your business requirements, you can create custom offset initializers.
        • If you do not specify an offset initializer, the OffsetsInitializer.earliest() offset initializer is used by default.
      • Streaming execution mode and batch execution mode
        A Kafka source can operate in streaming execution mode or batch execution mode. By default, a Kafka source operates in streaming execution mode. In this mode, the job continues to run until the job fails or is canceled. If you want a Kafka source to operate in batch execution mode, you can call the setBounded(OffsetsInitializer) method to specify a stop offset. When all partitions reach their stop offsets, the Kafka source exits.
        Note In most cases, a Kafka source that operates in streaming execution mode does not have a stop offset. If you want to debug a Kafka source that operates in streaming execution mode, you can call the setUnbounded(OffsetsInitializer) method to specify a stop offset. The methods that you can use to specify a stop offset vary based on whether you use the streaming execution mode or batch execution mode.
      • Dynamic partition discovery
        If you want a running job to process data from new topics and from new partitions that match your subscription pattern without the need to restart the job, you can enable the dynamic partition discovery feature on the Kafka source.
        Note By default, the dynamic partition discovery feature is disabled. To enable the dynamic partition discovery feature on a Kafka source, set the partition.discovery.interval.ms parameter to a non-negative value. The following sample code shows how to configure the partition.discovery.interval.ms parameter.
        KafkaSource.builder()
            .setProperty("partition.discovery.interval.ms", "10000") // The Kafka source checks for new partitions every 10 seconds. 
      • Event time and watermarks
        By default, a Kafka source uses the timestamp that is attached to a record as the event time for the record. You can define a watermark strategy based on the event time of each record and send the watermarks to downstream services.
        env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")
        For more information about how to define a watermark strategy, see Generating Watermarks.
      • Consumer offsets
        When a checkpoint is generated, a Kafka source commits the Kafka consumer offset of each partition to Kafka brokers. This way, the Kafka consumer offsets that are recorded on Kafka brokers are consistent with the state of the checkpoint. The Kafka consumer can automatically commit the offsets on each partition to Kafka brokers on a regular basis. You can configure the automatic offset commission feature by using the enable.auto.commit and auto.commit.interval.ms parameters. If you disable the checkpointing feature, a Kafka source relies on the Kafka consumer to commit the offsets to Kafka brokers.
        Note Kafka sources do not use the committed offsets that are recorded on Kafka brokers for fault tolerance. When you commit offsets, Kafka brokers can monitor the progress of record consumption on each partition.
      • Additional properties
        You can call the setProperties(Properties) and setProperty(String, String) methods to configure additional properties for the Kafka source and Kafka consumer. The following table describes the properties of a Kafka source.
        Parameter Description
        client.id.prefix Specifies the prefix for the client ID of the Kafka consumer.
        partition.discovery.interval.ms Specifies the time interval at which the Kafka source checks for new partitions.
        Note If the Kafka source operates in batch execution mode, the property is automatically set to -1.
        register.consumer.metrics Specifies whether to register metrics for the Kafka consumer in fully managed Flink.
        Additional properties for the Kafka consumer
        For more information about the properties of a Kafka consumer, see Apache Kafka.
        Notice The Kafka DataStream connector overwrites the values of the following properties:
        • key.deserializer: The value of this property is set to ByteArrayDeserializer.
        • value.deserializer: The value of this property is set to ByteArrayDeserializer.
        • auto.offset.reset.strategy: The value of this property is set to OffsetsInitializer#getAutoOffsetResetStrategy().
        The following sample code shows how the Kafka consumer connects to the Kafka cluster by using a JAAS configuration and the SASL/PLAIN authentication mechanism.
        KafkaSource.builder()
            .setProperty("sasl.mechanism", "PLAIN")
            .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
      • Monitoring
        Kafka sources register metrics in fully managed Flink for monitoring and diagnosis.
        • Metric scope

          All metrics of a Kafka source are registered under the KafkaSourceReader metric group. KafkaSourceReader is a subgroup of the operator metric group. The metrics for a specific partition are registered in the KafkaSourceReader.topic.<topic_name>.partition.<partition_id> metric group.

          For example, a topic is named my-topic and the partition of the topic is named 1. The consumer offset of the partition is reported by the <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset metric. The number of successful commits of consumer offsets is measured by the <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded metric.

        • Metrics
          Metric Description Scope
          currentOffset Reports the Kafka consumer offset of a partition. TopicPartition
          committedOffset Reports the committed consumer offset of a partition. TopicPartition
          commitsSucceeded Reports the number of successful commits of consumer offsets. KafkaSourceReader
          commitsFailed Reports the number of failed commits of consumer offsets. KafkaSourceReader
        • Metrics for the Kafka consumer

          The metrics for the Kafka consumer are registered in the KafkaSourceReader.KafkaConsumer metric group. For example, the records-consumed-total metric is registered at <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.

          You can configure the register.consumer.metrics parameter to specify whether to register metrics for the Kafka consumer. By default, the register.consumer.metrics parameter is set to true. For more information about the metrics for a Kafka consumer, see Apache Kafka.

    • Kafka Producer
      A Kafka producer can write data from multiple streams to one or more Kafka topics.
      DataStream<String> stream = ...
      
      Properties properties = new Properties();
      properties.setProperty("bootstrap.servers", "localhost:9092");
      
      FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
              "my-topic",                  // target topic
              new SimpleStringSchema(),    // serialization schema
              properties,                  // producer config
              FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
      
      stream.addSink(myProducer);
      The following table describes the parameters.
      Parameter Description
      Topic The name of the topic to which data is written.
      Serialization schema The serialization schema. The schema is specified by the SerializationSchema or KafkaSerializationSchema class.

      A producer converts Java or Scala objects into binary data and then writes the data to the required topic. You can use the KafkaSerializationSchema or SerializationSchema class to specify the schema that the Kafka producer uses during data serialization. A Kafka producer calls the ProducerRecord<byte[], byte[]> serialize(T element, Long timestamp) method for each incoming record to generate a ProducerRecord object that represents the serialized record. Then, the Kafka producer writes the ProducerRecord object to the required topic.

      The ProducerRecord class provides properties that you can configure to manage behavior when the Kafka producer writes a record to the required Kafka topic. You can configure the following properties of the ProducerRecord class:
      • The name of the topic to which the record is written.
      • The key for the record.
      • The name of the partition to which the record is written.
      Properties of the Kafka client The bootstrap.servers property is required. This property specifies the addresses of the Kafka brokers. Separate multiple addresses with a comma (,).
      Fault tolerance semantics
      After you enable the checkpointing feature, a Kafka producer can ensure exactly-once delivery. You can also configure the semantic parameter of a Kafka producer to specify one of the following fault tolerance semantics:
      • Semantic.NONE: The Kafka producer does not ensure the delivery of data. Data may be lost or duplicated.
      • Semantic.AT_LEAST_ONCE: By default, the Kafka producer ensures that data is not lost. However, data may be duplicated.
      • Semantic.EXACTLY_ONCE: The Kafka producer ensures that data is not lost or duplicated. The Kafka transaction mechanism is used to ensure exactly-once delivery.
        Note For more information about the exactly-once semantics, see Precautions for Semantic.EXACTLY_ONCE.

Upload the JAR package of the Kafka DataStream connector to the console of fully managed Flink

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Artifacts.
  4. Click Upload Artifact and select the JAR package that you want to upload.

    You can upload the JAR package of your self-managed connector or the JAR package of a connector provided by fully managed Flink. For the download links of the official JAR packages provided by fully managed Flink, see Connectors.

  5. In the Additional Dependencies section of the Draft Editor page, select the JAR package that you want to use.

Precautions for Semantic.EXACTLY_ONCE

  • When you use the Kafka transaction mechanism to write data to the required Kafka topic, you must check the setting of the isolation.level parameter for all Kafka consumers. The isolation.level parameter has the following valid values:
    • read_committed: The job can read only the committed data.
    • read_uncommitted: The job can read data that is not committed. This is the default value.
  • A Kafka producer that operates in Semantic.EXACTLY_ONCE mode relies on the transactions that are committed after a job is recovered from a checkpoint and before the checkpoint is taken. If the duration between the time when a job fails and the time when the job is restarted exceeds the specified transaction timeout period, data may be lost because Kafka automatically terminates the transactions that time out. Therefore, we recommend that you configure a transaction timeout period that is longer than the estimated job downtime.

    By default, the transaction.max.timeout.ms parameter of Kafka brokers is set to 15 minutes. The transaction timeout that you configure for a Kafka producer cannot exceed the transaction timeout setting for Kafka brokers. The default value of the transaction.timeout.ms parameter for a Kafka producer is 1 hour. Therefore, you must adjust the transaction timeout period for the Kafka producer and Kafka brokers.

  • The Semantic.EXACTLY_ONCE mode uses a fixed-size pool that specifies the number of Kafka producers that can run at the same time. If checkpointing is triggered on a Kafka producer, each checkpoint uses a Kafka producer that counts towards the pool size. If the number of parallel checkpoints exceeds the pool size, the Kafka producer throws an exception that causes the job to fail. You must configure a pool size that matches the maximum number of parallel checkpoints.
  • When a job starts, the Kafka producer that operates in Semantic.EXACTLY_ONCE execution mode tries to terminate transactions that are not committed during checkpointing. These uncommitted transactions may block consumers from reading data. If a Flink job fails before the first checkpoint is taken, the information about the transactions that are used by the Kafka producer is lost when the job is restarted. If you want to reduce the job parallelism before the first checkpoint is taken, make sure that the value after you divide the original parallelism by the new parallelism is greater than or equal to the value of FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR. Otherwise, memory leaks may occur because of untracked transactions.
  • If some transactions of a Kafka topic are not committed, the Kafka consumers for which the isolation.level property is set to read_committed do not read data from the Kafka topic. Example:
    1. A user creates a transaction to write data to a topic.
    2. The user creates another transaction to write data to the topic.
    3. The user commits the second transaction.
    In this case, consumers cannot read the data from the second transaction until the first transaction is committed or canceled.
    • You can estimate the delay at which data is written to a Kafka topic. The checkpoint interval is approximately the value of the delay.
    • If a Flink job fails, the topics that are written by the job block consumers from reading data until the job is restarted or the transactions time out.