All Products
Search
Document Center

Realtime Compute for Apache Flink:Paimon streaming data lakehouse

Last Updated:Sep 16, 2025

You can use the Paimon connector with a Paimon Catalog. This topic describes how to use the Paimon connector.

Background information

Apache Paimon is a data lake storage format that unifies streaming and batch processing. It supports high-throughput writes and low-latency queries. Common compute engines on Alibaba Cloud's E-MapReduce big data platform, such as Flink, Spark, Hive, or Trino, are well-integrated with Paimon. You can use Apache Paimon to quickly build your data lake storage service on Hadoop Distributed File System (HDFS) or Object Storage Service (OSS). You can then connect to these compute engines to perform data lake analytics. For more information, see Apache Paimon.

Item

Description

Supported types

Source table, dimension table, sink table, and data ingestion sink

Running modes

Streaming and batch

Data format

Not supported

Specific monitoring metrics

None

API types

SQL, data ingestion YAML job

Update or delete data in sink tables

Yes, it is.

Features

Apache Paimon provides the following core features:

  • Low-cost, lightweight data lake storage services based on HDFS or OSS.

  • Streaming and batch reads and writes for large datasets.

  • Batch and Online Analytical Processing (OLAP) queries with data freshness from minutes to seconds.

  • Consumption and production of incremental data. Paimon can serve as a storage layer for both traditional offline data warehouses and modern streaming data warehouses.

  • Support for pre-aggregated data to reduce storage costs and downstream computing pressure.

  • Support for backtracking to historical data versions.

  • Efficient data filtering.

  • Table schema evolution.

Limits

  • The Paimon connector is supported only in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.6 or later.

  • The following table shows the version compatibility between Paimon and VVR.

    Apache Paimon version

    VVR version

    1.1

    11

    1.0

    8.0.11

    0.9

    8.0.7, 8.0.8, 8.0.9, and 8.0.10

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

SQL

You can use the Paimon connector in SQL jobs as a source table or a sink table.

Syntax

  • If you create a Paimon table in a Paimon Catalog, you do not need to specify the connector parameter. The syntax is as follows.

    CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    Note

    If you have already created a Paimon table in a Paimon Catalog, you can use the table directly without creating it again.

  • If you create a temporary Paimon table in another catalog, you must specify the connector parameter and the storage path of the Paimon table. The syntax is as follows.

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create' = 'true', -- If no Paimon table data file exists in the specified path, a file is automatically created.
      ...
    );

WITH parameters

Parameter

Description

Data type

Required

Default value

Remarks

connector

The table type.

String

No

None

  • If you create a Paimon table in a Paimon Catalog, you do not need to specify this parameter.

  • If you create a temporary Paimon table in another catalog, set the value to paimon.

path

The storage path of the table.

String

No

None

  • If you create a Paimon table in a Paimon Catalog, you do not need to specify this parameter.

  • If you create a temporary Paimon table in another catalog, set this to the storage directory of the table in HDFS or OSS.

auto-create

Specifies whether to automatically create a Paimon table file if no file exists in the specified path when you create a temporary Paimon table.

Boolean

No

false

Valid values:

  • false (default): An error is reported if no Paimon table file exists in the specified path.

  • true: The Flink system automatically creates a Paimon table file if the specified path does not exist.

bucket

The number of buckets in each partition.

Integer

No

1

Data written to the Paimon table is distributed to each bucket based on the bucket-key.

Note

Keep the data size in each bucket under 5 GB.

bucket-key

The key columns for bucketing.

String

No

None

Specifies the columns whose values are used to distribute data to different buckets when writing to a Paimon table.

Separate column names with a comma (,). For example, 'bucket-key' = 'order_id,cust_id' distributes data based on the values of the order_id and cust_id columns.

Note
  • If this parameter is not specified, data is distributed based on the primary key.

  • If no primary key is specified for the Paimon table, data is distributed based on the values of all columns.

changelog-producer

The mechanism for producing incremental data.

String

No

none

