Use the Kafka connector as a source, sink, or Flink CDC destination in Realtime Compute for Apache Flink.
Overview
Apache Kafka is an open-source, distributed event streaming platform widely used for high-performance data processing, streaming analytics, and data integration. The Kafka connector for Realtime Compute for Apache Flink uses the open-source Apache Kafka client to provide high-performance data throughput, support reading and writing multiple data formats, and offer exactly-once semantics.
Category | Description |
Supported types | SQL source, sink Flink CDC source, sink DataStream source, sink |
Execution mode | Streaming |
Data formats | |
Metrics | |
API types | SQL, DataStream, Flink CDC |
Sink update/delete | The connector only supports appending data to a sink table. Updates and deletions are not supported. Note For more information about how to update or delete data in a Sink Table, see Realtime Compute for Apache Flink:Upsert Kafka. |
Prerequisites
Before you begin, verify that you meet the prerequisites for your Kafka cluster type:
Connect to an ApsaraMQ for Kafka cluster
The Kafka cluster is version 0.11 or later.
You have created an ApsaraMQ for Kafka cluster. For more information, see Step 3: Create resources.
The Flink workspace and the Kafka cluster are in the same Virtual Private Cloud (VPC), and you have added the CIDR block of the Flink workspace to the ApsaraMQ for Kafka whitelist. For more information, see Configure whitelists.
ImportantLimitations on writing data to ApsaraMQ for Kafka:
ApsaraMQ for Kafka does not support the Zstandard (zstd) compression format for writes.
ApsaraMQ for Kafka does not support idempotent or transactional writes, which prevents you from using the exactly-once semantics provided by Kafka sink tables. Starting with Ververica Runtime (VVR) 8.0.0, the Kafka connector uses Kafka client 3.x, where the
properties.enable.idempotenceproperty defaults totrue. Therefore, to prevent write failures when using Ververica Runtime (VVR) 8.0.0 or later to write to ApsaraMQ for Kafka, you must add the configurationproperties.enable.idempotence=falseto your sink table definition. For a comparison of storage engines and feature limitations for ApsaraMQ for Kafka, see Comparison between storage engines.
Connect to a self-managed Apache Kafka cluster
The self-managed Apache Kafka cluster is version 0.11 or later.
The Flink workspace has network connectivity to the self-managed Apache Kafka cluster. For details about how to connect to a cluster over the public internet, see FAQ about network connectivity.
Only client configuration options for Apache Kafka version 2.8 are supported. For more information, see the Apache Kafka Consumer Configs and Producer Configs documentation.
Notes
Transactional writes are not recommended due to known design limitations in Apache Flink and Apache Kafka. When you set sink.delivery-guarantee = 'exactly-once', the Kafka connector enables transactional writes, with the following known issues:
Each checkpoint generates a new Transaction ID. If the checkpoint interval is too short, the resulting flood of Transaction IDs can cause the Kafka cluster coordinator to run out of memory, compromising cluster stability.
Each transaction creates a new Producer instance. If too many transactions commit concurrently, the TaskManager can run out of memory, destabilizing the Apache Flink job.
If multiple Apache Flink jobs use the same
sink.transactional-id-prefix, their generated Transaction IDs can conflict. When a write operation fails in one job, it can prevent the Log Start Offset (LSO) of an Apache Kafka partition from advancing. This affects all consumers of that partition.
If you require exactly-once semantics, use the Upsert Kafka connector to write to a primary key table, ensuring idempotence. If you must use transactional writes, see Exactly-once semantics usage notes.
Troubleshoot network connectivity
A Timed out waiting for a node assignment error when a Realtime Compute for Apache Flink job fails to start typically indicates a network connectivity issue between Realtime Compute for Apache Flink and the Kafka cluster.
A Kafka client connects to brokers as follows:
The client uses the addresses specified in
bootstrap.serversto establish an initial connection to the Kafka cluster.The Kafka cluster returns metadata for each broker, including their endpoints.
The client then uses these endpoints to connect to the brokers to read or write data.
Even if the bootstrap.servers addresses are reachable, the client cannot read or write data if Kafka returns incorrect broker endpoints. This issue often occurs in network architectures that use a proxy, port forwarding, or a leased line.
Troubleshooting steps
ApsaraMQ for Kafka
Confirm the Endpoint type
Default Endpoint (internal network)
SASL Endpoint (internal network with authentication)
Public Endpoint (requires a separate application)
Use the Network Probe feature in the Realtime Compute for Apache Flink development console to rule out connectivity issues with the
bootstrap.serversaddress.Check security groups and whitelists
Add the CIDR block of the Realtime Compute for Apache Flink workspace to the whitelist of your Kafka instance. For more information, see View VPC CIDR Block and Configure a whitelist.
Check the SASL configuration (if enabled)
If you use a SASL_SSL endpoint, ensure that the JAAS, SSL, and SASL mechanisms are configured correctly in your Realtime Compute for Apache Flink job. Missing authentication can cause the connection to fail during the handshake phase, which might also appear as a timeout. For more information, see Security and authentication.
Self-managed Kafka
Use the Network Probe feature
This feature helps you rule out connectivity issues with the
bootstrap.serversaddress and verify that the correct internal or public endpoint is used.Check security groups and whitelists
The security group for the Elastic Compute Service (ECS) instance must allow inbound traffic on the Kafka endpoint port, which is typically 9092 or 9093.
Ensure that any firewall on the ECS instance allows traffic from the VPC of your Realtime Compute for Apache Flink workspace. For more information, see View VPC CIDR Block.
Check the configuration
Use the zkCli.sh or zookeeper-shell.sh tool to log on to the ZooKeeper cluster that Kafka uses.
Run a command to obtain broker metadata. For example, run
get /brokers/ids/0. In the endpoints field of the response, find the address that Kafka advertises to clients.
Use the Network Probe feature in the Realtime Compute for Apache Flink development console to test whether this address is accessible.
NoteIf the address is not accessible, contact your Kafka administrators to check and correct the
listenersandadvertised.listenersconfigurations to ensure that the advertised address is accessible from Realtime Compute for Apache Flink.For more information on Kafka client connections, see Troubleshoot Connectivity.
Check the SASL configuration (if enabled)
If you use a SASL_SSL endpoint, ensure that the JAAS, SSL, and SASL mechanisms are configured correctly in your Realtime Compute for Apache Flink job. Missing authentication can cause the connection to fail during the handshake phase, which might also appear as a timeout. For more information, see Security and authentication.
SQL
Use the Kafka connector as a source table or a sink table in SQL jobs.
Syntax
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)Metadata columns
Define metadata columns in a source or sink table to access Kafka message metadata. For example, when you subscribe to multiple topics, a metadata column can identify which topic each record comes from.
CREATE TABLE kafka_source (
-- Read the message topic as the `record_topic` column
`record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
-- Read the timestamp from the ConsumerRecord as the `ts` column
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
-- Read the message offset as the `record_offset` column
`record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE kafka_sink (
-- Write the timestamp from the `ts` column as the ProducerRecord's timestamp to Kafka
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
...
) WITH (
'connector' = 'kafka',
...
);The following table lists the metadata columns that Kafka source and sink tables support.
Key | Type | Description | Scope |
topic | STRING NOT NULL METADATA VIRTUAL | Message topic. | Source table |
partition | INT NOT NULL METADATA VIRTUAL | Message partition ID. | Source table |
headers | MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL | Message headers. | Source table and sink table |
leader-epoch | INT NOT NULL METADATA VIRTUAL | Message leader-epoch. | Source table |
offset | BIGINT NOT NULL METADATA VIRTUAL | Message offset. | Source table |
timestamp | TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL | Message timestamp. | Source table and sink table |
timestamp-type | STRING NOT NULL METADATA VIRTUAL | Message timestamp type. Valid values are:
| Source table |
__raw_key__ | STRING NOT NULL METADATA VIRTUAL | Raw message key. | Source table and sink table Note This parameter is supported only in Ververica Runtime (VVR) 11.4 and later. |
__raw_value__ | STRING NOT NULL METADATA VIRTUAL | Raw message value. | Source table and sink table Note This parameter is supported only in Ververica Runtime (VVR) 11.4 and later. |
Connector options
General
Option
Description
Type
Required
Default
Remarks
connector
The connector type.
String
Yes
–
The value must be
kafka.properties.bootstrap.servers
A list of Kafka broker addresses.
String
Yes
–
Format:
host:port,host:port,.... Separate addresses with commas (,).properties.*
Additional properties for the Kafka client.
String
No
–
The property keys must be valid options defined in the official Apache Kafka documentation for Producer Configs and Consumer Configs.
Realtime Compute for Apache Flink removes the properties. prefix and passes the remaining key-value pairs to the underlying Kafka client. For example, you can set
'properties.allow.auto.create.topics' = 'false'to disable automatic topic creation.The Kafka connector overwrites these options, so you cannot configure them this way:
key.deserializer
value.deserializer
format
The format for serializing and deserializing the value of a Kafka message.
String
No
–
Supported formats:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
NoteFor more information, see Format options.
key.format
The format for serializing and deserializing the key of a Kafka message.
String
No
–
Supported formats:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
NoteWhen you use this configuration, key.options is required.
key.fields
The fields from the table schema to use as the Kafka message key.
String
No
–
Separate multiple field names with semicolons (;). For example,
'field1;field2'.key.fields-prefix
A custom prefix for all key fields to prevent name conflicts with value fields.
String
No
–
This prefix is used to distinguish between key and value fields. It is removed before serializing the key or after deserializing it.
NoteIf you use this option,
value.fields-includemust be set toEXCEPT_KEY.value.format
The format for serializing and deserializing the value of a Kafka message.
String
No
–
This configuration is equivalent to
format. You can set only one offormatorvalue.format. If both are configured,value.formatoverridesformat.value.fields-include
Defines whether the key fields are included in the value format.
String
No
ALL
Valid values:
ALL: The Kafka message value includes all table columns.EXCEPT_KEY: The Kafka message value includes all table columns except those defined inkey.fields.
Source table
Option
Description
Type
Required
Default
Remarks
topic
The topic or topics to read from.
String
No
–
To subscribe to multiple topics, separate their names with semicolons (;), for example,
'topic-1;topic-2'.NoteYou can specify either this option or
topic-pattern, but not both.topic-pattern
A regular expression that matches the topics to subscribe to. The consumer subscribes to all topics whose names match this pattern.
String
No
–
NoteYou can specify either this option or
topic, but not both.properties.group.id
Kafka source consumer group ID.
String
No
KafkaSource-{Source-Table-Name}
If you use a consumer group ID for the first time, you must also set properties.auto.offset.reset to either
earliestorlatestto define the initial startup offset.scan.startup.mode
Kafka consumer startup offset.
String
No
group-offsets
Valid values:
earliest-offset: Starts reading from the earliest available offset.latest-offset: Starts reading from the latest offset.group-offsets: Starts reading from the committed offsets of the specified properties.group.id.timestamp: Starts reading from the specified scan.startup.timestamp-millis.specific-offsets: Starts reading from the offsets specified in scan.startup.specific-offsets.
NoteThis option applies only when a job starts without state. If a job resumes from a checkpoint, it reads from the offsets stored in the checkpoint state.
scan.startup.specific-offsets
Per-partition start offset when
scan.startup.modeisspecific-offsets.String
No
–
For example,
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
Start timestamp in milliseconds when
scan.startup.modeis set totimestamp.Long
No
–
The unit is milliseconds.
scan.topic-partition-discovery.interval
Partition discovery interval.
Duration
No
5 minutes
The connector periodically discovers and reads from new partitions. When you use topic-pattern, the connector also discovers new topics that match the pattern. Set the interval to a non-positive value to disable this feature.
NoteIn Ververica Runtime (VVR) 6.0.x, dynamic partition discovery is disabled by default. Starting from VVR 8.0, this feature is enabled by default with a discovery interval of 5 minutes.
scan.header-filter
Filters messages based on Kafka message headers.
String
No
–
A header key and its value are separated by a colon (:). Multiple header conditions are connected by using logical operators (& and |). The NOT logical operator (!) is also supported. For example,
depart:toy|depart:book&!env:testretains Kafka data if the header containsdepart=toyordepart=bookand does not containenv=test.NoteThis option is supported only in Ververica Runtime (VVR) 8.0.6 and later.
Parentheses in expressions are not supported.
Logical operations are evaluated left to right.
Header values are converted to UTF-8 strings for comparison.
scan.check.duplicated.group.id
Checks whether another active consumer is already using the
properties.group.id.Boolean
No
false
Valid values:
true: Before starting the job, the system checks for a duplicate consumer group. If one is found, the job fails to prevent conflicts.
false: Starts the job without checking for conflicts.
NoteThis option is supported only in Ververica Runtime (VVR) 6.0.4 and later.
Sink table
Option
Description
Type
Required
Default
Remarks
topic
Target topic.
String
Yes
–
–
sink.partitioner
Maps records from parallel sink instances to Kafka partitions.
String
No
default
Valid values:
default: Uses the default Kafka partitioner.fixed: Each parallel sink instance writes to a fixed Kafka partition.round-robin: Records are distributed to partitions in a round-robin fashion.Custom partitioner: To use a custom partitioner, provide the fully qualified class name of a
FlinkKafkaPartitionersubclass, for example,org.mycompany.MyPartitioner.
sink.delivery-guarantee
Sink delivery guarantee.
String
No
at-least-once
Valid values:
none: Provides no guarantees. Records may be lost or duplicated.at-least-once: Guarantees that no records are lost, but they might be duplicated.exactly-once: Uses Kafka transactions to provide exactly-once semantics, ensuring records are neither lost nor duplicated.
NoteWhen using
exactly-oncesemantics, you must also specify sink.transactional-id-prefix.sink.transactional-id-prefix
Transaction ID prefix. Required when
sink.delivery-guaranteeisexactly-once.String
Yes, if
sink.delivery-guaranteeisexactly-once–
Required only when sink.delivery-guarantee is set to
exactly-once.sink.parallelism
Sink operator parallelism.
Integer
No
–
By default, the framework determines the parallelism based on the upstream operators.
Security and authentication
If the Kafka cluster requires a secure connection or authentication, prefix the relevant security and authentication configurations with properties. and set them in the WITH parameter. The following example configures a Kafka table to use PLAIN as the SASL mechanism with a JAAS configuration.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'PLAIN',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)The following example shows how to use SASL_SSL as the security protocol and SCRAM-SHA-256 as the SASL mechanism.
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
...
'properties.security.protocol' = 'SASL_SSL',
/* SSL configuration */
/* Path to the truststore for the server's CA certificate. */
/* Files uploaded using Artifacts are stored in the /flink/usrlib/ directory. */
'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
'properties.ssl.truststore.password' = 'test1234',
/* If client authentication is required, you must also configure the path to the keystore (private key). */
'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
'properties.ssl.keystore.password' = 'test1234',
/* The algorithm used to verify the server hostname. An empty string disables hostname verification. */
'properties.ssl.endpoint.identification.algorithm' = '',
/* SASL configuration */
/* Set the SASL mechanism to SCRAM-SHA-256. */
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
/* Configure JAAS. */
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)You can use the Artifacts feature of the Realtime Compute for Apache Flink console to upload the CA certificate and private key mentioned in the example. The uploaded files are stored in the /flink/usrlib directory. To use a CA certificate file named my-truststore.jks, you can set the 'properties.ssl.truststore.location' property in the WITH clause in one of the following two ways:
Set
'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks'. This method avoids dynamically downloading files from Object Storage Service (OSS) at runtime, but it does not support Debug Mode.If the Realtime Compute engine version is VVR 11.5 or later, you can configure
properties.ssl.truststore.locationandproperties.ssl.keystore.locationto an absolute OSS path. The file path format is oss://flink-fullymanaged-<Workspace ID>/artifacts/namespaces/<Namespace name>/<file name>. This method dynamically downloads the OSS files during Flink runtime and supports Debug Mode.
Verify your configuration: The examples in this topic show common configurations. Before you configure the Kafka connector, contact your Kafka O&M team to obtain the correct security and authentication settings.
Escaping: Unlike native Apache Flink, the Realtime Compute for Apache Flink SQL editor escapes double quotation marks (") by default. Therefore, you do not need to add backslashes (\) to escape the double quotation marks used for the username and password in the
properties.sasl.jaas.configoption.
Source table start offset
Startup mode
You can configure the scan.startup.mode option to specify the offset from which a Kafka source table starts reading data. Valid values include:
earliest-offset: Starts reading from the earliest offset.
latest-offset: Starts reading from the latest offset.
group-offsets: Starts reading from the committed offsets for the consumer group specified in properties.group.id.
timestamp: Starts reading from the first message with a timestamp greater than or equal to the value specified in scan.startup.timestamp-millis.
specific-offsets: Starts reading from the specific partition offsets specified in scan.startup.specific-offsets.
If you do not specify a startup mode, the default is 'group-offsets'.
The scan.startup.mode option applies only to stateless jobs. When a stateful job starts, it always consumes from the offsets stored in its state.
Example:
CREATE TEMPORARY TABLE kafka_source (
...
) WITH (
'connector' = 'kafka',
...
-- Consume from the earliest offset.
'scan.startup.mode' = 'earliest-offset',
-- Consume from the latest offset.
'scan.startup.mode' = 'latest-offset',
-- Consume from the committed offsets of the consumer group "my-group".
'properties.group.id' = 'my-group',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest', -- If "my-group" is used for the first time, consumption starts from the earliest offset.
'properties.auto.offset.reset' = 'latest', -- If "my-group" is used for the first time, consumption starts from the latest offset.
-- Consume from the specified timestamp in milliseconds: 1655395200000.
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '1655395200000',
-- Consume from specific offsets.
'scan.startup.mode' = 'specific-offsets',
'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);Start offset priority
The source table's start offset is determined by the following rules, in order of priority:
Priority (highest to lowest) | The offset stored in a checkpoint or savepoint. |
The start time selected in the Realtime Compute for Apache Flink console during job startup. | |
The start offset specified by scan.startup.mode in the WITH clause. | |
If scan.startup.mode is not specified, group-offsets is used to start consumption from the offsets of the corresponding consumer group. |
If the offset determined by any of these steps is invalid, for example, because it has expired or an issue occurred in the Kafka cluster, the system resets the offset according to the policy specified in properties.auto.offset.reset. If this option is not configured, the system throws an exception that requires user intervention.
A common scenario involves starting consumption with a new consumer group ID. The source table first queries the Kafka cluster for the committed offsets of that group. Because the group ID is new, no valid offsets are found. As a result, the system resets the offset according to the policy specified in properties.auto.offset.reset. Therefore, when consuming with a new group ID, you must configure the properties.auto.offset.reset option.
Committing source offsets
The Kafka source table commits its consumer offset to the Kafka cluster only after a successful checkpoint, so a long checkpoint interval causes the committed offset to lag. The source table stores its actual reading progress in checkpoint state, which the system uses for fault recovery. Committed offsets serve only as a progress monitor and are not used for recovery, so commit failures do not affect data accuracy.
Custom sink partitioner
If Kafka's built-in partitioning strategy does not meet your requirements, you can implement a custom partitioner by extending the FlinkKafkaPartitioner class. After development is complete, compile your code into a JAR package and upload it using the Artifacts feature in the Realtime Compute console. After the JAR package is uploaded and referenced, set the sink.partitioner parameter in the WITH clause to your partitioner's fully qualified class name, for example, org.mycompany.MyPartitioner.
Kafka, Upsert Kafka, and Kafka JSON catalog
Kafka is an append-only event streaming platform that does not support data updates or deletions. In streaming SQL, a standard Kafka sink table cannot handle upstream Change Data Capture (CDC) data or the retraction logic of operators such as aggregate and join. If you need to write data that contains changes or retractions, use an Realtime Compute for Apache Flink:Upsert Kafka sink table.
To simplify the batch synchronization of Change Data Capture (CDC) data from one or more upstream database tables to Kafka, you can use a Kafka JSON catalog. If the data stored in Kafka is in JSON format, a Kafka JSON catalog lets you skip the step of defining a schema and WITH parameters. For details, see Manage Kafka JSON catalogs.
Examples
Example 1: Read from and write to Kafka
This example reads data from a source Kafka topic and writes it to a sink topic. The data is in CSV format.
CREATE TEMPORARY TABLE kafka_source (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'source',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
CREATE TEMPORARY TABLE kafka_sink (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
'topic' = 'sink',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'properties.group.id' = '<yourKafkaConsumerGroupId>',
'format' = 'csv'
);
INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;Example 2: Synchronize table schema and data
You can use the Kafka connector to synchronize messages from a Kafka topic to Hologres in real time. To prevent duplicate messages in Hologres during a failover, you can use the offset and partition ID of Kafka messages as a composite primary key.
CREATE TEMPORARY TABLE kafkaTable (
`offset` INT NOT NULL METADATA,
`part` BIGINT NOT NULL METADATA FROM 'partition',
PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.infer-schema.flatten-nested-columns.enable' = 'true'
-- Optional. Flattens all nested columns.
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Example 3: Synchronize Kafka keys and values
If a Kafka message key contains relevant information, you can synchronize both the key and value.
CREATE TEMPORARY TABLE kafkaTable (
`key_id` INT NOT NULL,
`val_name` VARCHAR(200)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_',
'value.fields-prefix' = 'val_',
'value.fields-include' = 'EXCEPT_KEY'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;Kafka message keys do not support Schema Evolution or automatic type parsing. You must declare the schema manually.
Example 4: Synchronize data and perform computation
When you synchronize data from Kafka to Hologres, you may need lightweight transformations.
CREATE TEMPORARY TABLE kafkaTable (
`distinct_id` INT NOT NULL,
`properties` STRING,
`timestamp` TIMESTAMP_LTZ METADATA,
`date` AS CAST(`timestamp` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '<yourKafkaBrokers>',
'topic' = 'kafka_evolution_demo',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'json',
'value.format' = 'json',
'key.fields' = 'key_id',
'key.fields-prefix' = 'key_'
);
CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
`order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
--Use COALESCE to handle null values.Example 5: Parse nested JSON
The following is a sample JSON message:
{
"id": 101,
"name": "VVP",
"properties": {
"owner": "Alibaba Cloud",
"engine": "Flink"
}
}To avoid using functions such as JSON_VALUE(payload, '$.properties.owner') to parse fields, you can directly define the structure in the Source DDL:
CREATE TEMPORARY TABLE kafka_source (
id VARCHAR,
`name` VARCHAR,
properties ROW<`owner` STRING, engine STRING>
) WITH (
'connector' = 'kafka',
'topic' = 'xxx',
'properties.bootstrap.servers' = 'xxx',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);With this approach, Flink parses the JSON into structured fields during the read phase. Subsequent SQL queries can directly reference properties.owner without additional function calls, which improves overall performance.
DataStream API
To read or write data with the DataStream API, use the corresponding DataStream Connector to connect to Realtime Compute for Apache Flink. For more information about how to set up a DataStream Connector, see Integrate DataStream connectors.
Build a Kafka source
The Kafka Source provides a builder class to create a Kafka Source instance. The following sample code builds a Kafka Source that consumes data from the earliest Offset of the
input-topicTopic. The Consumer Group ismy-group, and the Kafka Message Value is deserialized as a string.Java
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");To build a Kafka Source, you must specify the following properties.
Parameter
Description
BootstrapServers
A list of Kafka broker addresses. Set this property by calling the
setBootstrapServers(String)method.GroupId
The ID of the Consumer Group. Set this property by calling the
setGroupId(String)method.Topics or Partitions
The topics or partitions to subscribe to. The Kafka Source supports the following three methods to subscribe to topics or partitions:
Subscribes to all partitions of the topics in a list.
KafkaSource.builder().setTopics("topic-a","topic-b")Topic pattern: Subscribe to all partitions of topics whose names match the specified regular expression.
KafkaSource.builder().setTopicPattern("topic.*")The list of partitions, where you can subscribe to a specified partition.
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList( new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a" new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b" KafkaSource.builder().setPartitions(partitionSet)
Deserializer
The deserializer used to parse Kafka messages.
Specify the deserializer using the
setDeserializer(KafkaRecordDeserializationSchema)method.KafkaRecordDeserializationSchemadefines how to parse a KafkaConsumerRecord. If you only need to parse the Value of a Kafka Message, you can use one of the following methods:Use the
setValueOnlyDeserializer(DeserializationSchema)method from the builder class. TheDeserializationSchemadefines how to parse the binary data of the Kafka Message Value.Use a class that implements Kafka's Deserializer interface. For example, you can use StringDeserializer to parse the Kafka Message Value into a string.
import org.apache.kafka.common.serialization.StringDeserializer; KafkaSource.<String>builder() .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
NoteTo parse a complete
ConsumerRecord, you must implement theKafkaRecordDeserializationSchemainterface.POM
The Kafka DataStream Connector is available in the Maven central repository.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>ververica-connector-kafka</artifactId> <version>${vvr-version}</version> </dependency>When using the Kafka DataStream Connector, consider the following properties:
Starting offset
A Kafka Source specifies its starting offset using an offset initializer (
OffsetsInitializer). The built-in initializers include:Offset initializer
Code
Starts consuming from the earliest Offset.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())Starts consuming from the latest Offset.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())Starts consuming data whose timestamp is greater than or equal to the specified time. The unit is milliseconds.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))Starts consuming from the committed Offset of the Consumer Group. If no committed Offset exists, it uses the specified reset strategy (for example, earliest Offset).
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))Consumption starts from the offset committed by the consumer group, and no offset reset policy is specified.
KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())NoteIf the built-in initializers do not meet your requirements, you can implement a custom offset initializer.
If you do not specify an offset initializer, the default is
OffsetsInitializer.earliest().
Streaming mode and batch mode
The Kafka Source supports both streaming mode and Batch Mode. By default, it operates in streaming mode, where the Job runs indefinitely until it fails or is canceled. To configure the Kafka Source to run in Batch Mode, you can use
setBounded(OffsetsInitializer)to specify a stop Offset. The Kafka Source exits when all partitions reach their specified stop offsets.NoteA Kafka Source in streaming mode does not typically have a stop Offset. However, for testing purposes, you can use
setUnbounded(OffsetsInitializer)to specify a stop Offset even in streaming mode. Note the different method names for specifying the stop Offset:setUnboundedfor streaming mode andsetBoundedfor Batch Mode.Dynamic partition discovery
To handle Topic scaling or the creation of new topics without restarting the Flink Job, you can enable Dynamic partition discovery when subscribing to topics by pattern. This feature is disabled by default and must be explicitly enabled:
KafkaSource.builder() .setProperty("partition.discovery.interval.ms", "10000") // Discover new partitions every 10 seconds.ImportantThe dynamic partition discovery feature depends on the metadata update mechanism of the Kafka cluster. If the Kafka cluster does not update partition information in a timely manner, new partitions might not be discovered. Make sure that the partition.discovery.interval.ms configuration of the Kafka cluster matches your actual scenario.
Event time and watermark
By default, the Kafka Source uses the timestamp from the Kafka Message as the Event Time. You can define a custom Watermark strategy to extract the Event Time from the Message body and emit a Watermark downstream.
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")To learn more about custom Watermark strategies, see Generating Watermarks.
NoteIf a source subtask is idle (for example, when a Kafka Partition has no new data or the source Parallelism is higher than the number of Kafka partitions), the Watermark for that subtask will not advance. This can block downstream window computations.
To resolve this issue, consider the following solutions:
Configure a source idle timeout: Enable the table.exec.source.idle-timeout Property to mark an idle source as temporarily idle. This allows the downstream Watermark to advance.
Set appropriate Parallelism: Ensure the source Parallelism is not greater than the number of Kafka partitions.
Offset commit
When checkpointing is enabled, the Kafka Source commits the current consumer Offset to Kafka when a Checkpoint completes. This ensures the Flink Checkpoint state is consistent with the committed Offset on the Kafka broker. If checkpointing is disabled, the Kafka Source relies on the Kafka consumer's internal automatic periodic Offset commit mechanism. This feature is controlled by the
enable.auto.commitandauto.commit.interval.msKafka consumer properties.NoteThe Kafka Source does not rely on committed offsets for fault tolerance and recovery. Committing offsets is only for monitoring the progress of the Kafka consumer and Consumer Group.
Other properties
In addition to the properties mentioned, you can use
setProperties(Properties)andsetProperty(String, String)to set any Property for the Kafka Source and its underlying Kafka consumer. The Kafka Source provides the following specific properties.Parameter
Description
client.id.prefix
Client ID prefix for the Kafka consumer.
partition.discovery.interval.ms
Partition discovery interval in milliseconds. A value of
-1disables dynamic partition discovery.NoteIn Batch Mode, this property is automatically set to
-1.register.consumer.metrics
Registers Kafka consumer metrics in Flink.
Other Kafka Consumer Configuration
For a complete list of Kafka consumer configurations, see the official Apache Kafka documentation.
ImportantTo ensure correct operation, the Kafka DataStream Connector overwrites the following manually configured properties:
key.deserializeris always overwritten to org.apache.kafka.common.serialization.ByteArrayDeserializer.value.deserializeris always overwritten to org.apache.kafka.common.serialization.ByteArrayDeserializer.auto.offset.reset.strategyis overwritten by the strategy provided by theOffsetsInitializer.
The following example shows how to configure a Kafka consumer to use the PLAIN SASL mechanism and provide a JAAS configuration.
KafkaSource.builder() .setProperty("sasl.mechanism", "PLAIN") .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")Monitoring
The Kafka Source exposes metrics through Flink's metrics system for monitoring and diagnostics.
Metric scope
All metrics for the Kafka source reader are registered under the
KafkaSourceReadermetric group, which is a subgroup of the operator's metric group. Metrics related to a specific topic partition are registered in theKafkaSourceReader.topic.<topic_name>.partition.<partition_id>subgroup.For example, the current Consumer Offset metric (
currentOffset) for Partition 1 of the "my-topic" topic is available at .operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset. The number of successful commits (commitsSucceeded) is available at .operator.KafkaSourceReader.commitsSucceeded.List of metrics
Metric
Description
Scope
currentOffset
The current consumer Offset of a partition.
TopicPartition
committedOffset
The last committed Offset for a partition.
TopicPartition
commitsSucceeded
The total number of successful offset commits.
KafkaSourceReader
commitsFailed
Number of failed commits
KafkaSourceReader
Kafka consumer metrics
The underlying Kafka consumer's metrics are registered in the KafkaSourceReader.KafkaConsumer metric group. For example, the
records-consumed-totalMetric is registered at .operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.You can use the
register.consumer.metricsProperty to specify whether to register the Kafka consumer metrics. This option is enabled by default (true). For more information about Kafka consumer metrics, see the Apache Kafka documentation.
Build a Kafka sink
The Flink Kafka Sink writes a data stream to one or more Kafka topics.
DataStream<String> stream = ... Properties kafkaProperties = new Properties(); kafkaProperties.setProperty("bootstrap.servers", "localhost:9092"); KafkaSink<String> sink = KafkaSink.<String>builder() .setKafkaProducerConfig(kafkaProperties) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic("my-topic") .setValueSerializationSchema(new SimpleStringSchema()) .build()) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); stream.sinkTo(sink);To build a Kafka Sink, you must configure the following properties.
Parameter
Description
Kafka client properties
The
bootstrap.serversProperty is required. It specifies a comma-separated list of Kafka brokers.Record serializer
You must provide a
KafkaRecordSerializationSchemato convert input data into a KafkaProducerRecord. Flink provides a schema builder that offers common components, such as serialization for message keys and values, topic selection, and message partitioning. You can also implement the corresponding interfaces for more granular control. The ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) method is called for each incoming record to generate a ProducerRecord to be written to Kafka.The
ProducerRecordprovides fine-grained control over how each record is written to Kafka, allowing you to:Set the destination Topic.
Set the Message Key.
Specify the destination Partition.
Delivery guarantee
The
bootstrap.serversparameter is required and specifies a comma-separated list of Kafka brokers.Delivery guarantee
When Flink checkpoints are enabled, the Flink Kafka Sink can provide exactly-once semantics. In addition to enabling checkpoints, you can use the DeliveryGuarantee parameter to specify different delivery guarantees. The DeliveryGuarantee parameter provides the following options:
DeliveryGuarantee.NONE: (Default) Flink provides no guarantees. Data may be lost or duplicated.
DeliveryGuarantee.AT_LEAST_ONCE: Guarantees that no data is lost, but duplication may occur.
DeliveryGuarantee.EXACTLY_ONCE: Uses Kafka transactions to provide exactly-once semantics.
NoteWhen using EXACTLY_ONCE semantics, see Considerations for exactly-once semantics.
Flink CDC
Use the Kafka connector as a source or sink to create YAML jobs for Flink CDC.
Limitations
Use Realtime Compute for Apache Flink (VVR) 11.1 or later to ingest Flink CDC data from a Kafka data source.
Only JSON, Debezium JSON, and Canal JSON are supported.
Only Realtime Compute for Apache Flink (VVR) 8.0.11 and later supports reading data from a single table distributed across multiple Partitions.
Syntax
source:
type: kafka
name: Kafka source
properties.bootstrap.servers: localhost:9092
topic: ${kafka.topic}sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: localhost:9092Parameter
General
Parameter
Description
Required
Type
Default
Remarks
type
Source or sink type.
Yes
String
–
The value must be
kafka.name
Source or sink name.
No
String
–
–
properties.bootstrap.servers
Kafka broker addresses.
Yes
String
–
The format is
host:port,host:port,host:port, separated by commas (,).properties.*
Configuration properties for the Kafka client.
No
String
–
The property keys must be valid options as defined in the official Apache Kafka documentation for Producer Configs and Consumer Configs.
Realtime Compute for Apache Flink (VVR) removes the
properties.prefix before passing the remaining key-value pairs to the underlying Kafka client. For example, set'properties.allow.auto.create.topics' = 'false'to disable automatic topic creation.key.format
Kafka message key serialization and deserialization format.
No
String
–
For the source, only the
jsonformat is supported.For the sink, valid values are:
csv
json
NoteThis option is supported only in Realtime Compute for Apache Flink (VVR) 11.0.0 and later.
value.format
Kafka message value serialization and deserialization format.
No
String
debezium-json
For the source, valid values are:
debezium-json
canal-json
json
For the sink, valid values are:
debezium-json
canal-json
canal-protobuf
NoteThe
debezium-jsonandcanal-jsonformats require Realtime Compute for Apache Flink (VVR) version 8.0.10 or later.The
jsonformat requires Realtime Compute for Apache Flink (VVR) version 11.0.0 or later.
Source parameters
Parameter
Description
Required
Type
Default
Remarks
topic
The topic or topics to read from.
No
String
–
To subscribe to multiple topics, separate their names with semicolons (;), for example,
topic-1;topic-2.NoteSpecify either this parameter or
topic-pattern, but not both.topic-pattern
A regular expression that matches the names of topics to subscribe to.
No
String
–
NoteSpecify either this parameter or
topic, but not both.properties.group.id
Consumer group ID.
No
String
–
When specifying a new consumer group ID, you must set the properties.auto.offset.reset parameter to
earliestorlatestto define the initial start offset.scan.startup.mode
Kafka consumer start offset.
No
String
group-offsets
Valid values:
earliest-offset: Starts reading from the earliest available offset.
latest-offset: Starts reading from the latest offset.
group-offsets (Default value): Starts reading from the committed offsets for the specified properties.group.id.
timestamp: Starts reading from the timestamp specified by scan.startup.timestamp-millis.
specific-offsets: Start reading from the offsets specified by scan.startup.specific-offsets.
NoteThis parameter applies only when a job starts with a stateless startup. When a stateful job starts, it always consumes from the offsets stored in its state.
scan.startup.specific-offsets
Per-partition start offset when
scan.startup.modeis set tospecific-offsets.No
String
–
For example,
partition:0,offset:42;partition:1,offset:300scan.startup.timestamp-millis
The start timestamp in milliseconds when
scan.startup.modeis set totimestamp.No
Long
–
The unit is milliseconds.
scan.topic-partition-discovery.interval
The interval for dynamically discovering new partitions within topics.
No
Duration
5 minutes
The connector periodically discovers and reads from new partitions. When using
topic-pattern, the connector also discovers new topics that match the pattern. To disable discovery, set this value to 0 or less.scan.check.duplicated.group.id
Checks whether the consumer group specified by
properties.group.idis a duplicate.No
Boolean
false
Valid values:
true: Checks for a duplicate consumer group before the job starts. If a duplicate is found, the job fails.
false: Starts the job without checking for conflicts.
schema.inference.strategy
The schema parsing strategy.
No
String
continuous
Valid values:
continuous: Parses the schema of each data record. If schemas are incompatible, the system infers a wider schema and generates a schema change event.
static: Performs schema parsing only once when the job starts. Data is then parsed based on this initial schema, and no schema change events are generated.
NoteFor more information about schema parsing, see Policies for schema parsing and evolution.
This configuration option is supported only in Ververica Runtime (VVR) 8.0.11 and later.
scan.max.pre.fetch.records
Maximum messages consumed per partition for initial schema inference.
No
Int
50
Before data processing begins, the system pre-fetches and consumes the specified number of recent messages from each partition to initialize the schema.
key.fields-prefix
Prefix for message key field names to avoid name conflicts.
No
String
–
For example, if this parameter is set to
key_, and the message key contains a field nameda, the parsed field name becomeskey_a.NoteThe value of
key.fields-prefixcannot be a prefix of thevalue.fields-prefixvalue.value.fields-prefix
Prefix for message value field names to avoid name conflicts.
No
String
–
For example, if this parameter is set to
value_, and the message value contains a field namedb, the parsed field name becomesvalue_b.NoteThe value of
value.fields-prefixcannot be a prefix of thekey.fields-prefixvalue.metadata.list
Metadata columns passed to the downstream sink.
No
String
–
The available metadata columns include
topic,partition,offset,timestamp,timestamp-type,headers, andleader-epoch. Separate the column names with commas.scan.value.initial-schemas.ddls
DDL statements that define the initial schema for specific tables.
No
String
–
Use a semicolon (
;) to separate multiple DDL statements. For example, useCREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);to specify the initial schema for tables db1.t1 and db1.t2, respectively.The table schema defined in the DDL must be consistent with the target sink table and comply with Flink SQL syntax.
NoteThis configuration option is supported only in Ververica Runtime (VVR) 11.5 and later.
ingestion.ignore-errors
Ignores data parsing errors.
No
Boolean
false
NoteThis configuration option is supported only in Ververica Runtime (VVR) 11.5 and later.
ingestion.error-tolerance.max-count
Maximum parsing errors to tolerate before the job fails. Takes effect only when
ingestion.ignore-errorsistrue.No
Integer
-1
This parameter applies only when
ingestion.ignore-errorsis set totrue. A value of -1 indicates unlimited tolerance, meaning parsing exceptions will not cause the job to fail.NoteThis configuration option is supported only in Ververica Runtime (VVR) 11.5 and later.
Debezium JSON format parameters
Parameter
Required
Type
Default
Description
debezium-json.distributed-tables
No
Boolean
false
Set to
trueif data for a single Debezium JSON table is distributed across multiple partitions.NoteThis configuration option is supported only in Ververica Runtime (VVR) 8.0.11 and later.
ImportantModifying this parameter requires a stateless startup.
debezium-json.schema-include
No
Boolean
false
Includes a schema in the Debezium JSON message. This corresponds to the
value.converter.schemas.enableproperty in the Debezium Kafka Connect configuration.Valid values:
true: The Debezium JSON message contains a schema.
false: The Debezium JSON message does not contain a schema.
debezium-json.ignore-parse-errors
No
Boolean
false
Valid values:
true: Skips rows that cause a parsing exception.
false: Throws an error and the job fails.
debezium-json.infer-schema.primitive-as-string
No
Boolean
false
Parses all primitive types as
Stringwhen parsing the table schema.Valid values:
true: Parses all primitive types as
String.false: Parses types based on the default rules.
Canal JSON format parameters
Parameter
Required
Type
Default
Description
canal-json.distributed-tables
No
Boolean
false
If data for a single table in Canal JSON is distributed across multiple partitions, you must enable this option.
NoteThis configuration option is supported only in Ververica Runtime (VVR) 8.0.11 and later.
ImportantModifying this parameter requires a stateless startup.
canal-json.database.include
No
String
–
An optional regular expression for filtering changelogs by the
databasemetadata field in Canal records. Only records from matching databases are processed. The regular expression is compatible with Java's Pattern class.canal-json.table.include
No
String
–
An optional regular expression for filtering changelogs by the
tablemetadata field in Canal records. Only records from matching tables are processed. The regular expression is compatible with Java's Pattern class.canal-json.ignore-parse-errors
No
Boolean
false
Valid values:
true: Skips the current row if a parsing exception occurs.
false: Throws an error and the job fails to start.
canal-json.infer-schema.primitive-as-string
No
Boolean
false
Parses all primitive types as
Stringwhen parsing the table schema.Valid values:
true: Parses all primitive types as
String.false: Parses types based on the default rules.
canal-json.infer-schema.strategy
No
String
AUTO
Table schema parsing strategy.
Valid values:
AUTO: Automatically parses the schema from the JSON data. Recommended if the data does not contain a
sqlTypefield, to prevent parsing failures.SQL_TYPE: Parses the schema from the
sqlTypearray in the Canal JSON data. We recommend setting this to SQL_TYPE to get more precise types if the data contains asqlTypefield.MYSQL_TYPE: Parses the schema from the
mysqlTypearray in the Canal JSON data.
For more information about
sqlTypetype mapping rules, see Canal JSON's Schema parse.NoteThis configuration is supported only in Ververica Runtime (VVR) 11.1 and later.
The
MYSQL_TYPEvalue is supported in Ververica Runtime (VVR) 11.3 and later.
canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled
No
Boolean
true
Maps the MySQL
TIMESTAMPtype to the CDCTIMESTAMPtype.true: The MySQL
TIMESTAMPtype is mapped to the CDCTIMESTAMPtype.false: The MySQL
TIMESTAMPtype is mapped to the CDCTIMESTAMP_LTZtype.
canal-json.mysql.treat-tinyint1-as-boolean.enabled
No
Boolean
true
When using the
MYSQL_TYPEparsing strategy, controls whether to map the MySQLTINYINT(1)type to the CDCBOOLEANtype.true: The MySQL
TINYINT(1)type is mapped to the CDCBOOLEANtype.false: The MySQL
TINYINT(1)type is mapped to the CDCTINYINT(1)type.
This option applies only when
canal-json.infer-schema.strategyis set toMYSQL_TYPE.JSON format parameters
Parameter
Required
Type
Default
Description
json.timestamp-format.standard
No
String
SQL
The timestamp format for input and output data.
SQL: Parses input timestamps in the
yyyy-MM-dd HH:mm:ss.s{precision}format, such as2020-12-30 12:13:14.123.ISO-8601: Parses input timestamps in the
yyyy-MM-ddTHH:mm:ss.s{precision}format, such as2020-12-30T12:13:14.123.
json.ignore-parse-errors
No
Boolean
false
Valid values:
true: Skips the current row if a parsing exception occurs.
false: Throws an error and the job fails to start.
json.infer-schema.primitive-as-string
No
Boolean
false
Parses all primitive types as
Stringwhen parsing the table schema.Valid values:
true: Parses all primitive types as
String.false: Parses types based on the default rules.
json.infer-schema.flatten-nested-columns.enable
No
Boolean
false
Recursively expands nested columns in JSON data. Valid values:
true: Recursively expands nested columns.
false: Treats nested columns as
String.
json.decode.parser-table-id.fields
No
String
–
Uses the values of specified JSON fields to generate a tableId when parsing data in JSON format. The values of multiple fields are concatenated by an English comma
,. For example, if the JSON data is{"col0":"a", "col1","b", "col2","c"}, the generated result is as follows:Configuration
tableId
col0
a
col0,col1
a.b
col0,col1,col2
a.b.c
json.infer-schema.fixed-types
No
String
–
When you parse JSON data, you can specify the data types for specific fields. Use a comma
,to separate multiple fields. For example,id BIGINT, name VARCHAR(10)specifies that theidfield is of the BIGINT type and thenamefield is of the VARCHAR(10) type.NoteThis configuration option is supported only in Ververica Runtime (VVR) 11.5 and later.
When using this configuration with Ververica Runtime (VVR) version 11.5, you must also add the configuration
scan.max.pre.fetch.records: 0.
Sink table parameters
Parameter
Description
Required
Type
Default
Remarks
type
Sink type.
Yes
String
–
The value must be
kafka.name
Sink name.
No
String
–
–
topic
Kafka topic name.
No
String
–
If this parameter is specified, all data is written to this topic.
NoteIf this parameter is not specified, each record is written to a topic named after its TableID. The TableID is constructed by joining the database and table names with a period (
.), for example,databaseName.tableName.partition.strategy
Kafka partition write strategy.
No
String
all-to-zero
Valid values:
all-to-zero(default): Writes all data to Partition 0.hash-by-key: Writes data to partitions based on the hash value of the primary key. This ensures records with the same primary key are written to the same partition, preserving their order.
sink.tableId-to-topic.mapping
Mapping from upstream table names to downstream Kafka topic names.
No
String
–
Separate mappings with semicolons (
;). Within each mapping, separate the upstream table name and the downstream Kafka topic name with a colon (:). You can use a regular expression for the table name. To map multiple tables to the same topic, separate the table names with commas (,). For example:mydb.mytable1:topic1;mydb.mytable2:topic2.NoteThis parameter allows you to modify the mapped topic while preserving the original table name information.
Debezium JSON format parameters
Parameter
Required
Type
Default
Description
debezium-json.include-schema.enabled
No
Boolean
false
Includes schema information in the Debezium JSON data.
debezium-json.emit.full-table-id.enabled
No
Boolean
false
Writes the full three-part table ID to the Debezium JSON metadata fields.
If this parameter is enabled, the mapping is as follows:
CDC Table ID Part
Debezium JSON Key
Namespace
dbSchema
schemaTable
tableIf this parameter is disabled, the mapping is as follows:
CDC Table ID Part
Debezium JSON Key
Namespace
Not mapped
Schema
dbTable
tableNoteThis parameter is supported only in Ververica Runtime (VVR) 11.6 and later.
Examples
Use Kafka as a Flink CDC source:
source: type: kafka name: Kafka source properties.bootstrap.servers: ${kafka.bootstraps.server} topic: ${kafka.topic} value.format: ${value.format} scan.startup.mode: ${scan.startup.mode} sink: type: hologres name: Hologres sink endpoint: <yourEndpoint> dbname: <yourDbname> username: ${secret_values.ak_id} password: ${secret_values.ak_secret} sink.type-normalize-strategy: BROADENUse Kafka as a Flink CDC sink:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: kafka name: Kafka Sink properties.bootstrap.servers: ${kafka.bootstraps.server} route: - source-table: ${mysql.source.table} sink-table: ${kafka.topic}The
routemodule specifies the destination Kafka Topic for the Source Table.
By default, the automatic topic creation feature is disabled for ApsaraMQ for Kafka. For more information, see FAQ about automatic topic creation. You must create the topic before writing data to ApsaraMQ for Kafka. For more information, see Step 3: Create resources.
Policies for schema parsing and evolution
The Kafka connector maintains the schemas of all currently known tables.
Table schema initialization
A table schema includes columns and data types, database and table names, and primary keys. The following sections describe how to initialize each of these.
Column and data type information
A Flink CDC job can automatically infer columns and data types from the data, but you may want to define them explicitly for certain tables. There are three schema initialization strategies, depending on how much control you need over the types:
Fully automatic schema inference
Before reading data from Kafka, the Kafka connector attempts to consume up to scan.max.pre.fetch.records messages from each partition, parses the schema of each message, and merges these schemas to initialize the table schema. A table creation event is then generated based on this initialized schema before the data is actually consumed.
For the Debezium JSON and Canal JSON formats, table information is contained within each message. The messages pre-fetched based on the scan.max.pre.fetch.records parameter may contain data from multiple tables. Therefore, the number of pre-fetched records for any single table cannot be determined. Pre-fetching and schema initialization are performed only once for each partition before its messages are consumed and processed. If data for a new table appears later, the schema parsed from the first record of that table is used as its initial schema, and the schema is not pre-fetched or initialized again.
The distribution of data from a single table across multiple partitions is supported only in Ververica Runtime (VVR) 8.0.11 and later, and requires you to set the debezium-json.distributed-tables or canal-json.distributed-tables configuration option to true.
Specifying an initial table schema
In some cases, you may need to define the initial table schema explicitly, for example, when writing data from Kafka to a pre-existing downstream table. In this case, you can do so by adding the scan.value.initial-schemas.ddls parameter. The following is a configuration example:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Set the initial table schema
scan.value.initial-schemas.ddls: CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);The DDL statement must match the schema of the target table. This configuration specifies the initial type of the id column as BIGINT and the name column as VARCHAR(10) for the db1.t1 table, and the initial type of the id column as BIGINT for the db1.t2 table.
The DDL statements use Flink SQL syntax.
Setting fixed types for specific fields
You may want to lock certain fields to a fixed data type. For example, fields that would normally be inferred as TIMESTAMP might need to be output as strings instead. In this case, you can add the json.infer-schema.fixed-types parameter to specify the initial table schema. This parameter is valid only when the message format is JSON. The following is a configuration example:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Set specific fields to a fixed type
json.infer-schema.fixed-types: id BIGINT, name VARCHAR(10)
scan.max.pre.fetch.records: 0This configuration specifies that all id fields are of the BIGINT type and all name fields are of the VARCHAR(10) type.
The data types are consistent with Flink SQL types.
Database and table information
For Canal JSON and Debezium JSON formats, the connector parses table information, including the database and table name, from each message.
For the JSON format, by default, table information contains only the table name, which is the name of the topic that contains the data. If your data contains database and table information, you can use the json.infer-schema.fixed-types parameter to specify the fields that contain this information. These fields are then mapped to the database and table names. The following is a configuration example:
source: type: kafka name: Kafka Source properties.bootstrap.servers: host:9092 topic: test-topic value.format: json scan.startup.mode: earliest-offset # Use the value of the col1 field as the database name and the value of the col2 field as the table name json.decode.parser-table-id.fields: col1,col2With this configuration, the connector sends each record to a table where the database name is the value of the
col1field and the table name is the value of thecol2field.
Primary key information
For the Canal JSON format, the
pkNamesfield in the JSON data defines the table's primary key.For the Debezium JSON and JSON formats, the data does not contain primary key information. You can manually add primary keys to tables by using
transformrules:transform: - source-table: \.*.\.* projection: \* primary-keys: key1, key2
Schema parsing and schema evolution
After the table schema is initialized, if schema.inference.strategy is set to static, the Kafka connector parses the message value of each message based on the initial table schema and does not generate schema change events. If schema.inference.strategy is set to continuous, the Kafka connector parses the message value of each Kafka message, identifies its physical columns, and compares the resulting schema with the currently maintained schema. If the schemas are inconsistent, the connector attempts to merge them and generates a corresponding table schema change event. The merge rules are as follows:
If the parsed physical columns contain fields that are not present in the current schema, these fields are added to the schema, and an event is generated to add them as nullable columns.
If the parsed physical columns do not contain fields that exist in the current schema, those fields are retained and their values are populated with
NULL. No column deletion event is generated.Columns with the same name are handled as follows:
If the columns have the same data type but different precision, the type with the larger precision is used, and a column type change event is generated.
If the columns have different data types, the system finds the smallest common parent type in the type hierarchy tree below. The system then uses this common parent type for the column and generates a column type change event.

Supported schema evolution policies:
Adding a column: The connector adds the new column to the end of the schema and synchronizes its data. The new column is set as nullable.
Dropping a column: A column deletion event is not generated. Instead, subsequent data for that column is populated with
NULL.Renaming a column: The connector treats this as dropping the old column and adding a new one. The new column is added to the end of the schema, and the values for the original column are populated with
NULL.Changing a column type:
For downstream sinks that support column type changes, a Flink CDC job can handle type changes (for example, from
INTtoBIGINT) if the downstream sink is configured to process them. This capability depends on the column type change rules supported by the specific sink. Refer to your sink's documentation for its supported rules.For downstream sinks that do not support column type changes, such as Hologres, you can use Hologres connector (YAML). This feature creates a table with wider data types in the downstream sink when the job starts. When a column type changes, the system can tolerate the change as long as the new type fits within the wider type defined in the downstream sink.
Unsupported schema changes:
Changes to constraints, such as primary keys or indexes.
Changing a column from
NOT NULLtoNULLABLE.
Canal JSON schema parsing
Canal JSON data may contain an optional
sqlTypefield, which records precise type information for data columns. To obtain a more accurate schema, you can set canal-json.infer-schema.strategy toSQL_TYPEto use the types from thesqlTypefield. The type mappings are as follows:JDBC type
Type code
CDC type
BIT
-7
BOOLEAN
BOOLEAN
16
TINYINT
-6
TINYINT
SMALLINT
5
SMALLINT
INTEGER
4
INT
BIGINT
-5
BIGINT
DECIMAL
3
DECIMAL(38,18)
NUMERIC
2
REAL
7
FLOAT
FLOAT
6
DOUBLE
8
DOUBLE
BINARY
-2
BYTES
VARBINARY
-3
LONGVARBINARY
-4
BLOB
2004
DATE
91
DATE
TIME
92
TIME
TIMESTAMP
93
TIMESTAMP
CHAR
1
STRING
VARCHAR
12
LONGVARCHAR
-1
Other data types
Dirty data tolerance and collection
Your Kafka data source may contain malformed records, commonly called dirty data. To keep your job from repeatedly failing and restarting, you can configure it to skip these invalid records. For example:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Enable Dirty Data Tolerance
ingestion.ignore-errors: true
# Tolerate up to 1000 dirty data records
ingestion.error-tolerance.max-count: 1000With this configuration, the job continues running as long as it encounters no more than 1,000 dirty records. Once the count exceeds that threshold, the job fails so you can investigate your data.
To ensure your job never fails due to dirty data, use the following configuration:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Enable Dirty Data Tolerance
ingestion.ignore-errors: true
# Tolerate all dirty data records
ingestion.error-tolerance.max-count: -1While dirty data tolerance keeps your job running, you may also want to inspect the problematic records. You may also want to analyze the dirty data to improve your Kafka producers. As described in Dirty Data Collection, you can view the job's dirty data in the TaskManager logs. For example:
source:
type: kafka
name: Kafka Source
properties.bootstrap.servers: host:9092
topic: test-topic
value.format: json
scan.startup.mode: earliest-offset
# Enable Dirty Data Tolerance
ingestion.ignore-errors: true
# Tolerate all dirty data records
ingestion.error-tolerance.max-count: -1
pipeline:
dirty-data.collector:
# Write dirty data to the TaskManager log file
type: loggerTable name and topic mapping
When Kafka serves as a Flink CDC sink, the message format (such as Debezium JSON or Canal JSON) embeds the original table name. Downstream consumers typically use this embedded name as the table identifier rather than the topic name, so it is important to configure the mapping between table names and topics correctly.
Suppose you need to synchronize two tables from a MySQL database: mydb.mytable1 and mydb.mytable2. The following mapping strategies are available:
1. No mapping strategy
Without any mapping strategy, data for each table is written to a topic named in the <Database Name>.<Table Name> format. Therefore, data from mydb.mytable1 is written to a topic named mydb.mytable1, and data from mydb.mytable2 is written to a topic named mydb.mytable2. The following is an example configuration:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}2. Route rule mapping (Not recommended)
You might want to write data to a specific topic instead of using the default <Database Name>.<Table Name> format. To do this, you can configure a route rule. The following is an example configuration:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}
route:
- source-table: mydb.mytable1,mydb.mytable2
sink-table: mytableIn this case, all data from mydb.mytable1 and mydb.mytable2 is written to a single topic named mytable.
However, a route rule that changes the destination topic also changes the table name in the Kafka message (in Debezium JSON or Canal JSON format). The table name in all Kafka messages becomes mytable. This can cause unexpected behavior in systems that consume messages from this topic.
3. Mapping with sink.tableId-to-topic.mapping (Recommended)
To map table names to topics while preserving the original source table name, use the sink.tableId-to-topic.mapping parameter. The following is an example configuration:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}Alternatively, you can use the following configuration:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: mydb.mytable1,mydb.mytable2
server-id: 8601-8604
sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable
sink:
type: kafka
name: Kafka Sink
properties.bootstrap.servers: ${kafka.bootstraps.server}In this case, all data from mydb.mytable1 and mydb.mytable2 is written to the mytable topic, and the table name within the Kafka messages (in Debezium JSON or Canal JSON format) is preserved as mydb.mytable1 or mydb.mytable2. This way, downstream systems can still identify each record's original source table.
Exactly-once semantics
Configure the consumer isolation level
All applications that consume Kafka data must set the
isolation.levelproperty:read_committed: Reads only committed data.read_uncommitted(Default): Can read uncommitted data.
EXACTLY_ONCE depends on
read_committed. Otherwise, consumers may see uncommitted data, breaking consistency.Transaction timeout and data loss
When recovering from a checkpoint, Realtime Compute for Apache Flink considers only transactions that were committed before that checkpoint began. If the duration between a job failure and its restart exceeds the Kafka transaction timeout, Kafka automatically aborts the open transaction, which can result in data loss.
The default
transaction.max.timeout.msfor a Kafka broker is 15 minutes.By default, Flink Kafka Sink sets the
transaction.timeout.msparameter to 1 hour.You must increase
transaction.max.timeout.mson the broker to be greater than or equal to the setting in Flink.
Producer pool and concurrent checkpoints
The
EXACTLY_ONCEmode uses a fixed-size Kafka producer pool. Each checkpoint uses one producer from this pool. If the number of concurrent checkpoints exceeds the pool size, the job fails.Configure the producer pool size based on the maximum number of concurrent checkpoints.
Parallelism scale-down constraints
If a job fails before the first checkpoint is completed, the original producer pool information is lost on restart. Therefore, do not scale down the job's parallelism before the first checkpoint completes. If a scale-down is necessary, the new parallelism must not be less than
FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.Transactions block reads
In
read_committedmode, any transaction that has not been committed or aborted blocks read operations on the entire topic.For example:
Transaction 1 writes data.
Transaction 2 writes more data and is committed.
As long as Transaction 1 remains open, the data from the committed Transaction 2 is invisible to consumers.
This has the following implications:
During normal operation, data visibility latency is approximately equal to the checkpoint interval.
If a job fails, any topic it was writing to is blocked for consumers until the job restarts or the transaction times out. In extreme cases, the transaction timeout process itself can also affect read operations.