This topic describes how to use the ApsaraMQ for Kafka connector.
Background information
Apache Kafka is an open source, distributed message queue system. It is widely used in big data fields such as high-performance data processing, stream analytics, and data integration. The Kafka connector is based on the open source Apache Kafka client. It provides high data throughput, supports reading and writing multiple data formats, and offers exactly-once semantics for Realtime Compute for Apache Flink.
Category | Details |
Supported type | Source table, sink table, and data integration target |
Runtime mode | Streaming mode |
Data format | |
Specific monitoring metrics | |
API type | SQL, DataStream, and data integration YAML |
Update or delete data in sink tables | The connector does not support updating or deleting data in sink tables. It only supports inserting data. Note For features related to updating and deleting data, see Upsert Kafka. |
Prerequisites
You can connect to a cluster in one of the following ways:
Connect to an Alibaba Cloud ApsaraMQ for Kafka cluster
The Kafka cluster is version 0.11 or later.
You have created an ApsaraMQ for Kafka cluster. For more information, see Create resources.
The Flink workspace and the Kafka cluster are in the same VPC, and the ApsaraMQ for Kafka cluster has Flink added to its whitelist. For more information, see Configure a whitelist.
ImportantLimitations for writing data to ApsaraMQ for Kafka:
ApsaraMQ for Kafka does not support writing data in the zstd compression format.
ApsaraMQ for Kafka does not support idempotent or transactional writes. Therefore, you cannot use the exactly-once semantics feature of the Kafka sink table. If you use Ververica Runtime (VVR) 8.0.0 or later, you must add the
properties.enable.idempotence=falseconfiguration item to the sink table to disable idempotent writes. For a comparison of storage engines and feature limitations of ApsaraMQ for Kafka, see Comparison of storage engines.
Connect to a self-managed Apache Kafka cluster
The self-managed Apache Kafka cluster is version 0.11 or later.
You have established network connectivity between Flink and the self-managed Apache Kafka cluster. For information about how to connect to a self-managed cluster over the public network, see Select a network connection type.
Only client configuration items for Apache Kafka 2.8 are supported. For more information, see the Apache Kafka documentation for consumer and producer configurations.
Precautions
Currently, using transactional writes is not recommended due to design bugs in Flink and Kafka. When you set sink.delivery-guarantee = exactly-once, the Kafka connector enables transactional writes, and there are three known issues:
Each checkpoint generates a transaction ID. If the checkpoint interval is too short, too many transaction IDs are generated. The coordinator of the Kafka cluster may run out of memory, which can compromise the stability of the Kafka cluster.
Each transaction creates a producer instance. If too many transactions are committed at the same time, the TaskManager may run out of memory, which can compromise the stability of the Flink job.
If multiple Flink jobs use the same
sink.transactional-id-prefix, the transaction IDs they generate may conflict. If one job fails to write, it blocks the Log Start Offset (LSO) of the Kafka partition from advancing. This affects all consumers that read data from that partition.
If you require exactly-once semantics, use Upsert Kafka to write to a primary key table and ensure idempotence with the primary key. If you must use transactional writes, see Considerations for EXACTLY_ONCE semantics.
Troubleshoot network connectivity
If a Flink job reports the error Timed out waiting for a node assignment during startup, the cause is usually a network connectivity issue between Flink and Kafka.
A Kafka client connects to a server as follows:
The client uses an address in
bootstrap.serversto connect to Kafka.Kafka returns the metadata of each broker in the cluster, including their connection addresses.
The client then uses these returned addresses to connect to each broker for read and write operations.
Even if the bootstrap.servers address is accessible, the client cannot read or write data if Kafka returns incorrect broker addresses. This issue often occurs in network architectures that use proxies, port forwarding, or leased lines.
Troubleshooting steps
ApsaraMQ for Kafka
Confirm the endpoint type
Default endpoint (internal network)
SASL endpoint (internal network + authentication)
Public network endpoint (requires a separate request)
You can use the Flink development console to perform network diagnostics and rule out connectivity issues with the
bootstrap.serversaddress.Check security groups and whitelists
The Kafka instance must add the CIDR block of the VPC where Flink resides to its whitelist. For more information, see View VPC CIDR blocks and Configure a whitelist.
Check the SASL configuration (if enabled)
If you use a SASL_SSL endpoint, you must correctly configure the JAAS, SSL, and SASL mechanisms in your Flink job. Missing authentication can cause the connection to fail during the handshake phase, which can also manifest as a timeout. For more information, see Security and authentication.
Self-managed Kafka on ECS
Use the Flink development console to perform network diagnostics.
Rule out connectivity issues with the
bootstrap.serversaddress and confirm the correctness of internal and public network endpoints.Check security groups and whitelists
The ECS security group must allow traffic on the Kafka endpoint port (usually 9092 or 9093).
The ECS instance must add the CIDR block of the VPC where Flink resides to its whitelist. For more information, see View VPC CIDR blocks.
Check the configuration
Log on to the ZooKeeper cluster used by Kafka. You can use the zkCli.sh or zookeeper-shell.sh tool.
Run a command to retrieve broker metadata. For example:
get /brokers/ids/0. In the returned result, find the address that Kafka advertises to clients in the endpoints field.
Use the Flink development console to perform network diagnostics to test if the address is reachable.
NoteIf the address is not reachable, contact the Kafka O&M engineer to check and correct the
listenersandadvertised.listenersconfigurations. Ensure the returned address is accessible to Flink.For more information about how Kafka clients connect to servers, see Troubleshoot Connectivity.
Check the SASL configuration (if enabled)
If you use a SASL_SSL endpoint, you must correctly configure the JAAS, SSL, and SASL mechanisms in your Flink job. Missing authentication can cause the connection to fail during the handshake phase, which can also manifest as a timeout. For more information, see Security and authentication.
SQL
The Kafka connector can be used in SQL jobs as a source table or a sink table.
Syntax
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)Metadata columns
You can define metadata columns in source and sink tables to access or write the metadata of Kafka messages. For example, if you define multiple topics in the WITH parameters and define a metadata column in the Kafka source table, the data that Flink reads is marked with its source topic. The following example shows how to use metadata columns.
CREATE TABLE kafka_source (
-- Read the topic of the message as the `record_topic` field.
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
-- Read the timestamp from the ConsumerRecord as the `ts` field.
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
-- Read the offset of the message as the `record_offset` field.
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
-- Write the timestamp from the `ts` field as the timestamp of the ProducerRecord to Kafka.
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);The following table lists the supported metadata columns for Kafka source and sink tables.
Key | Data type | Description | Source or sink table |
topic | STRING NOT NULL METADATA VIRTUAL | The name of the topic that contains the Kafka message. | Source table |
partition | INT NOT NULL METADATA VIRTUAL | The ID of the partition that contains the Kafka message. | Source table |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | The headers of the Kafka message. | Source and sink tables |
leader-epoch | INT NOT NULL METADATA VIRTUAL | The leader epoch of the Kafka message. | Source table |
offset | BIGINT NOT NULL METADATA VIRTUAL | The offset of the Kafka message. | Source table |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | The timestamp of the Kafka message. | Source and sink tables |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | The timestamp type of the Kafka message:
| Source table |
__raw_key__ | STRING NOT NULL METADATA VIRTUAL | The key field of the raw Kafka message. | Source and sink tables |
__raw_value__ | STRING NOT NULL METADATA VIRTUAL | The value field of the raw Kafka message. | Source and sink tables |
WITH parameters
General
Parameter
Description
Data type
Required
Default value
Remarks
connector
The table type.
String
Yes
None
The value is fixed to Kafka.
properties.bootstrap.servers
The Kafka broker addresses.
String
Yes
None
The format is host:port,host:port,host:port. Separate addresses with commas (,).
properties.*
Direct configurations for the Kafka client.
String
No
None
The suffix must be a configuration defined in the official Kafka documentation for producers and consumers.
Flink removes the properties. prefix and passes the remaining configuration to the Kafka client. For example, you can use
'properties.allow.auto.create.topics'='false'to disable automatic topic creation.Do not use this method to modify the following configurations, because they are overwritten by the Kafka connector:
key.deserializer
value.deserializer
format
The format used to read or write the value part of a Kafka message.
String
No
None
Supported formats:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
NoteFor more information about format parameter settings, see Format Parameters.
key.format
The format used to read or write the key part of a Kafka message.
String
No
None
Supported formats:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
NoteIf you use this configuration, the key.options configuration is required.
key.fields
The fields in the source or sink table that correspond to the key part of the Kafka message.
String
No
None
Separate multiple field names with semicolons (;). For example:
field1;field2key.fields-prefix
Specifies a custom prefix for all key fields of Kafka messages to avoid name conflicts with the fields in the value part of the message.
String
No
None
This configuration item is only used to distinguish column names in source and sink tables. The prefix is removed when parsing and generating the key part of Kafka messages.
NoteIf you use this configuration, you must set
value.fields-includeto EXCEPT_KEY.value.format
The format used to read or write the value part of a Kafka message.
String
No
None
This configuration is equivalent to
format. You can set only one offormatorvalue.format. If both are configured,value.formatoverwritesformat.value.fields-include
Specifies whether to include the fields corresponding to the key part of the message when parsing or generating the value part of the Kafka message.
String
No
ALL
Valid values:
ALL(default): All columns are processed as the value part of the Kafka message.EXCEPT_KEY: The remaining fields, excluding those defined in key.fields, are processed as the value part of the Kafka message.
Source table
Parameter
Description
Data type
Required
Default value
Remarks
topic
The name of the topic to read from.
String
No
None
Separate multiple topic names with semicolons (;), for example, topic-1;topic-2.
NoteYou can specify only one of the topic and topic-pattern options.
topic-pattern
A regular expression that matches the names of topics to read from. All topics that match this regular expression are read when the job is running.
String
No
None
NoteYou can specify only one of the topic and topic-pattern options.
properties.group.id
The consumer group ID.
String
No
KafkaSource-{source_table_name}
If the specified group ID is used for the first time, you must set properties.auto.offset.reset to earliest or latest to specify the initial start offset.
scan.startup.mode
The start offset for reading data from Kafka.
String
No
group-offsets
Valid values:
earliest-offset: Starts reading from the earliest partition in Kafka.latest-offset: Starts reading from the latest offset in Kafka.group-offsets(default): Starts reading from the committed offset of the specified properties.group.id.timestamp: Starts reading from the timestamp specified by scan.startup.timestamp-millis.specific-offsets: Starts reading from the offset specified by scan.startup.specific-offsets.
NoteThis parameter takes effect when the job starts without a state. When a job restarts from a checkpoint or recovers from a state, it preferentially uses the progress saved in the state to resume reading.
scan.startup.specific-offsets
In specific-offsets startup mode, specifies the start offset for each partition.
String
No
None
Example:
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
In timestamp startup mode, specifies the start offset timestamp.
Long
No
None
The unit is milliseconds.
scan.topic-partition-discovery.interval
The interval for dynamically discovering Kafka topics and partitions.
Duration
No
5 minutes
The default partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. When dynamic partition discovery is enabled, the Kafka source can automatically discover new partitions and read data from them. In topic-pattern mode, it not only reads data from new partitions of existing topics but also reads data from all partitions of new topics that match the regular expression.
NoteIn Ververica Runtime (VVR) 6.0.x, dynamic partition discovery is disabled by default. Starting from VVR 8.0, this feature is enabled by default, with the discovery interval set to 5 minutes.
scan.header-filter
Filters data based on whether the Kafka data contains a specified header.
String
No
None
Separate the header key and value with a colon (:). Connect multiple header conditions with logical operators (&, |). The NOT logical operator (!) is also supported. For example,
depart:toy|depart:book&!env:testretains Kafka data whose header contains depart=toy or depart=book, and does not contain env=test.NoteThis parameter is supported only in Ververica Runtime (VVR) 8.0.6 and later.
Parentheses are not supported in logical operations.
Logical operations are performed from left to right.
The header value is converted to a string in UTF-8 format for comparison with the specified header value.
scan.check.duplicated.group.id
Specifies whether to check for duplicate consumer groups specified by
properties.group.id.Boolean
No
false
Valid values:
true: Before starting the job, the system checks for duplicate consumer groups. If a duplicate is found, the job reports an error and stops, preventing conflicts with existing consumer groups.
false: The job starts directly without checking for consumer group conflicts.
NoteThis parameter is supported only in VVR 6.0.4 and later.
Sink table
Parameter
Description
Data type
Required
Default value
Remarks
topic
The name of the topic to write to.
String
Yes
None
None
sink.partitioner
The mapping mode from Flink concurrency to Kafka partitions.
String
No
default
Valid values:
default: Uses the default Kafka partitioner.
fixed: Each Flink concurrency corresponds to a fixed Kafka partition.
round-robin: Data from Flink concurrencies is allocated to Kafka partitions in a round-robin manner.
Custom partitioner: If fixed and round-robin do not meet your needs, you can create a child class of FlinkKafkaPartitioner to define a custom partitioner. For example: org.mycompany.MyPartitioner
sink.delivery-guarantee
The delivery semantics for the Kafka sink table.
String
No
at-least-once
Valid values:
none: No guarantee. Data may be lost or duplicated.
at-least-once (default): Guarantees that no data is lost, but data may be duplicated.
exactly-once: Uses Kafka transactions to guarantee that data is not lost or duplicated.
NoteWhen using exactly-once semantics, the sink.transactional-id-prefix parameter is required.
sink.transactional-id-prefix
The prefix for Kafka transaction IDs used in exactly-once semantics.
String
No
None
This configuration takes effect only when sink.delivery-guarantee is set to exactly-once.
sink.parallelism
The degree of parallelism for the Kafka sink table operator.
Integer
No
None
The concurrency of the upstream operator is determined by the framework.
Security and authentication
If the Kafka cluster requires a secure connection or authentication, you can add the relevant security and authentication configurations to the WITH parameters with the properties. prefix. The following example shows how to configure a Kafka table to use PLAIN as the SASL mechanism and provide a JAAS configuration.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)The following example shows how to use SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/*SSL configuration*/
/*Configure the path to the truststore (CA certificate) provided by the server.*/
/*Files uploaded through File Management are stored in the /flink/usrlib/ path.*/
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/*If client authentication is required, configure the path to the keystore (private key).*/
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/*The algorithm for client to verify the server address. An empty value disables server address verification.*/
'properties.ssl.endpoint.identification.algorithm' = '',
/*SASL configuration*/
/*Set the SASL mechanism to SCRAM-SHA-256.*/
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/*Configure JAAS*/
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)You can upload the CA certificate and private key mentioned in the example to the platform using the File Management feature in the Realtime Compute console. After uploading, the files are stored in the /flink/usrlib directory. If the CA certificate file to be used is named my-truststore.jks, specify 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks' in the WITH parameters to use this certificate.
The preceding examples apply to most configuration scenarios. Before configuring the Kafka connector, contact the Kafka server O&M engineer to obtain the correct security and authentication configurations.
Unlike open source Flink, the SQL editor of Realtime Compute for Apache Flink automatically escapes double quotation marks ("). Therefore, you do not need to add extra escape characters (\) for double quotation marks in the username and password when configuring
properties.sasl.jaas.config.
Source table start offset
Startup mode
You can specify the initial read offset for a Kafka source table by configuring scan.startup.mode:
earliest-offset: Starts reading from the earliest offset of the current partition.
latest-offset: Starts reading from the latest offset of the current partition.
group-offsets: Starts reading from the committed offset of the specified group ID. The group ID is specified by properties.group.id.
timestamp: Starts reading from the first message whose timestamp is greater than or equal to the specified time. The timestamp is specified by scan.startup.timestamp-millis.
specific-offsets: Starts consuming from the specified partition offset. The offset is specified by scan.startup.specific-offsets.
If you do not specify a start offset, consumption starts from the committed offset (group-offsets) by default.
The scan.startup.mode parameter only takes effect for jobs that start without a state. When a stateful job starts, it begins consuming from the offset stored in its state.
The following code provides an example:
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- Start consuming from the earliest offset.
'scan.startup.mode' = 'earliest-offset',
-- Start consuming from the latest offset.
'scan.startup.mode' = 'latest-offset',
-- Start consuming from the committed offset of the consumer group "my-group".
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- If "my-group" is used for the first time, start consuming from the earliest offset.
'properties.auto.offset.reset' = 'latest', -- If "my-group" is used for the first time, start consuming from the latest offset.
-- Start consuming from the specified millisecond timestamp 1655395200000.
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- Start consuming from the specified offset.
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);Start offset priority
The priority of source table start offsets is as follows:
Priority from high to low | Offset stored in a checkpoint or savepoint |
Start time selected in the Realtime Compute console when starting the job | |
Start offset specified by scan.startup.mode in the WITH parameters | |
If scan.startup.mode is not specified, group-offsets is used, and the offset of the corresponding consumer group is used |
If an offset becomes invalid at any step (for example, due to expiration or a Kafka cluster issue), the system uses the policy set in properties.auto.offset.reset to reset the offset. If this parameter is not configured, an exception occurs and requires manual intervention.
A common scenario is starting consumption with a new group ID. First, the source table queries the Kafka cluster for the committed offset of that group. Because this is the first time the group ID is used, no valid offset is found. Therefore, the offset is reset according to the policy configured in the properties.auto.offset.reset parameter. When consuming with a new group ID, you must configure properties.auto.offset.reset to specify the offset reset policy.
Source table offset commit
The Kafka source table commits the current consumer offset to the Kafka cluster only after a checkpoint is successful. If the checkpoint interval is long, the observed consumer offset in the Kafka cluster will lag. During a checkpoint, the Kafka source table stores the current read progress in its state and does not rely on the offset committed to the cluster for fault recovery. Committing the offset is only for monitoring the read progress on the Kafka side. A failed offset commit does not affect data correctness.
Custom partitioner for sink tables
If the built-in Kafka producer partitioner does not meet your needs, you can implement a custom partitioner to write data to the corresponding partitions. A custom partitioner must inherit from FlinkKafkaPartitioner. After development, compile the JAR package and upload it to the Realtime Compute console using the File Management feature. After uploading and referencing the file, set the sink.partitioner parameter in the WITH clause to the full class path of the partitioner, such as org.mycompany.MyPartitioner.
Choosing between Kafka, Upsert Kafka, and Kafka JSON catalog
Kafka is an append-only message queue system that does not support data updates or deletions. Therefore, it cannot handle upstream Change Data Capture (CDC) data or retraction logic from operators such as aggregations and joins in streaming SQL. To write data with changes or retractions to Kafka, use the Upsert Kafka sink table, which is specially designed to handle change data.
To conveniently synchronize change data from one or more tables in an upstream database to Kafka in batches, you can use a Kafka JSON catalog. If the data stored in Kafka is in JSON format, using a Kafka JSON catalog eliminates the need to define schemas and WITH parameters. For more information, see Manage a Kafka JSON catalog.
Examples
Example 1: Read data from Kafka and write it to Kafka
Read Kafka data from a topic named `source` and write it to a topic named `sink`. The data is in CSV format.
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;Example 2: Synchronize table schema and data
Synchronize messages from a Kafka topic to Hologres in real time. In this case, you can use the offset and partition ID of the Kafka message as the primary key to ensure that there are no duplicate messages in Hologres during failover.
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
-- Optional. Flattens all nested columns.
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Example 3: Synchronize table schema and Kafka message key and value data
If the key part of a Kafka message already stores relevant information, you can synchronize both the key and value from Kafka.
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;The key part of a Kafka message does not support table schema changes or type parsing. You must declare them manually.
Example 4: Synchronize table schema and data and perform computations
When synchronizing Kafka data to Hologres, you often need to perform lightweight computations.
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- Use COALESCE to handle null values.Example 5: Parse nested JSON
JSON message example
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "Alibaba Cloud",
"engine": "Flink"
}
}To avoid using functions such as JSON_VALUE(payload, '$.properties.owner') to parse fields later, you can define the structure directly in the Source DDL:
CREATE TEMPORARY TABLE kafka_source (
id VARCHAR,
`name` VARCHAR,
properties ROW<`owner` STRING, engine STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);This way, Flink parses the JSON into structured fields at the read stage. Subsequent SQL queries can directly use properties.owner without additional function calls, resulting in better overall performance.
Datastream API
To read and write data using the DataStream API, you must use the corresponding DataStream connector to connect to Realtime Compute for Apache Flink. For information about how to set up a DataStream connector, see Use a DataStream connector.
Build a Kafka source
The Kafka source provides a builder class to create a KafkaSource instance. The following code example shows how to build a Kafka source to consume data from the earliest offset of `input-topic`. The consumer group is named `my-group`, and the Kafka message body is deserialized into a string.
Java
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");When building a KafkaSource, you must specify the following parameters.
Parameter
Description
BootstrapServers
The Kafka broker addresses. Configure this using the setBootstrapServers(String) method.
GroupId
The consumer group ID. Configure this using the setGroupId(String) method.
Topics or Partition
The names of the subscribed topics or partitions. The Kafka source provides the following three ways to subscribe to topics or partitions:
Topic list: Subscribes to all partitions in the topic list.
KafkaSource.builder().setTopics("topic-a","topic-b")Regular expression matching: Subscribes to all partitions of topics that match the regular expression.
KafkaSource.builder().setTopicPattern("topic.*")Partition list: Subscribes to the specified partitions.
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a" new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b" KafkaSource.builder().setPartitions(partitionSet)
Deserializer
The deserializer for parsing Kafka messages.
Specify the deserializer using setDeserializer(KafkaRecordDeserializationSchema), where KafkaRecordDeserializationSchema defines how to parse a Kafka ConsumerRecord. If you only need to parse the data in the message body (value) of a Kafka message, you can do so in one of the following ways:
Use the setValueOnlyDeserializer(DeserializationSchema) method in the KafkaSource builder class provided by Flink. DeserializationSchema defines how to parse the binary data in the Kafka message body.
Use a parser provided by Kafka, which includes multiple implementation classes. For example, you can use StringDeserializer to parse the Kafka message body into a string.
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
NoteTo fully parse the ConsumerRecord, you must implement the KafkaRecordDeserializationSchema interface yourself.
XML
The Kafka DataStream connector is available in the Maven Central Repository.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr-version}</version> </dependency>When using the Kafka DataStream connector, you need to understand the following Kafka properties:
Start consumer offset
The Kafka source can specify the starting offset for consumption through an offset initializer (OffsetsInitializer). The built-in offset initializers include the following.
Offset initializer
Code setting
Start consuming from the earliest offset.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())Start consuming from the latest offset.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())Start consuming from data with a timestamp greater than or equal to the specified time, in milliseconds.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))Start consuming from the offset committed by the consumer group. If the committed offset does not exist, use the earliest offset.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))Start consuming from the offset committed by the consumer group, without specifying an offset reset strategy.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())NoteIf the built-in initializers do not meet your needs, you can implement a custom offset initializer.
If no offset initializer is specified, OffsetsInitializer.earliest() (the earliest offset) is used by default.
Streaming and batch modes
The Kafka source supports both streaming and batch runtime modes. By default, the Kafka source is set to run in streaming mode, so the job never stops until the Flink job fails or is canceled. To configure the Kafka source to run in batch mode, you can use setBounded(OffsetsInitializer) to specify a stop offset. When all partitions reach their stop offset, the Kafka source exits.
NoteTypically, there is no stop offset in streaming mode. To facilitate code debugging, you can use setUnbounded(OffsetsInitializer) to specify a stop offset in streaming mode. Note that the method names for specifying stop offsets in streaming and batch modes (setUnbounded and setBounded) are different.
Dynamic partition discovery
To handle scenarios such as topic scaling or new topic creation without restarting the Flink job, you can enable the dynamic partition discovery feature in the provided topic or partition subscription mode.
NoteThe dynamic partition discovery feature is enabled by default, and the partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. The following code provides an example.
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // Check for new partitions every 10 seconds.ImportantThe dynamic partition discovery feature relies on the metadata update mechanism of the Kafka cluster. If the Kafka cluster does not update partition information promptly, new partitions may not be discovered. Ensure that the partition.discovery.interval.ms configuration of the Kafka cluster matches the actual situation.
Event time and watermarks
By default, the Kafka source uses the timestamp in the Kafka message as the event time. You can define a custom watermark strategy to extract the event time from the message and send watermarks downstream.
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")For more information about custom watermark strategies, see Generating Watermarks.
NoteIf some tasks of a parallel source are idle for a long time (for example, if a Kafka partition has no data input for a long time, or if the source concurrency exceeds the number of Kafka partitions), the watermark generation mechanism may fail. In this case, the system cannot trigger window calculations normally, which causes the data processing flow to stall.
To resolve this issue, you can make the following adjustments:
Configure a watermark timeout mechanism: Enable the table.exec.source.idle-timeout parameter to force the system to generate a watermark after a specified timeout period. This ensures that window calculation cycles progress.
Optimize the data source: We recommend maintaining a reasonable ratio of Kafka partitions to source concurrency (recommended: number of partitions ≥ source degree of parallelism).
Consumer offset commit
The Kafka source commits the current consumer offset when a checkpoint is completed. This ensures that the Flink checkpoint state is consistent with the committed offset on the Kafka broker. If checkpointing is not enabled, the Kafka source relies on the internal automatic offset commit logic of the Kafka consumer. The automatic commit feature is configured by the enable.auto.commit and auto.commit.interval.ms Kafka consumer configuration items.
NoteThe Kafka source does not rely on the offset committed on the broker to recover a failed job. Committing the offset is only for reporting the consumption progress of the Kafka consumer and the consumer group for monitoring on the broker side.
Other properties
In addition to the properties mentioned above, you can use setProperties(Properties) and setProperty(String, String) to set any properties for the Kafka source and Kafka consumer. The KafkaSource typically has the following configuration items.
Configuration item
Description
client.id.prefix
Specifies the client ID prefix for the Kafka consumer.
partition.discovery.interval.ms
Defines the interval at which the Kafka source checks for new partitions.
Notepartition.discovery.interval.ms is overwritten to -1 in batch mode.
register.consumer.metrics
Specifies whether to register the Kafka consumer's metrics in Flink.
Other Kafka consumer configurations
For more information about Kafka consumer configurations, see Apache Kafka.
ImportantThe Kafka connector forcibly overwrites some manually configured parameters as follows:
key.deserializer is always overwritten to ByteArrayDeserializer.
value.deserializer is always overwritten to ByteArrayDeserializer.
auto.offset.reset.strategy is overwritten to OffsetsInitializer#getAutoOffsetResetStrategy().
The following example shows how to configure a Kafka consumer to use PLAIN as the SASL mechanism and provide a JAAS configuration.
KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")Monitoring
The Kafka source registers metrics in Flink for monitoring and diagnostics.
Metric scope
All metrics of the Kafka source reader are registered under the KafkaSourceReader metric group, which is a subgroup of the operator metric group. Metrics related to a specific topic partition are registered in the KafkaSourceReader.topic.<topic_name>.partition.<partition_id> metric group.
For example, the current consumer offset (currentOffset) for partition 1 of the topic "my-topic" is registered under <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset. The number of successful offset commits (commitsSucceeded) is registered under <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded.
Metric list
Metric name
Description
Scope
currentOffset
The current consumer offset.
TopicPartition
committedOffset
The current committed offset.
TopicPartition
commitsSucceeded
The number of successful commits.
KafkaSourceReader
commitsFailed
The number of failed commits.
KafkaSourceReader
Kafka consumer metrics
The metrics of the Kafka consumer are registered in the KafkaSourceReader.KafkaConsumer metric group. For example, the Kafka consumer metric records-consumed-total is registered under <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.
You can use the register.consumer.metrics configuration item to specify whether to register the metrics of the Kafka consumer. By default, this option is set to true. For more information about Kafka consumer metrics, see Apache Kafka.
Build a Kafka sink
The Flink Kafka sink can write stream data to one or more Kafka topics.
DataStream<String> stream = ... Properties properties = new Properties(); properties.setProperty("bootstrap.servers", ); KafkaSink<String> kafkaSink = KafkaSink.<String>builder() .setKafkaProducerConfig(kafkaProperties) // producer config .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("my-topic") // target topic .setKafkaValueSerializer(StringSerializer.class) // serialization schema .build()) .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // fault-tolerance .build(); stream.sinkTo(kafkaSink);You need to configure the following parameters.
Parameter
Description
Topic
The default topic name to which data is written.
Data serialization
When building, you need to provide a
KafkaRecordSerializationSchemato convert the input data into a KafkaProducerRecord. Flink provides a schema builder to offer some common components, such as message key/value serialization, topic selection, and message partitioning. You can also implement the corresponding interfaces for more control. The ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) method is called for each data record that flows in to generate a ProducerRecord to be written to Kafka.You can have fine-grained control over how each data record is written to Kafka. With ProducerRecord, you can perform the following operations:
Set the name of the topic to write to.
Define the message key.
Specify the partition to which data is written.
Kafka client properties
bootstrap.servers is required. It is a comma-separated list of Kafka brokers.
Fault tolerance semantics
When Flink's checkpointing is enabled, the Flink Kafka sink can guarantee exactly-once semantics. In addition to enabling Flink's checkpointing, you can also specify different fault tolerance semantics through the DeliveryGuarantee parameter. The DeliveryGuarantee parameter details are as follows:
DeliveryGuarantee.NONE: (Default setting) Flink makes no guarantees. Data may be lost or duplicated.
DeliveryGuarantee.AT_LEAST_ONCE: Guarantees that no data is lost, but data may be duplicated.
DeliveryGuarantee.EXACTLY_ONCE: Uses Kafka transactions to provide exactly-once semantics.
NoteFor considerations when using EXACTLY_ONCE semantics, see Considerations for EXACTLY_ONCE semantics.
Data integration
The Kafka connector can be used in data integration YAML job development for reading from a source or writing to a target.
Limits
We recommend using Kafka as a synchronous data source for Flink Change Data Capture (CDC) data integration in Ververica Runtime (VVR) 11.1 and later.
Only JSON, Debezium JSON, and Canal JSON formats are supported. Other data formats are not supported at this time.
For data sources, only Ververica Runtime (VVR) 8.0.11 and later support distributing data from a single table across multiple partitions.
Syntax
source:
type: kafka
name: Kafka source
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092Configuration items
General
Parameter
Description
Required
Data type
Default value
Remarks
type
The source or target type.
Yes
String
None
Set this parameter to kafka.
name
The source or target name.
No
String
None
None
properties.bootstrap.servers
The Kafka broker addresses.
Yes
String
None
The format is
host:port,host:port,host:port. Separate addresses with commas (,).properties.*
Direct configurations for the Kafka client.
No
String
None
The suffix must be a configuration defined in the official Kafka documentation for producers and consumers.
Flink removes the properties. prefix and passes the remaining configuration to the Kafka client. For example, you can use
'properties.allow.auto.create.topics' = 'false'to disable automatic topic creation.key.format
The format used to read or write the key part of a Kafka message.
No
String
None
For a source, only json is supported.
For a sink, valid values are:
csv
json
NoteThis parameter is supported only in Ververica Runtime (VVR) 11.0.0 and later.
value.format
The format used to read or write the value part of a Kafka message.
No
String
debezium-json
Valid values:
debezium-json
canal-json
json
NoteThe debezium-json and canal-json formats are supported only in Ververica Runtime (VVR) 8.0.10 and later.
The json format is supported only in Ververica Runtime (VVR) 11.0.0 and later.
Source table
Parameter
Description
Required
Data type
Default value
Remarks
topic
The name of the topic to read from.
No
String
None
Separate multiple topic names with semicolons (;), for example, topic-1;topic-2.
NoteYou can specify only one of the topic and topic-pattern options.
topic-pattern
A regular expression that matches the names of topics to read from. All topics that match this regular expression are read when the job is running.
No
String
None
NoteYou can specify only one of the topic and topic-pattern options.
properties.group.id
The consumer group ID.
No
String
None
If the specified group ID is used for the first time, you must set properties.auto.offset.reset to earliest or latest to specify the initial start offset.
scan.startup.mode
The start offset for reading data from Kafka.
No
String
group-offsets
Valid values:
earliest-offset: Starts reading from the earliest Kafka partition.
latest-offset: Starts reading from the latest Kafka offset.
group-offsets (default): Starts reading from the committed offset of the specified properties.group.id.
timestamp: Starts reading from the timestamp specified by scan.startup.timestamp-millis.
specific-offsets: Starts reading from the offset specified by scan.startup.specific-offsets.
NoteThis parameter takes effect when the job starts without a state. When a job restarts from a checkpoint or recovers from a state, it preferentially uses the progress saved in the state to resume reading.
scan.startup.specific-offsets
In specific-offsets startup mode, specifies the start offset for each partition.
No
String
None
Example:
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
In timestamp startup mode, specifies the start offset timestamp.
No
Long
None
The unit is milliseconds.
scan.topic-partition-discovery.interval
The interval for dynamically discovering Kafka topics and partitions.
No
Duration
5 minutes
The default partition check interval is 5 minutes. To disable this feature, you must explicitly set the partition check interval to a non-positive value. When dynamic partition discovery is enabled, the Kafka source can automatically discover new partitions and read data from them. In topic-pattern mode, it not only reads data from new partitions of existing topics but also reads data from all partitions of new topics that match the regular expression.
scan.check.duplicated.group.id
Specifies whether to check for duplicate consumer groups specified by
properties.group.id.No
Boolean
false
Valid values:
true: Before starting the job, the system checks for duplicate consumer groups. If a duplicate is found, the job reports an error, preventing conflicts with existing consumer groups.
false: The job starts directly without checking for consumer group conflicts.
schema.inference.strategy
The schema parsing strategy.
No
String
continuous
Valid values:
continuous: Parses the schema for each data record. If the preceding and succeeding schemas are incompatible, a wider schema is parsed, and a schema change event is generated.
static: Parses the schema only once when the job starts. Subsequent data is parsed based on the initial schema, and no schema change events are generated.
NoteFor more information about schema parsing, see Table schema parsing and change synchronization strategy.
This configuration item is supported only in VVR 8.0.11 and later.
scan.max.pre.fetch.records
The maximum number of messages to attempt to consume and parse for each partition during initial schema parsing.
No
Int
50
Before the job actually reads and processes data, it attempts to pre-consume a specified number of the latest messages for each partition to initialize the schema information.
key.fields-prefix
A custom prefix added to the field names parsed from the message key to avoid naming conflicts after parsing the Kafka message key.
No
String
None
Assume this configuration item is set to key_. If the key contains a field named `a`, the field name after parsing the key will be `key_a`.
NoteThe value of key.fields-prefix cannot be a prefix of value.fields-prefix.
value.fields-prefix
A custom prefix added to the field names parsed from the message value to avoid naming conflicts after parsing the Kafka message body.
No
String
None
Assume this configuration item is set to value_. If the value contains a field named `b`, the field name after parsing the value will be `value_b`.
NoteThe value of value.fields-prefix cannot be a prefix of key.fields-prefix.
metadata.list
The metadata columns to be passed downstream.
No
String
None
Available metadata columns include
topic,partition,offset,timestamp,timestamp-type,headers,leader-epoch,__raw_key__, and__raw_value__. Separate them with commas.Source table Debezium JSON format
Parameter
Required
Data type
Default value
Description
debezium-json.distributed-tables
No
Boolean
false
If data for a single table in Debezium JSON appears in multiple partitions, you need to enable this option.
NoteThis configuration item is supported only in VVR 8.0.11 and later.
ImportantAfter modifying this configuration item, you need to start the job without a state.
debezium-json.schema-include
No
Boolean
false
When setting up the Debezium Kafka Connect, you can enable the Kafka configuration value.converter.schemas.enable to include the schema in the message. This option indicates whether the Debezium JSON message includes the schema.
Valid values:
true: The Debezium JSON message includes the schema.
false: The Debezium JSON message does not include the schema.
debezium-json.ignore-parse-errors
No
Boolean
false
Valid values:
true: Skips the current row when a parsing error occurs.
false (default): Reports an error, and the job fails to start.
debezium-json.infer-schema.primitive-as-string
No
Boolean
false
Specifies whether to parse all types as String when parsing the table schema.
Valid values:
true: Parses all primitive data types as String.
false (default): Parses according to basic rules.
Source table Canal JSON format
Parameter
Required
Data type
Default value
Description
canal-json.distributed-tables
No
Boolean
false
If data for a single table in Canal JSON appears in multiple partitions, you need to enable this option.
NoteThis configuration item is supported only in VVR 8.0.11 and later.
ImportantAfter modifying this configuration item, you need to start the job without a state.
canal-json.database.include
No
String
None
An optional regular expression that matches the `database` metadata field in Canal records. It reads only the changelog records of the specified database. The regular expression string is compatible with Java's Pattern.
canal-json.table.include
No
String
None
An optional regular expression that matches the `table` metadata field in Canal records. It reads only the changelog records of the specified table. The regular expression string is compatible with Java's Pattern.
canal-json.ignore-parse-errors
No
Boolean
false
Valid values:
true: Skips the current row when a parsing error occurs.
false (default): Reports an error, and the job fails to start.
canal-json.infer-schema.primitive-as-string
No
Boolean
false
Specifies whether to parse all types as String when parsing the table schema.
Valid values:
true: Parses all primitive data types as String.
false (default): Parses according to basic rules.
canal-json.infer-schema.strategy
No
String
AUTO
The parsing strategy for the table schema.
Valid values:
AUTO (default): Automatically parses by analyzing the JSON data. If the data does not contain a `sqlType` field, we recommend using AUTO to avoid parsing failures.
SQL_TYPE: Parses using the `sqlType` array in the Canal JSON data. If the data contains a `sqlType` field, we recommend setting canal-json.infer-schema.strategy to SQL_TYPE to obtain more precise types.
MYSQL_TYPE: Parses using the `mysqlType` array in the Canal JSON data.
When the Canal JSON data in Kafka contains a `sqlType` field and a more precise type mapping is needed, we recommend setting canal-json.infer-schema.strategy to SQL_TYPE.
For `sqlType` mapping rules, see Schema parsing for Canal JSON.
NoteThis configuration is supported in VVR 11.1 and later.
MYSQL_TYPE is supported in VVR 11.3 and later.
canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled
No
Boolean
true
Specifies whether to map the MySQL `timestamp` type to the CDC `timestamp` type:
true (default): The MySQL `timestamp` type is mapped to the CDC `timestamp` type.
false: The MySQL `timestamp` type is mapped to the CDC `timestamp_ltz` type.
canal-json.mysql.treat-tinyint1-as-boolean.enabled
No
Boolean
true
When parsing with MYSQL_TYPE, specifies whether to map the MySQL `tinyint(1)` type to the CDC `boolean` type:
true (default): The MySQL `tinyint(1)` type is mapped to the CDC `boolean` type.
false: The MySQL `tinyint(1)` type is mapped to the CDC `tinyint(1)` type.
This configuration is effective only when canal-json.infer-schema.strategy is set to MYSQL_TYPE.
Source table JSON format
Parameter
Required
Data type
Default value
Description
json.timestamp-format.standard
No
String
SQL
Specifies the input and output timestamp format. Valid values:
SQL: Parses input timestamps in the format yyyy-MM-dd HH:mm:ss.s{precision}, for example, 2020-12-30 12:13:14.123.
ISO-8601: Parses input timestamps in the format yyyy-MM-ddTHH:mm:ss.s{precision}, for example, 2020-12-30T12:13:14.123.
json.ignore-parse-errors
No
Boolean
false
Valid values:
true: Skips the current row when a parsing error occurs.
false (default): Reports an error, and the job fails to start.
json.infer-schema.primitive-as-string
No
Boolean
false
Specifies whether to parse all types as String when parsing the table schema.
Valid values:
true: Parses all primitive data types as String.
false (default): Parses according to basic rules.
json.infer-schema.flatten-nested-columns.enable
No
Boolean
false
When parsing JSON data, specifies whether to recursively expand nested columns in the JSON. Valid values:
true: Recursively expands.
false (default): Treats nested columns as String.
json.decode.parser-table-id.fields
No
String
None
When parsing JSON data, specifies whether to use some JSON field values to generate the tableId. Connect multiple fields with a comma
,. For example, if the JSON data is{"col0":"a", "col1","b", "col2","c"}, the results are as follows:Configuration
tableId
col0
a
col0,col1
a.b
col0,col1,col2
a.b.c
Sink table
Parameter
Description
Required
Data type
Default value
Remarks
type
The type of the target.
Yes
String
None
Set this parameter to kafka.
name
The name of the target.
No
String
None
None
topic
The name of the Kafka topic.
No
String
None
If this parameter is specified, all data is written to this topic.
NoteIf this parameter is not specified, each data record is written to a topic that corresponds to its TableID string. The TableID is generated by joining the database name and table name with a period (
.), for example,databaseName.tableName.partition.strategy
The strategy for writing data to Kafka partitions.
No
String
all-to-zero
Valid values:
all-to-zero (default): Writes all data to partition 0.
hash-by-key: Writes data to partitions based on the hash value of the primary key. This ensures that data records with the same primary key are written to the same partition and that their order is preserved.
sink.tableId-to-topic.mapping
The mapping from upstream table names to downstream Kafka topic names.
No
String
None
Use a semicolon (
;) to separate multiple mappings. In each mapping, use a colon (:) to separate the upstream table name from the downstream topic name. You can use regular expressions for table names. To map multiple tables to the same topic, separate the table names with a comma (,). For example:mydb.mytable1:topic1;mydb.mytable2:topic2.NoteThis parameter lets you change the mapped topic while retaining the original table name information.
Sink table Debezium JSON format
Parameter
Required
Data type
Default value
Description
debezium-json.include-schema.enabled
No
Boolean
false
Specifies whether the Debezium JSON data includes schema information.
Examples
Use Kafka as a data integration source:
source: type: kafka name: Kafka source properties.bootstrap.servers: ${kafka.bootstraps.server} topic: ${kafka.topic} value.format: ${value.format} scan.startup.mode: ${scan.startup.mode} sink: type: hologres name: Hologres sink endpoint: <yourEndpoint> dbname: <yourDbname> username: ${secret_values.ak_id} password: ${secret_values.ak_secret} sink.type-normalize-strategy: BROADENUse Kafka as a data integration target:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} route: - source-table: ${mysql.source.table} sink-table: ${kafka.topic}Here, the `route` module is used to set the topic name for writing from the source table to Kafka.
ApsaraMQ for Kafka does not enable automatic topic creation by default. For more information, see Issues related to automatic topic creation. When writing to ApsaraMQ for Kafka, you need to create the corresponding topic in advance. For more information, see Step 3: Create resources.
Table schema parsing and change synchronization strategy
Partition message pre-consumption and table schema initialization
The Kafka connector maintains the schema of all currently known tables. Before reading Kafka data, the Kafka connector attempts to pre-consume up to scan.max.pre.fetch.records messages in each partition. It parses the schema of each data record and then merges these schemas to initialize the table schema information. Subsequently, before consuming the data, the connector generates corresponding table creation events based on the initialized schema.
NoteFor Debezium JSON and Canal JSON formats, the table information is obtained from specific messages. The scan.max.pre.fetch.records messages that are pre-consumed may contain data from several tables. Therefore, the number of pre-consumed data records for each table cannot be determined. Pre-consumption and table schema initialization are performed only once before consuming and processing messages from each partition. If new table data appears later, the table schema parsed from the first data record of that table is used as the initial schema. The schema for that table will not be re-initialized through pre-consumption.
ImportantOnly VVR 8.0.11 and later support distributing data from a single table across multiple partitions. For this scenario, you need to set the configuration item debezium-json.distributed-tables or canal-json.distributed-tables to true.
Table information
For Canal JSON and Debezium JSON formats, table information, including the database and table name, is parsed from the specific message.
For JSON format, the table information only includes the table name, which is the name of the topic where the data resides.
Primary key information
For Canal JSON format, the table's primary key is defined based on the `pkNames` field in the JSON.
For Debezium JSON and JSON formats, the JSON does not contain primary key information. You can manually add a primary key to the table using a transform rule:
transform: - source-table: \.*.\.* projection: \* primary-keys: key1, key2
Schema parsing and schema change
After the table schema is initialized, if schema.inference.strategy is set to `static`, the Kafka connector parses the value of each message based on the initial table schema and does not generate schema change events. If schema.inference.strategy is set to `continuous`, the Kafka connector parses the body of each Kafka message to extract the physical columns and compares them with the currently maintained schema. If the parsed schema is inconsistent with the current schema, it attempts to merge the schemas and generates a corresponding table schema change event. The merge rules are as follows:
If the parsed physical columns contain fields that are not in the current schema, these fields are added to the schema, and an event for adding a nullable column is generated.
If the parsed physical columns do not contain fields that are already in the current schema, the field is retained, and its data is filled with NULL. No event for deleting a column is generated.
If there is a column with the same name in both, it is handled according to the following scenarios:
If the types are the same but the precision is different, the type with the larger precision is used, and a column type change event is generated.
If the types are different, the system finds the lowest common parent node in the tree structure shown in the following figure to use as the type for the column with the same name, and a column type change event is generated.

