All Products
Search
Document Center

Realtime Compute for Apache Flink:ApsaraMQ for Kafka

Last Updated:Dec 03, 2025

This topic describes how to use the ApsaraMQ for Kafka connector.

Background information

Apache Kafka is an open source, distributed message queue system. It is widely used in big data fields such as high-performance data processing, stream analytics, and data integration. The Kafka connector is based on the open source Apache Kafka client. It provides high data throughput, supports reading and writing multiple data formats, and offers exactly-once semantics for Realtime Compute for Apache Flink.

Category

Details

Supported type

Source table, sink table, and data integration target

Runtime mode

Streaming mode

Data format

Supported data formats

  • CSV

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium JSON

  • Canal JSON

  • Maxwell JSON

  • Raw

  • Protobuf

Note
  • Only Ververica Runtime (VVR) 8.0.9 and later supports the built-in Protobuf data format.

  • Each supported data format has corresponding configuration items that you can use directly in the WITH clause. For more information, see the Flink community documentation.

Specific monitoring metrics

Specific monitoring metrics

  • Source table

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • Sink table

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Note

For more information about the metrics, see Metric descriptions.

API type

SQL, DataStream, and data integration YAML

Update or delete data in sink tables

The connector does not support updating or deleting data in sink tables. It only supports inserting data.

Note

For features related to updating and deleting data, see Upsert Kafka.

Prerequisites

You can connect to a cluster in one of the following ways:

  • Connect to an Alibaba Cloud ApsaraMQ for Kafka cluster

    • The Kafka cluster is version 0.11 or later.

    • You have created an ApsaraMQ for Kafka cluster. For more information, see Create resources.

    • The Flink workspace and the Kafka cluster are in the same VPC, and the ApsaraMQ for Kafka cluster has Flink added to its whitelist. For more information, see Configure a whitelist.

    Important

    Limitations for writing data to ApsaraMQ for Kafka:

    • ApsaraMQ for Kafka does not support writing data in the zstd compression format.

    • ApsaraMQ for Kafka does not support idempotent or transactional writes. Therefore, you cannot use the exactly-once semantics feature of the Kafka sink table. If you use Ververica Runtime (VVR) 8.0.0 or later, you must add the properties.enable.idempotence=false configuration item to the sink table to disable idempotent writes. For a comparison of storage engines and feature limitations of ApsaraMQ for Kafka, see Comparison of storage engines.

  • Connect to a self-managed Apache Kafka cluster

    • The self-managed Apache Kafka cluster is version 0.11 or later.

    • You have established network connectivity between Flink and the self-managed Apache Kafka cluster. For information about how to connect to a self-managed cluster over the public network, see Select a network connection type.

    • Only client configuration items for Apache Kafka 2.8 are supported. For more information, see the Apache Kafka documentation for consumer and producer configurations.

Precautions

Currently, using transactional writes is not recommended due to design bugs in Flink and Kafka. When you set sink.delivery-guarantee = exactly-once, the Kafka connector enables transactional writes, and there are three known issues:

  • Each checkpoint generates a transaction ID. If the checkpoint interval is too short, too many transaction IDs are generated. The coordinator of the Kafka cluster may run out of memory, which can compromise the stability of the Kafka cluster.

  • Each transaction creates a producer instance. If too many transactions are committed at the same time, the TaskManager may run out of memory, which can compromise the stability of the Flink job.

  • If multiple Flink jobs use the same sink.transactional-id-prefix, the transaction IDs they generate may conflict. If one job fails to write, it blocks the Log Start Offset (LSO) of the Kafka partition from advancing. This affects all consumers that read data from that partition.

If you require exactly-once semantics, use Upsert Kafka to write to a primary key table and ensure idempotence with the primary key. If you must use transactional writes, see Considerations for EXACTLY_ONCE semantics.

Troubleshoot network connectivity

If a Flink job reports the error Timed out waiting for a node assignment during startup, the cause is usually a network connectivity issue between Flink and Kafka.

A Kafka client connects to a server as follows:

  1. The client uses an address in bootstrap.servers to connect to Kafka.

  2. Kafka returns the metadata of each broker in the cluster, including their connection addresses.

  3. The client then uses these returned addresses to connect to each broker for read and write operations.

Even if the bootstrap.servers address is accessible, the client cannot read or write data if Kafka returns incorrect broker addresses. This issue often occurs in network architectures that use proxies, port forwarding, or leased lines.

Troubleshooting steps

ApsaraMQ for Kafka

  1. Confirm the endpoint type

    • Default endpoint (internal network)

    • SASL endpoint (internal network + authentication)

    • Public network endpoint (requires a separate request)

    You can use the Flink development console to perform network diagnostics and rule out connectivity issues with the bootstrap.servers address.

  2. Check security groups and whitelists

    The Kafka instance must add the CIDR block of the VPC where Flink resides to its whitelist. For more information, see View VPC CIDR blocks and Configure a whitelist.

  3. Check the SASL configuration (if enabled)

    If you use a SASL_SSL endpoint, you must correctly configure the JAAS, SSL, and SASL mechanisms in your Flink job. Missing authentication can cause the connection to fail during the handshake phase, which can also manifest as a timeout. For more information, see Security and authentication.

Self-managed Kafka on ECS

  1. Use the Flink development console to perform network diagnostics.

    Rule out connectivity issues with the bootstrap.servers address and confirm the correctness of internal and public network endpoints.

  2. Check security groups and whitelists

    • The ECS security group must allow traffic on the Kafka endpoint port (usually 9092 or 9093).

    • The ECS instance must add the CIDR block of the VPC where Flink resides to its whitelist. For more information, see View VPC CIDR blocks.

  3. Check the configuration

    1. Log on to the ZooKeeper cluster used by Kafka. You can use the zkCli.sh or zookeeper-shell.sh tool.

    2. Run a command to retrieve broker metadata. For example: get /brokers/ids/0. In the returned result, find the address that Kafka advertises to clients in the endpoints field.

      example

    3. Use the Flink development console to perform network diagnostics to test if the address is reachable.

      Note
      • If the address is not reachable, contact the Kafka O&M engineer to check and correct the listeners and advertised.listeners configurations. Ensure the returned address is accessible to Flink.

      • For more information about how Kafka clients connect to servers, see Troubleshoot Connectivity.

  4. Check the SASL configuration (if enabled)

    If you use a SASL_SSL endpoint, you must correctly configure the JAAS, SSL, and SASL mechanisms in your Flink job. Missing authentication can cause the connection to fail during the handshake phase, which can also manifest as a timeout. For more information, see Security and authentication.

SQL

The Kafka connector can be used in SQL jobs as a source table or a sink table.

Syntax

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

Metadata columns

You can define metadata columns in source and sink tables to access or write the metadata of Kafka messages. For example, if you define multiple topics in the WITH parameters and define a metadata column in the Kafka source table, the data that Flink reads is marked with its source topic. The following example shows how to use metadata columns.

