All Products
Search
Document Center

Realtime Compute for Apache Flink:Apache Paimon connector

Last Updated:Nov 08, 2023

We recommend that you use the Apache Paimon connector together with Apache Paimon catalogs. This topic describes how to use the Apache Paimon connector.

Background information

Apache Paimon is a unified lake storage that allows you to process data in streaming and batch modes. Apache Paimon supports data writing with high throughput and data queries with low latency. Apache Paimon is compatible with common computing engines of Alibaba Cloud E-MapReduce (EMR), such as Flink, Spark, Hive, and Trino. You can use Apache Paimon to deploy your data lake storage service on Hadoop Distributed File System (HDFS) or Alibaba Cloud Object Storage Service (OSS) in an efficient manner, and connect to the preceding computing engines to perform data lake analytics. For more information, see Apache Paimon.

Item

Description

Table type

Source table, dimension table, and result table

Running mode

Streaming mode and batch mode

Data format

N/A

Metric

N/A

API type

SQL API

Data update or deletion in a result table

Supported

Features

Apache Paimon provides the following features:

  • Builds a low-cost lightweight data lake storage service based on HDFS or OSS.

  • Supports read and write operations on large-scale datasets in streaming and batch modes.

  • Supports batch queries and online analytical processing (OLAP) queries within minutes or even seconds.

  • Supports consumption and generation of incremental data. Apache Paimon can be used as storage for a traditional offline data warehouse and a streaming data warehouse.

  • Supports data pre-aggregation to reduce storage costs and downstream computing workloads.

  • Allows you to trace data of historical versions.

  • Supports efficient data filtering.

  • Supports table schema changes.

Limits

Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.6 or later supports the Apache Paimon connector.

