All Products
Search
Document Center

Realtime Compute for Apache Flink:Message Queue for Kafka

Last Updated:Feb 06, 2026

This topic describes how to use the Kafka connector.

Background information

Apache Kafka is an open source distributed message queue service widely used in big data applications, such as high-performance data processing, streaming analytics, and data integration. The Kafka connector supports high-throughput data ingestion and egress, read and write operations on data in multiple formats, and exactly-once semantics for Realtime Compute for Apache Flink by leveraging the Apache Kafka client.

Category

Description

Supported type

Source table, sink table, data ingestion sink

Running mode

Streaming mode

Data format

Supported data formats

  • CSV

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium JSON

  • Canal JSON

  • Maxwell JSON

  • Raw

  • Protobuf

Note
  • Protobuf is supported only for Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.9 or later.

  • Each of the supported data formats includes corresponding configuration options that you can use directly in the WITH clause. For more information, see Flink documentation.

Unique Monitoring Metrics

Specific monitoring metrics

  • Source table

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • Sink table

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Note

For more information about the metrics, see Monitoring metrics.

API type

SQL API, DataStream API, and data ingestion YAML API

Can you update or delete data in a sink table?

You cannot update or delete data in a sink table. You can only insert data into a sink table.

Note

For more information about features related to updating or deleting data, see Upsert Kafka.

Prerequisites

You can connect to the cluster using one of the following methods, depending on your requirements:

  • Connect to an ApsaraMQ for Kafka cluster

    • The ApsaraMQ for Kafka cluster version is 0.11 or later.

    • An ApsaraMQ for Kafka cluster has been created. For more information, see Create resources.

    • The Realtime Compute for Apache Flink workspace resides in the same virtual private cloud (VPC) as the ApsaraMQ for Kafka cluster, and the CIDR blocks of the Realtime Compute for Apache Flink VPC are added to the whitelist of the ApsaraMQ for Kafka cluster. For more information about how to configure an ApsaraMQ for Kafka whitelist, see Configure whitelists.

    Important

    When writing data to ApsaraMQ for Kafka, note the following:

    • ApsaraMQ for Kafka does not support the Zstandard compression algorithm for data writes.

    • ApsaraMQ for Kafka does not support idempotent or transactional write operations. Therefore, you cannot use the exactly-once semantics supported by Kafka sink tables. If your Realtime Compute for Apache Flink job uses Ververica Runtime (VVR) 8.0.0 or later, disable the idempotent write feature by setting properties.enable.idempotence=false for the sink table. For more information about storage engine comparisons and limits for ApsaraMQ for Kafka, see Comparison between storage engines.

  • Connect to a self-managed Apache Kafka cluster

    • The self-managed Apache Kafka cluster version is 0.11 or later.

    • A network connection exists between Flink and the self-managed Apache Kafka cluster. For more information about connecting a self-managed Apache Kafka cluster over the public network, see Network Connectivity Options.

    • Only Apache Kafka 2.8 client options are supported. For more information, see the Apache Kafka Consumer Configs and Producer Configs documentation.

Usage notes

Currently, transactional writes are not recommended due to design limitations in the Flink and Kafka communities. When you set sink.delivery-guarantee = exactly-once, the Kafka connector enables transactional writes, which have three known issues:

  • Each checkpoint generates a new transaction ID. If the checkpoint interval is too short, the number of transaction IDs becomes excessive. As a result, the Kafka coordinator may run out of memory, compromising Kafka cluster stability.

  • Each transaction creates a producer instance. If too many transactions are committed simultaneously, the TaskManager may run out of memory, destabilizing the Flink job.

  • If multiple Flink jobs use the same sink.transactional-id-prefix, their generated transaction IDs may conflict. If a job fails to write, it blocks the Log Start Offset (LSO) of the Kafka partition from advancing, affecting all consumers reading from that partition.

If you require exactly-once semantics, use Upsert Kafka to write to a primary key table and rely on the primary key to ensure idempotence. To use transactional writes, see Notes on Exactly-Once Semantics.

Troubleshoot network connectivity

If a Flink job reports the error Timed out waiting for a node assignment during startup, this typically indicates a network connectivity issue between Flink and Kafka.

The Kafka client connects to the server as shown below:

  1. You connect to Kafka using the addresses in bootstrap.servers.

  2. Kafka returns metadata for each broker in the cluster, such as their endpoints.

  3. The client then connects to each broker using the returned endpoints to produce or consume data.

Even if the bootstrap.servers address is reachable, the client cannot read from or write to the brokers if Kafka returns incorrect broker addresses. This issue commonly occurs in network architectures that use proxies, port forwarding, or leased lines.

Troubleshooting steps

ApsaraMQ for Kafka

  1. Confirm the access point type

    • Default access point (internal network)

    • SASL access point (internal network with authentication)

    • Public network access point (requires separate application)

    Use the Flink development console to run a network probe to rule out connectivity issues with the bootstrap.servers address.

  2. Check security groups and whitelists

    The Kafka instance must add the Flink VPC to its whitelist. For more information, see View a VPC CIDR block and Configure a whitelist.

  3. Check SASL configurations (if enabled)

    If you use an SASL_SSL endpoint, you must correctly configure the JAAS, SSL, and SASL mechanisms in your Flink job. Missing authentication causes the connection to fail during the handshake phase and may manifest as a timeout. For more information, see Security and Authentication.

Self-managed Kafka (ECS)

  1. Use the Flink development console to performnetwork probing.

    Rule out connectivity issues with the bootstrap.servers address and verify that the public and internal endpoints are correct.

  2. Check security groups and whitelists

    • The ECS security group must allow traffic on the Kafka access point port (typically 9092 or 9093).

    • The VPC where Flink is located must be added to the whitelist of the ECS instance. For more information, see View VPC CIDR block.

  3. Configuration troubleshooting

    1. Log on to the ZooKeeper cluster used by Kafka and use the zkCli.sh or zookeeper-shell.sh tool.

    2. Run the command to retrieve broker metadata. For example: get /brokers/ids/0. In the endpoints field of the response, find the address that Kafka advertises to clients.

      example

    3. Perform a network probe in the Flink development console to test whether the address is reachable.

      Note
      • If the address is unreachable, contact the Kafka O&M engineer to check and correct the listeners and advertised.listeners configuration so that the returned address is accessible to Flink.

      • For more information about Kafka client and server connectivity, see Troubleshoot Connectivity.

  4. Check SASL configurations (if enabled)

    If you use an SASL_SSL endpoint, you must correctly configure the JAAS, SSL, and SASL mechanisms in your Flink job. Missing authentication causes the connection to fail during the handshake phase or manifest as a timeout. For more information, see Security and Authentication.

SQL

You can use the Kafka connector in SQL jobs as a source table or a sink table.

Syntax

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

Metadata columns

You can define metadata columns in a Kafka source table or a Kafka sink table to retrieve Kafka message metadata. For example, if multiple topics are defined in the WITH clause for a Kafka source table and a metadata column is defined in the source table, the topic from which Flink reads data is marked. The following sample code provides an example of how to use metadata columns:

CREATE TABLE kafka_source (
  -- Read the topic to which the message belongs as the value of the record_topic field.
  `record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
  -- Read the timestamp in ConsumerRecord as the value of the ts field.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  -- Read the offset of the message as the value of the record_offset field.
  `record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE kafka_sink (
  -- Write the timestamp in the ts field as the timestamp of ProducerRecord to Kafka.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

The following table describes the metadata columns supported by Kafka source tables and sink tables.

Key

Data type

Description

Source table or sink table

topic

STRING NOT NULL METADATA VIRTUAL

The name of the topic to which the Kafka message belongs.

Source table

partition

INT NOT NULL METADATA VIRTUAL

The ID of the partition to which the Kafka message belongs.

Source table

headers

MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL

Headers of the Kafka message.

Source table and sink table

leader-epoch

INT NOT NULL METADATA VIRTUAL

The leader epoch of the Kafka message.

Source table

offset

BIGINT NOT NULL METADATA VIRTUAL

The offset of the Kafka message.

Source table

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL

The timestamp of the Kafka message.

Source table and sink table

timestamp-type

STRING NOT NULL METADATA VIRTUAL

The timestamp type of the Kafka message:

  • NoTimestampType: indicates that no timestamp is defined in the message.

  • CreateTime: indicates the time when the message was generated.

  • LogAppendTime: indicates the time when the message was added to Kafka brokers.

Source table

__raw_key__

STRING NOT NULL METADATA VIRTUAL

The raw key field of the Kafka message.

Source table and sink table

Note

This parameter is supported only for Realtime Compute for Apache Flink that uses VVR 11.4 or later.

__raw_value__

STRING NOT NULL METADATA VIRTUAL

The raw value field of the Kafka message.

Source table and sink table

Note

This parameter is supported only for Realtime Compute for Apache Flink that uses VVR 11.4 or later.

WITH parameters

  • General

    parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The type of the table.

    String

    Yes

    None

    Set the value to kafka.

    properties.bootstrap.servers

    The IP address and port number of a Kafka broker.

    String

    Yes

    None

    Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

    properties.*

    The options that are configured for the Kafka client.

    String

    No

    None

    The suffix must comply with the rules that are defined in Producer Configs and Consumer Configs.

    Flink removes the "properties." prefix and passes the remaining configuration to the Kafka client. For example, you can disable automatic topic creation using 'properties.allow.auto.create.topics'='false'.

    The following configurations cannot be modified in this way because the Kafka connector overwrites them:

    • key.deserializer

    • value.deserializer

    format

    The format used to read or write the value field of Kafka messages.

    String

    No

    No default value.

    Supported formats

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Note

    For more information about format options, see Format Options.

    key.format

    The format used to read or write the key field of Kafka messages.

    String

    No

    No default value.

    Supported formats

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Note

    If you use this configuration, the key.options configuration is required.

    key.fields

    The key fields in the source table or sink table that correspond to the key fields of Kafka messages.

    String

    No

    None

    Separate multiple field names with semicolons (;), such as field1;field2.

    key.fields-prefix

    The custom prefix for all key fields in Kafka messages. You can configure this option to prevent name conflicts with the value fields.

    String

    No

    None

    This option is used only to distinguish the column names of source tables and sink tables. The prefix is removed from the column names when the key fields of Kafka messages are parsed or generated.

    Note

    When you use this configuration, you must set the value.fields-include option to EXCEPT_KEY.

    value.format

    The format used to read or write the value field of Kafka messages.

    String

    No

    None

    This configuration is equivalent to format, and you can configure only one of format or value.format. If you configure both, value.format overwrites format.

    value.fields-include

    Specifies whether to include corresponding message keys when parsing or generating Kafka message values.

    String

    No

    ALL

    Valid values:

    • ALL (default): All fields are processed as the value of a Kafka message.

    • EXCEPT_KEY: All fields except for the fields specified by the key.fields option are processed as the Kafka message value.

  • Source table

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    topic

    The name of the topic from which you want to read data.

    String

    No

    None

    Separate multiple topic names with semicolons (;), such as topic-1 and topic-2.

    Note

    You cannot use the topic option together with the topic-pattern option.

    topic-pattern

    The regular expression that is used to match topics. Data of all topics whose names match the specified regular expression is read when a deployment is running.

    String

    No

    None

    Note

    You cannot use the topic option together with the topic-pattern option.

    properties.group.id

    The consumer group ID.

    String

    No

    KafkaSource-{Name of the source table}

    If the specified group ID is used for the first time, you must set properties.auto.offset.reset to earliest or latest to specify the initial start offset.

    scan.startup.mode

    The start offset for Kafka to read data.

    String

    No

    group-offsets

    Valid values:

    • earliest-offset: Kafka reads data from the earliest partition.

    • latest-offset: Kafka reads data from the latest offset.

    • group-offsets (default): reads data from the offset that is committed by the consumer group with the ID that is specified by the properties.group.id option.

    • timestamp: Reads data from the timestamp specified by scan.startup.timestamp-millis.

    • specific-offsets: reads data from the offset that is specified by the scan.startup.specific-offsets option.

    Note

    This option takes effect when the deployment is started without states. When the deployment is restarted from a checkpoint or resumes from the specified state, the deployment preferentially starts to read data at the progress that is saved in the state data.

    scan.startup.specific-offsets

    The start offset of each partition when the scan.startup.mode option is set to specific-offsets.

    String

    No

    None

    Example: partition:0,offset:42;partition:1,offset:300.

    scan.startup.timestamp-millis

    The timestamp of the start offset when the scan.startup.mode option is set to timestamp.

    Long

    No

    None

    Unit: milliseconds.

    scan.topic-partition-discovery.interval

    The time interval for dynamically detecting Kafka topics and partitions.

    Duration

    No

    5 minutes

    The default partition discovery interval is 5 minutes. To disable this feature, you must explicitly set this option to a non-positive value. After the dynamic partition discovery feature is enabled, the Kafka source can automatically discover new partitions and read data from the partitions. In topic-pattern mode, the Kafka source reads data from new partitions of existing topics and data from all partitions of new topics that match the regular expression.

    Note

    In Realtime Compute for Apache Flink that uses VVR 6.0.X, the dynamic partition discovery feature is disabled by default. In Realtime Compute for Apache Flink that uses VVR 8.0 or later, this feature is enabled by default. The default partition discovery interval is 5 minutes.

    scan.header-filter

    Kafka data filtering based on whether the data contains a specific message header.

    String

    No

    None

    Separate a header key and the value with a colon (:). Separate multiple headers with logical operators such as AND (&) or OR (|). The logical operator NOT (!) is supported. For example, depart:toy|depart:book&!env:test indicates that the Kafka data whose header contains depart=toy or depart=book and does not contain env=test is retained.

    Note
    • This option is supported only for Realtime Compute for Apache Flink that uses VVR 8.0.6 or later.

    • Parenthesis operations are not supported.

    • Logical operations are performed from left to right in sequence.

    • The header value in the UTF-8 format is converted into a string and compared with the header value specified by the scan.header-filter option.

    scan.check.duplicated.group.id

    Specifies whether to check for duplicate consumer groups specified by the properties.group.id parameter.

    Boolean

    No

    false

    Valid values:

    • true: Checks duplicate consumer groups before a job starts. If duplicate consumer groups exist, reports an error and suspends the job to prevent conflicts.

    • false: Does not check duplicate consumer groups before a job starts.

    Note

    This option is supported only for Realtime Compute for Apache Flink that uses VVR 6.0.4 or later.

  • Sink-specific

    Parameters

    Description

    Data type

    Required

    Default value

    Remarks

    topic

    The name of the topic to which data is written.

    String

    Yes

    None

    N/A

    sink.partitioner

    The pattern for mapping Flink concurrency to Kafka partitions.

    String

    No

    default

    Valid values:

    • default (default): Uses the default Kafka partitioner to partition data.

    • fixed: Each Flink partition corresponds to a fixed Kafka partition.

    • round-robin: Data in a Flink partition is distributed to the Kafka partitions in round-robin sequence.

    • Custom partition mapping pattern: You can create a subclass of FlinkKafkaPartitioner to configure a custom partition mapping pattern, such as org.mycompany.MyPartitioner.

    sink.delivery-guarantee

    Semantic pattern for the Kafka sink table.

    String

    No

    at-least-once

    Valid values:

    • none: The delivery semantics is not ensured. Data may be lost or duplicated.

    • at-least-once (default): Ensures that data is not lost. However, data may be duplicated.

    • exactly-once: Kafka transactions are used to ensure the exactly-once semantics. This ensures that data is not lost or duplicated.

    Note

    You must configure the sink.transactional-id-prefix option if you set this option to exactly-once.

    sink.transactional-id-prefix

    The prefix of the Kafka transaction ID that is used in the exactly-once semantics.

    String

    No

    None

    This option takes effect only when the sink.delivery-guarantee option is set to exactly-once.

    sink.parallelism

    The parallelism of operators for the Kafka sink table.

    Integer

    No

    None

    The parallelism of upstream operators, which is determined by the framework.

Security and authentication

If your Kafka cluster requires a secure connection or authentication, add the properties. prefix to the names of the security and authentication options and configure them in the WITH clause. The following sample code shows how to configure a Kafka table to use PLAIN as the Simple Authentication and Security Layer (SASL) mechanism and provide Java Authentication and Authorization Service (JAAS) configurations.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

The following sample code shows how to configure a Kafka table to use SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /*Configure Secure Sockets Layer (SSL).*/
  /*Specify the path of the CA certificate truststore provided by the server.*/
  /*Uploaded artifacts are stored in /flink/usrlib/.*/
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /*Specify the path of the private key file keystore if client authentication is required.*/
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /*The algorithm used by the client to verify the server address. A null value indicates that server address verification is disabled.*/
  'properties.ssl.endpoint.identification.algorithm' = '',
  /*Configure SASL.*/
  /*Configure SCRAM-SHA-256 as the SASL mechanism.*/
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /*Configure JAAS.*/
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

You can use the Artifacts feature in the development console to upload the CA certificate and private key from the example. After the upload, the file is stored in the /flink/usrlib directory. If the CA certificate file that you want to use is named my-truststore.jks, you can set the 'properties.ssl.truststore.location' parameter in the WITH clause in the following two ways to use this certificate:

  • If you set 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks', Flink does not need to dynamically download OSS files during runtime, but debug mode is not supported.

  • For real-time computing engine versions Ververica Runtime (VVR) 11.5 and later, you can configure properties.ssl.truststore.location and properties.ssl.keystore.location with an OSS absolute path. The file path format is oss://flink-fullymanaged-<workspace ID>/artifacts/namespaces/<project name>/<file name>. This method dynamically downloads the OSS files during Flink runtime and supports debug mode.

Note
  • Configuration confirmation: The preceding code snippets apply to most configuration scenarios. Before you configure the Kafka connector, contact the Kafka server O&M personnel to obtain the correct security and authentication configuration information.

  • Escape note: Unlike Apache Flink, the SQL editor of Realtime Compute for Apache Flink escapes double quotation marks (") by default. Therefore, you do not need to add backslashes (\) as escape characters to the double quotation marks (") that are used to enclose the username and password when you configure the properties.sasl.jaas.config option.

Start offset for a Kafka source table

Startup mode

You can configure the scan.startup.mode parameter to specify the initial read offset for a Kafka source table:

  • earliest-offset: Reads data from the earliest offset of the current partition.

  • latest-offset: Reads data from the latest offset of the current partition.

  • group-offsets: Reads data from the offset that is committed by the consumer group that has the ID specified by the properties.group.id option.

  • timestamp: Reads data from the first message whose timestamp is greater than or equal to the timestamp specified by scan.startup.timestamp-millis.

  • specific-offsets: Reads data from the partition offset that is specified by the scan.startup.specific-offsets option.

Note
  • If you do not specify the start offset, group-offsets is used by default.

  • scan.startup.mode takes effect only for stateless jobs. For stateful jobs, consumption starts from the offset stored in the state.

Sample code:

CREATE TEMPORARY TABLE kafka_source (
  ...
) WITH (
  'connector' = 'kafka',
  ...
  -- Consume data from the earliest offset.
  'scan.startup.mode' = 'earliest-offset',
  -- Consume data from the latest offset.
  'scan.startup.mode' = 'latest-offset',
  -- Consume data from the offset that is committed by the consumer group my-group.
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest', -- If my-group is used for the first time, the consumption starts from the earliest offset.
  'properties.auto.offset.reset' = 'latest', -- If my-group is used for the first time, the consumption starts from the latest offset.
  -- Consume data from the timestamp 1655395200000, in milliseconds.
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1655395200000',
  -- Consume data from the specified offset.
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);

Priority of start offsets

The priority order for the starting offset of the source table is as follows:

Priority (highest to lowest)

The offset stored in a checkpoint or savepoint.

The start time specified in the console.

The start offset specified by the scan.startup.mode parameter in the WITH parameters.

If scan.startup.mode is not specified, the system uses group-offsets and consumes data from the offset of the corresponding consumer group.

If an offset becomes invalid in any of the preceding steps due to expiration or an issue in the Kafka cluster, the reset policy specified by properties.auto.offset.reset is used. If this configuration item is not set, an exception is thrown that requires user intervention.

In most cases, the Kafka source table starts to read data from the offset that is committed by a consumer group that has a new group ID. When the Kafka source table queries the offset that is committed by the consumer group in the Kafka cluster, no valid offset is returned because the group ID is used for the first time. In this case, the reset strategy that is configured by the properties.auto.offset.reset parameter is used to reset the offset. Therefore, you must configure the properties.auto.offset.reset parameter to specify the offset reset strategy.

Source Table Offset Submission

The Kafka source table commits a consumer offset to the Kafka cluster only after the checkpointing operation succeeds. If the checkpoint interval that you specify is excessively large, the consumer offset is committed with a delay to the Kafka cluster. During the checkpointing operation, the Kafka source table stores the current data reading progress in the state backend. The offset that is committed to the Kafka cluster is not used for fault recovery. The committed offset is used only to monitor the data reading progress in Kafka. Data accuracy is not affected even if the offset fails to be committed.

Custom partitioner for a sink table

If the built-in Kafka producer partitioner does not meet your requirements, you can implement a custom partitioner to write data to specific partitions. A custom partitioner must inherit FlinkKafkaPartitioner. After development, compile the JAR package and use the File Management feature to upload it to the Real-time Computing console. After the JAR package is uploaded and referenced, set the sink.partitioner parameter in the WITH clause. The parameter value must be the full class path of the partitioner, such as org.mycompany.MyPartitioner.

Comparison among Kafka, Upsert Kafka, and Kafka JSON catalogs

Kafka is a message queue system that supports only data insertion and does not support updates or deletions. Therefore, Kafka cannot process Change Data Capture (CDC) data from upstream systems or retraction logic from operators such as aggregate and join during streaming SQL computation. If you want to write data containing change data or retraction data to Kafka, use an Upsert Kafka sink table, which performs special processing on change data.

If you want to synchronize change data from one or more data tables in an upstream database to Kafka in batches, you can use a Kafka JSON catalog. If data stored in Kafka is in JSON format, you can use a Kafka JSON catalog. This eliminates the need to configure the schema and options in the WITH clause. For more information, see Manage Kafka JSON catalogs.

Examples

Example 1: Read data from a Kafka topic and write the data into another Kafka topic

The following code sample reads data from the source Kafka topic and writes it to the sink Kafka topic. The data is in CSV format.

CREATE TEMPORARY TABLE kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

Example 2: Synchronize table schema and data

Use the Kafka connector to synchronize messages from a Kafka topic to Hologres in real time. By configuring the offset and partition ID of Kafka messages as primary keys, you avoid duplicate messages in Hologres in case of a failover.

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'
    -- Optional. Expand all nested columns. 
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

Example 3: Synchronize the table schema and data in the key and value columns of Kafka messages

The key fields of Kafka messages store relevant information. You can synchronize data in both the key and value columns of Kafka messages simultaneously.

CREATE TEMPORARY TABLE kafkaTable (
  `key_id` INT NOT NULL,
  `val_name` VARCHAR(200)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_',
  'value.fields-prefix' = 'val_',
  'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
Note

Kafka message keys do not support schema evolution and type parsing. Manual declaration is required.

Example 4: Synchronize table schema and data and perform computation

When you synchronize data from Kafka to Hologres, lightweight calculation is required.

CREATE TEMPORARY TABLE kafkaTable (
  `distinct_id` INT NOT NULL,
  `properties` STRING,
  `timestamp` TIMESTAMP_LTZ METADATA,
  `date` AS CAST(`timestamp` AS DATE)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
   'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
  `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- Use COALESCE to handle null values.

Example 5: Parse nested JSON data

Sample JSON message

{
  "id": 101,
  "name": "VVP",
  "properties": {
    "owner": "Alibaba Cloud",
    "engine": "Flink"
  }
}

To avoid having to use functions such as JSON_VALUE(payload, '$.properties.owner') to parse fields, you can directly define the structure in the Source DDL:

CREATE TEMPORARY TABLE kafka_source (
  id          VARCHAR,
  `name`      VARCHAR,
  properties  ROW<`owner` STRING, engine STRING>
) WITH (
  'connector' = 'kafka',
  'topic' = 'xxx',
  'properties.bootstrap.servers' = 'xxx',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

As a result, Flink parses JSON into structured fields during the read phase, and subsequent SQL queries directly use properties.owner, without requiring additional function calls, improving overall performance.

DataStream API

Important

If you want to use the DataStream API to read or write data, you must use a DataStream connector of the related type to connect to Realtime Compute for Apache Flink. For more information about how to configure a DataStream connector, see How to use DataStream connectors.

  • Create a Kafka source

    The Kafka source provides a builder class for creating an instance of KafkaSource. The following sample code shows how to construct a Kafka source to consume messages from the earliest offset of the "input-topic" topic, with the consumer group named my-group, and deserialize the Kafka message body as a string.

    Java

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

    When you create a KafkaSource, you must specify the following parameters.

    Parameter

    Description

    BootstrapServers

    The addresses of Kafka brokers. You can call the setBootstrapServers(String) operation to configure the addresses.

    GroupId

    The ID of the consumer group. You can call the setGroupId(String) method to configure the ID.

    Topics or Partition

    The topics or names of the partitions to which you subscribe. You can configure a Kafka source to subscribe to topics or partitions using one of the following subscription patterns:

    • Topic list. After you configure a topic list, the Kafka source subscribes to all partitions of the specified topics.

      KafkaSource.builder().setTopics("topic-a","topic-b")
    • Topic pattern. After you specify a regular expression, the Kafka source subscribes to all partitions of the topics that match the specified regular expression.

      KafkaSource.builder().setTopicPattern("topic.*")
    • Partition list. After you configure a partition list, the Kafka source subscribes to the specified partitions.

      final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
              new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
              new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
      KafkaSource.builder().setPartitions(partitionSet)

    Deserializer

    A deserializer that deserializes Kafka messages.

    You can call the setDeserializer(KafkaRecordDeserializationSchema) method to specify a deserializer. The KafkaRecordDeserializationSchema interface defines how a ConsumerRecord object is deserialized. You can use one of the following methods to deserialize only the Value fields in the Kafka messages of the ConsumerRecord object:

    • A Kafka source provides the setValueOnlyDeserializer(DeserializationSchema) method. The DeserializationSchema class defines how a Kafka message that is stored as a binary value is deserialized.

    • Use the classes that implement the Deserializer interface of Kafka. For example, you can use the StringDeserializer class to deserialize a message into a string.

      import org.apache.kafka.common.serialization.StringDeserializer;
      
      KafkaSource.<String>builder()
              .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
    Note

    If you want to deserialize a ConsumerRecord object, you must create a class that implements the KafkaRecordDeserializationSchema interface.

    XML

    The Kafka DataStream connectors are stored in the Maven central repository.

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr-version}</version>
    </dependency>

    When you use a Kafka DataStream connector, you must familiarize yourself with the following Kafka properties:

    • Start offset

      You can use an offset initializer to specify an offset for a Kafka source when the Kafka source starts to read data. An offset initializer is an object that implements the OffsetsInitializer interface. The KafkaSource class provides the following built-in offset initializers.

      Offset initializer

      Code Settings

      Reads data from the earliest offset.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())

      Reads data from the latest offset.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())

      Start consuming data with a timestamp greater than or equal to the specified time (in milliseconds).

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))

      Consume from the offset committed by the consumer group. If no such offset exists, use the earliest offset.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

      Reads data from the committed offset of each partition and no reset strategy is specified.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())

      Note
      • If the built-in offset initializers do not meet your business requirements, you can create custom offset initializers.

      • If you do not specify an offset initializer, the OffsetsInitializer.earliest() offset initializer is used by default.

    • Streaming execution mode and batch execution mode

      A Kafka source can operate in streaming mode or batch mode. By default, a Kafka source operates in streaming mode. In this mode, the deployment continues to run until the deployment fails or is canceled. If you want a Kafka source to operate in batch mode, you can call the setBounded(OffsetsInitializer) method to specify a stop offset. When all partitions reach their stop offsets, the Kafka source exits.

      Note

      In most cases, a Kafka source that operates in streaming mode does not have a stop offset. If you want to debug a Kafka source that operates in streaming mode, you can call the setUnbounded(OffsetsInitializer) method to specify a stop offset. The methods that you can use to specify a stop offset vary based on whether you use the streaming mode or batch mode.

    • Dynamic partition discovery

      If you want a running deployment to process data from new topics and new partitions that match your subscription pattern without restarting the deployment, you can enable the dynamic partition discovery feature on the Kafka source. In the DataStream connector, this feature is disabled by default and must be manually enabled:

      KafkaSource.builder()
          .setProperty("partition.discovery.interval.ms", "10000") // Discover new partitions every 10 seconds.
      Important

      The dynamic partition discovery feature depends on the Kafka cluster's metadata update mechanism. If the Kafka cluster does not promptly update partition information, new partitions might not be discovered. Ensure that the Kafka cluster's partition.discovery.interval.ms configuration matches the actual situation.

    • Event time and watermarks

      By default, a Kafka source uses the timestamp attached to a record as the event time for the record. You can define a watermark strategy based on the event time of each record and send the watermarks to downstream services.

      env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

      For more information about how to define a custom watermark strategy, see Generating Watermarks.

      Note

      If some source subtasks remain idle for a prolonged period of time, such as a Kafka partition receiving no new messages or the source parallelism exceeding the Kafka partition count, watermark generation may fail. In this case, window computations cannot be triggered, and data processing will stop.

      The solutions are as follows:

      • Configure the watermark timeout mechanism: Enable the table.exec.source.idle-timeout parameter to force the system to generate watermarks after the specified timeout period, ensuring the progress of window computation epochs.

      • Optimize the data source: Set the source parallelism to be equal to or less than the number of Kafka partitions.

    • Consumer Offset Commit

      When a checkpoint is generated, a Kafka source commits the Kafka consumer offset of each partition to Kafka brokers. This ensures that the Kafka consumer offsets recorded on Kafka brokers are consistent with the state of the checkpoint. The Kafka consumer can automatically commit the offsets on each partition to Kafka brokers on a regular basis. You can configure the automatic offset commit feature using the enable.auto.commit and auto.commit.interval.ms options. If you disable the checkpointing feature, a Kafka source relies on the Kafka consumer to commit the offsets to Kafka brokers.

      Note

      Kafka sources do not use the committed offsets recorded on Kafka brokers for fault tolerance. When you commit offsets, Kafka brokers can monitor the progress of record consumption on each partition.

    • Additional properties

      You can call the setProperties(Properties) and setProperty(String, String) methods to configure additional properties for the Kafka source and Kafka consumer. The following table describes the properties of a Kafka source.

      Configuration item

      Description

      client.id.prefix

      Specifies the prefix for the client ID of the Kafka consumer.

      partition.discovery.interval.ms

      Specifies the time interval at which the Kafka source checks for new partitions.

      Note

      The partition.discovery.interval.ms property is overridden to -1 in batch mode.

      register.consumer.metrics

      Specifies whether to register metrics for the Kafka consumer in Realtime Compute for Apache Flink.

      Other Kafka Consumer configurations

      For more information about the properties of a Kafka consumer, see Apache Kafka.

      Important

      The Kafka DataStream connector overwrites the values of the following properties:

      • key.deserializer: The value of this property is set to ByteArrayDeserializer.

      • value.deserializer: The value of this property is set to ByteArrayDeserializer.

      • auto.offset.reset.strategy: The value of this property is set to OffsetsInitializer#getAutoOffsetResetStrategy().

      The following sample code shows how the Kafka consumer connects to the Kafka cluster using a JAAS configuration and the SASL/PLAIN authentication mechanism.

      KafkaSource.builder()
          .setProperty("sasl.mechanism", "PLAIN")
          .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
    • Monitoring

      Kafka sources register metrics in Realtime Compute for Apache Flink for monitoring and diagnosis.

      • Metric scope

        All metrics of a Kafka source are registered under the KafkaSourceReader metric group. KafkaSourceReader is a subgroup of the operator metric group. The metrics for a specific partition are registered in the KafkaSourceReader.topic.<topic_name>.partition.<partition_id> metric group.

        For example, a topic is named my-topic and the partition of the topic is named 1. The consumer offset of the partition is reported by the <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset metric. The number of successful commits of consumer offsets is measured by the <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded metric.

      • Metrics

        Metric

        Description

        Scope

        currentOffset

        Current Consumer Offset

        TopicPartition

        committedOffset

        Current commit offset

        TopicPartition

        commitsSucceeded

        Number of successful submissions

        KafkaSourceReader

        commitsFailed

        Failed Submission Count

        KafkaSourceReader

      • Kafka Consumer metrics

        The metrics for a Kafka consumer are registered in the KafkaSourceReader.KafkaConsumer metric group. For example, the records-consumed-total metric is registered at <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.

        You can configure the register.consumer.metrics option to specify whether to register metrics for the Kafka consumer. By default, the register.consumer.metrics option is set to true. For more information about the metrics for a Kafka consumer, see Apache Kafka.

  • Create a Kafka sink

    A Kafka sink can write data from multiple streams to one or more Kafka topics.

    DataStream<String> stream = ...
    
    
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", );
    KafkaSink<String> kafkaSink =
                    KafkaSink.<String>builder()
                            .setKafkaProducerConfig(kafkaProperties) // // producer config
                            .setRecordSerializer(
                                    KafkaRecordSerializationSchema.builder()
                                            .setTopic("my-topic") // target topic
                                            .setKafkaValueSerializer(StringSerializer.class) // serialization schema
                                            .build())
                            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // fault-tolerance
                            .build();
    
    stream.sinkTo(kafkaSink);

    You must configure the following parameters.

    Parameter

    Description

    Topic

    The name of the topic to which data is written.

    Data serialization

    When you build a Kafka sink, you must provide a KafkaRecordSerializationSchema to convert input data into a Kafka ProducerRecord object. Flink provides a schema builder that offers common components, such as message key and value serialization, topic selection, and message partitioning. You can also implement the corresponding interfaces for more advanced control. A Kafka sink calls the ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) method for each incoming record to generate a ProducerRecord object that represents the serialized record. Then, the Kafka sink writes the ProducerRecord object to the required topic.

    You can control how each record is written to Kafka in detail. Using ProducerRecord, you can perform the following actions:

    • Set the name of the target topic.

    • Define the message key.

    • Specify the target partition.

    Kafka client properties

    The bootstrap.servers property is required. Specify a comma-separated list of Kafka broker addresses.

    Fault tolerance semantics

    After you enable the checkpointing feature, a Kafka sink can ensure exactly-once delivery. You can also configure the DeliveryGuarantee parameter to specify different fault tolerance semantics. Details about the DeliveryGuarantee parameter are as follows:

    • DeliveryGuarantee.NONE: No delivery guarantees are provided by Flink. Data may be lost or duplicated.

    • DeliveryGuarantee.AT_LEAST_ONCE: The Kafka sink ensures that data is not lost. However, data may be duplicated.

    • DeliveryGuarantee.EXACTLY_ONCE: The Kafka sink ensures that data is not lost or duplicated. The Kafka transaction mechanism is used to ensure exactly-once delivery.

      Note

      For more information about exactly-once semantics, see Usage notes of Semantic.EXACTLY_ONCE.

Data ingestion

You can use the Kafka connector in YAML-based data ingestion jobs as a source or a sink.

Limits

  • We recommend that you use Kafka as a synchronous data source for Flink CDC data ingestion in Realtime Compute for Apache Flink that uses VVR 11.1 or later.

  • Only the JSON, Debezium JSON, and Canal JSON formats are supported. Other data formats are not supported.

  • For a source, data in the same table can be distributed across multiple partitions only in Realtime Compute for Apache Flink that uses VVR 8.0.11 or later.

Syntax

source:
  type: kafka
  name: Kafka source
  properties.bootstrap.servers: localhost:9092
  topic: ${kafka.topic}
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: localhost:9092

Configuration options

  • General

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    type

    The type of the source or sink.

    Yes

    String

    None

    Set the value to kafka.

    name

    The name of the source or sink.

    No

    String

    No default value.

    N/A

    properties.bootstrap.servers

    The IP address and port number of a Kafka broker.

    Yes

    String

    None

    Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).

    properties.*

    The options that are configured for the Kafka client.

    No

    String

    None

    The suffix must be a producer or consumer configuration defined in the official Kafka documentation.

    Flink removes the properties. prefix and passes the transformed key and values to the Kafka client. For example, you can set properties.allow.auto.create.topics to false to disable automatic topic creation.

    key.format

    The format used to read or write the key field of Kafka messages.

    No

    String

    None

    • For the source, only json is supported.

    • For the sink, valid values are:

      • csv

      • json

    Note

    This option is supported only for Realtime Compute for Apache Flink that uses VVR 11.0.0 or later.

    value.format

    The format used to read or write the value field of Kafka messages.

    No

    String

    debezium-json

    Valid values:

    • debezium-json

    • canal-json

    • json

    Note
    • The debezium-json and canal-json formats are supported only for Realtime Compute for Apache Flink that uses VVR 8.0.10 or later.

    • The json format is supported only for Realtime Compute for Apache Flink that uses VVR 11.0.0 or later.

  • source table

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    topic

    The name of the topic from which you want to read data.

    No

    String

    No default value.

    Separate multiple topic names with semicolons (;), such as topic-1 and topic-2.

    Note

    You cannot use the topic option together with the topic-pattern option.

    topic-pattern

    The regular expression that is used to match topics. Data of all topics whose names match the specified regular expression is read when a job is running.

    No

    String

    No default value.

    Note

    You cannot use the topic option together with the topic-pattern option.

    properties.group.id

    The consumer group ID.

    No

    String

    None

    If the specified group ID is used for the first time, you must set properties.auto.offset.reset to earliest or latest to specify the initial start offset.

    scan.startup.mode

    The start offset for Kafka to read data.

    No

    String

    group-offsets

    Valid values:

    • earliest-offset: Reads data from the earliest partition.

    • latest-offset: Reads data from the latest offset.

    • group-offsets (default): Starts reading from the committed offsets for the group specified in properties.group.id.

    • timestamp: Reads from the timestamp specified by scan.startup.timestamp-millis.

    • specific-offsets: Read from the offset specified by scan.startup.specific-offsets.

    Note

    This option takes effect when the job is started without states. When the job is restarted from a checkpoint or resumes from the specified state, the job preferentially starts to read data at the progress that is saved in the state data.

    scan.startup.specific-offsets

    The start offset of each partition when the scan.startup.mode option is set to specific-offsets.

    No

    String

    None

    For example partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    The timestamp of the start offset when the scan.startup.mode option is set to timestamp.

    No

    Long

    None

    Unit: milliseconds.

    scan.topic-partition-discovery.interval

    The time interval for dynamically detecting Kafka topics and partitions.

    No

    Duration

    5 minutes

    The default partition discovery interval is 5 minutes. To disable this feature, you must explicitly set this option to a non-positive value. After the dynamic partition discovery feature is enabled, the Kafka source can automatically discover new partitions and read data from the partitions. In topic-pattern mode, the Kafka source reads data from new partitions of existing topics and data from all partitions of new topics that match the regular expression.

    scan.check.duplicated.group.id

    Whether to check for duplicate consumer groups specified by properties.group.id.

    No

    Boolean

    false

    Valid values:

    • true: Checks duplicate consumer groups before a job starts. If duplicate consumer groups exist, reports an error and suspends the job to prevent conflicts.

    • false: Does not check duplicate consumer groups before a job starts.

    schema.inference.strategy

    The schema inference strategy.

    No

    String

    continuous

    Valid values:

    • continuous: Infers the schema for each record. If schemas are incompatible, infers a wider schema and generates schema change events.

    • static: Performs schema inference only once when the job starts. Subsequent records are parsed based on the initial schema. Schema change events are not generated.

    Note

    scan.max.pre.fetch.records

    The maximum number of messages that the system attempts to consume and parse in a partition during initial schema inference.

    No

    Int

    50

    Before a job reads and processes data, the system attempts to consume a specific number of the latest messages in advance in a partition to initialize the schema information.

    key.fields-prefix

    The prefix added to fields parsed from the key field in a Kafka message. Configure this option to prevent naming conflicts after the key field in the Kafka message is parsed.

    No

    String

    None

    For example, if this option is set to key_, and the key field contains a field named a, then the field name after parsing is key_a.

    Note

    The value of the key.fields-prefix option cannot be a prefix of the value.fields-prefix option.

    value.fields-prefix

    The prefix added to fields parsed from the value field in a Kafka message. You can configure this option to prevent naming conflicts after value fields in the Kafka message are parsed.

    No

    String

    None

    For example, if this option is set to value_, and the value field contains a field named b, then the field name after parsing is value_b.

    Note

    The value of the value.fields-prefix option cannot be a prefix of the key.fields-prefix option.

    metadata.list

    The metadata columns to pass to downstream storage.

    No

    String

    None

    Available metadata columns include topic, partition, offset, timestamp, timestamp-type, headers, leader-epoch, __raw_key__, and __raw_value__, separated by commas.

    scan.value.initial-schemas.ddls

    Specify the initial schema for certain tables using DDL statements.

    No

    String

    None

    Multiple DDL statements are connected by an English semicolon (;). For example, use CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT); to specify the initial schema for tables db1.t1 and db1.t2.

    The DDL table structure here must match the target table and comply with Flink SQL syntax rules.

    Note

    VVR version 11.5 and later supports this configuration.

    ingestion.ignore-errors

    Specifies whether to ignore errors during data parsing.

    No

    Boolean

    false

    Note

    This configuration is supported in VVR 11.5 and later versions.

    ingestion.error-tolerance.max-count

    The number of parsing errors after which the job fails, if errors are ignored during data parsing.

    No

    Integer

    -1

    This option takes effect only when ingestion.ignore-errors is enabled. The default value -1 means that parsing exceptions do not trigger job failure.

    Note

    Ververica Runtime (VVR) version 11.5 or later supports this configuration.

    • Source table in Debezium JSON format

      Parameters

      Required

      Data type

      Default value

      Description

      debezium-json.distributed-tables

      No

      Boolean

      false

      If the data of a single table in Debezium JSON appears in multiple partitions, you must enable this option.

      Note

      This option is supported only for Realtime Compute for Apache Flink that uses VVR 8.0.11 or later.

      Important

      After you configure this option, you must start the deployment without states.

      debezium-json.schema-include

      No

      Boolean

      false

      When configuring Debezium Kafka Connect, you can enable the Kafka configuration value.converter.schemas.enable to include schema information in messages. This option specifies whether the Debezium JSON message includes schema information.

      Valid values:

      • true: The Debezium JSON message includes schema information.

      • false: The Debezium JSON message does not include schema information.

      debezium-json.ignore-parse-errors

      No

      Boolean

      false

      Valid values:

      • true: Skips the current row if a parsing exception occurs.

      • false (default): Returns an error and the deployment fails to start.

      debezium-json.infer-schema.primitive-as-string

      No

      Boolean

      false

      Specifies whether to interpret all data types as STRING when parsing the table schema.

      Valid values:

      • true: Interprets all basic types as STRING.

      • When false (the default), parsing follows basic rules.

    • Canal JSON Format for Source Tables

      parameter

      Required

      Data type

      Default value

      Description

      canal-json.distributed-tables

      No

      Boolean

      false

      If the data of a single table in Canal JSON appears in multiple partitions, you must enable this option.

      Note

      This option is supported only for Realtime Compute for Apache Flink that uses VVR 8.0.11 or later.

      Important

      After you configure this option, you must start the deployment without states.

      canal-json.database.include

      No

      String

      None

      An optional regular expression that matches the database metadata field in Canal records. Only the changelogs of the specified database are read. The regular expression string is compatible with Java's Pattern.

      canal-json.table.include

      No

      String

      None

      An optional regular expression that matches the table metadata field in Canal records. Only changelog records of the specified table are read. Regular expressions are compatible with Java's Pattern.

      canal-json.ignore-parse-errors

      No

      Boolean

      false

      Valid values:

      • true: Skips the current row if a parsing exception occurs.

      • false (default): Returns an error and the deployment fails to start.

      canal-json.infer-schema.primitive-as-string

      No

      Boolean

      false

      Specifies whether to interpret all data types as STRING when parsing the table schema.

      Valid values:

      • true: Interprets all basic types as STRING.

      • false (default): The parser follows basic rules.

      canal-json.infer-schema.strategy

      No

      String

      AUTO

      The schema inference strategy.

      Valid values:

      • AUTO (default): Automatically infers the schema by parsing JSON data. Use AUTO if your data does not contain sqlType fields to avoid parsing failures.

      • SQL_TYPE: Infers the schema using the sqlType array in Canal JSON data. If your data contains sqlType fields, we recommend setting canal-json.infer-schema.strategy to SQL_TYPE for more accurate type inference.

      • MYSQL_TYPE: Infers the schema using the mysqlType array in Canal JSON data.

      If your Canal JSON data in Kafka contains sqlType fields and you need more accurate type mapping, set canal-json.infer-schema.strategy to SQL_TYPE.

      For more information about sqlType mapping rules, see Canal JSON schema parsing.

      Note
      • Ververica Runtime (VVR) 11.1 and later versions support this configuration.

      • MYSQL_TYPE is supported only for Realtime Compute for Apache Flink that uses VVR 11.3 or later.

      canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled

      No

      Boolean

      true

      Specifies whether to map MySQL TIMESTAMP to CDC TIMESTAMP:

      • true (default): Maps MySQL TIMESTAMP to CDC TIMESTAMP.

      • false: Maps MySQL TIMESTAMP to CDC TIMESTAMP_LTZ.

      canal-json.mysql.treat-tinyint1-as-boolean.enabled

      No

      Boolean

      true

      When MYSQL_TYPE is used for parsing, specifies whether to map MySQL TINYINT(1) to CDC BOOLEAN:

      • true (default): Maps MySQL TINYINT(1) to CDC BOOLEAN.

      • false: Maps MySQL TINYINT(1) to CDC TINYINT(1).

      This option takes effect only when canal-json.infer-schema.strategy is set to MYSQL_TYPE.

    • Source table JSON format

      Parameter

      Required

      Data type

      Default value

      Description

      json.timestamp-format.standard

      No

      String

      SQL

      Specifies the input and output timestamp format. Valid values:

      • SQL: Parses input timestamps in the yyyy-MM-dd HH:mm:ss.s{precision} format, such as 2020-12-30 12:13:14.123.

      • ISO-8601: Parses input timestamps in the yyyy-MM-ddTHH:mm:ss.s{precision} format, such as 2020-12-30T12:13:14.123.

      json.ignore-parse-errors

      No

      Boolean

      false

      Valid values:

      • true: Skips the current row if a parsing exception occurs.

      • false (default): Returns an error and the deployment fails to start.

      json.infer-schema.primitive-as-string

      No

      Boolean

      false

      Specifies whether to interpret all data types as STRING when parsing the table schema.

      Valid values:

      • true: Interprets all basic types as STRING.

      • false (default): Parses according to the basic rules.

      json.infer-schema.flatten-nested-columns.enable

      No

      Boolean

      false

      When parsing JSON-formatted data, you must specify whether to recursively expand nested columns. Valid values for this parameter are as follows:

      • true: Recursively expands nested columns.

      • false (default): Treats nested types as STRING.

      json.decode.parser-table-id.fields

      No

      String

      None

      Specifies whether to generate the tableId based on values of specific JSON fields, with multiple fields separated by a comma ,. For example, if the JSON data is {"col0":"a", "col1","b", "col2","c"}, the generated result is as follows:

      Configuration

      tableId

      col0

      a

      col0,col1

      a.b

      col0,col1,col2

      a.b.c

      json.infer-schema.fixed-types

      No

      String

      No default value.

      When parsing JSON-formatted data, specify the exact data type for certain fields. Separate multiple fields with English commas (,). For example, id BIGINT, name VARCHAR(10) specifies the id field in the JSON data as BIGINT and the name field as VARCHAR(10).

      When using this configuration, you must also add the scan.max.pre.fetch.records: 0 configuration.

      Note

      This option is supported only for Realtime Compute for Apache Flink that uses VVR 11.5 or later.

  • Sink-specific

    Parameters

    Description

    Required

    Data type

    Default value

    Remarks

    type

    The type of the sink.

    Yes

    String

    None

    Set the value to Kafka.

    name

    The name of the sink.

    No

    String

    None

    N/A

    topic

    The name of the Kafka topic.

    No

    String

    None

    If this option is enabled, all data is written to this topic.

    Note

    If this option is not enabled, each data record is written to the topic whose name derives from its table ID string (a concatenation of a database name and a table name, separated by a period (.)), such as databaseName.tableName.

    partition.strategy

    The Kafka partitioning strategy.

    No

    String

    all-to-zero

    Valid values:

    • all-to-zero (default): Writes all data to partition 0.

    • hash-by-key: Writes data to partitions based on the hash value of the primary key. This ensures that data with the same primary key is in the same partition and ordered.

    sink.tableId-to-topic.mapping

    The mapping between upstream table names and downstream Kafka topic names.

    No

    String

    None

    Each mapping relationship is separated by ;. An ancestor table name and its corresponding downstream Kafka topic name are separated by :. You can use regular expressions for table names, and you can concatenate multiple tables that map to the same topic using ,. For example: mydb.mytable1:topic1;mydb.mytable2:topic2.

    Note

    Configuring this parameter lets you modify the mapped topic while retaining the original table name information.

    • Sink table in Debezium JSON format

      Parameters

      Required

      Data type

      Default value

      Description

      debezium-json.include-schema.enabled

      No

      Boolean

      false

      Specifies whether to include schema information in Debezium JSON data.

Examples

  • Ingest data from Kafka:

    source:
      type: kafka
      name: Kafka source
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      topic: ${kafka.topic}
      value.format: ${value.format}
      scan.startup.mode: ${scan.startup.mode}
     
    sink:
      type: hologres
      name: Hologres sink
      endpoint: <yourEndpoint>
      dbname: <yourDbname>
      username: ${secret_values.ak_id}
      password: ${secret_values.ak_secret}
      sink.type-normalize-strategy: BROADEN
  • Ingest data into Kafka:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${kafka.topic}

    In the route section, specify the name of the destination Kafka topic.

Note

By default, the automatic topic creation feature is disabled for Alibaba Cloud Kafka. For more information, see FAQ about automatic topic creation. When you write data to Alibaba Cloud Kafka, you must create the corresponding topic in advance. For more information, see Step 3: Create resources.

Policies for schema parsing and evolution

The Kafka connector maintains the schemas of all known tables.

Schema initialization

Schema information includes field and data type information, database and table information, and primary key information. The following describes how these three types of information are initialized:

  • Field and data type information

Data ingestion jobs can infer field and data type information from data automatically. However, in some scenarios, you may want to specify field and type information for certain tables. Based on the granularity of user-specified field types, schema initialization supports the following three strategies:

  1. Schema inferred entirely by the system

Before Kafka messages are read, the Kafka connector attempts to consume messages in each partition, parses the schema of each data record, and then merges the schemas to initialize the table schema information. The number of messages that can be consumed is no more than the value of the scan.max.pre.fetch.records option. Before data is consumed, a table creation event is generated based on the initialized schema.

Note

For the Debezium JSON and Canal JSON formats, table information is included in specific messages. The number of messages to pre-consume is specified by the scan.max.pre.fetch.records parameter. These pre-consumed messages may contain data from multiple tables. Therefore, the number of pre-consumed data records for each table cannot be determined. Partition message pre-consumption and table schema initialization are performed only once before the actual consumption and processing of messages for each partition. If subsequent table data exists, the table schema parsed from the first data record of the table is used as the initial table schema. In this case, partition message pre-consumption and table schema initialization will not be performed again.

Important

Data in a single table can be distributed across multiple partitions only in Ververica Runtime (VVR) 8.0.11 or later. In this scenario, you must set the debezium-json.distributed-tables or canal-json.distributed-tables option to true.

  1. Specify the initial schema

In some scenarios, you may want to specify an initial table schema yourself, for example, when you write data from Kafka to a pre-created descendant table. To do this, you can add the scan.value.initial-schemas.ddls parameter. The following code shows a configuration example:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Set the initial schema.
  scan.value.initial-schemas.ddls: CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);

The CREATE TABLE statement must match the target table's schema. Here, the initial type of the id field in table db1.t1 is set to BIGINT, and the initial type of the name field is set to VARCHAR(10). Similarly, the initial type of the id field in table db1.t2 is set to BIGINT.

The CREATE TABLE statement uses Flink SQL syntax.

  1. Fix field types

In some scenarios, you may want to fix the data type of specific fields—for example, for certain fields that might be inferred as the TIMESTAMP type, you want them to be delivered as strings. In this case, you can specify the initial table schema by adding the json.infer-schema.fixed-types parameter (only valid when the message format is JSON). Sample configuration:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Fix specific fields to static types.
  json.infer-schema.fixed-types: id BIGINT, name VARCHAR(10)
  scan.max.pre.fetch.records: 0

This fixes the type of all id fields to BIGINT and all name fields to VARCHAR(10).

The types here match Flink SQL data types.

  • Database and table information

    • For Canal JSON and Debezium JSON formats, database and table names are parsed from individual messages.

    • By default, for messages in the JSON format, table information contains only the table name—the name of the topic containing the data. If your data includes database and table information, you can use the json.infer-schema.fixed-types parameter to specify the fields that contain this information. We map these fields to the database name and table name. The following code shows a configuration example:

      source:
        type: kafka
        name: Kafka Source
        properties.bootstrap.servers: host:9092
        topic: test-topic
        value.format: json
        scan.startup.mode: earliest-offset
        # Use the value of the col1 field as the database name and the value of the col2 field as the table name.
        json.decode.parser-table-id.fields: col1,col2

      This writes each record to a table whose database name is the value of the col1 field and whose table name is the value of the col2 field.

  • Primary keys

    • For Canal JSON format, the primary key of the table is defined based on the pkNames field in JSON.

    • For Debezium JSON and JSON formats, JSON does not contain primary key information. You can manually add a primary key to a table using a transform rule:

      transform:
        - source-table: \.*\.\.*
          projection: \*
          primary-keys: key1, key2

Schema parsing and schema evolution

After initial schema synchronization is complete, if the schema.inference.strategy is set to static, the Kafka connector parses the value of each message based on the initial table schema and does not generate a schema change event. If the schema.inference.strategy is set to continuous, the Kafka connector parses the value part of each Kafka message into physical columns and compares the columns with the currently maintained schema. If the parsed schema is inconsistent with the current schema, the Kafka connector attempts to merge the schemas and generates corresponding table schema change events. The merging rules are as follows:

  • If the parsed physical columns contain fields that are not in the current schema, the Kafka connector adds those fields to the schema and generates nullable column addition events.

  • If the parsed physical columns do not contain fields that already exist in the current schema, those fields are retained and their values are filled with NULL. No column deletion event is generated.

  • If the parsed physical columns and the current schema contain columns with the same name, handle them as follows:

    • If the data types are the same but the precision differs, use the higher precision type and generate a column type change event.

    • If the data types differ, find the smallest parent node in the tree structure as the type for the column with the same name and generate a column type change event.

      image

  • Supported schema evolution options:

    • Adding a column: Adds the new column to the end of the current schema and synchronizes the new column's data. The new column is set to nullable.

    • Dropping a column: Does not generate a column deletion event. Instead, subsequent data for that column is automatically filled with NULL values.

    • Renaming a column: Treated as adding a column and dropping a column. Adds the renamed column to the end of the schema and fills the original column's data with NULL values.

    • Changing the data type of a column:

      • If the downstream system supports column type changes, the data ingestion job supports changing the type of ordinary columns after the downstream sink supports handling column type changes—for example, changing from INT to BIGINT. Such changes depend on the column type change rules supported by the downstream sink. Different sink tables support different column type change rules. See the documentation for the related sink table to learn about its supported column type change rules.

      • For downstream systems that do not support column type changes, such as Hologres, you can use wide type mapping. This method creates a table with more general data types in the downstream system when the job starts. When a column type change occurs, the system determines whether the downstream sink can accept the change, which enables tolerant support for column type changes.

  • Unsupported schema changes:

    • Changes to constraints, such as primary keys or indexes.

    • Changes from NOT NULL to NULLABLE.

  • Schema parsing for Canal JSON

    Canal JSON-formatted data may contain an optional sqlType field, which contains precise type information for data columns. To obtain a more accurate schema, you can set the canal-json.infer-schema.strategy configuration to SQL_TYPE to use the types from sqlType. The type mapping relationships are as follows:

    JDBC data type

    Type code

    CDC data type

    BIT

    -7

    BOOLEAN

    BOOLEAN

    16

    TINYINT

    -6

    TINYINT

    SMALLINT

    -5

    SMALLINT

    INTEGER

    4

    INT

    BIGINT

    -5

    BIGINT

    DECIMAL

    3

    DECIMAL(38,18)

    NUMERIC

    2

    REAL

    7

    FLOAT

    FLOAT

    6

    DOUBLE

    8

    DOUBLE

    BINARY

    -2

    BYTES

    VARBINARY

    -3

    LONGVARBINARY

    -4

    BLOB

    2004

    DATE

    91

    DATE

    TIME

    92

    TIME

    TIMESTAMP

    93

    TIMESTAMP

    CHAR

    1

    STRING

    VARCHAR

    12

    LONGVARCHAR

    -1

    Other data types

Dirty data tolerance and collection

In some cases, your Kafka data source may contain malformed data (dirty data). To prevent frequent job restarts due to such dirty data, you can configure the job to ignore these exceptions. Sample configuration:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Enable dirty data tolerance.
  ingestion.ignore-errors: true
  # Tolerate up to 1000 dirty data records.
  ingestion.error-tolerance.max-count: 1000

This configuration tolerates up to 1000 dirty data records, allowing your job to run normally when small amounts of dirty data exist. When the number of dirty data records exceeds this threshold, the job fails, prompting you to validate your data.

To ensure your job never fails due to dirty data, use the following configuration:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Enable dirty data tolerance.
  ingestion.ignore-errors: true
  # Tolerate all dirty data.
  ingestion.error-tolerance.max-count: -1

The dirty data toleration policy prevents jobs from failing frequently due to abnormal data. You may also want to learn more about dirty data to adjust the behavior of Kafka producers. For the process described in Dirty Data Collection, you can view the dirty data of your job in the TaskManager logs. The following code shows a configuration example:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Enable dirty data tolerance.
  ingestion.ignore-errors: true
  # Tolerate all dirty data.
  ingestion.error-tolerance.max-count: -1

pipeline:
  dirty-data.collector:
    # Write dirty data to TaskManager log files.
    type: logger

Table name and topic mapping strategy

When using Kafka as a data ingestion sink, the message format (debezium-json or canal-json) often includes table name information. Consumers typically use this table name—not the topic name—as the actual table name. Therefore, carefully configure the table name and topic mapping strategy.

Assume you need to synchronize two tables—mydb.mytable1 and mydb.mytable2—from MySQL. Possible mapping strategies include the following:

1. Do not configure any mapping strategy

Without any mapping strategy, each table is written to a topic named after the database and table (e.g., mydb.mytable1). Thus, mydb.mytable1 data goes to the mydb.mytable1 topic, and mydb.mytable2 data goes to the mydb.mytable2 topic. Sample configuration:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

2. Configure route rules for mapping (not recommended)

In many scenarios, users do not want topics named after the database and table. Instead, they configure route rules to map data to specific topics. Sample configuration:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  
 route:
  - source-table: mydb.mytable1,mydb.mytable2
    sink-table: mytable1

In this case, all data from mydb.mytable1 and mydb.mytable2 is written to the mytable1 topic.

However, modifying the topic name via route rules also modifies the table name in Kafka messages (in debezium-json or canal-json format). As a result, all messages in this topic have the table name mytable1. Other systems consuming this topic may behave unexpectedly.

3. Configure the sink.tableId-to-topic.mapping parameter for mapping (recommended)

To retain the original table name information while mapping to a custom topic, use the sink.tableId-to-topic.mapping parameter. Sample configurations:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

Or:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

In this configuration, all data from `mydb.mytable1` and `mydb.mytable2` is written to the `mytable1` topic, but the original table name (`mydb.mytable1` or `mydb.mytable2`) is preserved within the Kafka message format (`debezium-json` or `canal-json`). This allows other systems that consume messages from this topic to correctly retrieve the source table name information.

EXACTLY_ONCE semantic considerations

  • Configure consumer isolation level

    All applications that consume Kafka data must set isolation.level:

    • read_committed: Reads only committed data.

    • read_uncommitted (default): Lets you read uncommitted data.

    EXACTLY_ONCE relies on read_committed. Otherwise, consumers may see uncommitted data, which breaks consistency.

  • Transaction timeout and data loss

    When recovering from a checkpoint, Flink depends only on transactions committed before the checkpoint begins. If the time between job crash and restart exceeds the Kafka transaction timeout, Kafka automatically aborts the transaction, causing data loss.

    • The Kafka broker default transaction.max.timeout.ms = 15 minutes.

    • The Flink Kafka sink sets transaction.timeout.ms to 1 hour by default.

    • You must increase transaction.max.timeout.ms on the broker to be greater than or equal to the Flink setting.

  • Kafka producer pool and concurrent checkpoints

    The EXACTLY_ONCE mode uses a fixed-size Kafka producer pool. Each checkpoint consumes one producer from the pool. If concurrent checkpoints exceed the pool size, the job fails.

    Adjust the producer pool size according to your maximum concurrent checkpoint count.

  • Scale-in restrictions

    If a job fails before the first checkpoint, the producer pool information is not retained after restart. Therefore, before the first checkpoint completes, do not scale down the job's parallelism. If you must scale down, the parallelism must not be less than FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.

  • Transactional blocking of reads

    In read_committed mode, any uncompleted (neither committed nor aborted) transaction blocks reading from the entire topic.

    For example:

    • Transaction 1 writes data.

    • Transaction 2 writes and commits data.

    • Until Transaction 1 completes, Transaction 2's data is invisible to consumers.

    Therefore:

    • During normal operation, data visibility is delayed by approximately the average checkpoint interval.

    • Upon job failure, the topic being written to blocks consumers until the job restarts or the transaction times out. In extreme cases, transaction timeouts can even affect reads.

FAQ