The currently supported schema change strategies are as follows:
Add column: Adds the corresponding column to the end of the current schema and synchronizes the data of the new column. The new column is set to be nullable.
Delete column: Does not generate a delete column event. Instead, the data for that column is automatically filled with NULL values.
Rename column: This is treated as adding and deleting a column. The renamed column is added to the end of the current schema, and the data of the column before renaming is filled with NULL values.
Column type change:
For downstream systems that support column type changes, after the downstream sink supports handling column type changes, the data integration job supports changing the type of regular columns, for example, from INT to BIGINT. Such changes depend on the column type change rules supported by the downstream sink. Different sink tables support different column type change rules. For more information, see the documentation for the specific sink table.
For downstream systems that do not support column type changes, such as Hologres, you can use broad type mapping. This means creating a table with broader types in the downstream system when the job starts. When a column type change occurs, the system determines whether the downstream sink can accept the change, thus providing tolerant support for column type changes.
Schema changes that are not currently supported:
Changes to constraints such as primary keys or indexes.
Changing from NOT NULL to NULLABLE.
Schema parsing for Canal JSON
Canal JSON data may contain an optional `sqlType` field, which records precise type information for data columns. To retrieve a more accurate schema, you can use the types in `sqlType` by setting canal-json.infer-schema.strategy to SQL_TYPE. The type mapping relationships are as follows:
JDBC type
Type Code
CDC type
BIT
-7
BOOLEAN
BOOLEAN
16
TINYINT
-6
TINYINT
SMALLINT
-5
SMALLINT
INTEGER
4
INT
BIGINT
-5
BIGINT
DECIMAL
3
DECIMAL(38,18)
NUMERIC
2
REAL
7
FLOAT
FLOAT
6
DOUBLE
8
DOUBLE
BINARY
-2
BYTES
VARBINARY
-3
LONGVARBINARY
-4
BLOB
2004
DATE
91
DATE
TIME
92
TIME
TIMESTAMP
93
TIMESTAMP
CHAR
1
STRING
VARCHAR
12
LONGVARCHAR
-1
Other types
Table name to topic mapping strategy
When using Kafka as the target for a data integration job, you need to configure the table name to topic mapping strategy carefully. This is because the written Kafka message format (Debezium JSON or Canal JSON) also contains table name information, and subsequent consumption of Kafka messages often uses the table name information in the data as the actual table name (rather than the topic name).
Suppose you need to synchronize two tables, `mydb.mytable1` and `mydb.mytable2`, from MySQL. The possible configuration strategies are as follows:
1. Do not configure any mapping strategy
Without any mapping strategy, each table is written to a topic named in the format "database_name.table_name". Therefore, data from `mydb.mytable1` is written to a topic named `mydb.mytable1`, and data from `mydb.mytable2` is written to a topic named `mydb.mytable2`. The following code provides a configuration example:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}2. Configure route rules for mapping (not recommended)
In many scenarios, users do not want the written topic to be in the "database_name.table_name" format. They want to write data to a specified topic. Therefore, they configure route rules for mapping. The following code provides a configuration example:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
route:
- source-table: mydb.mytable1,mydb.mytable2
sink-table: mytable1In this case, all data from `mydb.mytable1` and `mydb.mytable2` is written to the `mytable1` topic.
However, when you use route rules to change the written topic name, it also modifies the table name information in the Kafka message (Debezium JSON or Canal JSON format). In this case, all table names in the Kafka messages become `mytable1`. This may lead to unexpected behavior when other systems consume Kafka messages from this topic.
3. Configure the sink.tableId-to-topic.mapping parameter for mapping (recommended)
To configure the table name to topic mapping rule while preserving the source table name information, use the `sink.tableId-to-topic.mapping` parameter. The following code provides a configuration example:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}or
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}In this case, all data from `mydb.mytable1` and `mydb.mytable2` is written to the `mytable1` topic. The table name information in the Kafka message (Debezium JSON or Canal JSON format) remains `mydb.mytable1` or `mydb.mytable2`. When other systems consume Kafka messages from this topic, they can correctly obtain the source table name information.
Considerations for EXACTLY_ONCE semantics
Configure the consumer isolation level
All applications that consume Kafka data must set `isolation.level`:
read_committed: Reads only committed data.read_uncommitted(default): Can read uncommitted data.
EXACTLY_ONCE depends on
read_committed. Otherwise, consumers might see uncommitted data, which breaks consistency.Transaction timeout and data loss
When Flink recovers from a checkpoint, it only relies on transactions that were committed before that checkpoint began. If the time between a job crash and its restart exceeds the Kafka transaction timeout, Kafka automatically aborts the transaction, leading to data loss.
The Kafka broker's default
transaction.max.timeout.msis 15 minutes.The Flink Kafka sink's default
transaction.timeout.msis 1 hour.You must increase
transaction.max.timeout.mson the broker side to be no less than Flink's setting.
Producer pool and concurrent checkpoints
EXACTLY_ONCE mode uses a fixed-size pool of Kafka producers. Each checkpoint occupies one producer from the pool. If the number of concurrent checkpoints exceeds the pool size, the job will fail.
Adjust the producer pool size based on the maximum number of concurrent checkpoints.
Limitations on scaling in the degree of parallelism
If a job fails before the first checkpoint, the original producer pool information is not retained upon restart. Therefore, do not reduce the job's degree of parallelism before the first checkpoint is completed. If you must scale in, the degree of parallelism must not be lower than
FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.Transaction blocking reads
In
read_committedmode, any open transaction (neither committed nor aborted) blocks reading from the entire topic.For example:
Transaction 1 writes data.
Transaction 2 writes and commits data.
As long as Transaction 1 is not finished, the data from Transaction 2 is not visible to consumers.
Therefore:
During normal operation, the data visibility latency is approximately equal to the checkpoint interval.
When a job fails, the topic being written to will block consumers until the job restarts or the transaction times out. In extreme cases, a transaction timeout can even affect reads.