We recommend that you use the Apache Paimon connector together with Apache Paimon catalogs. This topic describes how to use the Apache Paimon connector.
Background information
Apache Paimon is a unified lake storage that allows you to process data in streaming and batch modes. Apache Paimon supports data writing with high throughput and data queries with low latency. Apache Paimon is compatible with common computing engines of Alibaba Cloud E-MapReduce (EMR), such as Flink, Spark, Hive, and Trino. You can use Apache Paimon to deploy your data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS) in an efficient manner, and connect to the preceding computing engines to perform data lake analytics. For more information, see Apache Paimon.
Item | Description |
Table type | Source table, dimension table, and result table |
Running mode | Streaming mode and batch mode |
Data format | N/A |
Metric | N/A |
API type | SQL API |
Data update or deletion in a result table | Supported |
Features
Apache Paimon provides the following features:
Builds a low-cost lightweight data lake storage service based on HDFS or OSS.
Supports read and write operations on large-scale datasets in streaming and batch modes.
Supports batch queries and online analytical processing (OLAP) queries within minutes or even seconds.
Supports consumption and generation of incremental data. Apache Paimon can be used as storage for a traditional offline data warehouse and a streaming data warehouse.
Supports data pre-aggregation to reduce storage costs and downstream computing workloads.
Allows you to trace data of historical versions.
Supports efficient data filtering.
Supports table schema changes.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.6 or later supports the Apache Paimon connector.
Syntax
If you create an Apache Paimon table in an Apache Paimon catalog, you do not need to configure the
connector
parameter. The following sample code shows the syntax for creating an Apache Paimon table in an Apache Paimon catalog.CREATE TABLE `<your-paimon-catalog>`.`<your-db>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );
NoteIf you have created an Apache Paimon table in the Apache Paimon catalog, you can directly use the table without the need to recreate a table.
If you want to create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, you must configure the connector parameter and the storage path of the Apache Paimon table. The following sample code shows the syntax for creating an Apache Paimon table in such a catalog.
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create'='true', -- If no Apache Paimon table file exists in the specified path, a file is automatically created. ... );
Parameters in the WITH clause
Parameter | Description | Data type | Required | Default value | Remarks |
connector | The type of the table. | STRING | No | No default value |
|
path | The storage path of the table. | STRING | No | No default value |
|
auto-create | Specifies whether to automatically create an Apache Paimon table file if no Apache Paimon table file exists in the specified path when you create a temporary Apache Paimon table. | BOOLEAN | No | false | Valid values:
|
bucket | The number of buckets in each partition. | INTEGER | No | 1 | Data that is written to the Apache Paimon table is distributed to each bucket based on the columns that are specified by the Note We recommend that the data in each bucket be less than 5 GB in size. |
bucket-key | The bucket key columns. | STRING | No | No default value | The columns based on which the data written to the Apache Paimon table is distributed to different buckets. Separate column names with commas (,). For example, Note
|
changelog-producer | The incremental data generation mechanism. | STRING | No | none | Apache Paimon can generate complete incremental data for any input data stream to facilitate downstream data consumption. Each UPDATE_AFTER data record corresponds to an UPDATE_BEFORE data record. Valid values:
For more information about how to select an incremental data generation mechanism, see Incremental data generation mechanism. |
full-compaction.delta-commits | The maximum interval at which full compaction is performed. | INTEGER | No | No default value | A full compaction is definitely triggered when the number of commit snapshots reaches the value of this parameter. |
lookup.cache-max-memory-size | The memory cache size of the Apache Paimon dimension table. | STRING | No | 256 MB | The value of this parameter determines the cache sizes of both the dimension table and the lookup changelog producer. |
merge-engine | The mechanism for merging data that has the same primary key. | STRING | No | deduplicate | Valid values:
For more information about the data merging mechanism, see Data merging mechanism. |
partial-update.ignore-delete | Specifies whether to ignore delete messages when the merge-engine parameter is set to partial-update. | BOOLEAN | No | false | Valid values:
Note Whether delete messages need to be ignored depends on the actual scenario. You need to configure this parameter based on your business requirements. |
partition.default-name | The default name of the partition. | STRING | No | __DEFAULT_PARTITION__ | If the value of a partition key column is null or an empty string, the value of this parameter is used as the partition name. |
partition.expiration-check-interval | The interval at which the system checks partition expiration. | STRING | No | 1h | For more information, see How do I configure automatic partition expiration? |
partition.expiration-time | The validity period of a partition. | STRING | No | No default value | If the period of time for which a partition exists exceeds the value of this parameter, the partition expires. By default, a partition never expires. The period of time for which a partition exists is calculated based on the value of the partition. For more information, see How do I configure automatic partition expiration? |
partition.timestamp-formatter | The pattern that is used to convert a time string into a timestamp. | STRING | No | No default value | This parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value. For more information, see How do I configure automatic partition expiration? |
partition.timestamp-pattern | The pattern that is used to convert a partition value into a time string. | STRING | No | No default value | This parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value. For more information, see How do I configure automatic partition expiration? |
scan.bounded.watermark | The end condition for bounded streaming mode. If the watermark of data in an Apache Paimon source table exceeds the value of this parameter, the generation of data in the Apache Paimon source table ends. | LONG | No | No default value | N/A. |
scan.mode | The consumer offset of the Apache Paimon source table. | STRING | No | default | For more information, see How do I specify a consumer offset for an Apache Paimon source table? |
scan.snapshot-id | The ID of the snapshot from which the Apache Paimon source table starts to consume data. | INTEGER | No | No default value | For more information, see How do I specify a consumer offset for an Apache Paimon source table? |
scan.timestamp-millis | The point in time from which the Apache Paimon source table starts to consume data. | INTEGER | No | No default value | For more information, see How do I specify a consumer offset for an Apache Paimon source table? |
snapshot.num-retained.max | The maximum number of the latest snapshots that can be retained. | INTEGER | No | 2147483647 | The snapshot expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met and the condition specified by the snapshot.num-retained.min parameter is met. |
snapshot.num-retained.min | The minimum number of the latest snapshots that can be retained. | INTEGER | No | 10 | N/A. |
snapshot.time-retained | The duration for which snapshots can be retained. | STRING | No | 1h | The snapshot expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met and the condition specified by the snapshot.num-retained.min parameter is met. |
write-mode | The write mode of the Apache Paimon table. | STRING | No | change-log | Valid values:
For more information about write modes, see Write modes. |
scan.infer-parallelism | Specifies whether to automatically infer the degree of parallelism of the Apache Paimon source table. | BOOLEAN | No | false | Valid values:
|
scan.parallelism | The degree of parallelism of the Apache Paimon source table. | INTEGER | No | No default value | N/A. |
sink.parallelism | The degree of parallelism of the Apache Paimon result table. | INTEGER | No | No default value | N/A. |
For more information about the configuration items, see Apache Paimon documentation.
Feature description
Data freshness and consistency assurance
An Apache Paimon result table uses the two-phase commit protocol (2PC) to commit the written data each time a checkpoint is generated in a Flink deployment. Therefore, the data freshness is based on the checkpoint interval of the Flink deployment. A maximum of two snapshots can be generated each time data is committed.
If two Flink deployments write data to an Apache Paimon table at the same time but they write data to different buckets, serializable consistency is ensured. If the Flink deployments write data to the same bucket, only isolation-level consistency of snapshots is ensured. In this case, the table may contain the results of the two deployments, but no data is lost.
Data merging mechanism
When an Apache Paimon result table receives multiple data records that have the same primary key, the Apache Paimon result table merges the data records into one data record to ensure the uniqueness of the primary key. You can configure the merge-engine
parameter to specify the data merging mechanism. The following table describes the different data merging mechanisms.
Data merging mechanism | Description |
deduplicate | This is the default value of the merge-engine parameter. If the data merging mechanism is deduplicate and multiple data records have the same primary key, the Apache Paimon result table retains only the latest data record and discards other data records. Note If the latest data record is a delete message, all the data records that have the same primary key are discarded. |
partial-update | The partial-update mechanism allows you to use multiple messages to update data and finally obtain complete data. New data that has the same primary key as the existing data overwrites the existing data. Columns that have NULL values cannot overwrite existing data. For example, the Apache Paimon result table receives the following data records in sequence:
If the first column is the primary key, the final result is <1, 25.2, 10, 'This is a book'>. Note
|
aggregation | In specific scenarios, you may focus only on the aggregated values. The aggregation mechanism can aggregate data that has the same primary key based on the specified aggregate function. You must use
In this example, data in the price column is aggregated based on the max function, and data in the sales column is aggregated based on the sum function. If the input data records are <1, 23.0, 15> and <1, 30.2, 20>, the result is <1, 30.2, 35>. Mappings between the supported aggregate functions and data types:
Note
|
Incremental data generation mechanism
The incremental data generation mechanism is specified by the changelog-producer parameter. Apache Paimon can generate complete incremental data for any input data stream. Each UPDATE_AFTER data record corresponds to an UPDATE_BEFORE data record. The following table describes all incremental data generation mechanisms. For more information, see Apache Paimon official documentation.
Incremental data generation mechanism | Description |
none | This is the default value of the For example, if the downstream consumer wants to calculate the sum of a column and the consumer obtains only the latest value 5, the downstream consumer cannot determine how to update the sum. If the original value is 4, the sum should be increased by 1. If the original value is 6, the sum should be decreased by 1. This type of consumer is sensitive to UPDATE_BEFORE data. For this type of consumer, we recommend that you do not set the changelog-producer parameter to none. However, other incremental data generation mechanisms may cause performance loss. Note If your downstream consumer is a database that is not sensitive to UPDATE_BEFORE data, you can set the changelog-producer parameter to none. We recommend that you configure this parameter based on your business requirements. |
input | If you set the Therefore, this incremental data generation mechanism can be used only when the input data streams, such as Change Data Capture (CDC) data, are complete. |
lookup | If you set the The lookup mechanism is more efficient than the full-compaction mechanism in the generation of incremental data. However, the lookup mechanism consumes more resources. We recommend that you use the lookup mechanism in scenarios where the requirement for the freshness of incremental data is high. For example, incremental data within minutes is required. |
full compaction | If you set the Compared with the lookup mechanism, the full compaction mechanism is less efficient in generating incremental data. However, the full compaction mechanism does not cause additional computations based on the full compaction process of data. Therefore, fewer resources are consumed. We recommend that you use the full compaction mechanism in scenarios where the requirement for the freshness of incremental data is not high. For example, incremental data within hours is required. |
Write mode
The following table describes the write modes supported by Apache Paimon tables.
Write mode | Description |
Change-log | change-log is the default write mode for Apache Paimon tables. In change-log mode, data can be inserted into, deleted from, and updated in an Apache Paimon table based on the primary key of the table. In change-log mode, you can also use the data merging mechanism and incremental data generation mechanism. |
Append-only | In append-only mode, the Apache Paimon table allows only data insertion and does not support operations based on the primary key. The append-only mode is more efficient than the change-log mode. In append-only mode, an Apache Paimon table can be used as a substitute of Message Queue in scenarios where the data freshness requirement is not high. For example, data within hours is required. For more information about the append-only mode, see Apache Paimon official documentation. When you use the append-only mode, take note of the following points:
|
Data synchronization based on the CREATE TABLE AS and CREATE DATABASE AS statements
You can execute the CREATE TABLE AS or CREATE DATABASE AS statement to synchronize data from a single table or an entire database to an Apache Paimon table in real time. The changes to the schema of the source table can also be synchronized to the related Apache Paimon table in real time during data synchronization of a database or table. For more information, see Use an Apache Paimon catalog as the catalog of the destination store that is used in the CREATE TABLE AS statement and Use an Apache Paimon catalog as the catalog of the destination store that is used in the CREATE DATABASE AS statement.