Paimon can produce a complete changelog (all update_after data has corresponding update_before data) for any input data stream, which is convenient for downstream consumers. The available options are:

  • none (default): No extra changelog is produced. The downstream can still stream-read the Paimon table, but the changelog is incomplete (only update_after data, without corresponding update_before data).

  • input: The input data stream is dual-written to the changelog file as incremental data.

  • full-compaction: A complete changelog is produced during each full compaction.

  • lookup: A complete changelog is produced before each snapshot commit.

For more information about choosing a changelog producer, see Incremental data production mechanism.

full-compaction.delta-commits

The maximum interval for a full compaction.

Integer

No

None

This parameter specifies that a full compaction is triggered after a certain number of snapshot commits.

lookup.cache-max-memory-size

The memory cache size for a Paimon dimension table.

String

No

256 MB

This parameter affects both the dimension table cache size and the lookup changelog-producer cache size. The cache for both mechanisms is configured by this parameter.

merge-engine

The merge mechanism for data with the same primary key.

String

No

deduplicate

Valid values:

  • deduplicate: Retains only the latest record.

  • partial-update: Updates the existing data that has the same primary key with non-null columns from the latest data. Other columns remain unchanged.

  • aggregation: Performs pre-aggregation using specified aggregate functions.

For a detailed analysis of data merge mechanisms, see Data merging mechanism.

partial-update.ignore-delete

Specifies whether to ignore delete (-D) messages.

Boolean

No

false

Valid values:

  • true: Ignores delete messages.

  • false: Does not ignore delete messages. You must configure how the sink handles delete data using options such as sequence.field. Otherwise, an IllegalStateException or IllegalArgumentException error may be thrown.

Note
  • In Realtime Compute for Apache Flink VVR 8.0.6 and earlier, this parameter takes effect only in partial update scenarios when merge-engine = partial-update.

  • In Realtime Compute for Apache Flink VVR 8.0.7 and later, this parameter is compatible with non-partial update scenarios and has the same function as the ignore-delete parameter. We recommend replacing it with ignore-delete.

  • Determine whether to enable this parameter as needed and whether the delete messages are expected. If the semantics of the delete messages are not as expected, throwing an error is a better choice.

ignore-delete

Specifies whether to ignore delete (-D) messages.

Boolean

No

false

The valid values are the same as for partial-update.ignore-delete.

Note
  • This parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.7 and later.

  • This parameter has the same function as the partial-update.ignore-delete parameter. Use the ignore-delete parameter and avoid configuring both parameters at the same time.

partition.default-name

The default partition name.

String

No

__DEFAULT_PARTITION__

If the value of a partition key column is null or an empty string, this default name is used as the partition name.

partition.expiration-check-interval

The interval for checking partition expiration.

String

No

1h

For more information, see How do I configure automatic partition expiration?

partition.expiration-time

The expiration time for a partition.

String

No

None

A partition expires when its lifetime exceeds this value. By default, partitions never expire.

A partition's lifetime is calculated from its partition value. For more information, see How do I configure automatic partition expiration?

partition.timestamp-formatter

The format string to convert a time string to a timestamp.

String

No

None

Sets the format for extracting the partition lifetime from the partition value. For more information, see How do I configure automatic partition expiration?

partition.timestamp-pattern

The format string to convert a partition value to a time string.

String

No

None

Sets the format for extracting the partition lifetime from the partition value. For more information, see How do I configure automatic partition expiration?

scan.bounded.watermark

When the watermark of data produced by a Paimon source table exceeds this value, the source table stops producing data.

Long

No

None

None.

scan.mode

Specifies the consumer offset for the Paimon source table.

String

No

default

For more information, see How do I set the consumer offset for a Paimon source table?

scan.snapshot-id

Specifies the snapshot from which the Paimon source table starts consumption.

Integer

No

None

For more information, see How do I set the consumer offset for a Paimon source table?

scan.timestamp-millis

Specifies the point in time from which the Paimon source table starts consumption.

Integer

No

None

For more information, see How do I set the consumer offset for a Paimon source table?

