The Upsert Kafka connector reads from and writes to Kafka topics using upsert semantics. As a source, it converts Kafka records into a changelog stream. As a sink, it writes changelog data back to Kafka, using null-valued (tombstone) messages to represent deletes.
| Category | Details |
|---|---|
| Supported types | Source table, sink table, and data ingestion sink |
| Running mode | Streaming mode |
| Data formats | avro, avro-confluent, csv, json, and raw |
| API types | SQL and data ingestion YAML job |
| Update or delete data in a sink table | Yes |
Prerequisites
Before you begin, ensure that you have:
-
A Kafka cluster. Create a DataFlow Kafka cluster or create resources in ApsaraMQ for Kafka
-
Network connectivity between your Flink cluster and Kafka cluster. For Kafka on EMR, configure a VPC and security group. For ApsaraMQ for Kafka, configure a whitelist
How it works
The upsert changelog stream produced by the source contains only +I (insert), +U (update after), and -D (delete) entries. Unlike a retract stream, it never contains -U (update before) messages.
| Changelog entry type | Long name | Semantics |
|---|---|---|
+I |
Insertion | A new record whose key did not previously exist |
+U |
Update after | A record whose key already exists — overwrites the previous value |
-D |
Delete | A record with a null value — deletes the entry for that key |
As a source table, the connector reads Kafka records and converts them into a changelog stream:
-
A record whose key already exists is interpreted as UPDATE (
+U) -
A record whose key does not exist is interpreted as INSERT (
+I) -
A record with a null value is interpreted as DELETE (
-D)
As a sink table or data ingestion sink, the connector consumes an upstream changelog stream:
-
INSERTandUPDATE_AFTERrecords are written as normal Kafka messages -
DELETErecords are written as tombstone messages (null value for the corresponding key)
Flink partitions sink output by primary key, so all messages for the same key land in the same partition in the correct order.
Example
The following end-to-end example reads website pageview events from a Kafka source table, aggregates page views (PV) and unique visitors (UV) per region, and writes the results to an Upsert Kafka sink.
Source table — reads raw pageview events:
CREATE TABLE pageviews (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP,
user_region STRING,
WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopicName>',
'properties.bootstrap.servers' = '<host:port>',
'format' = 'json'
);
Sink table — stores aggregated results with upsert semantics:
CREATE TABLE pageviews_per_region (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '<yourTopicName>',
'properties.bootstrap.servers' = '<host:port>',
'key.format' = 'avro',
'value.format' = 'avro'
);
Every Upsert Kafka table must define a primary key. The primary key columns determine how Flink partitions output and how the connector identifies upsert operations.
Query — aggregates and writes results:
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;
SQL
Syntax
CREATE TABLE upsert_kafka_sink (
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '<yourTopicName>',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
);
WITH parameters
General parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
connector |
Yes | — | Set to upsert-kafka. |
properties.bootstrap.servers |
Yes | — | Comma-separated list of Kafka broker addresses in host:port format. |
topic |
Yes | — | The Kafka topic to read from or write to. |
key.format |
Yes | — | Serialization format for the message key. Valid values: csv, json, avro, debezium-json, canal-json, maxwell-json, avro-confluent, raw. |
value.format |
Yes | — | Serialization format for the message value. Equivalent to format — configure only one. |
value.fields-include |
Yes | ALL |
Controls which columns are included in the message value. ALL includes all columns; EXCEPT_KEY excludes the key fields defined by key.fields. |
properties.* |
No | — | Additional Kafka client parameters. The properties. prefix is stripped before passing the configuration to the client. The suffix must be a valid Kafka producer or consumer config key. Do not use this mechanism to set key.deserializer or value.deserializer — the connector manages those internally. |
key.fields-prefix |
No | — | A custom prefix applied to all key field names in the table schema, used to avoid name collisions with value fields. The prefix is stripped when parsing and generating the key. If set, value.fields-include must be set to EXCEPT_KEY. |
Sink-specific parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
sink.parallelism |
No | Inherited from the upstream operator | Parallelism of the Kafka sink operator. |
sink.buffer-flush.max-rows |
No | 0 (disabled) |
Maximum number of records to buffer before flushing. When multiple updates arrive for the same key, only the latest record is retained in the buffer, reducing write volume and avoiding unnecessary tombstone messages. Both sink.buffer-flush.max-rows and sink.buffer-flush.interval must be greater than zero to enable buffering. |
sink.buffer-flush.interval |
No | 0 (disabled) |
How often the buffer is flushed. Supported units: ms, s, min, h — for example, '1 s'. Same key-deduplication behavior as sink.buffer-flush.max-rows applies. Both parameters must be greater than zero to enable buffering. |
Data ingestion
The Upsert Kafka connector can serve as the sink in a YAML data ingestion job. Data is written in JSON format, and primary key fields are included in the message body.
Syntax
sink:
type: upsert-kafka
name: upsert-kafka Sink
properties.bootstrap.servers: localhost:9092
# ApsaraMQ for Kafka
aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
aliyun.kafka.instanceId: ${instancd-id}
aliyun.kafka.endpoint: ${endpoint}
aliyun.kafka.regionId: ${region-id}
Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
type |
Yes | — | Set to upsert-kafka. |
name |
No | — | A display name for the sink. |
properties.bootstrap.servers |
Yes | — | Comma-separated list of Kafka broker addresses in host:port format. |
properties.* |
No | — | Additional Kafka producer parameters. The properties. prefix is stripped before passing the configuration to the client. The suffix must be a valid Kafka producer config key. |
sink.delivery-guarantee |
No | at-least-once |
Delivery semantics. Valid values: none (no guarantee; data may be lost or duplicated), at-least-once (default; no data loss, but duplicates are possible), exactly-once (uses Kafka transactions to eliminate duplicates). |
sink.add-tableId-to-header-enabled |
No | false |
When enabled, writes namespace, schemaName, and tableName to the Kafka message header. |
aliyun.kafka.accessKeyId |
No | — | AccessKey ID for your Alibaba Cloud account. Required when writing to ApsaraMQ for Kafka. See Create an AccessKey pair. |
aliyun.kafka.accessKeySecret |
No | — | AccessKey secret for your Alibaba Cloud account. Required when writing to ApsaraMQ for Kafka. See Create an AccessKey pair. |
aliyun.kafka.instanceId |
No | — | Instance ID of the ApsaraMQ for Kafka instance. You can view the instance details on the Alibaba Cloud Kafka console. Required when writing to ApsaraMQ for Kafka. |
aliyun.kafka.endpoint |
No | — | API endpoint for ApsaraMQ for Kafka. Required when writing to ApsaraMQ for Kafka. See Endpoints. |
aliyun.kafka.regionId |
No | — | Region ID of the ApsaraMQ for Kafka instance. Required when writing to ApsaraMQ for Kafka. See Endpoints. |
Supported type changes
The Upsert Kafka data ingestion connector supports all changelog operation types. To consume the written data downstream, use the Flink Upsert Kafka SQL connector with a fixed schema.
Data ingestion example
The following example synchronizes a MySQL table to ApsaraMQ for Kafka using a YAML data ingestion job:
source:
type: mysql
name: MySQL Source
hostname: ${mysql.hostname}
port: ${mysql.port}
username: ${mysql.username}
password: ${mysql.password}
tables: ${mysql.source.table}
server-id: 8601-8604
sink:
type: upsert-kafka
name: Upsert Kafka Sink
properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
route:
- source-table: ${mysql.source.table}
sink-table: ${upsert.kafka.topic}
Delivery semantics
By default, the Upsert Kafka sink writes with at-least-once guarantees. Flink may write duplicate records with the same key to the topic. Because the connector operates in upsert mode, the last record for any given key takes effect when reading the topic back as a source. Duplicate writes are therefore idempotent — they do not corrupt the final state.
To enable exactly-once semantics, the Kafka cluster must support Kafka transactions (Apache Kafka 0.11 or later) and the transaction feature must be enabled. Set sink.delivery-guarantee to exactly-once in the data ingestion YAML configuration.
Limits
-
Requires Flink running on Ververica Runtime (VVR) 2.0.0 or later.
-
Supports Apache Kafka 0.10 or later for both reads and writes.
-
Supports only the client parameters documented for Apache Kafka 2.8. See the consumer and producer configuration references.
-
Upsert Kafka source tables always start from
earliest-offset— this is fixed and not configurable. The connector must read all historical change data to reconstruct a complete changelog. Using any other startup mode (such aslatest-offsetor by timestamp) would result in an incomplete changelog and cause data correctness issues in downstream computations. -
For exactly-once sink semantics, the destination Kafka cluster must run Apache Kafka 0.11 or later with transactions enabled.
Monitoring metrics
Source table metrics
| Metric | Description |
|---|---|
numRecordsIn |
Total number of records read |
numRecordsInPerSecond |
Records read per second |
numBytesIn |
Total bytes read |
numBytesInPerSecond |
Bytes read per second |
currentEmitEventTimeLag |
Lag between event time and emit time |
currentFetchEventTimeLag |
Lag between event time and fetch time |
sourceIdleTime |
Time the source has been idle |
pendingRecords |
Number of records waiting to be processed |
Sink table metrics
| Metric | Description |
|---|---|
numRecordsOut |
Total number of records written |
numRecordsOutPerSecond |
Records written per second |
numBytesOut |
Total bytes written |
numBytesOutPerSecond |
Bytes written per second |
currentSendTime |
Time taken to send the latest batch |