Use the Paimon connector with the Paimon Catalog for best results. This topic explains how to use the Paimon connector for streaming data lakehouses.
Background information
Apache Paimon is a unified streaming and batch lake storage format. It supports high-throughput writes and low-latency queries. Major compute engines on Alibaba Cloud’s open source big data platform E-MapReduce—including Flink, Spark, Hive, and Trino—integrate well with Paimon. You can use Apache Paimon to quickly build your own data lake storage service on HDFS or cloud-based Object Storage Service. Then connect supported compute engines to analyze your data lake. For more information, see Apache Paimon.
Category | Details |
Supported types | Source tables, dimension tables, and sink tables (data ingestion targets) |
Execution mode | Streaming and batch modes |
Data format | Not supported |
Unique monitoring metrics | None |
API types | SQL and YAML-based data ingestion jobs |
Supports updating or deleting data in sink tables | Yes |
Key features
Apache Paimon provides the following core capabilities:
Build lightweight, low-cost data lake storage services on HDFS or Object Storage Service.
Read and write large-scale data sets in both streaming and batch modes.
Run batch and OLAP queries with data freshness from minutes down to seconds.
Ingest and produce incremental data. Serve as storage at all layers of traditional offline data warehouses and modern streaming data warehouses.
Pre-aggregate data to reduce storage costs and downstream compute load.
You can roll back to historical versions.
Filter data efficiently.
Support schema evolution.
Limits and recommendations
The Paimon connector is supported only on Ververica Runtime (VVR) version 6.0.6 and later.
The following table shows the mapping between Paimon community versions and Realtime Compute for Apache Flink engine versions (VVR).
Apache Paimon version
VVR version
1.3
11.4
1.2
11.2 and 11.3
1.1
11
1.0
8.0.11
0.9
8.0.7, 8.0.8, 8.0.9, and 8.0.10
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
Storage recommendation for concurrent writes
When multiple jobs concurrently update the same Paimon table, standard OSS storage (oss://) may cause rare commit conflicts or job failures because of atomicity limits on file operations.
To ensure stable, continuous writes, use metadata or storage services that guarantee strong atomicity. We recommend using Data Lake Formation (DLF). DLF unifies Paimon metadata and storage management. Alternatively, use OSS-HDFS or HDFS.
Parameter Setting Activation
After you change Paimon table configuration parameters, restart related jobs for the new settings to take effect. Running jobs cannot dynamically detect or load such changes.
Physical cleanup delay after dropping partitions
When you run DROP PARTITION, the system does not delete underlying physical data files immediately.
This operation performs a logical deletion only. Paimon removes the partition’s metadata from the latest snapshot. Because Paimon supports time travel, historical snapshots retain references to the partition’s data files. Physical files are deleted only after all snapshots referencing the partition expire and are cleaned up by Paimon’s snapshot expiration mechanism.
SQL
You can use the Paimon connector in SQL jobs as a source table or sink table.
Syntax
If you create a Paimon table in the Paimon Catalog, omit the
connectorparameter. The syntax is as follows.CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );NoteIf you already created a Paimon table in the Paimon Catalog, use it directly without recreating it.
If you create a Paimon temporary table in another catalog, specify the connector parameter and the path to the Paimon table files. The syntax is as follows.
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- Creates files automatically if the path does not contain Paimon table data. ... );NotePath example:
'path' = 'oss://<bucket>/test/order.db/orders'. Do not omit the.dbsuffix. Paimon uses this suffix to identify databases.Multiple jobs writing to one table must share the same path.
Different paths mean different tables—even if they point to the same physical location. For example,
oss://b/testandoss://b/test/differ only by a trailing slash but refer to different tables. Mismatched catalog configurations cause concurrent write conflicts, compaction failures, or data loss.
WITH parameters
Parameter | Description | Data type | Required | Default value | Notes |
connector | Table type. | String | No | None |
|
path | Table storage path. | String | No | None |
|
auto-create | When creating a Paimon temporary table, automatically create files if the specified path does not contain Paimon table files. | Boolean | No | false | Valid values:
|
file.format | Storage class for data files in the table. | String | No | parquet | Valid values:
|
bucket | Number of buckets per partition. | Integer | No | 1 | Data written to the Paimon table is distributed across buckets based on the Note We recommend keeping bucket size under 5 GB. |
bucket-key | Bucket key column. | String | No | None | Specify columns whose values determine how data is distributed across buckets. Separate column names with English commas (,), for example, Note
|
changelog-producer | Mechanism for producing incremental data. | String | No | none | Paimon can generate complete changelogs (with matching update_before and update_after records) for any input data stream. This helps downstream consumers process updates correctly. Valid values:
For guidance on choosing a changelog producer, see Changelog producer. |
full-compaction.delta-commits | Maximum interval between full compactions. | Integer | No | None | This parameter sets how many snapshot commits occur before a full compaction runs. |
lookup.cache-max-memory-size | Memory cache size for Paimon dimension tables. | String | No | 256 MB | This setting controls both the dimension table cache size and the lookup changelog-producer cache size. |
merge-engine | Mechanism for merging rows with the same primary key. | String | No | deduplicate | Valid values:
For details about merge engines, see Merge engine. |
partial-update.ignore-delete | Whether to ignore delete (-D) messages. | Boolean | No | false | Valid values:
Note
|
ignore-delete | Whether to ignore delete (-D) messages. | Boolean | No | false | Same valid values as partial-update.ignore-delete. Note
|
partition.default-name | Default partition name. | String | No | __DEFAULT_PARTITION__ | Used as the partition name when the partition column value is null or an empty string. |
partition.expiration-check-interval | How often to check for expired partitions. | String | No | 1h | For details, see How to configure automatic partition expiration? |
partition.expiration-time | Partition retention period. | String | No | None | A partition expires after this duration. By default, partitions never expire. The duration is calculated from the partition value. For details, see How to configure automatic partition expiration? |
partition.timestamp-formatter | Convert a time string to a formatted timestamp. | String | No | None | Specifies the format used to extract partition age from partition values. For details, see How to configure automatic partition expiration? |
partition.timestamp-pattern | Format string to convert partition values to timestamp strings. | String | No | None | Specifies the format used to extract partition age from partition values. For details, see How to configure automatic partition expiration? |
scan.bounded.watermark | Stops reading from the Paimon source table when its watermark exceeds this value. | Long | No | None | None. |
scan.mode | Consumer offset for the Paimon source table. | String | No | default | For details, see How to configure the consumer offset for a Paimon source table? |
scan.snapshot-id | Snapshot ID to start reading from. | Integer | No | None | For details, see How to configure the consumer offset for a Paimon source table? |
scan.timestamp-millis | Timestamp to start reading from. | Integer | No | None | For details, see How to configure the consumer offset for a Paimon source table? |
snapshot.num-retained.max | Maximum number of recent snapshots to retain. | Integer | No | 2147483647 | A snapshot expires if it meets either this limit or snapshot.time-retained, and also satisfies snapshot.num-retained.min. |
snapshot.num-retained.min | Minimum number of recent snapshots to retain. | Integer | No | 10 | None. |
snapshot.time-retained | How long a snapshot remains before expiring. | String | No | 1h | A snapshot expires if it meets either this limit or snapshot.num-retained.max, and also satisfies snapshot.num-retained.min. |
write-mode | Write mode for the Paimon table. | String | No | change-log | Valid values:
For details, see Write mode. |
scan.infer-parallelism | Whether to infer parallelism for the Paimon source table automatically. | Boolean | No | true | Valid values:
|
scan.parallelism | Parallelism for the Paimon source table. | Integer | No | None | Note This parameter has no effect when resource allocation mode is set to Expert Mode on the tab. |
sink.parallelism | Parallelism for the Paimon sink table. | Integer | No | None | Note This parameter has no effect when resource allocation mode is set to Expert Mode on the tab. |
sink.clustering.by-columns | Clustering columns for the Paimon sink table. | String | No | None | For Paimon append-only tables (non-primary-key tables), configure this parameter in batch jobs to enable clustering writes. Clustering improves query speed by grouping data by value ranges on specified columns. Separate column names with commas (,). For example: For details, see the Apache Paimon documentation. |
sink.delete-strategy | Validation strategy to ensure correct handling of retract (-D/-U) messages. | Enum | No | NONE | Valid strategies and required sink behavior:
Note
|
For more configuration options, see the Apache Paimon documentation.
Feature details
Data freshness and consistency guarantees
The Paimon sink table uses the two-phase commit protocol to commit data during each checkpoint of a Flink job. Therefore, the data freshness corresponds to the checkpoint interval of the Flink job. Each commit generates up to two snapshots.
When two Flink jobs simultaneously write to the same Paimon table, serializable consistency is guaranteed if their data is written to different buckets. If their data is written to the same bucket, however, only snapshot isolation consistency can be guaranteed. This means the table's data may be a mix of results from both jobs, but no data loss will occur.
Merge engine
When a Paimon sink table receives multiple rows with the same primary key, it merges them into one row to preserve uniqueness. Set the merge-engine parameter to control how merging works. The following table describes each option.
Merge engine | Details |
Deduplicate | Deduplicate is the default merge engine. For rows with the same primary key, the Paimon sink table keeps only the most recent row and discards the rest. Note If the most recent row is a delete message, all rows with that primary key are discarded. |
Partial update | With partial update, you can incrementally update data across multiple messages. New rows with the same primary key overwrite existing ones, but null columns remain unchanged. For example, suppose the Paimon sink table receives these rows in order:
If the first column is the primary key, the final result is <1, 25.2, 10, 'This is a book'>. Note
|
Aggregation | In some cases, you only need aggregated values. With aggregation, Paimon merges rows with the same primary key using your specified aggregate functions. For each non-primary-key column, define an aggregate function using The price column aggregates using max; sales uses sum. Given inputs <1, 23.0, 15> and <1, 30.2, 20>, the result is <1, 30.2, 35>. Supported aggregate functions and data types:
Note
|
Incremental data generation mechanism
Set the changelog-producer parameter to generate complete changelogs (with matching update_before and update_after records) for any input data stream. The following table lists all available changelog producers. For more detail, see the Apache Paimon documentation.
Mechanism | Details |
None | When For example, suppose a downstream consumer calculates the sum of a column. If it sees only the latest value 5, it cannot decide how to update the sum. If the previous value was 4, it should add 1. If it was 6, it should subtract 1. Consumers sensitive to update_before data should avoid none. But other producers incur performance overhead. Note If your downstream consumer—such as a database—does not need update_before data, none is acceptable. Choose based on your needs. |
Input | Set So use this only when the input stream itself is a complete changelog—such as CDC data. |
Lookup | When you set Compared to Full Compaction, Lookup delivers fresher changelogs but consumes more resources. We recommend this feature for incremental data that requires high freshness, such as minute-level updates. |
Full Compaction | After you set Compared to Lookup, Full Compaction delivers less-fresh changelogs but uses fewer resources because it piggybacks on existing compaction work. This approach is recommended when the freshness requirement for incremental data is not strict, such as when hourly updates are acceptable. |
Write mode
Paimon tables support the following write modes.
Mode | Details |
Change-log | Change-log is the default write mode. It supports insert, delete, and update operations based on the primary key. You can also use merge engines and changelog producers with this mode. |
Append-only | Append-only mode supports only inserts and does not use primary keys. It is more efficient than change-log mode. Use it as a message queue alternative when moderate data freshness is acceptable—such as minute-level. For details, see the Apache Paimon documentation. When using append-only mode, note the following:
|
Target for CTAS and CDAS
Paimon tables support real-time synchronization of single tables or entire databases. Schema changes in upstream tables sync to Paimon tables in real time. For details, see Manage Paimon tables and Manage Paimon Catalog.
Data ingestion
You can use the Paimon connector in YAML-based data ingestion jobs as a sink.
Syntax
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouseConfiguration options
Parameter | Description | Required | Data type | Default value | Notes |
type | Connector type. | Yes | STRING | None | Fixed value: |
name | Sink name. | No | STRING | None | Name of the sink. |
catalog.properties.metastore | Type of Paimon Catalog. | No | STRING | filesystem | Valid values:
|
catalog.properties.* | Parameters for creating a Paimon Catalog. | No | STRING | None | For details, see Manage Paimon Catalog. |
table.properties.* | Parameters for creating a Paimon table. | No | STRING | None | For details, see Paimon table options. |
catalog.properties.warehouse | Root directory for file storage. | No | STRING | None | This parameter takes effect only when |
commit.user-prefix | Username prefix used when committing data files. | No | STRING | None | Note Use different usernames for different jobs to help identify conflicting jobs during commit failures. |
partition.key | Partition field for each partitioned table. | No | STRING | None | Different tables are separated by |
sink.cross-partition-upsert.tables | Tables that require cross-partition upserts (primary key does not include all partition fields). | No | STRING | None | Applies to tables requiring cross-partition upserts.
Important
|
sink.commit.parallelism | Parallelism for the commit operator. | No | INTEGER | None | Increase this value to improve performance when the commit operator becomes a bottleneck. Supported only in VVR 11.6 and later. Note Setting this parameter changes operator parallelism. When restarting stateful jobs, enable AllowNonRestoredState. |
Usage examples
Configure the Paimon connector as a data ingestion sink based on your Paimon Catalog type.
Example configuration for Paimon Catalog as a filesystem, writing to Alibaba Cloud OSS:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxxFor meanings of parameters prefixed with catalog.properties, see Create a Paimon Filesystem Catalog.
Example: REST catalog writing to Alibaba Cloud Data Lake Formation.
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlf # (Optional) Enable deletion vectors to improve read performance. table.properties.deletion-vectors.enabled: trueFor meanings of parameters prefixed with catalog.properties, see Flink CDC catalog configuration parameters.
Schema evolution
Paimon supports the following schema evolution events when used as a data ingestion sink:
CREATE TABLE EVENT
ADD COLUMN EVENT
ALTER COLUMN TYPE EVENT (does not support changing primary key column types)
RENAME COLUMN EVENT
DROP COLUMN EVENT
TRUNCATE TABLE EVENT
DROP TABLE EVENT
If the downstream Paimon table already exists, Paimon uses the existing schema for writes and does not attempt to recreate the table.
FAQ
How do I configure the consumer offset for a Paimon source table?
Why does my Paimon job fail with “Heartbeat of TaskManager timed out”?
Why does my Paimon job fail with “Sink materializer must not be used with Paimon sink”?
Why does my Paimon job fail with “File deletion conflicts detected” or “LSM conflicts detected”?
Why does my Paimon job fail with “File xxx not found, Possible causes”?
Is Paimon connector data visibility related to checkpoint intervals?