All Products
Search
Document Center

Realtime Compute for Apache Flink:Upsert Kafka

Last Updated:Mar 26, 2026

The Upsert Kafka connector reads from and writes to Kafka topics using upsert semantics. As a source, it converts Kafka records into a changelog stream. As a sink, it writes changelog data back to Kafka, using null-valued (tombstone) messages to represent deletes.

Category Details
Supported types Source table, sink table, and data ingestion sink
Running mode Streaming mode
Data formats avro, avro-confluent, csv, json, and raw
API types SQL and data ingestion YAML job
Update or delete data in a sink table Yes

Prerequisites

Before you begin, ensure that you have:

How it works

The upsert changelog stream produced by the source contains only +I (insert), +U (update after), and -D (delete) entries. Unlike a retract stream, it never contains -U (update before) messages.

Changelog entry type Long name Semantics
+I Insertion A new record whose key did not previously exist
+U Update after A record whose key already exists — overwrites the previous value
-D Delete A record with a null value — deletes the entry for that key

As a source table, the connector reads Kafka records and converts them into a changelog stream:

  • A record whose key already exists is interpreted as UPDATE (+U)

  • A record whose key does not exist is interpreted as INSERT (+I)

  • A record with a null value is interpreted as DELETE (-D)

As a sink table or data ingestion sink, the connector consumes an upstream changelog stream:

  • INSERT and UPDATE_AFTER records are written as normal Kafka messages

  • DELETE records are written as tombstone messages (null value for the corresponding key)

Flink partitions sink output by primary key, so all messages for the same key land in the same partition in the correct order.

Example

The following end-to-end example reads website pageview events from a Kafka source table, aggregates page views (PV) and unique visitors (UV) per region, and writes the results to an Upsert Kafka sink.

Source table — reads raw pageview events:

CREATE TABLE pageviews (
  user_id    BIGINT,
  page_id    BIGINT,
  viewtime   TIMESTAMP,
  user_region STRING,
  WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
) WITH (
  'connector'                  = 'kafka',
  'topic'                      = '<yourTopicName>',
  'properties.bootstrap.servers' = '<host:port>',
  'format'                     = 'json'
);

Sink table — stores aggregated results with upsert semantics:

CREATE TABLE pageviews_per_region (
  user_region STRING,
  pv          BIGINT,
  uv          BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector'                  = 'upsert-kafka',
  'topic'                      = '<yourTopicName>',
  'properties.bootstrap.servers' = '<host:port>',
  'key.format'                 = 'avro',
  'value.format'               = 'avro'
);
Every Upsert Kafka table must define a primary key. The primary key columns determine how Flink partitions output and how the connector identifies upsert operations.

Query — aggregates and writes results:

INSERT INTO pageviews_per_region
SELECT
  user_region,
  COUNT(*),
  COUNT(DISTINCT user_id)
FROM pageviews
GROUP BY user_region;

SQL

Syntax

CREATE TABLE upsert_kafka_sink (
  user_region STRING,
  pv          BIGINT,
  uv          BIGINT,
  PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
  'connector'                  = 'upsert-kafka',
  'topic'                      = '<yourTopicName>',
  'properties.bootstrap.servers' = '...',
  'key.format'                 = 'avro',
  'value.format'               = 'avro'
);

WITH parameters

General parameters

Parameter Required Default Description
connector Yes Set to upsert-kafka.
properties.bootstrap.servers Yes Comma-separated list of Kafka broker addresses in host:port format.
topic Yes The Kafka topic to read from or write to.
key.format Yes Serialization format for the message key. Valid values: csv, json, avro, debezium-json, canal-json, maxwell-json, avro-confluent, raw.
value.format Yes Serialization format for the message value. Equivalent to format — configure only one.
value.fields-include Yes ALL Controls which columns are included in the message value. ALL includes all columns; EXCEPT_KEY excludes the key fields defined by key.fields.
properties.* No Additional Kafka client parameters. The properties. prefix is stripped before passing the configuration to the client. The suffix must be a valid Kafka producer or consumer config key. Do not use this mechanism to set key.deserializer or value.deserializer — the connector manages those internally.
key.fields-prefix No A custom prefix applied to all key field names in the table schema, used to avoid name collisions with value fields. The prefix is stripped when parsing and generating the key. If set, value.fields-include must be set to EXCEPT_KEY.

Sink-specific parameters

Parameter Required Default Description
sink.parallelism No Inherited from the upstream operator Parallelism of the Kafka sink operator.
sink.buffer-flush.max-rows No 0 (disabled) Maximum number of records to buffer before flushing. When multiple updates arrive for the same key, only the latest record is retained in the buffer, reducing write volume and avoiding unnecessary tombstone messages. Both sink.buffer-flush.max-rows and sink.buffer-flush.interval must be greater than zero to enable buffering.
sink.buffer-flush.interval No 0 (disabled) How often the buffer is flushed. Supported units: ms, s, min, h — for example, '1 s'. Same key-deduplication behavior as sink.buffer-flush.max-rows applies. Both parameters must be greater than zero to enable buffering.

