All Products
Search
Document Center

Realtime Compute for Apache Flink:Paimon connector

Last Updated:Dec 03, 2025

You can use the Paimon connector, a streaming data lakehouse connector, with a Paimon Catalog. This topic describes how to use the Paimon connector.

Background information

Apache Paimon is a lake storage format that unifies streaming and batch processing. It supports high-throughput writes and low-latency queries. Common compute engines on the Alibaba Cloud open-source big data platform E-MapReduce, such as Flink, Spark, Hive, and Trino, are well-integrated with Paimon. You can use Apache Paimon to build a data lake storage service on Hadoop Distributed File System (HDFS) or Object Storage Service (OSS) and connect to these compute engines for data lake analytics. For more information, see Apache Paimon.

Category

Details

Supported types

Source table, dimension table, sink table, and data ingestion destination

Run modes

Streaming and batch modes

Data format

Not supported

Specific monitoring metrics

None

API types

SQL, YAML jobs for data ingestion

Supports updating or deleting data in sink tables

Yes

Features

Apache Paimon provides the following core capabilities:

  • Build a low-cost, lightweight data lake storage service based on HDFS or OSS.

  • Read and write large-scale datasets in streaming and batch modes.

  • Perform batch queries and Online Analytical Processing (OLAP) queries with data freshness ranging from seconds to minutes.

  • Consume and produce incremental data. Paimon can be used as a storage layer for traditional offline data warehouses and modern streaming data warehouses.

  • Pre-aggregate data to reduce storage costs and downstream computing pressure.

  • Retrieve historical versions of data.

  • Filter data efficiently.

  • Support schema evolution.

Limits

  • The Paimon connector is supported only in Flink compute engine Ververica Runtime (VVR) versions 6.0.6 and later.

  • The following table describes the version mapping between Paimon and VVR.

    Apache Paimon version

    VVR version

    1.1

    11

    1.0

    8.0.11

    0.9

    8.0.7, 8.0.8, 8.0.9, and 8.0.10

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

SQL

The Paimon connector can be used in SQL jobs as a source table or sink table.

Syntax

  • If you create a Paimon table in a Paimon Catalog, you do not need to specify the connector parameter. The syntax for creating a Paimon table is as follows:

    CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    Note

    If you have already created a Paimon table in a Paimon Catalog, you can use it directly without creating it again.

  • If you create a Paimon temporary table in another catalog, you must specify the connector parameter and the storage path of the Paimon table. The syntax for creating a Paimon table is as follows:

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create' = 'true', -- If no Paimon table data file exists at the specified path, a file is automatically created.
      ...
    );
    Note
    • Path example: 'path' = 'oss://<bucket>/test/order.db/orders'. Do not omit the .db suffix. Paimon uses this naming convention to identify the database.

    • Multiple jobs that write to the same table must use the same path configuration.

    • If two path values are different, Paimon considers them to be different tables. Even if the physical paths are the same, inconsistent Catalog configurations can cause concurrent write conflicts, compaction failures, and data loss. For example, oss://b/test and oss://b/test/ are considered different paths because of the trailing slash, even though they have the same physical path.

WITH parameters

Parameter

Description

Data type

Required

Default value

Notes

connector

The table type.

String

No

None

  • If you create a Paimon table in a Paimon Catalog, you do not need to specify this parameter.

  • If you create a Paimon temporary table in another catalog, set this parameter to paimon.

path

The table storage path.

String

No

None

  • If you create a Paimon table in a Paimon Catalog, you do not need to specify this parameter.

  • If you create a Paimon temporary table in another catalog, this parameter specifies the storage directory of the table in HDFS or OSS.

auto-create

When you create a Paimon temporary table, specifies whether to automatically create a file if no Paimon table file exists at the specified path.

Boolean

No

false

Valid values:

  • false (default): If no Paimon table file exists at the specified path, an error is reported.

  • true: If the specified path does not exist, the Flink system automatically creates a Paimon table file.

bucket

The number of buckets in each partition.

Integer

No

1

Data written to the Paimon table is distributed to each bucket based on the bucket-key.

Note

We recommend that the data volume of each bucket be less than 5 GB.

bucket-key

The key columns for bucketing.

String

No

None

Specifies the columns based on which the data written to the Paimon table is distributed to different buckets.

Separate column names with commas (,), for example, 'bucket-key' = 'order_id,cust_id' distributes data based on the values in the order_id and cust_id columns.

