You can use the Paimon connector, a streaming data lakehouse connector, with a Paimon Catalog. This topic describes how to use the Paimon connector.
Background information
Apache Paimon is a lake storage format that unifies streaming and batch processing. It supports high-throughput writes and low-latency queries. Common compute engines on the Alibaba Cloud open-source big data platform E-MapReduce, such as Flink, Spark, Hive, and Trino, are well-integrated with Paimon. You can use Apache Paimon to build a data lake storage service on Hadoop Distributed File System (HDFS) or Object Storage Service (OSS) and connect to these compute engines for data lake analytics. For more information, see Apache Paimon.
Category | Details |
Supported types | Source table, dimension table, sink table, and data ingestion destination |
Run modes | Streaming and batch modes |
Data format | Not supported |
Specific monitoring metrics | None |
API types | SQL, YAML jobs for data ingestion |
Supports updating or deleting data in sink tables | Yes |
Features
Apache Paimon provides the following core capabilities:
Build a low-cost, lightweight data lake storage service based on HDFS or OSS.
Read and write large-scale datasets in streaming and batch modes.
Perform batch queries and Online Analytical Processing (OLAP) queries with data freshness ranging from seconds to minutes.
Consume and produce incremental data. Paimon can be used as a storage layer for traditional offline data warehouses and modern streaming data warehouses.
Pre-aggregate data to reduce storage costs and downstream computing pressure.
Retrieve historical versions of data.
Filter data efficiently.
Support schema evolution.
Limits
The Paimon connector is supported only in Flink compute engine Ververica Runtime (VVR) versions 6.0.6 and later.
The following table describes the version mapping between Paimon and VVR.
Apache Paimon version
VVR version
1.1
11
1.0
8.0.11
0.9
8.0.7, 8.0.8, 8.0.9, and 8.0.10
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
SQL
The Paimon connector can be used in SQL jobs as a source table or sink table.
Syntax
If you create a Paimon table in a Paimon Catalog, you do not need to specify the
connectorparameter. The syntax for creating a Paimon table is as follows:CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );NoteIf you have already created a Paimon table in a Paimon Catalog, you can use it directly without creating it again.
If you create a Paimon temporary table in another catalog, you must specify the connector parameter and the storage path of the Paimon table. The syntax for creating a Paimon table is as follows:
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- If no Paimon table data file exists at the specified path, a file is automatically created. ... );NotePath example:
'path' = 'oss://<bucket>/test/order.db/orders'. Do not omit the.dbsuffix. Paimon uses this naming convention to identify the database.Multiple jobs that write to the same table must use the same path configuration.
If two path values are different, Paimon considers them to be different tables. Even if the physical paths are the same, inconsistent Catalog configurations can cause concurrent write conflicts, compaction failures, and data loss. For example,
oss://b/testandoss://b/test/are considered different paths because of the trailing slash, even though they have the same physical path.
WITH parameters
Parameter | Description | Data type | Required | Default value | Notes |
connector | The table type. | String | No | None |
|
path | The table storage path. | String | No | None |
|
auto-create | When you create a Paimon temporary table, specifies whether to automatically create a file if no Paimon table file exists at the specified path. | Boolean | No | false | Valid values:
|
bucket | The number of buckets in each partition. | Integer | No | 1 | Data written to the Paimon table is distributed to each bucket based on the Note We recommend that the data volume of each bucket be less than 5 GB. |
bucket-key | The key columns for bucketing. | String | No | None | Specifies the columns based on which the data written to the Paimon table is distributed to different buckets. Separate column names with commas (,), for example, Note
|
changelog-producer | The mechanism for generating incremental data. | String | No | none | Paimon can generate complete incremental data (all update_after data has corresponding update_before data) for any input data stream, which is convenient for downstream consumers. Valid values for the incremental data generation mechanism:
For more information about how to select a mechanism for generating incremental data, see Incremental data generation mechanisms. |
full-compaction.delta-commits | The maximum interval for full compaction. | Integer | No | None | This parameter specifies the number of snapshot commits after which a full compaction must be performed. |
lookup.cache-max-memory-size | The memory cache size for the Paimon dimension table. | String | No | 256 MB | This parameter affects the cache size of both the dimension table and the lookup changelog-producer. The cache size for both mechanisms is configured by this parameter. |
merge-engine | The mechanism for merging data with the same primary key. | String | No | deduplicate | Valid values:
For a detailed analysis of data merge mechanisms, see Data merge mechanisms. |
partial-update.ignore-delete | Specifies whether to ignore delete (-D) messages. | Boolean | No | false | Valid values:
Note
|
ignore-delete | Specifies whether to ignore delete (-D) messages. | Boolean | No | false | The valid values are the same as for partial-update.ignore-delete. Note
|
partition.default-name | The default name for partitions. | String | No | __DEFAULT_PARTITION__ | If the value of a partition key column is null or an empty string, this default name is used as the partition name. |
partition.expiration-check-interval | The interval at which to check for expired partitions. | String | No | 1h | For more information, see How do I configure automatic partition expiration? |
partition.expiration-time | The expiration duration for partitions. | String | No | None | When the survival time of a partition exceeds this value, the partition expires. By default, partitions never expire. The survival time of a partition is calculated from its partition value. For more information, see How do I configure automatic partition expiration? |
partition.timestamp-formatter | The format string to convert a time string to a UNIX timestamp. | String | No | None | Sets the format for extracting the partition survival time from the partition value. For more information, see How do I configure automatic partition expiration? |
partition.timestamp-pattern | The format string to convert a partition value to a time string. | String | No | None | Sets the format for extracting the partition survival time from the partition value. For more information, see How do I configure automatic partition expiration? |
scan.bounded.watermark | When the watermark of the data generated by the Paimon source table exceeds this value, the Paimon source table stops generating data. | Long | No | None | None. |
scan.mode | Specifies the consumer offset for the Paimon source table. | String | No | default | For more information, see How do I set the consumer offset for a Paimon source table? |
scan.snapshot-id | Specifies the snapshot from which the Paimon source table starts consumption. | Integer | No | None | For more information, see How do I set the consumer offset for a Paimon source table? |
scan.timestamp-millis | Specifies the point in time from which the Paimon source table starts consumption. | Integer | No | None | For more information, see How do I set the consumer offset for a Paimon source table? |
snapshot.num-retained.max | The maximum number of latest snapshots to retain without expiring. | Integer | No | 2147483647 | A snapshot expires if this configuration or `snapshot.time-retained` is met, and `snapshot.num-retained.min` is also met. |
snapshot.num-retained.min | The minimum number of latest snapshots to retain without expiring. | Integer | No | 10 | None. |
snapshot.time-retained | The duration after which a snapshot expires. | String | No | 1h | A snapshot expires if this configuration or `snapshot.num-retained.max` is met, and `snapshot.num-retained.min` is also met. |
write-mode | The write mode for the Paimon table. | String | No | change-log | Valid values:
For more information about write modes, see Write modes. |
scan.infer-parallelism | Specifies whether to automatically infer the concurrency of the Paimon source table. | Boolean | No | true | Valid values:
|
scan.parallelism | The concurrency of the Paimon source table. | Integer | No | None | Note On the tab, this parameter does not take effect when Resource Mode is set to Expert. |
sink.parallelism | The concurrency of the Paimon sink table. | Integer | No | None | Note On the tab, this parameter does not take effect when Resource Mode is set to Expert. |
sink.clustering.by-columns | Specifies the clustering columns for writing to the Paimon sink table. | String | No | None | For Paimon append-only tables (non-primary key tables), configuring this parameter in a batch job enables the clustering write feature. This feature clusters data on specific columns by value range, which improves the query speed of the table. Separate multiple column names with commas (,), for example, For more information about clustering, see the official Apache Paimon documentation. |
sink.delete-strategy | Sets a validation policy to ensure that the system correctly handles retraction (-D/-U) messages. | Enum | No | NONE | The valid values for the validation policy and the expected behavior of the sink operator for handling retraction messages are as follows:
Note
|
For more information about configuration items, see the official Apache Paimon documentation.
Feature details
Data freshness and consistency guarantees
The Paimon sink table uses a two-phase commit protocol to commit written data during each checkpoint of a Flink job. Therefore, the data freshness is the same as the checkpoint interval of the Flink job. Each commit generates up to two snapshots.
When two Flink jobs write to the same Paimon table at the same time, if the data from the two jobs is not written to the same bucket, serializable consistency is guaranteed. If the data from the two jobs is written to the same bucket, only snapshot isolation consistency is guaranteed. This means that the data in the table may be a mix of results from both jobs, but no data will be lost.
Data merge mechanisms
When a Paimon sink table receives multiple records with the same primary key, it merges these records into a single record to maintain the uniqueness of the primary key. You can specify the data merge behavior by setting the merge-engine parameter. The following table describes the data merge mechanisms.
Mechanism | Details |
Deduplicate | The deduplicate mechanism is the default data merge mechanism. For multiple records with the same primary key, the Paimon sink table keeps only the latest record and discards the others. Note If the latest record is a delete message, all records with that primary key are discarded. |
Partial Update | By specifying the partial-update mechanism, you can progressively update data through multiple messages to eventually obtain complete data. Specifically, new data with the same primary key will overwrite the old data, but columns with null values will not be overwritten. For example, assume a Paimon sink table receives the following three records in order:
The first column is the primary key. The final result is <1, 25.2, 10, 'This is a book'>. Note
|
Aggregation | In some scenarios, you may only care about the aggregated value. The aggregation mechanism aggregates data with the same primary key based on the aggregate functions you specify. For each column that is not part of the primary key, you must specify an aggregate function using The price column is aggregated using the max function, and the sales column is aggregated using the sum function. Given two input records <1, 23.0, 15> and <1, 30.2, 20>, the final result is <1, 30.2, 35>. The currently supported aggregate functions and their corresponding data types are as follows:
Note
|
Incremental data generation mechanisms
By setting the changelog-producer parameter to the appropriate incremental data generation mechanism, Paimon can generate complete incremental data for any input data stream. Complete incremental data means that all update_after data has corresponding update_before data. The following lists all incremental data generation mechanisms. For more information, see the official Apache Paimon documentation.
Mechanism | Details |
None | If you set For example, assume a downstream consumer needs to calculate the sum of a column. If the consumer only sees the latest data, which is 5, it cannot determine how to update the sum. If the previous data was 4, it should increase the sum by 1. If the previous data was 6, it should decrease the sum by 1. Such consumers are sensitive to update_before data. We recommend that you do not set the incremental data generation mechanism to `none`. However, other mechanisms will incur performance overhead. Note If your downstream consumer, such as a database, is not sensitive to update_before data, you can set the incremental data generation mechanism to `none`. Therefore, we recommend that you configure the mechanism based on your actual needs. |
Input | If you set Therefore, this mechanism can be used only when the input data stream itself is complete incremental data, such as Change Data Capture (CDC) data. |
Lookup | If you set Compared with the Full Compaction mechanism described below, the Lookup mechanism provides better timeliness for generating incremental data but consumes more resources overall. We recommend that you use this mechanism when you have high requirements for data freshness, such as minute-level freshness. |
Full Compaction | If you set Compared with the Lookup mechanism, the Full Compaction mechanism has poorer timeliness for generating incremental data. However, it utilizes the full compaction process of the data and does not generate extra computation, thus consuming fewer resources overall. We recommend that you use this mechanism when you do not have high requirements for data freshness, such as hour-level freshness. |
Write modes
Paimon tables currently support the following write modes.
Mode | Details |
Change-log | The change-log write mode is the default write mode for Paimon tables. This mode supports inserting, deleting, and updating data based on the primary key. You can also use the data merge and incremental data generation mechanisms mentioned above in this mode. |
Append-only | The append-only write mode only supports data insertion and does not support a primary key. This mode is more efficient than the change-log mode and can be used as an alternative to a message queue in scenarios where data freshness requirements are not high, such as minute-level freshness. For more information about the append-only write mode, see the official Apache Paimon documentation. When you use the append-only write mode, note the following:
|
As a destination for CTAS and CDAS
Paimon tables support real-time synchronization of data at the single-table or entire-database level. If the schema of an upstream table changes during synchronization, the change is also synchronized to the Paimon table in real time. For more information, see Manage Paimon tables and Manage Paimon Catalogs.
Data ingestion
The Paimon connector can be used in YAML job development for data ingestion as a sink.
Syntax
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouseConfiguration items
Parameter | Description | Required | Data type | Default value | Notes |
type | The connector type. | Yes | STRING | None | Set this parameter to |
name | The sink name. | No | STRING | None | The name of the sink. |
catalog.properties.metastore | The type of the Paimon Catalog. | No | STRING | filesystem | Valid values:
|
catalog.properties.* | The parameters for creating a Paimon Catalog. | No | STRING | None | For more information, see Manage Paimon Catalogs. |
table.properties.* | The parameters for creating a Paimon table. | No | STRING | None | For more information, see Paimon table options. |
catalog.properties.warehouse | The root directory for file storage. | No | STRING | None | This parameter takes effect only when |
commit.user-prefix | The username prefix for committing data files. | No | STRING | None | Note We recommend that you set different usernames for different jobs to easily locate conflicting jobs when commit conflicts occur. |
partition.key | The partition field for each partitioned table. | No | STRING | None | Use semicolons ( |
sink.cross-partition-upsert.tables | Specifies the tables that require cross-partition updates, where the primary key does not include all partition fields. | No | STRING | None | The tables to update across partitions.
Important
|
Examples
The following examples show how to configure Paimon as a data ingestion sink based on the Paimon Catalog type.
Example configuration for writing to Alibaba Cloud OSS when the Paimon Catalog is filesystem:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxxFor more information about the parameters that are prefixed with catalog.properties, see Create a Paimon Filesystem Catalog.
Example configuration for writing to Alibaba Cloud DLF 2.5 when the Paimon Catalog is rest:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlfFor more information about the parameters that are prefixed with catalog.properties, see Flink CDC Catalog configuration parameters.
Schema evolution
Currently, Paimon as a data ingestion sink supports the following schema evolution events:
CREATE TABLE EVENT
ADD COLUMN EVENT
ALTER COLUMN TYPE EVENT (Modifying the type of a primary key column is not supported)
RENAME COLUMN EVENT
DROP COLUMN EVENT
TRUNCATE TABLE EVENT
DROP TABLE EVENT
If the downstream Paimon table already exists, its existing schema is used for writing. The system does not attempt to create the table again.