Data ingestion

The Upsert Kafka connector can serve as the sink in a YAML data ingestion job. Data is written in JSON format, and primary key fields are included in the message body.

Syntax

sink:
  type: upsert-kafka
  name: upsert-kafka Sink
  properties.bootstrap.servers: localhost:9092
  # ApsaraMQ for Kafka
  aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
  aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
  aliyun.kafka.instanceId: ${instancd-id}
  aliyun.kafka.endpoint: ${endpoint}
  aliyun.kafka.regionId: ${region-id}

Parameters

Parameter Required Default Description
type Yes Set to upsert-kafka.
name No A display name for the sink.
properties.bootstrap.servers Yes Comma-separated list of Kafka broker addresses in host:port format.
properties.* No Additional Kafka producer parameters. The properties. prefix is stripped before passing the configuration to the client. The suffix must be a valid Kafka producer config key.
sink.delivery-guarantee No at-least-once Delivery semantics. Valid values: none (no guarantee; data may be lost or duplicated), at-least-once (default; no data loss, but duplicates are possible), exactly-once (uses Kafka transactions to eliminate duplicates).
sink.add-tableId-to-header-enabled No false When enabled, writes namespace, schemaName, and tableName to the Kafka message header.
aliyun.kafka.accessKeyId No AccessKey ID for your Alibaba Cloud account. Required when writing to ApsaraMQ for Kafka. See Create an AccessKey pair.
aliyun.kafka.accessKeySecret No AccessKey secret for your Alibaba Cloud account. Required when writing to ApsaraMQ for Kafka. See Create an AccessKey pair.
aliyun.kafka.instanceId No Instance ID of the ApsaraMQ for Kafka instance. You can view the instance details on the Alibaba Cloud Kafka console. Required when writing to ApsaraMQ for Kafka.
aliyun.kafka.endpoint No API endpoint for ApsaraMQ for Kafka. Required when writing to ApsaraMQ for Kafka. See Endpoints.
aliyun.kafka.regionId No Region ID of the ApsaraMQ for Kafka instance. Required when writing to ApsaraMQ for Kafka. See Endpoints.

Supported type changes

The Upsert Kafka data ingestion connector supports all changelog operation types. To consume the written data downstream, use the Flink Upsert Kafka SQL connector with a fixed schema.

Data ingestion example

The following example synchronizes a MySQL table to ApsaraMQ for Kafka using a YAML data ingestion job:

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 8601-8604

sink:
  type: upsert-kafka
  name: Upsert Kafka Sink
  properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
  aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
  aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
  aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
  aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
  aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}

route:
  - source-table: ${mysql.source.table}
    sink-table: ${upsert.kafka.topic}

Delivery semantics

By default, the Upsert Kafka sink writes with at-least-once guarantees. Flink may write duplicate records with the same key to the topic. Because the connector operates in upsert mode, the last record for any given key takes effect when reading the topic back as a source. Duplicate writes are therefore idempotent — they do not corrupt the final state.

To enable exactly-once semantics, the Kafka cluster must support Kafka transactions (Apache Kafka 0.11 or later) and the transaction feature must be enabled. Set sink.delivery-guarantee to exactly-once in the data ingestion YAML configuration.

Limits

  • Requires Flink running on Ververica Runtime (VVR) 2.0.0 or later.

  • Supports Apache Kafka 0.10 or later for both reads and writes.

  • Supports only the client parameters documented for Apache Kafka 2.8. See the consumer and producer configuration references.

  • Upsert Kafka source tables always start from earliest-offset — this is fixed and not configurable. The connector must read all historical change data to reconstruct a complete changelog. Using any other startup mode (such as latest-offset or by timestamp) would result in an incomplete changelog and cause data correctness issues in downstream computations.

  • For exactly-once sink semantics, the destination Kafka cluster must run Apache Kafka 0.11 or later with transactions enabled.

Monitoring metrics

Source table metrics

Metric Description
numRecordsIn Total number of records read
numRecordsInPerSecond Records read per second
numBytesIn Total bytes read
numBytesInPerSecond Bytes read per second
currentEmitEventTimeLag Lag between event time and emit time
currentFetchEventTimeLag Lag between event time and fetch time
sourceIdleTime Time the source has been idle
pendingRecords Number of records waiting to be processed

Sink table metrics

Metric Description
numRecordsOut Total number of records written
numRecordsOutPerSecond Records written per second
numBytesOut Total bytes written
numBytesOutPerSecond Bytes written per second
currentSendTime Time taken to send the latest batch

What's next