All Products
Search
Document Center

Realtime Compute for Apache Flink:Primary key and append-only tables

Last Updated:Mar 26, 2026

Apache Paimon supports two table types: primary key tables and append-only tables. Each type has distinct semantics for writes, updates, and streaming consumption. Choose the type that matches your data pipeline requirements.

Choose a table type

Scenario Recommended configuration
CDC sync, standard upserts Primary key table, deduplicate engine
Deduplication where only the first record matters Primary key table, first-row engine
Real-time aggregation (running totals, max/min tracking) Primary key table, aggregation engine
Wide table assembly from multiple streaming sources Primary key table, partial-update engine
Log ingestion, event streaming Append-only table (scalable)
Order-preserving message queue replacement Append queue table

Primary key tables

A primary key table requires one or more primary keys, specified at table creation. Records with the same primary keys are merged according to the configured merge engine.

CREATE TABLE T (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
  'bucket' = '4'
);

In this example, the partition key is dt, the primary keys are dt, shop_id, and user_id, and the table uses 4 fixed buckets.

Bucket modes

A bucket is the smallest unit for reads and writes in a Paimon table. Partitions (or the entire table for unpartitioned tables) are subdivided into buckets, enabling parallel reads and writes.

Mode Configuration Notes
Dynamic bucket mode (default) Omit bucket or set 'bucket' = '-1' Does not support concurrent writes from multiple Flink deployments; supports cross-partition updates
Fixed bucket mode Set 'bucket' = '<num>' (integer > 0) <num> is the number of buckets for the whole table (unpartitioned) or per partition (partitioned); supports changing the bucket count

Sizing buckets: We recommend setting the total size of data in each bucket to 2 GB and not specifying a value greater than 5 GB. Too few buckets limit write parallelism; too many produce excessive small files.

Dynamic bucket mode: how data is updated

Cross-partition updates occur when the primary keys do not include all partition keys. Paimon cannot determine the target partition from the primary key alone, so it uses RocksDB to maintain a primary-key-to-partition/bucket mapping. On large tables, this can significantly reduce performance compared to fixed bucket mode, and deployment initialization takes longer while the mapping loads into RocksDB.

The update behavior depends on the merge engine:

  • deduplicate: deletes the existing record and inserts a new record in the target partition.

  • aggregation or partial-update: updates the existing record in place within the current partition.

  • first-row: retains the existing record and discards the incoming record.

Intra-partition updates occur when the primary keys include all partition keys. Paimon can determine the partition from the primary key but not the bucket, so it maintains an in-memory index mapping primary keys to buckets. Every 100 million mapping entries consume approximately 1 GB of heap memory, charged only to the partitions actively being written.

Dynamic bucket mode: bucket assignment

New records are written into existing buckets first. If all buckets are at capacity, a new bucket is created automatically.

Configure this behavior with the following parameters in the WITH clause:

Parameter Description Default
dynamic-bucket.target-row-num Maximum records per bucket 2000000
dynamic-bucket.initial-buckets Initial bucket count Writer operator parallelism

Fixed bucket mode: bucket assignment

By default, Paimon assigns records to buckets using a hash of the primary key values. To use a different set of columns, set bucket-key in the WITH clause. Specify multiple columns separated by commas. The columns in bucket-key must be a subset of the primary keys.

For example, 'bucket-key' = 'c1,c2' routes records by the hash of c1 and c2.

In fixed bucket mode, include all partition keys in the primary keys to prevent cross-partition updates.

Change the number of buckets in fixed bucket mode

The bucket count determines read and write parallelism. To change it for an existing table:

  1. Pause all deployments that read from or write to the table.

  2. Create a script and execute the following SQL statement to configure the bucket parameter:

    ALTER TABLE `<catalog-name>`.`<database-name>`.`<table-name>` SET ('bucket' = '<bucket-num>');
  3. Reorganize the data by running a batch deployment.

    • Unpartitioned table: Create a Blank Batch Draft, paste the following SQL, then click Deploy and Start: ``sql INSERT OVERWRITE <catalog-name>.<database-name>.<table-name> SELECT * FROM <catalog-name>.<database-name>.<table-name>; ``

    • Partitioned table: Run one batch per partition you want to reorganize:

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (<partition-spec>)
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE <partition-condition>;

      Example — reorganize the partition where dt = '20240312' and hh = '08':

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (dt = '20240312', hh = '08')
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE dt = '20240312' AND hh = '08';
  4. After the batch deployment completes successfully, resume the paused deployments.