snapshot.num-retained.max

The maximum number of latest snapshots to retain.

Integer

No

2147483647

A snapshot expires if this condition or the snapshot.time-retained condition is met, and the snapshot.num-retained.min condition is also met.

snapshot.num-retained.min

The minimum number of latest snapshots to retain.

Integer

No

10

None.

snapshot.time-retained

The retention period for a snapshot after it is created.

String

No

1h

A snapshot expires if this condition or the snapshot.num-retained.max condition is met, and the snapshot.num-retained.min condition is also met.

write-mode

The write mode for the Paimon table.

String

No

change-log

Valid values:

  • change-log: The Paimon table supports data insertion, deletion, and updates based on the primary key.

  • append-only: The Paimon table only accepts data insertions and does not support a primary key. This mode is more efficient than the change-log mode.

For more information about write modes, see Write modes.

scan.infer-parallelism

Specifies whether to automatically infer the parallelism of the Paimon source table.

Boolean

No

true

Valid values:

  • true: Automatically infers the parallelism of the Paimon source table based on the number of buckets.

  • false: Uses the default parallelism configured in VVP. If you use the expert mode, the user-configured parallelism is used.

scan.parallelism

The parallelism of the Paimon source table.

Integer

No

None

Note

This parameter does not take effect if Resource Mode is set to Expert Mode on the Deployment Details > Resource Configuration tab.

sink.parallelism

The parallelism of the Paimon sink table.

Integer

No

None

Note

This parameter does not take effect if Resource Mode is set to Expert Mode on the Deployment Details > Resource Configuration tab.

sink.clustering.by-columns

Specifies the clustering columns for writing to the Paimon sink table.

String

No

None

For a Paimon append-only table (non-primary key table), configure this parameter in a batch job to enable clustered writing. This clusters data in specific columns by size range, which improves the query speed of the table.

Use a comma (,) to separate multiple column names, for example, 'col1,col2'.

For more information about the clustering feature, see the Apache Paimon official documentation.

sink.delete-strategy​

Sets a validation policy to ensure that the system correctly handles retraction (-D/-U) messages.

​​

Enum

No

NONE

The valid values for the validation policy and the expected behavior of the sink operator for handling retraction messages are as follows:​

  • ​NONE (default): No validation is performed.​

  • IGNORE_DELETE: The sink operator should ignore -U and -D messages. No retraction occurs.

  • NON_PK_FIELD_TO_NULL: The sink operator should ignore -U messages. When it receives a -D message, it keeps the primary key value and retracts other non-primary key values in the schema.

    This is mainly used for partial updates when multiple sinks write to the same table.​

  • DELETE_ROW_ON_PK: The sink operator should ignore -U messages but delete the row corresponding to the primary key when it receives a -D message.​

  • CHANGELOG_STANDARD: The sink operator should delete the row corresponding to the primary key when it receives either -U or -D messages.​

Note
  • This parameter is supported only in Realtime Compute for Apache Flink VVR 8.0.8 and later.

  • The behavior of the Paimon sink for retraction messages is determined by other options, such as ignore-delete and merge-engine. This option validates whether the behavior meets the expected policy. If the behavior does not meet the policy, the validation fails, and an error is reported. The error message provides guidance on how to modify the other options to meet the policy.

Note

For more information about the configuration items, see the official Apache Paimon documentation.

Feature details

Data freshness and consistency guarantee

A Paimon sink table uses the two-phase commit protocol and commits written data during each checkpoint of a Flink job. Therefore, the data freshness is the same as the Flink job's checkpoint interval. Each commit produces a maximum of two snapshots.

If two Flink jobs write to the same Paimon table at the same time, serializable-level consistency is guaranteed, provided the data from the two jobs is not written to the same bucket. If the data is written to the same bucket, only snapshot isolation-level consistency is guaranteed. This means the data in the table might be a mix of results from both jobs, but no data is lost.

Data merging mechanism

