You can use the Paimon connector with a Paimon Catalog. This topic describes how to use the Paimon connector.
Background information
Apache Paimon is a data lake storage format that unifies streaming and batch processing. It supports high-throughput writes and low-latency queries. Common compute engines on Alibaba Cloud's E-MapReduce big data platform, such as Flink, Spark, Hive, or Trino, are well-integrated with Paimon. You can use Apache Paimon to quickly build your data lake storage service on Hadoop Distributed File System (HDFS) or Object Storage Service (OSS). You can then connect to these compute engines to perform data lake analytics. For more information, see Apache Paimon.
Item | Description |
Supported types | Source table, dimension table, sink table, and data ingestion sink |
Running modes | Streaming and batch |
Data format | Not supported |
Specific monitoring metrics | None |
API types | SQL, data ingestion YAML job |
Update or delete data in sink tables | Yes, it is. |
Features
Apache Paimon provides the following core features:
Low-cost, lightweight data lake storage services based on HDFS or OSS.
Streaming and batch reads and writes for large datasets.
Batch and Online Analytical Processing (OLAP) queries with data freshness from minutes to seconds.
Consumption and production of incremental data. Paimon can serve as a storage layer for both traditional offline data warehouses and modern streaming data warehouses.
Support for pre-aggregated data to reduce storage costs and downstream computing pressure.
Support for backtracking to historical data versions.
Efficient data filtering.
Table schema evolution.
Limits
The Paimon connector is supported only in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.6 or later.
The following table shows the version compatibility between Paimon and VVR.
Apache Paimon version
VVR version
1.1
11
1.0
8.0.11
0.9
8.0.7, 8.0.8, 8.0.9, and 8.0.10
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
SQL
You can use the Paimon connector in SQL jobs as a source table or a sink table.
Syntax
If you create a Paimon table in a Paimon Catalog, you do not need to specify the
connectorparameter. The syntax is as follows.CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );NoteIf you have already created a Paimon table in a Paimon Catalog, you can use the table directly without creating it again.
If you create a temporary Paimon table in another catalog, you must specify the connector parameter and the storage path of the Paimon table. The syntax is as follows.
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- If no Paimon table data file exists in the specified path, a file is automatically created. ... );
WITH parameters
Parameter | Description | Data type | Required | Default value | Remarks |
connector | The table type. | String | No | None |
|
path | The storage path of the table. | String | No | None |
|
auto-create | Specifies whether to automatically create a Paimon table file if no file exists in the specified path when you create a temporary Paimon table. | Boolean | No | false | Valid values:
|
bucket | The number of buckets in each partition. | Integer | No | 1 | Data written to the Paimon table is distributed to each bucket based on the Note Keep the data size in each bucket under 5 GB. |
bucket-key | The key columns for bucketing. | String | No | None | Specifies the columns whose values are used to distribute data to different buckets when writing to a Paimon table. Separate column names with a comma (,). For example, Note
|
changelog-producer | The mechanism for producing incremental data. | String | No | none | Paimon can produce a complete changelog (all update_after data has corresponding update_before data) for any input data stream, which is convenient for downstream consumers. The available options are:
For more information about choosing a changelog producer, see Incremental data production mechanism. |
full-compaction.delta-commits | The maximum interval for a full compaction. | Integer | No | None | This parameter specifies that a full compaction is triggered after a certain number of snapshot commits. |
lookup.cache-max-memory-size | The memory cache size for a Paimon dimension table. | String | No | 256 MB | This parameter affects both the dimension table cache size and the lookup changelog-producer cache size. The cache for both mechanisms is configured by this parameter. |
merge-engine | The merge mechanism for data with the same primary key. | String | No | deduplicate | Valid values:
For a detailed analysis of data merge mechanisms, see Data merging mechanism. |
partial-update.ignore-delete | Specifies whether to ignore delete (-D) messages. | Boolean | No | false | Valid values:
Note
|
ignore-delete | Specifies whether to ignore delete (-D) messages. | Boolean | No | false | The valid values are the same as for partial-update.ignore-delete. Note
|
partition.default-name | The default partition name. | String | No | __DEFAULT_PARTITION__ | If the value of a partition key column is null or an empty string, this default name is used as the partition name. |
partition.expiration-check-interval | The interval for checking partition expiration. | String | No | 1h | For more information, see How do I configure automatic partition expiration? |
partition.expiration-time | The expiration time for a partition. | String | No | None | A partition expires when its lifetime exceeds this value. By default, partitions never expire. A partition's lifetime is calculated from its partition value. For more information, see How do I configure automatic partition expiration? |
partition.timestamp-formatter | The format string to convert a time string to a timestamp. | String | No | None | Sets the format for extracting the partition lifetime from the partition value. For more information, see How do I configure automatic partition expiration? |
partition.timestamp-pattern | The format string to convert a partition value to a time string. | String | No | None | Sets the format for extracting the partition lifetime from the partition value. For more information, see How do I configure automatic partition expiration? |
scan.bounded.watermark | When the watermark of data produced by a Paimon source table exceeds this value, the source table stops producing data. | Long | No | None | None. |
scan.mode | Specifies the consumer offset for the Paimon source table. | String | No | default | For more information, see How do I set the consumer offset for a Paimon source table? |
scan.snapshot-id | Specifies the snapshot from which the Paimon source table starts consumption. | Integer | No | None | For more information, see How do I set the consumer offset for a Paimon source table? |
scan.timestamp-millis | Specifies the point in time from which the Paimon source table starts consumption. | Integer | No | None | For more information, see How do I set the consumer offset for a Paimon source table? |
snapshot.num-retained.max | The maximum number of latest snapshots to retain. | Integer | No | 2147483647 | A snapshot expires if this condition or the snapshot.time-retained condition is met, and the snapshot.num-retained.min condition is also met. |
snapshot.num-retained.min | The minimum number of latest snapshots to retain. | Integer | No | 10 | None. |
snapshot.time-retained | The retention period for a snapshot after it is created. | String | No | 1h | A snapshot expires if this condition or the snapshot.num-retained.max condition is met, and the snapshot.num-retained.min condition is also met. |
write-mode | The write mode for the Paimon table. | String | No | change-log | Valid values:
For more information about write modes, see Write modes. |
scan.infer-parallelism | Specifies whether to automatically infer the parallelism of the Paimon source table. | Boolean | No | true | Valid values:
|
scan.parallelism | The parallelism of the Paimon source table. | Integer | No | None | Note This parameter does not take effect if Resource Mode is set to Expert Mode on the tab. |
sink.parallelism | The parallelism of the Paimon sink table. | Integer | No | None | Note This parameter does not take effect if Resource Mode is set to Expert Mode on the tab. |
sink.clustering.by-columns | Specifies the clustering columns for writing to the Paimon sink table. | String | No | None | For a Paimon append-only table (non-primary key table), configure this parameter in a batch job to enable clustered writing. This clusters data in specific columns by size range, which improves the query speed of the table. Use a comma (,) to separate multiple column names, for example, For more information about the clustering feature, see the Apache Paimon official documentation. |
sink.delete-strategy | Sets a validation policy to ensure that the system correctly handles retraction (-D/-U) messages. | Enum | No | NONE | The valid values for the validation policy and the expected behavior of the sink operator for handling retraction messages are as follows:
Note
|
For more information about the configuration items, see the official Apache Paimon documentation.
Feature details
Data freshness and consistency guarantee
A Paimon sink table uses the two-phase commit protocol and commits written data during each checkpoint of a Flink job. Therefore, the data freshness is the same as the Flink job's checkpoint interval. Each commit produces a maximum of two snapshots.
If two Flink jobs write to the same Paimon table at the same time, serializable-level consistency is guaranteed, provided the data from the two jobs is not written to the same bucket. If the data is written to the same bucket, only snapshot isolation-level consistency is guaranteed. This means the data in the table might be a mix of results from both jobs, but no data is lost.
Data merging mechanism
When a Paimon sink table receives multiple records with the same primary key, it merges these records into a single record to maintain the uniqueness of the primary key. You can specify the data merging behavior by setting the merge-engine parameter. The following table describes the available data merging mechanisms.
Merging mechanism | Details |
Deduplicate | The deduplicate mechanism is the default data merging mechanism. For multiple records with the same primary key, the Paimon sink table retains only the latest record and discards the others. Note If the latest record is a delete message, all records with that primary key are discarded. |
Partial Update | By specifying the partial-update mechanism, you can progressively update data through multiple messages to eventually obtain the complete data. Specifically, new data with the same primary key will overwrite the old data, but columns with null values will not be overwritten. For example, assume a Paimon sink table receives the following three records in order:
If the first column is the primary key, the final result is <1, 25.2, 10, 'This is a book'>. Note
|
Aggregation | In some scenarios, you may only care about the aggregated value. The aggregation mechanism aggregates data with the same primary key based on the aggregate functions you specify. For each column that is not part of the primary key, you must specify an aggregate function using The price column will be aggregated using the max function, and the sales column will be aggregated using the sum function. Given two input records <1, 23.0, 15> and <1, 30.2, 20>, the final result will be <1, 30.2, 35>. The currently supported aggregate functions and their corresponding data types are:
Note
|
Incremental data production mechanism
By setting the changelog-producer parameter, you can configure Paimon to generate complete incremental data for any input data stream. In this case, every update_after record has a corresponding update_before record. The following section describes all the incremental data generation mechanisms. For more information, see the official Apache Paimon documentation.
Mechanism | Details |
None | If you set For example, if a downstream consumer needs to calculate the sum of a column and only sees the latest value of 5, it cannot determine how to update the sum. If the previous value was 4, it should increase the sum by 1. If the previous value was 6, it should decrease the sum by 1. Such consumers are sensitive to update_before data. We recommend not setting the changelog producer to None for these cases. However, other mechanisms will incur a performance overhead. Note If your downstream consumer, such as a database, is not sensitive to update_before data, you can set the changelog producer to None. Therefore, configure the changelog producer based on your actual needs. |
Input | If you set Therefore, this mechanism can only be used when the input data stream itself is a complete changelog, such as CDC data. |
Lookup | If you set Compared to the Full Compaction mechanism, the Lookup mechanism provides better timeliness for changelog production but consumes more resources overall. Use this mechanism when you have high requirements for the freshness of incremental data, such as at the minute level. |
Full Compaction | If you set Compared to the Lookup mechanism, the Full Compaction mechanism has worse timeliness for changelog production. However, it utilizes the data's full compaction process and does not generate extra computation, so it consumes fewer resources overall. Use this mechanism when the freshness requirement for incremental data is not high, such as at the hour level. |
Write modes
Paimon tables support the following write modes.
Mode | Details |
Change-log | The change-log write mode is the default for Paimon tables. This mode supports data insertion, deletion, and updates based on a primary key. You can also use the data merging and incremental data production mechanisms mentioned earlier in this mode. |
Append-only | The append-only write mode only supports data insertion and does not support a primary key. This mode is more efficient than the change-log mode and can be used as an alternative to a message queue in scenarios where data freshness requirements are moderate, such as at the minute level. For more information about the append-only write mode, see the official Apache Paimon documentation. When you use the append-only write mode, note the following two points:
|
As a sink for CTAS and CDAS
Paimon tables support real-time data synchronization for a single table or an entire database. If the schema of the upstream table changes during synchronization, the change is also synchronized to the Paimon table in real time. For more information, see Manage Paimon tables and Manage Paimon Catalogs.
Data ingestion
You can use the Paimon connector in a data ingestion YAML job to write data to a sink.
Syntax
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouseParameters
Parameter | Description | Required | Data type | Default value | Remarks |
type | The connector type. | Yes | STRING | None | The value must be |
name | The name of the sink. | No | STRING | None | The name of the sink. |
catalog.properties.metastore | The type of the Paimon Catalog. | No | STRING | filesystem | Valid values:
|
catalog.properties.* | Parameters for creating a Paimon Catalog. | No | STRING | None | For more information, see Manage Paimon Catalogs. |
table.properties.* | Parameters for creating a Paimon table. | No | STRING | None | For more information, see Paimon table options. |
catalog.properties.warehouse | The root directory of the file storage. | No | STRING | None | This parameter takes effect only when |
commit.user | The username for committing data files. | No | STRING | None | Note Set different usernames for different jobs to help locate jobs that cause commit conflicts. |
partition.key | The partition fields for each partitioned table. | No | STRING | None | Use a semicolon ( |
Examples
When you use Paimon as a data ingestion sink, you can refer to the following examples for configuration based on the Paimon Catalog type.
Example of writing data to Alibaba Cloud OSS when the Paimon Catalog type is filesystem:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxxFor more information about the parameters that have the catalog.properties prefix, see Create a Paimon Filesystem Catalog.
Example of writing data to Alibaba Cloud DLF 2.5 when the Paimon Catalog type is rest:
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: ${mysql.port} username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: ${mysql.source.table} server-id: 8601-8604 sink: type: paimon name: Paimon Sink sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: dlf_uri catalog.properties.warehouse: your_warehouse catalog.properties.token.provider: dlfFor more information about the parameters that have the catalog.properties prefix, see Flink CDC Paimon Catalog configuration parameters.