Merge engines

When multiple records share the same primary keys, Paimon merges them according to the merge-engine setting.

If your input stream contains out-of-order records, configure the sequence.field parameter to control merge order. See Out-of-order data handling.
deduplicate (default)

Paimon keeps only the most recent record for each set of primary keys, discarding earlier ones. If the latest record is a DELETE, all records with those primary keys are removed.

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'deduplicate' -- Optional: deduplicate is the default
);

Example results:

  • Write +I(1, 2.0, 'apple'), +I(1, 4.0, 'banana'), +I(1, 8.0, 'cherry') — query returns (1, 8.0, 'cherry').

  • Write +I(1, 2.0, 'apple'), +I(1, 4.0, 'banana'), -D(1, 4.0, 'banana') — query returns no rows.

Use when: CDC sync, standard upsert pipelines.

first-row

Paimon keeps only the first record for each set of primary keys. Because downstream consumers only ever see INSERT-type changelog events, changelog production is more efficient than with deduplicate.

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'first-row'
);

Example result:

Write +I(1, 2.0, 'apple'), +I(1, 4.0, 'banana'), +I(1, 8.0, 'cherry') — query returns (1, 2.0, 'apple').

Usage notes:

  • Set changelog-producer to lookup to enable streaming consumption.

  • The first-row engine does not process DELETE and UPDATE_BEFORE changes. To ignore them instead of erroring, set 'first-row.ignore-delete' = 'true'.

  • Sequence fields are not supported.

Use when: Deduplication pipelines where only the first-seen record matters.

aggregation

Paimon applies an aggregate function to each non-primary-key column across records that share the same primary keys. Specify the function per column using fields.<field-name>.aggregate-function. Columns without an explicit function default to last_non_null_value.

CREATE TABLE T (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

Example result:

Write +I(1, 23.0, 15) and +I(1, 30.2, 20) — query returns (1, 30.2, 35).

Supported aggregate functions:

Function Supported types
sum DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE
product DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE
count INTEGER, BIGINT
max, min CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ
first_value, last_value All types, including null
first_not_null_value, last_non_null_value All types
listagg STRING
bool_and, bool_or BOOLEAN
Only sum, product, and count support retraction changes (UPDATE_BEFORE and DELETE). To ignore retraction changes for a specific column, set 'fields.<field-name>.ignore-retract' = 'true'.

Set changelog-producer to lookup or full-compaction to enable streaming consumption.

Use when: Real-time aggregation metrics, running totals, max/min tracking.

partial-update

Paimon updates individual columns of an existing record using the latest non-null value from incoming records that share the same primary keys. Null values do not overwrite existing values.

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 BIGINT,
  v3 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update'
);

Example result:

Write +I(1, 23.0, 10, NULL), +I(1, NULL, NULL, 'This is a book'), +I(1, 25.2, NULL, NULL) — query returns (1, 25.2, 10, 'This is a book').

Usage notes:

  • Set changelog-producer to lookup or full-compaction to enable streaming consumption.

  • The partial-update engine does not process DELETE and UPDATE_BEFORE changes. To ignore them, set 'partial-update.ignore-delete' = 'true'.

Sequence groups

When assembling a wide table from multiple upstream tables, use sequence groups to control the update order of different column sets independently, handling out-of-order data across sources.

In the following example, columns a and b are updated in ascending order of g_1, and columns c and d are updated in ascending order of g_2:

CREATE TABLE T (
  k INT,
  a STRING,
  b STRING,
  g_1 INT,
  c STRING,
  d STRING,
  g_2 INT,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update',
  'fields.g_1.sequence-group' = 'a,b',
  'fields.g_2.sequence-group' = 'c,d'
);

Combine sequence groups with aggregate functions by adding fields.<field-name>.aggregate-function for any column in the group. The following example tracks the most recent non-null value of a, the maximum of b, the most recent non-null value of c, and the sum of d:

CREATE TABLE T (
  k INT,
  a STRING,
  b INT,
  g_1 INT,
  c STRING,
  d INT,
  g_2 INT,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update',
  'fields.g_1.sequence-group' = 'a,b',
  'fields.b.aggregate-function' = 'max',
  'fields.g_2.sequence-group' = 'c,d',
  'fields.d.aggregate-function' = 'sum'
);

For more information, see Merge Engine.

Use when: Wide table assembly by joining columns from multiple streaming sources.

Out-of-order data handling

By default, Paimon merges records in input order — the last record received is the last to merge. For streams with out-of-order records, set sequence.field in the WITH clause. Records with the same primary keys are then merged in ascending order of the specified column's value.

Supported data types for sequence.field: TINYINT, SMALLINT, INTEGER, BIGINT, TIMESTAMP, TIMESTAMP_LTZ.

When using MySQL as the input source with the op_t metadata column as the sequence field, UPDATE_BEFORE and UPDATE_AFTER change pairs share the same sequence value. To ensure Paimon processes UPDATE_BEFORE before UPDATE_AFTER, set 'sequence.auto-padding' = 'row-kind-flag'.

Changelog producer

A primary key table must generate a complete changelog (covering INSERT, DELETE, and UPDATE operations) to support downstream streaming consumption, similar to a database binlog. Configure the generation method with the changelog-producer parameter.

Value Description Latency Resource use Use when
none No changelog is generated. Lowest No streaming consumption needed
input Passes input records directly to downstream consumers. Lowest Lowest Input already contains a complete changelog, such as a database binlog. Most efficient option — no extra computation required.
lookup (recommended) Executes a lookup against small file compaction results to produce a complete changelog. Triggered at each Flink checkpoint. Minute-level Higher Any input type; latency requirements at the minute level
full-compaction Generates a changelog after each full compaction of small files. Up to several hours Lower than lookup Any input type; higher latency is acceptable

For full-compaction, set 'full-compaction.delta-commits' = '<num>' to trigger full compaction after every <num> Flink checkpoints. Full compaction is resource-intensive — set the interval to 30 minutes to 1 hour.

By default, Paimon generates a changelog record even when the updated value is identical to the previous one. To suppress these no-op records, set 'changelog-producer.row-deduplicate' = 'true'. This option applies only to lookup and full-compaction and requires additional computation to compare before/after values, so enable it only when a large number of unnecessary records is expected.

Append-only tables

An append-only table has no primary keys and accepts only INSERT operations in streaming mode. Use this table type for workloads that do not require streaming updates, such as log data synchronization.

CREATE TABLE T (
  dt STRING,
  order_id BIGINT,
  item_id BIGINT,
  amount INT,
  address STRING
) PARTITIONED BY (dt) WITH (
  'bucket' = '-1'
);

Subtypes

Subtype Configuration Characteristics
Append scalable table 'bucket' = '-1' High write throughput; no hash partitioning required; data sorting supported; flexible parallelism configuration; supports direct conversion from Hive tables; completely asynchronous file compaction; consumption order differs from write order; data skew possible when upstream parallelism equals writer parallelism
Append queue table 'bucket' = '<num>' (integer > 0) Order-preserving per bucket; latency of several minutes; equivalent to a Kafka topic partition or the number of shards in an ApsaraMQ for MQTT instance

Bucket assignment

Append scalable table: Data is written directly to a single partition in parallel. The bucket concept is ignored, and no hash partitioning is required, which maximizes write performance.

Append queue table: By default, records are assigned to buckets based on the values in all columns of the data record. To control assignment, set bucket-key in the WITH clause with a comma-separated list of column names.

For example, 'bucket-key' = 'c1,c2' routes records by the hash of c1 and c2.

Specify bucket-key to reduce bucket assignment computation and improve write efficiency.

Data consumption order

Append scalable table: Records may be consumed in an order different from the write order.

Append queue table: Records within each bucket are consumed in write order. For records across different partitions or buckets:

  • Two records from different partitions: if 'scan.plan-sort-partition' = 'true', the record with the smaller partition value is consumed first; otherwise, the record in the earlier-created partition is consumed first.

  • Two records from the same partition, same bucket: the earlier-written record is consumed first.

  • Two records from the same partition, different buckets: consumption order is not guaranteed because different buckets may be processed concurrently.

What's next