When a Paimon sink table receives multiple records with the same primary key, it merges these records into a single record to maintain the uniqueness of the primary key. You can specify the data merging behavior by setting the merge-engine parameter. The following table describes the available data merging mechanisms.

Merging mechanism

Details

Deduplicate

The deduplicate mechanism is the default data merging mechanism. For multiple records with the same primary key, the Paimon sink table retains only the latest record and discards the others.

Note

If the latest record is a delete message, all records with that primary key are discarded.

Partial Update

By specifying the partial-update mechanism, you can progressively update data through multiple messages to eventually obtain the complete data. Specifically, new data with the same primary key will overwrite the old data, but columns with null values will not be overwritten.

For example, assume a Paimon sink table receives the following three records in order:

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

If the first column is the primary key, the final result is <1, 25.2, 10, 'This is a book'>.

Note
  • To stream-read the result of a partial-update, you must set the changelog-producer parameter to lookup or full-compaction.

  • Partial-update cannot handle delete messages. You can set the partial-update.ignore-delete parameter to ignore delete messages.

Aggregation

In some scenarios, you may only care about the aggregated value. The aggregation mechanism aggregates data with the same primary key based on the aggregate functions you specify. For each column that is not part of the primary key, you must specify an aggregate function using fields.<field-name>.aggregate-function. Otherwise, the column will use the last_non_null_value aggregate function by default. For example, consider the following Paimon table definition.

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

The price column will be aggregated using the max function, and the sales column will be aggregated using the sum function. Given two input records <1, 23.0, 15> and <1, 30.2, 20>, the final result will be <1, 30.2, 35>. The currently supported aggregate functions and their corresponding data types are:

  • sum: Supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE.

  • min and max: Supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ.

  • last_value and last_non_null_value: Supports all data types.

  • listagg: Supports STRING.

  • bool_and and bool_or: Supports BOOLEAN.

Note
  • Only the sum function supports data retraction and deletion. Other aggregate functions do not. If you need certain columns to ignore retraction and deletion messages, you can set 'fields.${field_name}.ignore-retract'='true'.

  • To stream-read the result of an aggregation, you must set the changelog-producer parameter to lookup or full-compaction.

Incremental data production mechanism

By setting the changelog-producer parameter, you can configure Paimon to generate complete incremental data for any input data stream. In this case, every update_after record has a corresponding update_before record. The following section describes all the incremental data generation mechanisms. For more information, see the official Apache Paimon documentation.

Mechanism

Details

None

If you set changelog-producer to none (the default), the downstream Paimon source table can only see the latest state of the data for the same primary key. However, this latest state does not provide enough information for downstream consumers to understand the complete incremental changes for correct calculations. It can only determine if the data was deleted or what the latest data is, but not what the data was before the change.

For example, if a downstream consumer needs to calculate the sum of a column and only sees the latest value of 5, it cannot determine how to update the sum. If the previous value was 4, it should increase the sum by 1. If the previous value was 6, it should decrease the sum by 1. Such consumers are sensitive to update_before data. We recommend not setting the changelog producer to None for these cases. However, other mechanisms will incur a performance overhead.

Note

If your downstream consumer, such as a database, is not sensitive to update_before data, you can set the changelog producer to None. Therefore, configure the changelog producer based on your actual needs.

Input

If you set changelog-producer to input, the Paimon sink table will dual-write the input data stream to the changelog file as incremental data.

Therefore, this mechanism can only be used when the input data stream itself is a complete changelog, such as CDC data.

Lookup

If you set changelog-producer to lookup, the Paimon sink table uses a point-query mechanism similar to a dimension table to produce a complete changelog for the current snapshot before each commit. This mechanism can produce a complete changelog regardless of whether the input data is a complete changelog.

Compared to the Full Compaction mechanism, the Lookup mechanism provides better timeliness for changelog production but consumes more resources overall.

Use this mechanism when you have high requirements for the freshness of incremental data, such as at the minute level.

Full Compaction