CREATE TABLE kafka_source (
  -- Read the topic of the message as the `record_topic` field.
  `record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
  -- Read the timestamp from the ConsumerRecord as the `ts` field.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  -- Read the offset of the message as the `record_offset` field.
  `record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE kafka_sink (
  -- Write the timestamp from the `ts` field as the timestamp of the ProducerRecord to Kafka.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

The following table lists the supported metadata columns for Kafka source and sink tables.

Key

Data type

Description

Source or sink table

topic

STRING NOT NULL METADATA VIRTUAL

The name of the topic that contains the Kafka message.

Source table

partition

INT NOT NULL METADATA VIRTUAL

The ID of the partition that contains the Kafka message.

Source table

headers

MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL

The headers of the Kafka message.

Source and sink tables

leader-epoch

INT NOT NULL METADATA VIRTUAL

The leader epoch of the Kafka message.

Source table

offset

BIGINT NOT NULL METADATA VIRTUAL

The offset of the Kafka message.

Source table

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL

The timestamp of the Kafka message.

Source and sink tables

timestamp-type

STRING NOT NULL METADATA VIRTUAL

The timestamp type of the Kafka message:

  • NoTimestampType: No timestamp is defined in the message.

  • CreateTime: The time when the message was created.

  • LogAppendTime: The time when the message was appended to the Kafka broker.

Source table

__raw_key__

STRING NOT NULL METADATA VIRTUAL

The key field of the raw Kafka message.

Source and sink tables

__raw_value__

STRING NOT NULL METADATA VIRTUAL

The value field of the raw Kafka message.

Source and sink tables

WITH parameters

  • General

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    connector

    The table type.

    String

    Yes

    None

    The value is fixed to Kafka.

    properties.bootstrap.servers

    The Kafka broker addresses.

    String

    Yes

    None

    The format is host:port,host:port,host:port. Separate addresses with commas (,).

    properties.*

    Direct configurations for the Kafka client.

    String

    No

    None

    The suffix must be a configuration defined in the official Kafka documentation for producers and consumers.

    Flink removes the properties. prefix and passes the remaining configuration to the Kafka client. For example, you can use 'properties.allow.auto.create.topics'='false' to disable automatic topic creation.

    Do not use this method to modify the following configurations, because they are overwritten by the Kafka connector:

    • key.deserializer

    • value.deserializer

    format

    The format used to read or write the value part of a Kafka message.

    String

    No

    None

    Supported formats:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Note

    For more information about format parameter settings, see Format Parameters.

    key.format

    The format used to read or write the key part of a Kafka message.

    String

    No

    None

    Supported formats:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Note

    If you use this configuration, the key.options configuration is required.

    key.fields

    The fields in the source or sink table that correspond to the key part of the Kafka message.

    String

    No

    None

    Separate multiple field names with semicolons (;). For example: field1;field2

    key.fields-prefix

    Specifies a custom prefix for all key fields of Kafka messages to avoid name conflicts with the fields in the value part of the message.

    String

    No

    None

    This configuration item is only used to distinguish column names in source and sink tables. The prefix is removed when parsing and generating the key part of Kafka messages.

    Note

    If you use this configuration, you must set value.fields-include to EXCEPT_KEY.

    value.format

    The format used to read or write the value part of a Kafka message.

    String

    No

    None

    This configuration is equivalent to format. You can set only one of format or value.format. If both are configured, value.format overwrites format.

    value.fields-include

    Specifies whether to include the fields corresponding to the key part of the message when parsing or generating the value part of the Kafka message.

    String

    No

    ALL

    Valid values:

    • ALL (default): All columns are processed as the value part of the Kafka message.

    • EXCEPT_KEY: The remaining fields, excluding those defined in key.fields, are processed as the value part of the Kafka message.

  • Source table

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    topic

    The name of the topic to read from.

    String

    No

    None

    Separate multiple topic names with semicolons (;), for example, topic-1;topic-2.

    Note

    You can specify only one of the topic and topic-pattern options.

    topic-pattern

    A regular expression that matches the names of topics to read from. All topics that match this regular expression are read when the job is running.

    String

    No

    None

    Note

    You can specify only one of the topic and topic-pattern options.

    properties.group.id

    The consumer group ID.

    String

    No

    KafkaSource-{source_table_name}

    If the specified group ID is used for the first time, you must set properties.auto.offset.reset to earliest or latest to specify the initial start offset.

    scan.startup.mode

    The start offset for reading data from Kafka.

    String

    No

    group-offsets

    Valid values:

    • earliest-offset: Starts reading from the earliest partition in Kafka.

    • latest-offset: Starts reading from the latest offset in Kafka.

    • group-offsets (default): Starts reading from the committed offset of the specified properties.group.id.

    • timestamp: Starts reading from the timestamp specified by scan.startup.timestamp-millis.

    • specific-offsets: Starts reading from the offset specified by scan.startup.specific-offsets.

    Note

    This parameter takes effect when the job starts without a state. When a job restarts from a checkpoint or recovers from a state, it preferentially uses the progress saved in the state to resume reading.

    scan.startup.specific-offsets

    In specific-offsets startup mode, specifies the start offset for each partition.

    String

    No

    None

    Example: partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    In timestamp startup mode, specifies the start offset timestamp.

    Long

    No

    None

    The unit is milliseconds.

    scan.topic-partition-discovery.interval

    The interval for dynamically discovering Kafka topics and partitions.

    Duration

    No

    5 minutes

    The default partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. When dynamic partition discovery is enabled, the Kafka source can automatically discover new partitions and read data from them. In topic-pattern mode, it not only reads data from new partitions of existing topics but also reads data from all partitions of new topics that match the regular expression.

    Note

    In Ververica Runtime (VVR) 6.0.x, dynamic partition discovery is disabled by default. Starting from VVR 8.0, this feature is enabled by default, with the discovery interval set to 5 minutes.

    scan.header-filter

    Filters data based on whether the Kafka data contains a specified header.

    String

    No

    None

    Separate the header key and value with a colon (:). Connect multiple header conditions with logical operators (&, |). The NOT logical operator (!) is also supported. For example, depart:toy|depart:book&!env:test retains Kafka data whose header contains depart=toy or depart=book, and does not contain env=test.

    Note
    • This parameter is supported only in Ververica Runtime (VVR) 8.0.6 and later.

    • Parentheses are not supported in logical operations.

    • Logical operations are performed from left to right.

    • The header value is converted to a string in UTF-8 format for comparison with the specified header value.

    scan.check.duplicated.group.id

    Specifies whether to check for duplicate consumer groups specified by properties.group.id.

    Boolean

    No

    false

    Valid values:

    • true: Before starting the job, the system checks for duplicate consumer groups. If a duplicate is found, the job reports an error and stops, preventing conflicts with existing consumer groups.

    • false: The job starts directly without checking for consumer group conflicts.

    Note

    This parameter is supported only in VVR 6.0.4 and later.

  • Sink table

    Parameter

    Description

    Data type

    Required

    Default value

    Remarks

    topic

    The name of the topic to write to.

    String

    Yes

    None

    None

    sink.partitioner

    The mapping mode from Flink concurrency to Kafka partitions.

    String

    No

    default

    Valid values:

    • default: Uses the default Kafka partitioner.

    • fixed: Each Flink concurrency corresponds to a fixed Kafka partition.

    • round-robin: Data from Flink concurrencies is allocated to Kafka partitions in a round-robin manner.

    • Custom partitioner: If fixed and round-robin do not meet your needs, you can create a child class of FlinkKafkaPartitioner to define a custom partitioner. For example: org.mycompany.MyPartitioner

    sink.delivery-guarantee

    The delivery semantics for the Kafka sink table.

    String

    No

    at-least-once

    Valid values:

    • none: No guarantee. Data may be lost or duplicated.

    • at-least-once (default): Guarantees that no data is lost, but data may be duplicated.

    • exactly-once: Uses Kafka transactions to guarantee that data is not lost or duplicated.

    Note

    When using exactly-once semantics, the sink.transactional-id-prefix parameter is required.

    sink.transactional-id-prefix

    The prefix for Kafka transaction IDs used in exactly-once semantics.

    String

    No

    None

    This configuration takes effect only when sink.delivery-guarantee is set to exactly-once.

    sink.parallelism

    The degree of parallelism for the Kafka sink table operator.

    Integer

    No

    None

    The concurrency of the upstream operator is determined by the framework.

Security and authentication

If the Kafka cluster requires a secure connection or authentication, you can add the relevant security and authentication configurations to the WITH parameters with the properties. prefix. The following example shows how to configure a Kafka table to use PLAIN as the SASL mechanism and provide a JAAS configuration.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

The following example shows how to use SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /*SSL configuration*/
  /*Configure the path to the truststore (CA certificate) provided by the server.*/
  /*Files uploaded through File Management are stored in the /flink/usrlib/ path.*/
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /*If client authentication is required, configure the path to the keystore (private key).*/
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /*The algorithm for client to verify the server address. An empty value disables server address verification.*/
  'properties.ssl.endpoint.identification.algorithm' = '',
  /*SASL configuration*/
  /*Set the SASL mechanism to SCRAM-SHA-256.*/
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /*Configure JAAS*/
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

You can upload the CA certificate and private key mentioned in the example to the platform using the File Management feature in the Realtime Compute console. After uploading, the files are stored in the /flink/usrlib directory. If the CA certificate file to be used is named my-truststore.jks, specify 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks' in the WITH parameters to use this certificate.

Note
  • The preceding examples apply to most configuration scenarios. Before configuring the Kafka connector, contact the Kafka server O&M engineer to obtain the correct security and authentication configurations.

  • Unlike open source Flink, the SQL editor of Realtime Compute for Apache Flink automatically escapes double quotation marks ("). Therefore, you do not need to add extra escape characters (\) for double quotation marks in the username and password when configuring properties.sasl.jaas.config.

Source table start offset

Startup mode

You can specify the initial read offset for a Kafka source table by configuring scan.startup.mode:

  • earliest-offset: Starts reading from the earliest offset of the current partition.

  • latest-offset: Starts reading from the latest offset of the current partition.

  • group-offsets: Starts reading from the committed offset of the specified group ID. The group ID is specified by properties.group.id.

  • timestamp: Starts reading from the first message whose timestamp is greater than or equal to the specified time. The timestamp is specified by scan.startup.timestamp-millis.

  • specific-offsets: Starts consuming from the specified partition offset. The offset is specified by scan.startup.specific-offsets.

Note
  • If you do not specify a start offset, consumption starts from the committed offset (group-offsets) by default.

  • The scan.startup.mode parameter only takes effect for jobs that start without a state. When a stateful job starts, it begins consuming from the offset stored in its state.

The following code provides an example:

CREATE TEMPORARY TABLE kafka_source (
  ...
) WITH (
  'connector' = 'kafka',
  ...
  -- Start consuming from the earliest offset.
  'scan.startup.mode' = 'earliest-offset',
  -- Start consuming from the latest offset.
  'scan.startup.mode' = 'latest-offset',
  -- Start consuming from the committed offset of the consumer group "my-group".
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest', -- If "my-group" is used for the first time, start consuming from the earliest offset.
  'properties.auto.offset.reset' = 'latest', -- If "my-group" is used for the first time, start consuming from the latest offset.
  -- Start consuming from the specified millisecond timestamp 1655395200000.
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1655395200000',
  -- Start consuming from the specified offset.
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);

Start offset priority

The priority of source table start offsets is as follows:

Priority from high to low

Offset stored in a checkpoint or savepoint

Start time selected in the Realtime Compute console when starting the job

Start offset specified by scan.startup.mode in the WITH parameters

If scan.startup.mode is not specified, group-offsets is used, and the offset of the corresponding consumer group is used

If an offset becomes invalid at any step (for example, due to expiration or a Kafka cluster issue), the system uses the policy set in properties.auto.offset.reset to reset the offset. If this parameter is not configured, an exception occurs and requires manual intervention.

A common scenario is starting consumption with a new group ID. First, the source table queries the Kafka cluster for the committed offset of that group. Because this is the first time the group ID is used, no valid offset is found. Therefore, the offset is reset according to the policy configured in the properties.auto.offset.reset parameter. When consuming with a new group ID, you must configure properties.auto.offset.reset to specify the offset reset policy.

Source table offset commit

The Kafka source table commits the current consumer offset to the Kafka cluster only after a checkpoint is successful. If the checkpoint interval is long, the observed consumer offset in the Kafka cluster will lag. During a checkpoint, the Kafka source table stores the current read progress in its state and does not rely on the offset committed to the cluster for fault recovery. Committing the offset is only for monitoring the read progress on the Kafka side. A failed offset commit does not affect data correctness.

Custom partitioner for sink tables

If the built-in Kafka producer partitioner does not meet your needs, you can implement a custom partitioner to write data to the corresponding partitions. A custom partitioner must inherit from FlinkKafkaPartitioner. After development, compile the JAR package and upload it to the Realtime Compute console using the File Management feature. After uploading and referencing the file, set the sink.partitioner parameter in the WITH clause to the full class path of the partitioner, such as org.mycompany.MyPartitioner.

Choosing between Kafka, Upsert Kafka, and Kafka JSON catalog

Kafka is an append-only message queue system that does not support data updates or deletions. Therefore, it cannot handle upstream Change Data Capture (CDC) data or retraction logic from operators such as aggregations and joins in streaming SQL. To write data with changes or retractions to Kafka, use the Upsert Kafka sink table, which is specially designed to handle change data.

To conveniently synchronize change data from one or more tables in an upstream database to Kafka in batches, you can use a Kafka JSON catalog. If the data stored in Kafka is in JSON format, using a Kafka JSON catalog eliminates the need to define schemas and WITH parameters. For more information, see Manage a Kafka JSON catalog.

Examples

Example 1: Read data from Kafka and write it to Kafka

Read Kafka data from a topic named `source` and write it to a topic named `sink`. The data is in CSV format.

CREATE TEMPORARY TABLE kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

Example 2: Synchronize table schema and data

Synchronize messages from a Kafka topic to Hologres in real time. In this case, you can use the offset and partition ID of the Kafka message as the primary key to ensure that there are no duplicate messages in Hologres during failover.

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'
    -- Optional. Flattens all nested columns.
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

Example 3: Synchronize table schema and Kafka message key and value data

If the key part of a Kafka message already stores relevant information, you can synchronize both the key and value from Kafka.

CREATE TEMPORARY TABLE kafkaTable (
  `key_id` INT NOT NULL,
  `val_name` VARCHAR(200)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_',
  'value.fields-prefix' = 'val_',
  'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
Note

The key part of a Kafka message does not support table schema changes or type parsing. You must declare them manually.

Example 4: Synchronize table schema and data and perform computations

When synchronizing Kafka data to Hologres, you often need to perform lightweight computations.

CREATE TEMPORARY TABLE kafkaTable (
  `distinct_id` INT NOT NULL,
  `properties` STRING,
  `timestamp` TIMESTAMP_LTZ METADATA,
  `date` AS CAST(`timestamp` AS DATE)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
   'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
  `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- Use COALESCE to handle null values.

Example 5: Parse nested JSON

JSON message example

{
  "id": 101,
  "name": "VVP",
  "properties": {
    "owner": "Alibaba Cloud",
    "engine": "Flink"
  }
}

To avoid using functions such as JSON_VALUE(payload, '$.properties.owner') to parse fields later, you can define the structure directly in the Source DDL:

CREATE TEMPORARY TABLE kafka_source (
  id          VARCHAR,
  `name`      VARCHAR,
  properties  ROW<`owner` STRING, engine STRING>
) WITH (
  'connector' = 'kafka',
  'topic' = 'xxx',
  'properties.bootstrap.servers' = 'xxx',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

This way, Flink parses the JSON into structured fields at the read stage. Subsequent SQL queries can directly use properties.owner without additional function calls, resulting in better overall performance.

Datastream API

Important

To read and write data using the DataStream API, you must use the corresponding DataStream connector to connect to Realtime Compute for Apache Flink. For information about how to set up a DataStream connector, see Use a DataStream connector.

  • Build a Kafka source

    The Kafka source provides a builder class to create a KafkaSource instance. The following code example shows how to build a Kafka source to consume data from the earliest offset of `input-topic`. The consumer group is named `my-group`, and the Kafka message body is deserialized into a string.

    Java

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

    When building a KafkaSource, you must specify the following parameters.

    Parameter

    Description

    BootstrapServers

    The Kafka broker addresses. Configure this using the setBootstrapServers(String) method.

    GroupId

    The consumer group ID. Configure this using the setGroupId(String) method.

    Topics or Partition

    The names of the subscribed topics or partitions. The Kafka source provides the following three ways to subscribe to topics or partitions:

    • Topic list: Subscribes to all partitions in the topic list.

      KafkaSource.builder().setTopics("topic-a","topic-b")
    • Regular expression matching: Subscribes to all partitions of topics that match the regular expression.

      KafkaSource.builder().setTopicPattern("topic.*")
    • Partition list: Subscribes to the specified partitions.

      final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
              new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
              new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
      KafkaSource.builder().setPartitions(partitionSet)

    Deserializer

    The deserializer for parsing Kafka messages.

    Specify the deserializer using setDeserializer(KafkaRecordDeserializationSchema), where KafkaRecordDeserializationSchema defines how to parse a Kafka ConsumerRecord. If you only need to parse the data in the message body (value) of a Kafka message, you can do so in one of the following ways:

    • Use the setValueOnlyDeserializer(DeserializationSchema) method in the KafkaSource builder class provided by Flink. DeserializationSchema defines how to parse the binary data in the Kafka message body.

    • Use a parser provided by Kafka, which includes multiple implementation classes. For example, you can use StringDeserializer to parse the Kafka message body into a string.

      import org.apache.kafka.common.serialization.StringDeserializer;
      
      KafkaSource.<String>builder()
              .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
    Note

    To fully parse the ConsumerRecord, you must implement the KafkaRecordDeserializationSchema interface yourself.

    XML

    The Kafka DataStream connector is available in the Maven Central Repository.

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr-version}</version>
    </dependency>

    When using the Kafka DataStream connector, you need to understand the following Kafka properties:

    • Start consumer offset

      The Kafka source can specify the starting offset for consumption through an offset initializer (OffsetsInitializer). The built-in offset initializers include the following.

      Offset initializer

      Code setting

      Start consuming from the earliest offset.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())

      Start consuming from the latest offset.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())

      Start consuming from data with a timestamp greater than or equal to the specified time, in milliseconds.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))

      Start consuming from the offset committed by the consumer group. If the committed offset does not exist, use the earliest offset.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

      Start consuming from the offset committed by the consumer group, without specifying an offset reset strategy.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())

      Note
      • If the built-in initializers do not meet your needs, you can implement a custom offset initializer.

      • If no offset initializer is specified, OffsetsInitializer.earliest() (the earliest offset) is used by default.

    • Streaming and batch modes

      The Kafka source supports both streaming and batch runtime modes. By default, the Kafka source is set to run in streaming mode, so the job never stops until the Flink job fails or is canceled. To configure the Kafka source to run in batch mode, you can use setBounded(OffsetsInitializer) to specify a stop offset. When all partitions reach their stop offset, the Kafka source exits.

      Note

      Typically, there is no stop offset in streaming mode. To facilitate code debugging, you can use setUnbounded(OffsetsInitializer) to specify a stop offset in streaming mode. Note that the method names for specifying stop offsets in streaming and batch modes (setUnbounded and setBounded) are different.

    • Dynamic partition discovery

      To handle scenarios such as topic scaling or new topic creation without restarting the Flink job, you can enable the dynamic partition discovery feature in the provided topic or partition subscription mode.

      Note

      The dynamic partition discovery feature is enabled by default, and the partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. The following code provides an example.

      KafkaSource.builder()
          .setProperty("partition.discovery.interval.ms", "10000") // Check for new partitions every 10 seconds.
      Important

      The dynamic partition discovery feature relies on the metadata update mechanism of the Kafka cluster. If the Kafka cluster does not update partition information promptly, new partitions may not be discovered. Ensure that the partition.discovery.interval.ms configuration of the Kafka cluster matches the actual situation.

    • Event time and watermarks

      By default, the Kafka source uses the timestamp in the Kafka message as the event time. You can define a custom watermark strategy to extract the event time from the message and send watermarks downstream.

      env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

      For more information about custom watermark strategies, see Generating Watermarks.

      Note

      If some tasks of a parallel source are idle for a long time (for example, if a Kafka partition has no data input for a long time, or if the source concurrency exceeds the number of Kafka partitions), the watermark generation mechanism may fail. In this case, the system cannot trigger window calculations normally, which causes the data processing flow to stall.

      To resolve this issue, you can make the following adjustments:

      • Configure a watermark timeout mechanism: Enable the table.exec.source.idle-timeout parameter to force the system to generate a watermark after a specified timeout period. This ensures that window calculation cycles progress.

      • Optimize the data source: We recommend maintaining a reasonable ratio of Kafka partitions to source concurrency (recommended: number of partitions ≥ source degree of parallelism).

    • Consumer offset commit

      The Kafka source commits the current consumer offset when a checkpoint is completed. This ensures that the Flink checkpoint state is consistent with the committed offset on the Kafka broker. If checkpointing is not enabled, the Kafka source relies on the internal automatic offset commit logic of the Kafka consumer. The automatic commit feature is configured by the enable.auto.commit and auto.commit.interval.ms Kafka consumer configuration items.

      Note

      The Kafka source does not rely on the offset committed on the broker to recover a failed job. Committing the offset is only for reporting the consumption progress of the Kafka consumer and the consumer group for monitoring on the broker side.

    • Other properties

      In addition to the properties mentioned above, you can use setProperties(Properties) and setProperty(String, String) to set any properties for the Kafka source and Kafka consumer. The KafkaSource typically has the following configuration items.

      Configuration item

      Description

      client.id.prefix

      Specifies the client ID prefix for the Kafka consumer.

      partition.discovery.interval.ms

      Defines the interval at which the Kafka source checks for new partitions.

      Note

      partition.discovery.interval.ms is overwritten to -1 in batch mode.

      register.consumer.metrics

      Specifies whether to register the Kafka consumer's metrics in Flink.

      Other Kafka consumer configurations

      For more information about Kafka consumer configurations, see Apache Kafka.

      Important

      The Kafka connector forcibly overwrites some manually configured parameters as follows:

      • key.deserializer is always overwritten to ByteArrayDeserializer.

      • value.deserializer is always overwritten to ByteArrayDeserializer.

      • auto.offset.reset.strategy is overwritten to OffsetsInitializer#getAutoOffsetResetStrategy().

      The following example shows how to configure a Kafka consumer to use PLAIN as the SASL mechanism and provide a JAAS configuration.

      KafkaSource.builder()
          .setProperty("sasl.mechanism", "PLAIN")
          .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
    • Monitoring

      The Kafka source registers metrics in Flink for monitoring and diagnostics.

      • Metric scope

        All metrics of the Kafka source reader are registered under the KafkaSourceReader metric group, which is a subgroup of the operator metric group. Metrics related to a specific topic partition are registered in the KafkaSourceReader.topic.<topic_name>.partition.<partition_id> metric group.

        For example, the current consumer offset (currentOffset) for partition 1 of the topic "my-topic" is registered under <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset. The number of successful offset commits (commitsSucceeded) is registered under <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded.

      • Metric list

        Metric name

        Description

        Scope

        currentOffset

        The current consumer offset.

        TopicPartition

        committedOffset

        The current committed offset.

        TopicPartition

        commitsSucceeded

        The number of successful commits.

        KafkaSourceReader

        commitsFailed

        The number of failed commits.

        KafkaSourceReader

      • Kafka consumer metrics

        The metrics of the Kafka consumer are registered in the KafkaSourceReader.KafkaConsumer metric group. For example, the Kafka consumer metric records-consumed-total is registered under <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.

        You can use the register.consumer.metrics configuration item to specify whether to register the metrics of the Kafka consumer. By default, this option is set to true. For more information about Kafka consumer metrics, see Apache Kafka.

  • Build a Kafka sink

    The Flink Kafka sink can write stream data to one or more Kafka topics.

    DataStream<String> stream = ...
    
    
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", );
    KafkaSink<String> kafkaSink =
                    KafkaSink.<String>builder()
                            .setKafkaProducerConfig(kafkaProperties) // producer config
                            .setRecordSerializer(
                                    KafkaRecordSerializationSchema.builder()
                                            .setTopic("my-topic") // target topic
                                            .setKafkaValueSerializer(StringSerializer.class) // serialization schema
                                            .build())
                            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // fault-tolerance
                            .build();
    
    stream.sinkTo(kafkaSink);

    You need to configure the following parameters.

    Parameter

    Description

    Topic

    The default topic name to which data is written.

    Data serialization

    When building, you need to provide a KafkaRecordSerializationSchema to convert the input data into a Kafka ProducerRecord. Flink provides a schema builder to offer some common components, such as message key/value serialization, topic selection, and message partitioning. You can also implement the corresponding interfaces for more control. The ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) method is called for each data record that flows in to generate a ProducerRecord to be written to Kafka.

    You can have fine-grained control over how each data record is written to Kafka. With ProducerRecord, you can perform the following operations:

    • Set the name of the topic to write to.

    • Define the message key.

    • Specify the partition to which data is written.

    Kafka client properties

    bootstrap.servers is required. It is a comma-separated list of Kafka brokers.

    Fault tolerance semantics

    When Flink's checkpointing is enabled, the Flink Kafka sink can guarantee exactly-once semantics. In addition to enabling Flink's checkpointing, you can also specify different fault tolerance semantics through the DeliveryGuarantee parameter. The DeliveryGuarantee parameter details are as follows:

    • DeliveryGuarantee.NONE: (Default setting) Flink makes no guarantees. Data may be lost or duplicated.

    • DeliveryGuarantee.AT_LEAST_ONCE: Guarantees that no data is lost, but data may be duplicated.

    • DeliveryGuarantee.EXACTLY_ONCE: Uses Kafka transactions to provide exactly-once semantics.

      Note

      For considerations when using EXACTLY_ONCE semantics, see Considerations for EXACTLY_ONCE semantics.

Data integration

The Kafka connector can be used in data integration YAML job development for reading from a source or writing to a target.

Limits

  • We recommend using Kafka as a synchronous data source for Flink Change Data Capture (CDC) data integration in Ververica Runtime (VVR) 11.1 and later.

  • Only JSON, Debezium JSON, and Canal JSON formats are supported. Other data formats are not supported at this time.

  • For data sources, only Ververica Runtime (VVR) 8.0.11 and later support distributing data from a single table across multiple partitions.

Syntax

source:
  type: kafka
  name: Kafka source
  properties.bootstrap.servers: localhost:9092
  topic: ${kafka.topic}
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: localhost:9092

Configuration items

  • General

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    type

    The source or target type.

    Yes

    String

    None

    Set this parameter to kafka.

    name

    The source or target name.

    No

    String

    None

    None

    properties.bootstrap.servers

    The Kafka broker addresses.

    Yes

    String

    None

    The format is host:port,host:port,host:port. Separate addresses with commas (,).

    properties.*

    Direct configurations for the Kafka client.

    No

    String

    None

    The suffix must be a configuration defined in the official Kafka documentation for producers and consumers.

    Flink removes the properties. prefix and passes the remaining configuration to the Kafka client. For example, you can use 'properties.allow.auto.create.topics' = 'false' to disable automatic topic creation.

    key.format

    The format used to read or write the key part of a Kafka message.

    No

    String

    None

    • For a source, only json is supported.

    • For a sink, valid values are:

      • csv

      • json

    Note

    This parameter is supported only in Ververica Runtime (VVR) 11.0.0 and later.

    value.format

    The format used to read or write the value part of a Kafka message.

    No

    String

    debezium-json

    Valid values:

    • debezium-json 

    • canal-json

    • json

    Note
    • The debezium-json and canal-json formats are supported only in Ververica Runtime (VVR) 8.0.10 and later.

    • The json format is supported only in Ververica Runtime (VVR) 11.0.0 and later.

  • Source table

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    topic

    The name of the topic to read from.

    No

    String

    None

    Separate multiple topic names with semicolons (;), for example, topic-1;topic-2.

    Note

    You can specify only one of the topic and topic-pattern options.

    topic-pattern

    A regular expression that matches the names of topics to read from. All topics that match this regular expression are read when the job is running.

    No

    String

    None

    Note

    You can specify only one of the topic and topic-pattern options.

    properties.group.id

    The consumer group ID.

    No

    String

    None

    If the specified group ID is used for the first time, you must set properties.auto.offset.reset to earliest or latest to specify the initial start offset.

    scan.startup.mode

    The start offset for reading data from Kafka.

    No

    String

    group-offsets

    Valid values:

    • earliest-offset: Starts reading from the earliest Kafka partition.

    • latest-offset: Starts reading from the latest Kafka offset.

    • group-offsets (default): Starts reading from the committed offset of the specified properties.group.id.

    • timestamp: Starts reading from the timestamp specified by scan.startup.timestamp-millis.

    • specific-offsets: Starts reading from the offset specified by scan.startup.specific-offsets.

    Note

    This parameter takes effect when the job starts without a state. When a job restarts from a checkpoint or recovers from a state, it preferentially uses the progress saved in the state to resume reading.

    scan.startup.specific-offsets

    In specific-offsets startup mode, specifies the start offset for each partition.

    No

    String

    None

    Example: partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    In timestamp startup mode, specifies the start offset timestamp.

    No

    Long

    None

    The unit is milliseconds.

    scan.topic-partition-discovery.interval

    The interval for dynamically discovering Kafka topics and partitions.

    No

    Duration

    5 minutes

    The default partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. When dynamic partition discovery is enabled, the Kafka source can automatically discover new partitions and read data from them. In topic-pattern mode, it not only reads data from new partitions of existing topics but also reads data from all partitions of new topics that match the regular expression.

    scan.check.duplicated.group.id

    Specifies whether to check for duplicate consumer groups specified by properties.group.id.

    No

    Boolean

    false

    Valid values:

    • true: Before starting the job, the system checks for duplicate consumer groups. If a duplicate is found, the job reports an error, preventing conflicts with existing consumer groups.

    • false: The job starts directly without checking for consumer group conflicts.

    schema.inference.strategy

    The schema parsing strategy.

    No

    String

    continuous

    Valid values:

    • continuous: Parses the schema for each data record. If the preceding and succeeding schemas are incompatible, a wider schema is parsed, and a schema change event is generated.

    • static: Parses the schema only once when the job starts. Subsequent data is parsed based on the initial schema, and no schema change events are generated.

    Note

    scan.max.pre.fetch.records

    The maximum number of messages to attempt to consume and parse for each partition during initial schema parsing.

    No

    Int

    50

    Before the job actually reads and processes data, it attempts to pre-consume a specified number of the latest messages for each partition to initialize the schema information.

    key.fields-prefix

    A custom prefix added to the field names parsed from the message key to avoid naming conflicts after parsing the Kafka message key.

    No

    String

    None

    Assume this configuration item is set to key_. If the key contains a field named `a`, the field name after parsing the key will be `key_a`.

    Note

    The value of key.fields-prefix cannot be a prefix of value.fields-prefix.

    value.fields-prefix

    A custom prefix added to the field names parsed from the message value to avoid naming conflicts after parsing the Kafka message body.

    No

    String

    None

    Assume this configuration item is set to value_. If the value contains a field named `b`, the field name after parsing the value will be `value_b`.

    Note

    The value of value.fields-prefix cannot be a prefix of key.fields-prefix.

    metadata.list

    The metadata columns to be passed downstream.

    No

    String

    None

    Available metadata columns include topic, partition, offset, timestamp, timestamp-type, headers, leader-epoch, __raw_key__, and __raw_value__. Separate them with commas.

    • Source table Debezium JSON format

      Parameter

      Required

      Data type

      Default value

      Description

      debezium-json.distributed-tables

      No

      Boolean

      false

      If data for a single table in Debezium JSON appears in multiple partitions, you need to enable this option.

      Note

      This configuration item is supported only in VVR 8.0.11 and later.

      Important

      After modifying this configuration item, you need to start the job without a state.

      debezium-json.schema-include

      No

      Boolean

      false

      When setting up the Debezium Kafka Connect, you can enable the Kafka configuration value.converter.schemas.enable to include the schema in the message. This option indicates whether the Debezium JSON message includes the schema.

      Valid values:

      • true: The Debezium JSON message includes the schema.

      • false: The Debezium JSON message does not include the schema.

      debezium-json.ignore-parse-errors

      No

      Boolean

      false

      Valid values:

      • true: Skips the current row when a parsing error occurs.

      • false (default): Reports an error, and the job fails to start.

      debezium-json.infer-schema.primitive-as-string

      No

      Boolean

      false

      Specifies whether to parse all types as String when parsing the table schema.

      Valid values:

      • true: Parses all primitive data types as String.

      • false (default): Parses according to basic rules.

    • Source table Canal JSON format

      Parameter

      Required

      Data type

      Default value

      Description

      canal-json.distributed-tables

      No

      Boolean

      false

      If data for a single table in Canal JSON appears in multiple partitions, you need to enable this option.

      Note

      This configuration item is supported only in VVR 8.0.11 and later.

      Important

      After modifying this configuration item, you need to start the job without a state.

      canal-json.database.include

      No

      String

      None

      An optional regular expression that matches the `database` metadata field in Canal records. It reads only the changelog records of the specified database. The regular expression string is compatible with Java's Pattern.

      canal-json.table.include

      No

      String

      None

      An optional regular expression that matches the `table` metadata field in Canal records. It reads only the changelog records of the specified table. The regular expression string is compatible with Java's Pattern.

      canal-json.ignore-parse-errors

      No

      Boolean

      false

      Valid values:

      • true: Skips the current row when a parsing error occurs.

      • false (default): Reports an error, and the job fails to start.

      canal-json.infer-schema.primitive-as-string

      No

      Boolean

      false

      Specifies whether to parse all types as String when parsing the table schema.

      Valid values:

      • true: Parses all primitive data types as String.

      • false (default): Parses according to basic rules.

      canal-json.infer-schema.strategy

      No

      String

      AUTO

      The parsing strategy for the table schema.

      Valid values:

      • AUTO (default): Automatically parses by analyzing the JSON data. If the data does not contain a `sqlType` field, we recommend using AUTO to avoid parsing failures.

      • SQL_TYPE: Parses using the `sqlType` array in the Canal JSON data. If the data contains a `sqlType` field, we recommend setting canal-json.infer-schema.strategy to SQL_TYPE to obtain more precise types.

      • MYSQL_TYPE: Parses using the `mysqlType` array in the Canal JSON data.

      When the Canal JSON data in Kafka contains a `sqlType` field and a more precise type mapping is needed, we recommend setting canal-json.infer-schema.strategy to SQL_TYPE.

      For `sqlType` mapping rules, see Schema parsing for Canal JSON.

      Note
      • This configuration is supported in VVR 11.1 and later.

      • MYSQL_TYPE is supported in VVR 11.3 and later.

      canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled

      No

      Boolean

      true

      Specifies whether to map the MySQL `timestamp` type to the CDC `timestamp` type:

      • true (default): The MySQL `timestamp` type is mapped to the CDC `timestamp` type.

      • false: The MySQL `timestamp` type is mapped to the CDC `timestamp_ltz` type.

      canal-json.mysql.treat-tinyint1-as-boolean.enabled

      No

      Boolean

      true

      When parsing with MYSQL_TYPE, specifies whether to map the MySQL `tinyint(1)` type to the CDC `boolean` type:

      • true (default): The MySQL `tinyint(1)` type is mapped to the CDC `boolean` type.

      • false: The MySQL `tinyint(1)` type is mapped to the CDC `tinyint(1)` type.

      This configuration is effective only when canal-json.infer-schema.strategy is set to MYSQL_TYPE.

    • Source table JSON format

      Parameter

      Required

      Data type

      Default value

      Description

      json.timestamp-format.standard

      No

      String

      SQL

      Specifies the input and output timestamp format. Valid values:

      • SQL: Parses input timestamps in the format yyyy-MM-dd HH:mm:ss.s{precision}, for example, 2020-12-30 12:13:14.123.

      • ISO-8601: Parses input timestamps in the format yyyy-MM-ddTHH:mm:ss.s{precision}, for example, 2020-12-30T12:13:14.123.

      json.ignore-parse-errors

      No

      Boolean

      false

      Valid values:

      • true: Skips the current row when a parsing error occurs.

      • false (default): Reports an error, and the job fails to start.

      json.infer-schema.primitive-as-string

      No

      Boolean

      false

      Specifies whether to parse all types as String when parsing the table schema.

      Valid values:

      • true: Parses all primitive data types as String.

      • false (default): Parses according to basic rules.

      json.infer-schema.flatten-nested-columns.enable

      No

      Boolean

      false

      When parsing JSON data, specifies whether to recursively expand nested columns in the JSON. Valid values:

      • true: Recursively expands.

      • false (default): Treats nested columns as String.

      json.decode.parser-table-id.fields

      No

      String

      None

      When parsing JSON data, specifies whether to use some JSON field values to generate the tableId. Connect multiple fields with a comma ,. For example, if the JSON data is {"col0":"a", "col1","b", "col2","c"}, the results are as follows:

      Configuration

      tableId

      col0

      a

      col0,col1

      a.b

      col0,col1,col2

      a.b.c

  • Sink table

    Parameter

    Description

    Required

    Data type

    Default value

    Remarks

    type

    The type of the target.

    Yes

    String

    None

    Set this parameter to kafka.

    name

    The name of the target.

    No

    String

    None

    None

    topic

    The name of the Kafka topic.

    No

    String

    None

    If this parameter is specified, all data is written to this topic.

    Note

    If this parameter is not specified, each data record is written to a topic that corresponds to its TableID string. The TableID is generated by joining the database name and table name with a period (.), for example, databaseName.tableName.

    partition.strategy

    The strategy for writing data to Kafka partitions.

    No

    String

    all-to-zero

    Valid values:

    • all-to-zero (default): Writes all data to partition 0.

    • hash-by-key: Writes data to partitions based on the hash value of the primary key. This ensures that data records with the same primary key are written to the same partition and that their order is preserved.

    sink.tableId-to-topic.mapping

    The mapping from upstream table names to downstream Kafka topic names. 

    No

    String

    None

    Use a semicolon (;) to separate multiple mappings. In each mapping, use a colon (:) to separate the upstream table name from the downstream topic name. You can use regular expressions for table names. To map multiple tables to the same topic, separate the table names with a comma (,). For example: mydb.mytable1:topic1;mydb.mytable2:topic2.

    Note

    This parameter lets you change the mapped topic while retaining the original table name information.

    • Sink table Debezium JSON format

      Parameter

      Required

      Data type

      Default value

      Description

      debezium-json.include-schema.enabled

      No

      Boolean

      false

      Specifies whether the Debezium JSON data includes schema information.

Examples

  • Use Kafka as a data integration source:

    source:
      type: kafka
      name: Kafka source
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      topic: ${kafka.topic}
      value.format: ${value.format}
      scan.startup.mode: ${scan.startup.mode}
     
    sink:
      type: hologres
      name: Hologres sink
      endpoint: <yourEndpoint>
      dbname: <yourDbname>
      username: ${secret_values.ak_id}
      password: ${secret_values.ak_secret}
      sink.type-normalize-strategy: BROADEN
  • Use Kafka as a data integration target:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${kafka.topic}

    Here, the `route` module is used to set the topic name for writing from the source table to Kafka.

Note

ApsaraMQ for Kafka does not enable automatic topic creation by default. For more information, see Issues related to automatic topic creation. When writing to ApsaraMQ for Kafka, you need to create the corresponding topic in advance. For more information, see Step 3: Create resources.

Table schema parsing and change synchronization strategy

  • Partition message pre-consumption and table schema initialization

    The Kafka connector maintains the schema of all currently known tables. Before reading Kafka data, the Kafka connector attempts to pre-consume up to scan.max.pre.fetch.records messages in each partition. It parses the schema of each data record and then merges these schemas to initialize the table schema information. Subsequently, before consuming the data, the connector generates corresponding table creation events based on the initialized schema.

    Note

    For Debezium JSON and Canal JSON formats, the table information is obtained from specific messages. The scan.max.pre.fetch.records messages that are pre-consumed may contain data from several tables. Therefore, the number of pre-consumed data records for each table cannot be determined. Pre-consumption and table schema initialization are performed only once before consuming and processing messages from each partition. If new table data appears later, the table schema parsed from the first data record of that table is used as the initial schema. The schema for that table will not be re-initialized through pre-consumption.

    Important

    Only VVR 8.0.11 and later support distributing data from a single table across multiple partitions. For this scenario, you need to set the configuration item debezium-json.distributed-tables or canal-json.distributed-tables to true.

  • Table information

    • For Canal JSON and Debezium JSON formats, table information, including the database and table name, is parsed from the specific message.

    • For JSON format, the table information only includes the table name, which is the name of the topic where the data resides.

  • Primary key information

    • For Canal JSON format, the table's primary key is defined based on the `pkNames` field in the JSON.

    • For Debezium JSON and JSON formats, the JSON does not contain primary key information. You can manually add a primary key to the table using a transform rule:

      transform:
        - source-table: \.*.\.*
          projection: \*
          primary-keys: key1, key2
  • Schema parsing and schema change

    After the table schema is initialized, if schema.inference.strategy is set to `static`, the Kafka connector parses the value of each message based on the initial table schema and does not generate schema change events. If schema.inference.strategy is set to `continuous`, the Kafka connector parses the body of each Kafka message to extract the physical columns and compares them with the currently maintained schema. If the parsed schema is inconsistent with the current schema, it attempts to merge the schemas and generates a corresponding table schema change event. The merge rules are as follows:

    • If the parsed physical columns contain fields that are not in the current schema, these fields are added to the schema, and an event for adding a nullable column is generated.

    • If the parsed physical columns do not contain fields that are already in the current schema, the field is retained, and its data is filled with NULL. No event for deleting a column is generated.

    • If there is a column with the same name in both, it is handled according to the following scenarios:

      • If the types are the same but the precision is different, the type with the larger precision is used, and a column type change event is generated.

      • If the types are different, the system finds the lowest common parent node in the tree structure shown in the following figure to use as the type for the column with the same name, and a column type change event is generated.

        image

  • The currently supported schema change strategies are as follows:

    • Add column: Adds the corresponding column to the end of the current schema and synchronizes the data of the new column. The new column is set to be nullable.

    • Delete column: Does not generate a delete column event. Instead, the data for that column is automatically filled with NULL values.

    • Rename column: This is treated as adding and deleting a column. The renamed column is added to the end of the current schema, and the data of the column before renaming is filled with NULL values.

    • Column type change:

      • For downstream systems that support column type changes, after the downstream sink supports handling column type changes, the data integration job supports changing the type of regular columns, for example, from INT to BIGINT. Such changes depend on the column type change rules supported by the downstream sink. Different sink tables support different column type change rules. For more information, see the documentation for the specific sink table.

      • For downstream systems that do not support column type changes, such as Hologres, you can use broad type mapping. This means creating a table with broader types in the downstream system when the job starts. When a column type change occurs, the system determines whether the downstream sink can accept the change, thus providing tolerant support for column type changes.

  • Schema changes that are not currently supported:

    • Changes to constraints such as primary keys or indexes.

    • Changing from NOT NULL to NULLABLE.

  • Schema parsing for Canal JSON

    Canal JSON data may contain an optional `sqlType` field, which records precise type information for data columns. To retrieve a more accurate schema, you can use the types in `sqlType` by setting canal-json.infer-schema.strategy to SQL_TYPE. The type mapping relationships are as follows:

    JDBC type

    Type Code

    CDC type

    BIT

    -7

    BOOLEAN

    BOOLEAN

    16

    TINYINT

    -6

    TINYINT

    SMALLINT

    -5

    SMALLINT

    INTEGER

    4

    INT

    BIGINT

    -5

    BIGINT

    DECIMAL

    3

    DECIMAL(38,18)

    NUMERIC

    2

    REAL

    7

    FLOAT

    FLOAT

    6

    DOUBLE

    8

    DOUBLE

    BINARY

    -2

    BYTES

    VARBINARY

    -3

    LONGVARBINARY

    -4

    BLOB

    2004

    DATE

    91

    DATE

    TIME

    92

    TIME

    TIMESTAMP

    93

    TIMESTAMP

    CHAR

    1

    STRING

    VARCHAR

    12

    LONGVARCHAR

    -1

    Other types

Table name to topic mapping strategy

When using Kafka as the target for a data integration job, you need to configure the table name to topic mapping strategy carefully. This is because the written Kafka message format (Debezium JSON or Canal JSON) also contains table name information, and subsequent consumption of Kafka messages often uses the table name information in the data as the actual table name (rather than the topic name).

Suppose you need to synchronize two tables, `mydb.mytable1` and `mydb.mytable2`, from MySQL. The possible configuration strategies are as follows:

1. Do not configure any mapping strategy

Without any mapping strategy, each table is written to a topic named in the format "database_name.table_name". Therefore, data from `mydb.mytable1` is written to a topic named `mydb.mytable1`, and data from `mydb.mytable2` is written to a topic named `mydb.mytable2`. The following code provides a configuration example:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

2. Configure route rules for mapping (not recommended)

In many scenarios, users do not want the written topic to be in the "database_name.table_name" format. They want to write data to a specified topic. Therefore, they configure route rules for mapping. The following code provides a configuration example:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  
 route:
  - source-table: mydb.mytable1,mydb.mytable2
    sink-table: mytable1

In this case, all data from `mydb.mytable1` and `mydb.mytable2` is written to the `mytable1` topic.

However, when you use route rules to change the written topic name, it also modifies the table name information in the Kafka message (Debezium JSON or Canal JSON format). In this case, all table names in the Kafka messages become `mytable1`. This may lead to unexpected behavior when other systems consume Kafka messages from this topic.

3. Configure the sink.tableId-to-topic.mapping parameter for mapping (recommended)

To configure the table name to topic mapping rule while preserving the source table name information, use the `sink.tableId-to-topic.mapping` parameter. The following code provides a configuration example:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

or

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

In this case, all data from `mydb.mytable1` and `mydb.mytable2` is written to the `mytable1` topic. The table name information in the Kafka message (Debezium JSON or Canal JSON format) remains `mydb.mytable1` or `mydb.mytable2`. When other systems consume Kafka messages from this topic, they can correctly obtain the source table name information.

Considerations for EXACTLY_ONCE semantics

  • Configure the consumer isolation level

    All applications that consume Kafka data must set `isolation.level`:

    • read_committed: Reads only committed data.

    • read_uncommitted (default): Can read uncommitted data.

    EXACTLY_ONCE depends on read_committed. Otherwise, consumers might see uncommitted data, which breaks consistency.

  • Transaction timeout and data loss

    When Flink recovers from a checkpoint, it only relies on transactions that were committed before that checkpoint began. If the time between a job crash and its restart exceeds the Kafka transaction timeout, Kafka automatically aborts the transaction, leading to data loss.

    • The Kafka broker's default transaction.max.timeout.ms is 15 minutes.

    • The Flink Kafka sink's default transaction.timeout.ms is 1 hour.

    • You must increase transaction.max.timeout.ms on the broker side to be no less than Flink's setting.

  • Producer pool and concurrent checkpoints

    EXACTLY_ONCE mode uses a fixed-size pool of Kafka producers. Each checkpoint occupies one producer from the pool. If the number of concurrent checkpoints exceeds the pool size, the job will fail.

    Adjust the producer pool size based on the maximum number of concurrent checkpoints.

  • Limitations on scaling in the degree of parallelism

    If a job fails before the first checkpoint, the original producer pool information is not retained upon restart. Therefore, do not reduce the job's degree of parallelism before the first checkpoint is completed. If you must scale in, the degree of parallelism must not be lower than FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.

  • Transaction blocking reads

    In read_committed mode, any open transaction (neither committed nor aborted) blocks reading from the entire topic.

    For example:

    • Transaction 1 writes data.

    • Transaction 2 writes and commits data.

    • As long as Transaction 1 is not finished, the data from Transaction 2 is not visible to consumers.

    Therefore:

    • During normal operation, the data visibility latency is approximately equal to the checkpoint interval.

    • When a job fails, the topic being written to will block consumers until the job restarts or the transaction times out. In extreme cases, a transaction timeout can even affect reads.

FAQ