Apache Paimon supports two table types: primary key tables and append-only tables. Each type has distinct semantics for writes, updates, and streaming consumption. Choose the type that matches your data pipeline requirements.
Choose a table type
| Scenario | Recommended configuration |
|---|---|
| CDC sync, standard upserts | Primary key table, deduplicate engine |
| Deduplication where only the first record matters | Primary key table, first-row engine |
| Real-time aggregation (running totals, max/min tracking) | Primary key table, aggregation engine |
| Wide table assembly from multiple streaming sources | Primary key table, partial-update engine |
| Log ingestion, event streaming | Append-only table (scalable) |
| Order-preserving message queue replacement | Append queue table |
Primary key tables
A primary key table requires one or more primary keys, specified at table creation. Records with the same primary keys are merged according to the configured merge engine.
CREATE TABLE T (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket' = '4'
);
In this example, the partition key is dt, the primary keys are dt, shop_id, and user_id, and the table uses 4 fixed buckets.
Bucket modes
A bucket is the smallest unit for reads and writes in a Paimon table. Partitions (or the entire table for unpartitioned tables) are subdivided into buckets, enabling parallel reads and writes.
| Mode | Configuration | Notes |
|---|---|---|
| Dynamic bucket mode (default) | Omit bucket or set 'bucket' = '-1' |
Does not support concurrent writes from multiple Flink deployments; supports cross-partition updates |
| Fixed bucket mode | Set 'bucket' = '<num>' (integer > 0) |
<num> is the number of buckets for the whole table (unpartitioned) or per partition (partitioned); supports changing the bucket count |
Sizing buckets: We recommend setting the total size of data in each bucket to 2 GB and not specifying a value greater than 5 GB. Too few buckets limit write parallelism; too many produce excessive small files.
Dynamic bucket mode: how data is updated
Cross-partition updates occur when the primary keys do not include all partition keys. Paimon cannot determine the target partition from the primary key alone, so it uses RocksDB to maintain a primary-key-to-partition/bucket mapping. On large tables, this can significantly reduce performance compared to fixed bucket mode, and deployment initialization takes longer while the mapping loads into RocksDB.
The update behavior depends on the merge engine:
-
deduplicate: deletes the existing record and inserts a new record in the target partition. -
aggregationorpartial-update: updates the existing record in place within the current partition. -
first-row: retains the existing record and discards the incoming record.
Intra-partition updates occur when the primary keys include all partition keys. Paimon can determine the partition from the primary key but not the bucket, so it maintains an in-memory index mapping primary keys to buckets. Every 100 million mapping entries consume approximately 1 GB of heap memory, charged only to the partitions actively being written.
Dynamic bucket mode: bucket assignment
New records are written into existing buckets first. If all buckets are at capacity, a new bucket is created automatically.
Configure this behavior with the following parameters in the WITH clause:
| Parameter | Description | Default |
|---|---|---|
dynamic-bucket.target-row-num |
Maximum records per bucket | 2000000 |
dynamic-bucket.initial-buckets |
Initial bucket count | Writer operator parallelism |
Fixed bucket mode: bucket assignment
By default, Paimon assigns records to buckets using a hash of the primary key values. To use a different set of columns, set bucket-key in the WITH clause. Specify multiple columns separated by commas. The columns in bucket-key must be a subset of the primary keys.
For example, 'bucket-key' = 'c1,c2' routes records by the hash of c1 and c2.
In fixed bucket mode, include all partition keys in the primary keys to prevent cross-partition updates.
Change the number of buckets in fixed bucket mode
The bucket count determines read and write parallelism. To change it for an existing table:
-
Pause all deployments that read from or write to the table.
-
Create a script and execute the following SQL statement to configure the bucket parameter:
ALTER TABLE `<catalog-name>`.`<database-name>`.`<table-name>` SET ('bucket' = '<bucket-num>'); -
Reorganize the data by running a batch deployment.
-
Unpartitioned table: Create a Blank Batch Draft, paste the following SQL, then click Deploy and Start: ``
sql INSERT OVERWRITE<catalog-name>.<database-name>.<table-name>SELECT * FROM<catalog-name>.<database-name>.<table-name>;`` -
Partitioned table: Run one batch per partition you want to reorganize:
INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>` PARTITION (<partition-spec>) SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>` WHERE <partition-condition>;Example — reorganize the partition where
dt = '20240312'andhh = '08':INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>` PARTITION (dt = '20240312', hh = '08') SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>` WHERE dt = '20240312' AND hh = '08';
-
-
After the batch deployment completes successfully, resume the paused deployments.
Merge engines
When multiple records share the same primary keys, Paimon merges them according to the merge-engine setting.
If your input stream contains out-of-order records, configure the sequence.field parameter to control merge order. See Out-of-order data handling.
first-row
aggregation
partial-update
Out-of-order data handling
By default, Paimon merges records in input order — the last record received is the last to merge. For streams with out-of-order records, set sequence.field in the WITH clause. Records with the same primary keys are then merged in ascending order of the specified column's value.
Supported data types for sequence.field: TINYINT, SMALLINT, INTEGER, BIGINT, TIMESTAMP, TIMESTAMP_LTZ.
When using MySQL as the input source with theop_tmetadata column as the sequence field, UPDATE_BEFORE and UPDATE_AFTER change pairs share the same sequence value. To ensure Paimon processes UPDATE_BEFORE before UPDATE_AFTER, set'sequence.auto-padding' = 'row-kind-flag'.
Changelog producer
A primary key table must generate a complete changelog (covering INSERT, DELETE, and UPDATE operations) to support downstream streaming consumption, similar to a database binlog. Configure the generation method with the changelog-producer parameter.
| Value | Description | Latency | Resource use | Use when |
|---|---|---|---|---|
none |
No changelog is generated. | — | Lowest | No streaming consumption needed |
input |
Passes input records directly to downstream consumers. | Lowest | Lowest | Input already contains a complete changelog, such as a database binlog. Most efficient option — no extra computation required. |
lookup (recommended) |
Executes a lookup against small file compaction results to produce a complete changelog. Triggered at each Flink checkpoint. | Minute-level | Higher | Any input type; latency requirements at the minute level |
full-compaction |
Generates a changelog after each full compaction of small files. | Up to several hours | Lower than lookup |
Any input type; higher latency is acceptable |
For full-compaction, set 'full-compaction.delta-commits' = '<num>' to trigger full compaction after every <num> Flink checkpoints. Full compaction is resource-intensive — set the interval to 30 minutes to 1 hour.
By default, Paimon generates a changelog record even when the updated value is identical to the previous one. To suppress these no-op records, set'changelog-producer.row-deduplicate' = 'true'. This option applies only tolookupandfull-compactionand requires additional computation to compare before/after values, so enable it only when a large number of unnecessary records is expected.
Append-only tables
An append-only table has no primary keys and accepts only INSERT operations in streaming mode. Use this table type for workloads that do not require streaming updates, such as log data synchronization.
CREATE TABLE T (
dt STRING,
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) PARTITIONED BY (dt) WITH (
'bucket' = '-1'
);
Subtypes
| Subtype | Configuration | Characteristics |
|---|---|---|
| Append scalable table | 'bucket' = '-1' |
High write throughput; no hash partitioning required; data sorting supported; flexible parallelism configuration; supports direct conversion from Hive tables; completely asynchronous file compaction; consumption order differs from write order; data skew possible when upstream parallelism equals writer parallelism |
| Append queue table | 'bucket' = '<num>' (integer > 0) |
Order-preserving per bucket; latency of several minutes; equivalent to a Kafka topic partition or the number of shards in an ApsaraMQ for MQTT instance |
Bucket assignment
Append scalable table: Data is written directly to a single partition in parallel. The bucket concept is ignored, and no hash partitioning is required, which maximizes write performance.
Append queue table: By default, records are assigned to buckets based on the values in all columns of the data record. To control assignment, set bucket-key in the WITH clause with a comma-separated list of column names.
For example, 'bucket-key' = 'c1,c2' routes records by the hash of c1 and c2.
Specify bucket-key to reduce bucket assignment computation and improve write efficiency.
Data consumption order
Append scalable table: Records may be consumed in an order different from the write order.
Append queue table: Records within each bucket are consumed in write order. For records across different partitions or buckets:
-
Two records from different partitions: if
'scan.plan-sort-partition' = 'true', the record with the smaller partition value is consumed first; otherwise, the record in the earlier-created partition is consumed first. -
Two records from the same partition, same bucket: the earlier-written record is consumed first.
-
Two records from the same partition, different buckets: consumption order is not guaranteed because different buckets may be processed concurrently.
What's next
-
To create a Paimon catalog and table, see Manage Paimon catalogs.
-
To optimize primary key table performance, see Performance optimization.