Note
  • If this parameter is not specified, data is distributed based on the primary key.

  • If no primary key is specified for the Paimon table, data is distributed based on the values of all columns.

changelog-producer

The mechanism for generating incremental data.

String

No

none

Paimon can generate complete incremental data (all update_after data has corresponding update_before data) for any input data stream, which is convenient for downstream consumers. Valid values for the incremental data generation mechanism:

  • none (default): No extra incremental data is generated. The downstream can still stream-read the Paimon table, but the incremental data read is incomplete. It contains only update_after data, without corresponding update_before data.

  • input: The input data stream is written to both the main table and the incremental data files.

  • full-compaction: Complete incremental data is generated during each full compaction.

  • lookup: Complete incremental data is generated before each snapshot is committed.

For more information about how to select a mechanism for generating incremental data, see Incremental data generation mechanisms.

full-compaction.delta-commits

The maximum interval for full compaction.

Integer

No

None

This parameter specifies the number of snapshot commits after which a full compaction must be performed.

lookup.cache-max-memory-size

The memory cache size for the Paimon dimension table.

String

No

256 MB

This parameter affects the cache size of both the dimension table and the lookup changelog-producer. The cache size for both mechanisms is configured by this parameter.

merge-engine

The mechanism for merging data with the same primary key.

String

No

deduplicate

Valid values:

  • deduplicate: Only the latest record is kept.

  • partial-update: Updates the existing data that has the same primary key with the non-null columns of the latest data. Other columns remain unchanged.

  • aggregation: Performs pre-aggregation using specified aggregate functions.

For a detailed analysis of data merge mechanisms, see Data merge mechanisms.

partial-update.ignore-delete

Specifies whether to ignore delete (-D) messages.

Boolean

No

false

Valid values:

  • true: Ignores delete messages.

  • false: Does not ignore delete messages. You need to configure the policy for the sink to process delete data using configuration items such as sequence.field. Otherwise, an IllegalStateException or IllegalArgumentException error may be thrown.

Note
  • In Realtime Compute for Apache Flink VVR 8.0.6 and earlier, this parameter takes effect only in partial update scenarios where merge-engine = partial-update.

  • In Realtime Compute for Apache Flink VVR 8.0.7 and later, this parameter is compatible with non-partial update scenarios and has the same function as the ignore-delete parameter. We recommend that you use ignore-delete instead.

  • Determine whether to enable this parameter based on your business scenario and whether the delete data is expected. If the job semantics represented by the delete data are not as expected, throwing an error is a better choice.

ignore-delete

Specifies whether to ignore delete (-D) messages.

Boolean

No

false

The valid values are the same as for partial-update.ignore-delete.

Note
  • This parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.7 and later.

  • This parameter has the same function as `partial-update.ignore-delete`. We recommend that you use the `ignore-delete` parameter and avoid configuring both parameters at the same time.

partition.default-name

The default name for partitions.

String

No

__DEFAULT_PARTITION__

If the value of a partition key column is null or an empty string, this default name is used as the partition name.

partition.expiration-check-interval

The interval at which to check for expired partitions.

String

No

1h

For more information, see How do I configure automatic partition expiration?

partition.expiration-time

The expiration duration for partitions.

String

No

None

When the survival time of a partition exceeds this value, the partition expires. By default, partitions never expire.

The survival time of a partition is calculated from its partition value. For more information, see How do I configure automatic partition expiration?

partition.timestamp-formatter

The format string to convert a time string to a UNIX timestamp.

String

No

None

Sets the format for extracting the partition survival time from the partition value. For more information, see How do I configure automatic partition expiration?

partition.timestamp-pattern

The format string to convert a partition value to a time string.

String

No

None

Sets the format for extracting the partition survival time from the partition value. For more information, see How do I configure automatic partition expiration?

scan.bounded.watermark

When the watermark of the data generated by the Paimon source table exceeds this value, the Paimon source table stops generating data.

Long

No

None

None.

scan.mode

Specifies the consumer offset for the Paimon source table.

String

No

default

For more information, see How do I set the consumer offset for a Paimon source table?

scan.snapshot-id

Specifies the snapshot from which the Paimon source table starts consumption.

Integer

No

None

For more information, see How do I set the consumer offset for a Paimon source table?

scan.timestamp-millis

Specifies the point in time from which the Paimon source table starts consumption.

Integer

No

None