If you set changelog-producer to full-compaction, the Paimon sink table produces a complete changelog during each full compaction. This mechanism can produce a complete changelog regardless of whether the input data is a complete changelog. The interval for full compaction is specified by the full-compaction.delta-commits parameter.

Compared to the Lookup mechanism, the Full Compaction mechanism has worse timeliness for changelog production. However, it utilizes the data's full compaction process and does not generate extra computation, so it consumes fewer resources overall.

Use this mechanism when the freshness requirement for incremental data is not high, such as at the hour level.

Write modes

Paimon tables support the following write modes.

Mode

Details

Change-log

The change-log write mode is the default for Paimon tables. This mode supports data insertion, deletion, and updates based on a primary key. You can also use the data merging and incremental data production mechanisms mentioned earlier in this mode.

Append-only

The append-only write mode only supports data insertion and does not support a primary key. This mode is more efficient than the change-log mode and can be used as an alternative to a message queue in scenarios where data freshness requirements are moderate, such as at the minute level.

For more information about the append-only write mode, see the official Apache Paimon documentation. When you use the append-only write mode, note the following two points:

  • Set the bucket-key parameter based on your actual needs. Otherwise, the Paimon table will perform bucketing based on the values of all columns, which is less efficient.

  • The append-only write mode can guarantee the order of data output to a certain extent. The specific output order is as follows:

    1. If two records are from different partitions and the scan.plan-sort-partition parameter is set, the record with the smaller partition value will be produced first. Otherwise, data from the earlier-created partition will be produced first.

    2. If two records are from the same bucket in the same partition, the earlier-written record will be produced first.

    3. If two records are from different buckets in the same partition, their output order is not guaranteed because different buckets are processed by different concurrent tasks.

As a sink for CTAS and CDAS

Paimon tables support real-time data synchronization for a single table or an entire database. If the schema of the upstream table changes during synchronization, the change is also synchronized to the Paimon table in real time. For more information, see Manage Paimon tables and Manage Paimon Catalogs.

Data ingestion

You can use the Paimon connector in a data ingestion YAML job to write data to a sink.

Syntax

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

Parameters

Parameter

Description

Required

Data type

Default value

Remarks

type

The connector type.

Yes

STRING

None

The value must be paimon.

name

The name of the sink.

No

STRING

None

The name of the sink.

catalog.properties.metastore

The type of the Paimon Catalog.

No

STRING

filesystem

Valid values:

  • filesystem (default)

  • dlf-paimon (Only DLF 2.0 and later versions are supported)

catalog.properties.*

Parameters for creating a Paimon Catalog.

No

STRING

None

For more information, see Manage Paimon Catalogs.

table.properties.*

Parameters for creating a Paimon table.

No

STRING

None

For more information, see Paimon table options.

catalog.properties.warehouse

The root directory of the file storage.

No

STRING

None

This parameter takes effect only when catalog.properties.metastore is set to filesystem.

commit.user

The username for committing data files.

No

STRING

None

Note

Set different usernames for different jobs to help locate jobs that cause commit conflicts.

partition.key

The partition fields for each partitioned table.

No

STRING

None

Use a semicolon (;) to separate different tables, a comma (,) to separate different fields, and a colon (:) to separate a table from its fields. Example: testdb.table1:id1,id2;testdb.table2:name.

Examples

When you use Paimon as a data ingestion sink, you can refer to the following examples for configuration based on the Paimon Catalog type.

  • Example of writing data to Alibaba Cloud OSS when the Paimon Catalog type is filesystem:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: filesystem
      catalog.properties.warehouse: oss://default/test
      catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
      catalog.properties.fs.oss.accessKeyId: xxxxxxxx
      catalog.properties.fs.oss.accessKeySecret: xxxxxxxx

    For more information about the parameters that have the catalog.properties prefix, see Create a Paimon Filesystem Catalog.

  • Example of writing data to Alibaba Cloud DLF 2.5 when the Paimon Catalog type is rest:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf

    For more information about the parameters that have the catalog.properties prefix, see Flink CDC Paimon Catalog configuration parameters.

FAQ