Syntax

  • If you create an Apache Paimon table in an Apache Paimon catalog, you do not need to configure the connector parameter. The following sample code shows the syntax for creating an Apache Paimon table in an Apache Paimon catalog.

    CREATE TABLE `<your-paimon-catalog>`.`<your-db>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    Note

    If you have created an Apache Paimon table in the Apache Paimon catalog, you can directly use the table without the need to recreate a table.

  • If you want to create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, you must configure the connector parameter and the storage path of the Apache Paimon table. The following sample code shows the syntax for creating an Apache Paimon table in such a catalog.

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create'='true', -- If no Apache Paimon table file exists in the specified path, a file is automatically created. 
      ...
    );

Parameters in the WITH clause

Parameter

Description

Data type

Required

Default value

Remarks

connector

The type of the table.

STRING

No

No default value

  • If you create an Apache Paimon table in an Apache Paimon catalog, you do not need to configure this parameter.

  • If you create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, set the value to paimon.

path

The storage path of the table.

STRING

No

No default value

  • If you create an Apache Paimon table in an Apache Paimon catalog, you do not need to configure this parameter.

  • If you create a temporary Apache Paimon table in a catalog of a storage other than Apache Paimon, set this parameter to the HDFS or OSS directory in which you want to store the table.

auto-create

Specifies whether to automatically create an Apache Paimon table file if no Apache Paimon table file exists in the specified path when you create a temporary Apache Paimon table.

BOOLEAN

No

false

Valid values:

  • false: If no Apache Paimon table file exists in the specified path, an error is returned. This is the default value.

  • true: If the specified path does not exist, Flink automatically creates an Apache Paimon table file.

bucket

The number of buckets in each partition.

INTEGER

No

1

Data that is written to the Apache Paimon table is distributed to each bucket based on the columns that are specified by the bucket-key parameter.

Note

We recommend that the data in each bucket be less than 5 GB in size.

bucket-key

The bucket key columns.

STRING

No

No default value

The columns based on which the data written to the Apache Paimon table is distributed to different buckets.

Separate column names with commas (,). For example, 'bucket-key' = 'order_id,cust_id' indicates that data is distributed to buckets based on the order_id and cust_id columns.

Note
  • If you do not configure this parameter, data is distributed based on the primary key.

  • If no primary key is specified for the Apache Paimon table, data is distributed based on the values of all columns.

changelog-producer

The incremental data generation mechanism.

STRING

No

none

Apache Paimon can generate complete incremental data for any input data stream to facilitate downstream data consumption. Each UPDATE_AFTER data record corresponds to an UPDATE_BEFORE data record. Valid values:

  • none: No incremental data is generated. This is the default value. The downstream consumer can read data from the Apache Paimon table in streaming mode. However, the incremental data that is read by the downstream consumer contains only UPDATE_AFTER data and does not contain UPDATE_BEFORE data.

  • input: The input data streams are written to an incremental data file as incremental data in dual-write mode.

  • full-compaction: Complete incremental data is generated each time full compaction is performed.

  • lookup: Complete incremental data is generated before commit snapshot is performed.

For more information about how to select an incremental data generation mechanism, see Incremental data generation mechanism.

full-compaction.delta-commits

The maximum interval at which full compaction is performed.

INTEGER

No

No default value

A full compaction is definitely triggered when the number of commit snapshots reaches the value of this parameter.

lookup.cache-max-memory-size

The memory cache size of the Apache Paimon dimension table.

STRING

No

256 MB

The value of this parameter determines the cache sizes of both the dimension table and the lookup changelog producer.

merge-engine

The mechanism for merging data that has the same primary key.

STRING

No

deduplicate

Valid values:

  • deduplicate: Only the latest data record is retained.

  • partial-update: Existing data that has the same primary key as the latest data is overwritten by the latest data in the non-null columns. Data in other columns remains unchanged.

  • aggregation: An aggregate function is specified to perform pre-aggregation.

For more information about the data merging mechanism, see Data merging mechanism.

partial-update.ignore-delete

Specifies whether to ignore delete messages when the merge-engine parameter is set to partial-update.

BOOLEAN

No

false

Valid values:

  • true: Delete messages are ignored.

  • false: If a delete message appears, the Flink system reports an error.

Note

Whether delete messages need to be ignored depends on the actual scenario. You need to configure this parameter based on your business requirements.

partition.default-name

The default name of the partition.

STRING

No

__DEFAULT_PARTITION__

If the value of a partition key column is null or an empty string, the value of this parameter is used as the partition name.

partition.expiration-check-interval

The interval at which the system checks partition expiration.

STRING

No

1h

For more information, see How do I configure automatic partition expiration?

partition.expiration-time

The validity period of a partition.

STRING

No

No default value

If the period of time for which a partition exists exceeds the value of this parameter, the partition expires. By default, a partition never expires.

The period of time for which a partition exists is calculated based on the value of the partition. For more information, see How do I configure automatic partition expiration?

partition.timestamp-formatter

The pattern that is used to convert a time string into a timestamp.

STRING

No

No default value

This parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value. For more information, see How do I configure automatic partition expiration?

partition.timestamp-pattern

The pattern that is used to convert a partition value into a time string.

STRING

No

No default value

This parameter specifies the pattern that is used to extract the period of time for which a partition exists from the partition value. For more information, see How do I configure automatic partition expiration?

scan.bounded.watermark

The end condition for bounded streaming mode. If the watermark of data in an Apache Paimon source table exceeds the value of this parameter, the generation of data in the Apache Paimon source table ends.

LONG

No

No default value

N/A.

scan.mode

The consumer offset of the Apache Paimon source table.

STRING

No

default

For more information, see How do I specify a consumer offset for an Apache Paimon source table?

scan.snapshot-id

The ID of the snapshot from which the Apache Paimon source table starts to consume data.

INTEGER

No

No default value

For more information, see How do I specify a consumer offset for an Apache Paimon source table?

scan.timestamp-millis

The point in time from which the Apache Paimon source table starts to consume data.

INTEGER

No

No default value

For more information, see How do I specify a consumer offset for an Apache Paimon source table?

snapshot.num-retained.max

The maximum number of the latest snapshots that can be retained.

INTEGER

No

2147483647

The snapshot expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met and the condition specified by the snapshot.num-retained.min parameter is met.

snapshot.num-retained.min

The minimum number of the latest snapshots that can be retained.

INTEGER

No

10

N/A.

snapshot.time-retained

The duration for which snapshots can be retained.

STRING

No

1h

The snapshot expiration is triggered only if the condition specified by the snapshot.num-retained.max parameter or the snapshot.time-retained parameter is met and the condition specified by the snapshot.num-retained.min parameter is met.

write-mode

The write mode of the Apache Paimon table.

STRING

No

change-log

Valid values:

  • change-log: Data is inserted into, deleted from, and updated in the Apache Paimon table based on the primary key.

  • append-only: The Apache Paimon table allows only data insertion and does not support operations based on the primary key. This mode is more efficient than the change-log mode.

For more information about write modes, see Write modes.

scan.infer-parallelism

Specifies whether to automatically infer the degree of parallelism of the Apache Paimon source table.

BOOLEAN

No

false

Valid values:

  • true: The parallelism of the Apache Paimon source table is automatically inferred based on the number of buckets.

  • false: The default degree of parallelism that is configured based on Ververica Platform (VVP) is used. If the resource configuration is in expert mode, the degree of parallelism that is configured is used.

scan.parallelism

The degree of parallelism of the Apache Paimon source table.

INTEGER

No

No default value

N/A.

sink.parallelism

The degree of parallelism of the Apache Paimon result table.

INTEGER

No

No default value

N/A.

Note

For more information about the configuration items, see Apache Paimon documentation.

Feature description

Data freshness and consistency assurance

An Apache Paimon result table uses the two-phase commit protocol (2PC) to commit the written data each time a checkpoint is generated in a Flink deployment. Therefore, the data freshness is based on the checkpoint interval of the Flink deployment. A maximum of two snapshots can be generated each time data is committed.

If two Flink deployments write data to an Apache Paimon table at the same time but they write data to different buckets, serializable consistency is ensured. If the Flink deployments write data to the same bucket, only isolation-level consistency of snapshots is ensured. In this case, the table may contain the results of the two deployments, but no data is lost.

Data merging mechanism

When an Apache Paimon result table receives multiple data records that have the same primary key, the Apache Paimon result table merges the data records into one data record to ensure the uniqueness of the primary key. You can configure the merge-engine parameter to specify the data merging mechanism. The following table describes the different data merging mechanisms.

Data merging mechanism

Description

deduplicate

This is the default value of the merge-engine parameter. If the data merging mechanism is deduplicate and multiple data records have the same primary key, the Apache Paimon result table retains only the latest data record and discards other data records.

Note

If the latest data record is a delete message, all the data records that have the same primary key are discarded.

partial-update

The partial-update mechanism allows you to use multiple messages to update data and finally obtain complete data. New data that has the same primary key as the existing data overwrites the existing data. Columns that have NULL values cannot overwrite existing data.

For example, the Apache Paimon result table receives the following data records in sequence:

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

If the first column is the primary key, the final result is <1, 25.2, 10, 'This is a book'>.

Note
  • If you want the Apache Paimon result table to read the result that is obtained by using the partial-update mechanism in streaming mode, you must set the changelog-producer parameter to lookup or full-compaction.

  • When you use the partial-update mechanism, delete messages cannot be processed. You can set the partial-update.ignore-delete parameter to true to ignore delete messages.

aggregation

In specific scenarios, you may focus only on the aggregated values. The aggregation mechanism can aggregate data that has the same primary key based on the specified aggregate function. You must use fields.<field-name>.aggregate-function to specify an aggregate function for each column that is not the primary key. If you do not specify an aggregate function for a column that is not the primary key, the column uses the last_non_null_value aggregate function by default. The following sample code provides an example.

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

In this example, data in the price column is aggregated based on the max function, and data in the sales column is aggregated based on the sum function. If the input data records are <1, 23.0, 15> and <1, 30.2, 20>, the result is <1, 30.2, 35>. Mappings between the supported aggregate functions and data types:

  • sum: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE

  • min and max: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ

  • last_value and last_non_null_value: all data types

  • listagg: STRING

  • bool_and and bool_or: BOOLEAN

Note
  • Only the sum function supports data retraction and deletion. If you want specific columns to ignore retraction and deletion messages, you can configure 'fields.${field_name}.ignore-retract'='true'.

  • If you want the Apache Paimon result table to read the aggregation result in streaming mode, you must set the changelog-producer parameter to lookup or full-compaction.

Incremental data generation mechanism

The incremental data generation mechanism is specified by the changelog-producer parameter. Apache Paimon can generate complete incremental data for any input data stream. Each UPDATE_AFTER data record corresponds to an UPDATE_BEFORE data record. The following table describes all incremental data generation mechanisms. For more information, see Apache Paimon official documentation.

Incremental data generation mechanism

Description

none

This is the default value of the changelog-producer parameter. If you use the default value, the Apache Paimon source table of the downstream consumer can obtain only the latest situation of data when specific data records have the same primary key. In this case, the downstream consumer cannot learn the complete incremental data to effectively calculate data. The downstream consumer can view the latest data and determine whether existing data is deleted, but cannot learn more information about the deleted data.

For example, if the downstream consumer wants to calculate the sum of a column and the consumer obtains only the latest value 5, the downstream consumer cannot determine how to update the sum. If the original value is 4, the sum should be increased by 1. If the original value is 6, the sum should be decreased by 1. This type of consumer is sensitive to UPDATE_BEFORE data. For this type of consumer, we recommend that you do not set the changelog-producer parameter to none. However, other incremental data generation mechanisms may cause performance loss.

Note

If your downstream consumer is a database that is not sensitive to UPDATE_BEFORE data, you can set the changelog-producer parameter to none. We recommend that you configure this parameter based on your business requirements.

input

If you set the changelog-producer parameter to input, the Apache Paimon result table writes input data streams to an incremental data file as incremental data in dual-write mode.

Therefore, this incremental data generation mechanism can be used only when the input data streams, such as Change Data Capture (CDC) data, are complete.

lookup

If you set the changelog-producer parameter to lookup, the Apache Paimon result table uses a point query mechanism that is similar to a dimension table to generate complete incremental data that corresponds to the snapshot before commit snapshot is performed. The incremental data generation mechanism allows the Apache Paimon result table to generate complete incremental data regardless of whether the input incremental data is complete.

The lookup mechanism is more efficient than the full-compaction mechanism in the generation of incremental data. However, the lookup mechanism consumes more resources.

We recommend that you use the lookup mechanism in scenarios where the requirement for the freshness of incremental data is high. For example, incremental data within minutes is required.

full compaction

If you set the changelog-producer parameter to full-compaction, the Apache Paimon result table generates complete incremental data each time full compaction is performed. The incremental data generation mechanism allows the Apache Paimon result table to generate complete incremental data regardless of whether the input incremental data is complete. The time interval at which full compaction is performed is specified by the full-compaction.delta-commits parameter.

Compared with the lookup mechanism, the full compaction mechanism is less efficient in generating incremental data. However, the full compaction mechanism does not cause additional computations based on the full compaction process of data. Therefore, fewer resources are consumed.

We recommend that you use the full compaction mechanism in scenarios where the requirement for the freshness of incremental data is not high. For example, incremental data within hours is required.

Write mode

The following table describes the write modes supported by Apache Paimon tables.

Write mode

Description

Change-log

change-log is the default write mode for Apache Paimon tables. In change-log mode, data can be inserted into, deleted from, and updated in an Apache Paimon table based on the primary key of the table. In change-log mode, you can also use the data merging mechanism and incremental data generation mechanism.

Append-only

In append-only mode, the Apache Paimon table allows only data insertion and does not support operations based on the primary key. The append-only mode is more efficient than the change-log mode. In append-only mode, an Apache Paimon table can be used as a substitute of Message Queue in scenarios where the data freshness requirement is not high. For example, data within hours is required.

For more information about the append-only mode, see Apache Paimon official documentation. When you use the append-only mode, take note of the following points:

  • We recommend that you configure the bucket-key parameter based on your business requirements. Otherwise, data of the Apache Paimon table is distributed to buckets based on the values of all columns. This results in low computing efficiency.

  • The append-only mode ensures the data generation order based on the following rules:

    1. If two data records come from different partitions and the scan.plan-sort-partition parameter is configured, data that has the smaller partition value is preferentially generated. Otherwise, data from the partition that is created earlier is preferentially generated.

    2. If two data records come from the same bucket in the same partition, the data that is written earlier is preferentially generated.

    3. If two data records come from different buckets in the same partition, the data generation order cannot be ensured because data in different buckets is processed in different parallel subtasks.

Data synchronization based on the CREATE TABLE AS and CREATE DATABASE AS statements

You can execute the CREATE TABLE AS or CREATE DATABASE AS statement to synchronize data from a single table or an entire database to an Apache Paimon table in real time. The changes to the schema of the source table can also be synchronized to the related Apache Paimon table in real time during data synchronization of a database or table. For more information, see Use an Apache Paimon catalog as the catalog of the destination store that is used in the CREATE TABLE AS statement and Use an Apache Paimon catalog as the catalog of the destination store that is used in the CREATE DATABASE AS statement.

FAQ