For more information, see How do I set the consumer offset for a Paimon source table?

snapshot.num-retained.max

The maximum number of latest snapshots to retain without expiring.

Integer

No

2147483647

A snapshot expires if this configuration or `snapshot.time-retained` is met, and `snapshot.num-retained.min` is also met.

snapshot.num-retained.min

The minimum number of latest snapshots to retain without expiring.

Integer

No

10

None.

snapshot.time-retained

The duration after which a snapshot expires.

String

No

1h

A snapshot expires if this configuration or `snapshot.num-retained.max` is met, and `snapshot.num-retained.min` is also met.

write-mode

The write mode for the Paimon table.

String

No

change-log

Valid values:

  • change-log: The Paimon table supports inserting, deleting, and updating data based on the primary key.

  • append-only: The Paimon table only accepts data insertions and does not support a primary key. This mode is more efficient than the change-log mode.

For more information about write modes, see Write modes.

scan.infer-parallelism

Specifies whether to automatically infer the concurrency of the Paimon source table.

Boolean

No

true

Valid values:

  • true: The concurrency of the Paimon source table is automatically inferred based on the number of buckets.

  • false: The default concurrency configured in VVP is used. In expert mode, the user-configured concurrency is used.

scan.parallelism

The concurrency of the Paimon source table.

Integer

No

None

Note

On the Deployment Details > Resource Configuration tab, this parameter does not take effect when Resource Mode is set to Expert.

sink.parallelism

The concurrency of the Paimon sink table.

Integer

No

None

Note

On the Deployment Details > Resource Configuration tab, this parameter does not take effect when Resource Mode is set to Expert.

sink.clustering.by-columns

Specifies the clustering columns for writing to the Paimon sink table.

String

No

None

For Paimon append-only tables (non-primary key tables), configuring this parameter in a batch job enables the clustering write feature. This feature clusters data on specific columns by value range, which improves the query speed of the table.

Separate multiple column names with commas (,), for example, 'col1,col2'.

For more information about clustering, see the official Apache Paimon documentation.

sink.delete-strategy​

Sets a validation policy to ensure that the system correctly handles retraction (-D/-U) messages.

​​

Enum

No

NONE

The valid values for the validation policy and the expected behavior of the sink operator for handling retraction messages are as follows:​

  • ​NONE (default): No validation is performed.​

  • IGNORE_DELETE: The sink operator should ignore -U and -D messages. No retraction occurs.

  • NON_PK_FIELD_TO_NULL: The sink operator should ignore -U messages. However, when it receives a -D message, it keeps the primary key value and retracts other non-primary key values in the schema.

    This is mainly used in scenarios where multiple sinks write to the same table simultaneously for partial updates.​

  • DELETE_ROW_ON_PK: The sink operator should ignore -U messages. However, when it receives a -D message, it deletes the row corresponding to the primary key.​

  • CHANGELOG_STANDARD: The sink operator should delete the row corresponding to the primary key when it receives either -U or -D data.​

Note
  • This parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.8 and later.

  • The behavior of the Paimon sink when processing retraction messages is actually determined by other configuration items such as `ignore-delete` and `merge-engine`. This configuration item does not directly affect this behavior. Instead, it validates whether the behavior meets the expected policy. If the behavior does not meet the expected policy, the validation step is terminated, and the job error message prompts you on how to modify other configuration items like `ignore-delete` and `merge-engine` to meet the expectation.

Note

For more information about configuration items, see the official Apache Paimon documentation.

Feature details

Data freshness and consistency guarantees

The Paimon sink table uses a two-phase commit protocol to commit written data during each checkpoint of a Flink job. Therefore, the data freshness is the same as the checkpoint interval of the Flink job. Each commit generates up to two snapshots.

When two Flink jobs write to the same Paimon table at the same time, if the data from the two jobs is not written to the same bucket, serializable consistency is guaranteed. If the data from the two jobs is written to the same bucket, only snapshot isolation consistency is guaranteed. This means that the data in the table may be a mix of results from both jobs, but no data will be lost.

Data merge mechanisms

When a Paimon sink table receives multiple records with the same primary key, it merges these records into a single record to maintain the uniqueness of the primary key. You can specify the data merge behavior by setting the merge-engine parameter. The following table describes the data merge mechanisms.

Mechanism

Details

Deduplicate

The deduplicate mechanism is the default data merge mechanism. For multiple records with the same primary key, the Paimon sink table keeps only the latest record and discards the others.

