This topic describes how to use the Streaming Data Lakehouse Paimon connector with a Paimon Catalog.
Background information
Apache Paimon is a unified storage format for streaming and batch data lakes. It supports high-throughput writes and low-latency queries. E-MapReduce (EMR), an open source big data platform from Alibaba Cloud, integrates with common compute engines such as Flink, Spark, Hive, and Trino. You can use Apache Paimon to build a data lake storage service on HDFS or Object Storage Service (OSS) and connect these compute engines to analyze data in the data lake. For more information, see Apache Paimon.
Category | Details |
Supported types | Source table, dimension table, sink table, and data ingestion sink |
Running modes | Streaming mode and batch mode |
Data format | Not supported |
Specific monitoring metrics | None |
API types | SQL, data ingestion YAML job |
Supports updating or deleting sink table data | Yes |
Features
Apache Paimon provides the following core features:
Build a low-cost, lightweight data lake storage service on HDFS or Object Storage Service (OSS).
Read and write large scale datasets in streaming and batch modes.
Support batch queries and Online Analytical Processing (OLAP) queries with data freshness from minutes to seconds.
Consume and generate incremental data. You can use it as storage for traditional offline data warehouses and modern streaming data warehouses.
Pre-aggregate data to reduce storage costs and downstream computing pressure.
Support data backtracking for historical versions.
Support efficient data filtering.
Support schema evolution.
Limits and Recommendations
The Paimon connector is supported only by Flink compute engine VVR 6.0.6 and later.
The following table shows the compatibility between Paimon and VVR versions.
Apache Paimon version
VVR version
1.3
11.4
1.2
11.2、11.3
1.1
11
1.0
8.0.11
0.9
8.0.7, 8.0.8, 8.0.9, and 8.0.10
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
Storage recommendations for concurrent write scenarios
When multiple jobs concurrently update the same Paimon table, standard OSS storage (oss://) might experience rare commit conflicts or job errors because of limitations in the atomicity of file operations.
To ensure continuous and stable writes, use metadata or storage services that have strong atomicity guarantees. We recommend that you use Data Lake Formation (DLF), which provides unified management of Paimon metadata and storage services. Alternatively, you can use OSS-HDFS or HDFS.
SQL
You can use the Paimon connector in SQL jobs as a source table or a sink table.
Syntax
If you create a Paimon table in a Paimon Catalog, you do not need to specify the
connectorparameter. The syntax for creating a Paimon table is as follows:CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );NoteIf you have already created a Paimon table in the Paimon Catalog, you can use it directly without recreating the table.
If you create a temporary Paimon table in another Catalog, you must specify the connector parameter and the storage path of the Paimon table. The syntax for creating a Paimon table is as follows:
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- If no Paimon table data file exists in the specified path, the system automatically creates a file. ... );NotePath example:
'path' = 'oss://<bucket>/test/order.db/orders'. Do not omit the.dbsuffix. Paimon uses this naming convention to identify databases.Multiple jobs that write to a single table must share the same path configuration.
If two paths differ, Paimon does not consider them the same table. Even if the physical paths are identical, inconsistent Catalog configurations can cause concurrent write conflicts, compaction failures, or data loss. For example,
oss://b/testandoss://b/test/are considered different paths because of the trailing slash, even though their physical paths are the same.
WITH parameters
Parameter | Description | Data type | Required | Default value | Remarks |
connector | Table type. | String | No | None |
|
path | Table storage path. | String | No | None |
|
auto-create | When creating a temporary Paimon table, automatically create a file if the specified path does not contain a Paimon table file. | Boolean | No | false | Values:
|
file.format | Storage type of data files in the table. | String | No | parquet | Values:
|
bucket | Number of buckets in each partition. | Integer | No | 1 | Data written to the Paimon table is distributed to each bucket based on the Note Keep the data volume in each bucket below 5 GB. |
bucket-key | Bucket key column. | String | No | None | Specify which column values distribute data written to the Paimon table into different buckets. Separate column names with commas (,). For example, Note
|
changelog-producer | Incremental data generation mechanism. | String | No | none | Paimon generates complete incremental data for any input data stream (all update_after data has corresponding update_before data) to facilitate downstream consumers. Incremental data generation mechanism values:
For more information about selecting an incremental data generation mechanism, see Incremental data generation mechanism. |
full-compaction.delta-commits | Maximum Full Compaction interval. | Integer | No | None | This parameter specifies how many commit snapshots trigger a Full Compaction. |
lookup.cache-max-memory-size | Memory cache size for Paimon dimension tables. | String | No | 256 MB | This parameter affects the cache size for both dimension tables and the lookup changelog-producer. Both mechanisms use this parameter for cache configuration. |
merge-engine | Merge mechanism for data with the same primary key. | String | No | deduplicate | Values:
For a detailed analysis of the data merge mechanism, see Data merge mechanism. |
partial-update.ignore-delete | Ignore delete (-D) messages. | Boolean | No | false | Values:
Note
|
ignore-delete | Ignore delete (-D) messages. | Boolean | No | false | Values are the same as partial-update.ignore-delete. Note
|
partition.default-name | Default partition name. | String | No | __DEFAULT_PARTITION__ | If the partition key column value is null or an empty string, this default name is used as the partition name. |
partition.expiration-check-interval | How often to check for partition expiration. | String | No | 1h | For more information, see Configure automatic partition expiration. |
partition.expiration-time | Partition expiration duration. | String | No | None | When a partition's lifespan exceeds this value, the partition expires. By default, partitions never expire. A partition's lifespan is calculated from its partition value. For more information, see Configure automatic partition expiration. |
partition.timestamp-formatter | Format string to convert a time string to a timestamp. | String | No | None | Set the format to extract the partition's lifespan from the partition value. For more information, see Configure automatic partition expiration. |
partition.timestamp-pattern | Format string to convert a partition value to a time string. | String | No | None | Set the format to extract the partition's lifespan from the partition value. For more information, see Configure automatic partition expiration. |
scan.bounded.watermark | When the watermark of data generated by the Paimon source table exceeds this value, the Paimon source table stops generating data. | Long | No | None | None. |
scan.mode | Specify the consumer offset for the Paimon source table. | String | No | default | For more information, see Specify consumer offset for a Paimon source table. |
scan.snapshot-id | Specify the snapshot from which the Paimon source table starts consuming data. | Integer | No | None | For more information, see Specify consumer offset for a Paimon source table. |
scan.timestamp-millis | Specify the timestamp from which the Paimon source table starts consuming data. | Integer | No | None | For more information, see Specify consumer offset for a Paimon source table. |
snapshot.num-retained.max | Maximum number of latest snapshots to retain. | Integer | No | 2147483647 | Snapshot expiration is triggered if this configuration or snapshot.time-retained is met, and snapshot.num-retained.min is also met. |
snapshot.num-retained.min | Minimum number of latest snapshots to retain. | Integer | No | 10 | None. |
snapshot.time-retained | How long after creation a snapshot expires. | String | No | 1h | Snapshot expiration is triggered if this configuration or snapshot.num-retained.max is met, and snapshot.num-retained.min is also met. |
write-mode | Paimon table write mode. | String | No | change-log | Values:
For a detailed introduction to write modes, see Write modes. |
scan.infer-parallelism | Automatically infer the parallelism of the Paimon source table. | Boolean | No | true | Values:
|
scan.parallelism | Parallelism of the Paimon source table. | Integer | No | None | Note On the tab of the job, this parameter does not take effect when the Resource Mode is set to Expert mode. |
sink.parallelism | Parallelism of the Paimon sink table. | Integer | No | None | Note In the job tab, this parameter does not take effect when the Resource Allocation mode is set to Expert mode. |
sink.clustering.by-columns | Specify clustering columns for data written to the Paimon sink table. | String | No | None | For Paimon Append-Only tables (non-primary key tables), configure this parameter in batch jobs to enable clustering writes. This clusters data by size range in specific columns, improving table query speed. Separate multiple column names with commas (,). For example, For clustering details, see the Apache Paimon official documentation. |
sink.delete-strategy | Set the validation policy to ensure the system correctly handles retraction (-D/-U) messages. | Enum | No | NONE | Validation policy values and how the Sink operator should handle retraction messages:
Note
|
For more information about configuration items, see the Apache Paimon official documentation.
Feature details
Data freshness and consistency assurance
A Paimon sink table uses the two-phase commit (2PC) protocol. It commits written data during the checkpoint of each Flink job. Therefore, the data freshness is equivalent to the checkpoint interval of the Flink job. Each commit generates a maximum of two snapshots.
If two Flink jobs write data to a Paimon table concurrently, serializable consistency is guaranteed if the data is written to different buckets. If the data is written to the same bucket, only snapshot isolation consistency is guaranteed. This means that the data in the table might contain mixed results from both jobs, but no data is lost.
Data merge mechanism
When a Paimon sink table receives multiple data records that have the same primary key, it merges them into a single record to maintain the uniqueness of the primary key. You can specify the data merge behavior using the merge-engine parameter. The following table describes the data merge mechanisms.
Merge mechanism | Details |
Deduplicate | Deduplicate is the default data merge mechanism. For multiple data records with the same primary key, the Paimon sink table retains only the latest record and discards others. Note If the latest record is a delete message, all data with that primary key is discarded. |
Partial Update | Use the partial-update mechanism to incrementally update data with multiple messages, ultimately achieving complete data. Specifically, new data with the same primary key overwrites existing data, but columns with null values do not overwrite existing data. For example, the Paimon sink table receives the following data records in sequence:
If the first column is the primary key, the final result is <1, 25.2, 10, 'This is a book'>. Note
|
Aggregation | In some scenarios, you might only need aggregated values. The aggregation mechanism aggregates data with the same primary key using a specified aggregate function. For each non-primary key column, specify an aggregate function using The price column aggregates using the max function, and the sales column aggregates using the sum function. Given two input data records <1, 23.0, 15> and <1, 30.2, 20>, the final result is <1, 30.2, 35>. Supported aggregate functions and their corresponding data types:
Note
|
Incremental data generation mechanism
You can set the changelog-producer parameter to configure the incremental data generation mechanism. Paimon generates complete incremental data (all update_after data has corresponding update_before data) for any input data stream. The following list describes all incremental data generation mechanisms. For more information, see the Apache Paimon official documentation.
Mechanism | Details |
None | When you set For example, if a downstream consumer needs to calculate the sum of a column and only sees the latest data as 5, it cannot determine how to update the sum. If the previous data was 4, it should increase the sum by 1. If the previous data was 6, it should decrease the sum by 1. Such consumers are sensitive to update_before data. Do not configure the incremental data generation mechanism to None for these consumers. However, other incremental data generation mechanisms incur performance overhead. Note If your downstream consumer, such as a database, is not sensitive to update_before data, you can configure the incremental data generation mechanism to None. Configure the incremental data generation mechanism as needed. |
Input | When you set Therefore, use this incremental data generation mechanism only when the input data stream itself is complete incremental data (such as CDC data). |
Lookup | When you set Compared to the Full Compaction mechanism described below, the Lookup mechanism generates incremental data with better timeliness but consumes more resources overall. Use it when high freshness of incremental data is required (such as minute-level freshness). |
Full Compaction | When you set Compared to the Lookup mechanism described above, the Full Compaction mechanism generates incremental data with less timeliness. However, it leverages the data's full compaction process, incurring no additional computation, thus consuming fewer resources overall. Use it when high freshness of incremental data is not required (such as hour-level freshness). |
Write modes
Paimon tables support the following write modes:
Mode | Details |
Change-log | Change-log is the default write mode for Paimon tables. This mode supports inserting, deleting, and updating data based on the primary key. You can also use the data merge mechanism and incremental data generation mechanism mentioned above in this write mode. |
Append-only | Append-only write mode supports only data insertion and does not support primary keys. This mode is more efficient than change-log mode. Use it as a message queue alternative in scenarios where data freshness requirements are moderate (such as minute-level freshness). For a detailed introduction to append-only write mode, see the Apache Paimon official documentation. When using append-only write mode, note the following:
|
As CTAS and CDAS target
Paimon tables support real-time synchronization of data at the single-table or full-database level. If the schema of the upstream table changes during synchronization, the change is also synchronized to the Paimon table in real time. For more information, see Manage Paimon tables and Manage Paimon Catalogs.
Data ingestion
You can use the Paimon connector for data ingestion in YAML job development to write data to the destination.
Syntax
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouseConfiguration items
Parameter | Description | Required | Data type | Default value | Remarks |
type | Connector type. | Yes | STRING | None | The value is fixed to |
name | Target end name. | No | STRING | None | Sink name. |
catalog.properties.metastore | Paimon Catalog type. | No | STRING | filesystem | Values:
|
catalog.properties.* | Parameters for creating a Paimon Catalog. | No | STRING | None | For more information, see Manage Paimon Catalogs. |
table.properties.* | Parameters for creating a Paimon table. | No | STRING | None | For more information, see Paimon table options. |
catalog.properties.warehouse | Root directory for file storage. | No | STRING | None | This takes effect only when |
commit.user-prefix | Username prefix when committing data files. | No | STRING | None | Note Set different usernames for different jobs. This helps locate conflicting jobs during commit conflicts. |
partition.key | Partition field for each partitioned table. | No | STRING | None | Separate different tables with |
sink.cross-partition-upsert.tables | Specify tables that require cross-partition upsert (primary key does not include all partition fields). | No | STRING | None | Applies to tables with cross-partition upserts.
Important
|
sink.commit.parallelism | Specify the parallelism of the Commit operator. | No | INTEGER | None | If the Commit operator becomes a bottleneck, specify its parallelism using this parameter to improve performance. Only Realtime Compute for Apache Flink VVR 11.6 and later versions support this parameter. Note Setting this parameter changes operator parallelism. When restarting a stateful job, specify to allow ignoring partial operator states (AllowNonRestoredState). |
Usage examples
When you use Paimon as a data ingestion target, you can refer to the following examples for configuration based on the Paimon Catalog type.
If the Paimon Catalog is a filesystem, see the following configuration example for writing data to Alibaba Cloud OSS:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxxFor information about the parameters that are prefixed with catalog.properties, see Create a Paimon Filesystem Catalog.
If the Paimon Catalog is rest, see the following configuration example for writing data to Alibaba Cloud Data Lake Formation:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlf # (Optional) Enable deletion vectors to improve read performance table.properties.deletion-vectors.enabled: trueFor information about the parameters that are prefixed with catalog.properties, see Flink CDC Catalog configuration parameters.
Schema evolution
Currently, Paimon supports the following schema evolution events as a data ingestion target:
CREATE TABLE EVENT
ADD COLUMN EVENT
ALTER COLUMN TYPE EVENT (Modifying primary key column types is not supported.)
RENAME COLUMN EVENT
DROP COLUMN EVENT
TRUNCATE TABLE EVENT
DROP TABLE EVENT
If the downstream Paimon table already exists, the system uses the existing table schema for writes and does not attempt to recreate the table.