Note

If the latest record is a delete message, all records with that primary key are discarded.

Partial Update

By specifying the partial-update mechanism, you can progressively update data through multiple messages to eventually obtain complete data. Specifically, new data with the same primary key will overwrite the old data, but columns with null values will not be overwritten.

For example, assume a Paimon sink table receives the following three records in order:

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

The first column is the primary key. The final result is <1, 25.2, 10, 'This is a book'>.

Note
  • To stream-read the results of a partial-update, you must set the changelog-producer parameter to `lookup` or `full-compaction`.

  • The partial-update mechanism cannot process delete messages. You can set the partial-update.ignore-delete parameter to ignore delete messages.

Aggregation

In some scenarios, you may only care about the aggregated value. The aggregation mechanism aggregates data with the same primary key based on the aggregate functions you specify. For each column that is not part of the primary key, you must specify an aggregate function using fields.<field-name>.aggregate-function. Otherwise, the last_non_null_value aggregate function is used by default for that column. For example, consider the following Paimon table definition.

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

The price column is aggregated using the max function, and the sales column is aggregated using the sum function. Given two input records <1, 23.0, 15> and <1, 30.2, 20>, the final result is <1, 30.2, 35>. The currently supported aggregate functions and their corresponding data types are as follows:

  • sum: Supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE.

  • min and max: Supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ.

  • last_value and last_non_null_value: Supports all data types.

  • listagg: Supports STRING.

  • bool_and and bool_or: Supports BOOLEAN.

Note
  • Only the sum function supports data retraction and deletion. Other aggregate functions do not. If you need certain columns to ignore retraction and deletion messages, you can set 'fields.${field_name}.ignore-retract'='true'.

  • To stream-read the results of an aggregation, you must set the changelog-producer parameter to `lookup` or `full-compaction`.

Incremental data generation mechanisms

By setting the changelog-producer parameter to the appropriate incremental data generation mechanism, Paimon can generate complete incremental data for any input data stream. Complete incremental data means that all update_after data has corresponding update_before data. The following lists all incremental data generation mechanisms. For more information, see the official Apache Paimon documentation.

Mechanism

Details

None

If you set changelog-producer to `none` (the default value), the downstream Paimon source table can only see the latest state of the data for the same primary key. However, this latest state does not allow downstream consumers to easily understand the complete incremental data for correct calculations. This is because it can only determine whether the corresponding data was deleted or what the latest data is, but not what the data was before the change.

For example, assume a downstream consumer needs to calculate the sum of a column. If the consumer only sees the latest data, which is 5, it cannot determine how to update the sum. If the previous data was 4, it should increase the sum by 1. If the previous data was 6, it should decrease the sum by 1. Such consumers are sensitive to update_before data. We recommend that you do not set the incremental data generation mechanism to `none`. However, other mechanisms will incur performance overhead.

Note

If your downstream consumer, such as a database, is not sensitive to update_before data, you can set the incremental data generation mechanism to `none`. Therefore, we recommend that you configure the mechanism based on your actual needs.

Input

If you set changelog-producer to `input`, the Paimon sink table writes the input data stream to both the main table and the incremental data files.

Therefore, this mechanism can be used only when the input data stream itself is complete incremental data, such as Change Data Capture (CDC) data.

Lookup

If you set changelog-producer to `lookup`, the Paimon sink table uses a mechanism similar to a dimension table point query to generate complete incremental data for the current snapshot before each commit. This mechanism can generate complete incremental data regardless of whether the input data is complete incremental data.

Compared with the Full Compaction mechanism described below, the Lookup mechanism provides better timeliness for generating incremental data but consumes more resources overall.

We recommend that you use this mechanism when you have high requirements for data freshness, such as minute-level freshness.

Full Compaction

If you set changelog-producer to `full-compaction`, the Paimon sink table generates complete incremental data during each full compaction. This mechanism can generate complete incremental data regardless of whether the input data is complete incremental data. The interval for full compaction is specified by the full-compaction.delta-commits parameter.

Compared with the Lookup mechanism, the Full Compaction mechanism has poorer timeliness for generating incremental data. However, it utilizes the full compaction process of the data and does not generate extra computation, thus consuming fewer resources overall.

We recommend that you use this mechanism when you do not have high requirements for data freshness, such as hour-level freshness.

Write modes

Paimon tables currently support the following write modes.

Mode

Details

Change-log

The change-log write mode is the default write mode for Paimon tables. This mode supports inserting, deleting, and updating data based on the primary key. You can also use the data merge and incremental data generation mechanisms mentioned above in this mode.

Append-only

The append-only write mode only supports data insertion and does not support a primary key. This mode is more efficient than the change-log mode and can be used as an alternative to a message queue in scenarios where data freshness requirements are not high, such as minute-level freshness.

For more information about the append-only write mode, see the official Apache Paimon documentation. When you use the append-only write mode, note the following:

  • We recommend that you set the bucket-key parameter based on your actual needs. Otherwise, the Paimon table will perform bucketing based on the values of all columns, which is less efficient.

  • The append-only write mode can guarantee the output order of data to a certain extent. The specific output order is as follows:

    1. If two records are from different partitions, the record with the smaller partition value is output first if the scan.plan-sort-partition parameter is set. Otherwise, the data from the earlier created partition is output first.

    2. If two records are from the same bucket in the same partition, the earlier written data is output first.

    3. If two records are from different buckets in the same partition, their output order is not guaranteed because different buckets are processed by different concurrent tasks.

As a destination for CTAS and CDAS

Paimon tables support real-time synchronization of data at the single-table or entire-database level. If the schema of an upstream table changes during synchronization, the change is also synchronized to the Paimon table in real time. For more information, see Manage Paimon tables and Manage Paimon Catalogs.

Data ingestion

The Paimon connector can be used in YAML job development for data ingestion as a sink.

Syntax

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

Configuration items

Parameter

Description

Required

Data type

Default value

Notes

type

The connector type.

Yes

STRING

None

Set this parameter to paimon.

name

The sink name.

No

STRING

None

The name of the sink.

catalog.properties.metastore

The type of the Paimon Catalog.

No

STRING

filesystem

Valid values:

  • filesystem (default)

  • rest (Only DLF is supported. DLF-Legacy is not supported.)

catalog.properties.*

The parameters for creating a Paimon Catalog.

No

STRING

None

For more information, see Manage Paimon Catalogs.

table.properties.*

The parameters for creating a Paimon table.

No

STRING

None

For more information, see Paimon table options.

catalog.properties.warehouse

The root directory for file storage.

No

STRING

None

This parameter takes effect only when catalog.properties.metastore is set to `filesystem`.

commit.user-prefix

The username prefix for committing data files.

No

STRING

None

Note

We recommend that you set different usernames for different jobs to easily locate conflicting jobs when commit conflicts occur.

partition.key

The partition field for each partitioned table.

No

STRING

None

Use semicolons (;) to separate different tables, commas (,) to separate different fields, and colons (:) to separate tables and fields. For example: testdb.table1:id1,id2;testdb.table2:name.

sink.cross-partition-upsert.tables

Specifies the tables that require cross-partition updates, where the primary key does not include all partition fields.

No

STRING

None

The tables to update across partitions.

  • Format: Use a semicolon (;) to separate table names.

  • Performance recommendation: This operation consumes a large amount of compute resources. Create separate jobs for these tables.

Important
  • List all eligible tables. Omitting a table name causes data duplication.

Examples

The following examples show how to configure Paimon as a data ingestion sink based on the Paimon Catalog type.

  • Example configuration for writing to Alibaba Cloud OSS when the Paimon Catalog is filesystem:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: filesystem
      catalog.properties.warehouse: oss://default/test
      catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
      catalog.properties.fs.oss.accessKeyId: xxxxxxxx
      catalog.properties.fs.oss.accessKeySecret: xxxxxxxx

    For more information about the parameters that are prefixed with catalog.properties, see Create a Paimon Filesystem Catalog.

  • Example configuration for writing to Alibaba Cloud DLF 2.5 when the Paimon Catalog is rest:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf

    For more information about the parameters that are prefixed with catalog.properties, see Flink CDC Catalog configuration parameters.

Schema evolution

Currently, Paimon as a data ingestion sink supports the following schema evolution events:

  • CREATE TABLE EVENT

  • ADD COLUMN EVENT

  • ALTER COLUMN TYPE EVENT (Modifying the type of a primary key column is not supported)

  • RENAME COLUMN EVENT

  • DROP COLUMN EVENT

  • TRUNCATE TABLE EVENT

  • DROP TABLE EVENT

Note

If the downstream Paimon table already exists, its existing schema is used for writing. The system does not attempt to create the table again.

FAQ