All Products
Search
Document Center

Realtime Compute for Apache Flink:Streaming Data Lakehouse Paimon

Last Updated:Mar 05, 2026

This topic describes how to use the Streaming Data Lakehouse Paimon connector with a Paimon Catalog.

Background information

Apache Paimon is a unified storage format for streaming and batch data lakes. It supports high-throughput writes and low-latency queries. E-MapReduce (EMR), an open source big data platform from Alibaba Cloud, integrates with common compute engines such as Flink, Spark, Hive, and Trino. You can use Apache Paimon to build a data lake storage service on HDFS or Object Storage Service (OSS) and connect these compute engines to analyze data in the data lake. For more information, see Apache Paimon.

Category

Details

Supported types

Source table, dimension table, sink table, and data ingestion sink

Running modes

Streaming mode and batch mode

Data format

Not supported

Specific monitoring metrics

None

API types

SQL, data ingestion YAML job

Supports updating or deleting sink table data

Yes

Features

Apache Paimon provides the following core features:

  • Build a low-cost, lightweight data lake storage service on HDFS or Object Storage Service (OSS).

  • Read and write large scale datasets in streaming and batch modes.

  • Support batch queries and Online Analytical Processing (OLAP) queries with data freshness from minutes to seconds.

  • Consume and generate incremental data. You can use it as storage for traditional offline data warehouses and modern streaming data warehouses.

  • Pre-aggregate data to reduce storage costs and downstream computing pressure.

  • Support data backtracking for historical versions.

  • Support efficient data filtering.

  • Support schema evolution.

Limits and Recommendations

  • The Paimon connector is supported only by Flink compute engine VVR 6.0.6 and later.

  • The following table shows the compatibility between Paimon and VVR versions.

    Apache Paimon version

    VVR version

    1.3

    11.4

    1.2

    11.2、11.3

    1.1

    11

    1.0

    8.0.11

    0.9

    8.0.7, 8.0.8, 8.0.9, and 8.0.10

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

  • Storage recommendations for concurrent write scenarios

    When multiple jobs concurrently update the same Paimon table, standard OSS storage (oss://) might experience rare commit conflicts or job errors because of limitations in the atomicity of file operations.

    To ensure continuous and stable writes, use metadata or storage services that have strong atomicity guarantees. We recommend that you use Data Lake Formation (DLF), which provides unified management of Paimon metadata and storage services. Alternatively, you can use OSS-HDFS or HDFS.

SQL

You can use the Paimon connector in SQL jobs as a source table or a sink table.

Syntax

  • If you create a Paimon table in a Paimon Catalog, you do not need to specify the connector parameter. The syntax for creating a Paimon table is as follows:

    CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    Note

    If you have already created a Paimon table in the Paimon Catalog, you can use it directly without recreating the table.

  • If you create a temporary Paimon table in another Catalog, you must specify the connector parameter and the storage path of the Paimon table. The syntax for creating a Paimon table is as follows:

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create' = 'true', -- If no Paimon table data file exists in the specified path, the system automatically creates a file.
      ...
    );
    Note
    • Path example: 'path' = 'oss://<bucket>/test/order.db/orders'. Do not omit the .db suffix. Paimon uses this naming convention to identify databases.

    • Multiple jobs that write to a single table must share the same path configuration.

    • If two paths differ, Paimon does not consider them the same table. Even if the physical paths are identical, inconsistent Catalog configurations can cause concurrent write conflicts, compaction failures, or data loss. For example, oss://b/test and oss://b/test/ are considered different paths because of the trailing slash, even though their physical paths are the same.

WITH parameters

Parameter

Description

Data type

Required

Default value

Remarks

connector

Table type.

String

No

None

  • If you create a Paimon table in a Paimon Catalog, do not specify this parameter.

  • If you create a temporary Paimon table in another Catalog, the value is fixed to paimon.

path

Table storage path.

String

No

None

  • If you create a Paimon table in a Paimon Catalog, do not specify this parameter.

  • If you create a temporary Paimon table in another Catalog, this is the table's storage directory in HDFS or OSS.

auto-create

When creating a temporary Paimon table, automatically create a file if the specified path does not contain a Paimon table file.

Boolean

No

false

Values:

  • false (default): An error is reported if the specified path does not contain a Paimon table file.

  • true: The Flink system automatically creates a Paimon table file if the specified path does not exist.

file.format

Storage type of data files in the table.

String

No

parquet

Values:

  • orc

  • parquet

  • avro

  • lance (VVR 11.6 and later)

bucket

Number of buckets in each partition.

Integer

No

1

Data written to the Paimon table is distributed to each bucket based on the bucket-key.

Note

Keep the data volume in each bucket below 5 GB.

bucket-key

Bucket key column.

String

No

None

Specify which column values distribute data written to the Paimon table into different buckets.

Separate column names with commas (,). For example, 'bucket-key' = 'order_id,cust_id' distributes data by the order_id and cust_id column values.

Note
  • If this parameter is not specified, data is distributed by the primary key.

  • If the Paimon table does not specify a primary key, data is distributed by all column values.

changelog-producer

Incremental data generation mechanism.

String

No

none

Paimon generates complete incremental data for any input data stream (all update_after data has corresponding update_before data) to facilitate downstream consumers. Incremental data generation mechanism values:

  • none (default): No incremental data is generated. The downstream consumer can stream read the Paimon table, but the incremental data read is incomplete (only update_after data, no corresponding update_before data).

  • input: Dual-write the input data stream to an incremental data file as incremental data.

  • full-compaction: Generate complete incremental data during each Full Compaction.

  • lookup: Generate complete incremental data before each commit snapshot.

For more information about selecting an incremental data generation mechanism, see Incremental data generation mechanism.

full-compaction.delta-commits

Maximum Full Compaction interval.

Integer

No

None

This parameter specifies how many commit snapshots trigger a Full Compaction.

lookup.cache-max-memory-size

Memory cache size for Paimon dimension tables.

String

No

256 MB

This parameter affects the cache size for both dimension tables and the lookup changelog-producer. Both mechanisms use this parameter for cache configuration.

merge-engine

Merge mechanism for data with the same primary key.

String

No

deduplicate

Values:

  • deduplicate: Only the latest record is retained.

  • partial-update: Update existing data with the same primary key using non-null columns from the latest data. Other columns remain unchanged.

  • aggregation: Pre-aggregate data using a specified aggregate function.

For a detailed analysis of the data merge mechanism, see Data merge mechanism.

partial-update.ignore-delete

Ignore delete (-D) messages.

Boolean

No

false

Values:

  • true: Delete messages are ignored.

  • false: Delete messages are not ignored. You must set how the Sink handles delete data using sequence.field or other configuration items. Otherwise, IllegalStateException or IllegalArgumentException errors might occur.

Note
  • In Realtime Compute for Apache Flink VVR 8.0.6 and earlier, this parameter takes effect only in partial update scenarios when merge-engine = partial-update.

  • In Realtime Compute for Apache Flink VVR 8.0.7 and later, this parameter is compatible with non-partial update scenarios and has the same function as the ignore-delete parameter. Replace it with ignore-delete.

  • Based on your business scenario, determine if delete data is expected. Enable this parameter as needed. If the job semantics represented by delete data are not expected, reporting an error is a more appropriate choice.

ignore-delete

Ignore delete (-D) messages.

Boolean

No

false

Values are the same as partial-update.ignore-delete.

Note
  • Only Realtime Compute for Apache Flink VVR 8.0.7 and later versions support this parameter.

  • This parameter has the same function as partial-update.ignore-delete. Use the ignore-delete parameter and avoid configuring both parameters simultaneously.

partition.default-name

Default partition name.

String

No

__DEFAULT_PARTITION__

If the partition key column value is null or an empty string, this default name is used as the partition name.

partition.expiration-check-interval

How often to check for partition expiration.

String

No

1h

For more information, see Configure automatic partition expiration.

partition.expiration-time

Partition expiration duration.

String

No

None

When a partition's lifespan exceeds this value, the partition expires. By default, partitions never expire.

A partition's lifespan is calculated from its partition value. For more information, see Configure automatic partition expiration.

partition.timestamp-formatter

Format string to convert a time string to a timestamp.

String

No

None

Set the format to extract the partition's lifespan from the partition value. For more information, see Configure automatic partition expiration.

partition.timestamp-pattern

Format string to convert a partition value to a time string.

String

No

None

Set the format to extract the partition's lifespan from the partition value. For more information, see Configure automatic partition expiration.

scan.bounded.watermark

When the watermark of data generated by the Paimon source table exceeds this value, the Paimon source table stops generating data.

Long

No

None

None.

scan.mode

Specify the consumer offset for the Paimon source table.

String

No

default

For more information, see Specify consumer offset for a Paimon source table.

scan.snapshot-id

Specify the snapshot from which the Paimon source table starts consuming data.

Integer

No

None

For more information, see Specify consumer offset for a Paimon source table.

scan.timestamp-millis

Specify the timestamp from which the Paimon source table starts consuming data.

Integer

No

None

For more information, see Specify consumer offset for a Paimon source table.

snapshot.num-retained.max

Maximum number of latest snapshots to retain.

Integer

No

2147483647

Snapshot expiration is triggered if this configuration or snapshot.time-retained is met, and snapshot.num-retained.min is also met.

snapshot.num-retained.min

Minimum number of latest snapshots to retain.

Integer

No

10

None.

snapshot.time-retained

How long after creation a snapshot expires.

String

No

1h

Snapshot expiration is triggered if this configuration or snapshot.num-retained.max is met, and snapshot.num-retained.min is also met.

write-mode

Paimon table write mode.

String

No

change-log

Values:

  • change-log: The Paimon table supports inserting, deleting, and updating data based on the primary key.

  • append-only: The Paimon table supports only data insertion and does not support primary keys. This mode is more efficient than change-log mode.

For a detailed introduction to write modes, see Write modes.

scan.infer-parallelism

Automatically infer the parallelism of the Paimon source table.

Boolean

No

true

Values:

  • true: Automatically infer the parallelism of the Paimon source table based on the number of buckets.

  • false: Use the default parallelism configured in VVP. If in expert mode, use the parallelism configured by the user.

scan.parallelism

Parallelism of the Paimon source table.

Integer

No

None

Note

On the Deployment Details > Resource Configuration tab of the job, this parameter does not take effect when the Resource Mode is set to Expert mode.

sink.parallelism

Parallelism of the Paimon sink table.

Integer

No

None

Note

In the job Deployment Details > Resource Configuration tab, this parameter does not take effect when the Resource Allocation mode is set to Expert mode.

sink.clustering.by-columns

Specify clustering columns for data written to the Paimon sink table.

String

No

None

For Paimon Append-Only tables (non-primary key tables), configure this parameter in batch jobs to enable clustering writes. This clusters data by size range in specific columns, improving table query speed.

Separate multiple column names with commas (,). For example, 'col1,col2'.

For clustering details, see the Apache Paimon official documentation.

sink.delete-strategy​

Set the validation policy to ensure the system correctly handles retraction (-D/-U) messages.

​​

Enum

No

NONE

Validation policy values and how the Sink operator should handle retraction messages:​

  • ​NONE (default): No validation is performed. ​

  • IGNORE_DELETE: The Sink operator should ignore -U and -D messages, preventing retraction.

  • NON_PK_FIELD_TO_NULL: The Sink operator should ignore -U messages. When receiving -D messages, it should retain primary key values and retract other non-primary key values in the schema.

    Use this primarily for partial update scenarios where multiple Sinks write to the same table. ​

  • DELETE_ROW_ON_PK: The Sink operator should ignore -U messages. When receiving -D messages, it should delete the row corresponding to the primary key. ​

  • CHANGELOG_STANDARD: The Sink operator should delete the row corresponding to the primary key when receiving both -U and -D messages. ​

Note
  • Only Realtime Compute for Apache Flink VVR 8.0.8 and later versions support this parameter.

  • The Paimon Sink's behavior for handling retraction messages is determined by other configuration items such as ignore-delete and merge-engine. This configuration item does not directly affect this behavior. Instead, it validates whether this behavior aligns with the expected policy. If the behavior does not meet the expected policy, the validation steps terminate, and the job error prompts you on how to modify other configuration items like ignore-delete and merge-engine to meet expectations.

Note

For more information about configuration items, see the Apache Paimon official documentation.

Feature details

Data freshness and consistency assurance

A Paimon sink table uses the two-phase commit (2PC) protocol. It commits written data during the checkpoint of each Flink job. Therefore, the data freshness is equivalent to the checkpoint interval of the Flink job. Each commit generates a maximum of two snapshots.

If two Flink jobs write data to a Paimon table concurrently, serializable consistency is guaranteed if the data is written to different buckets. If the data is written to the same bucket, only snapshot isolation consistency is guaranteed. This means that the data in the table might contain mixed results from both jobs, but no data is lost.

Data merge mechanism

When a Paimon sink table receives multiple data records that have the same primary key, it merges them into a single record to maintain the uniqueness of the primary key. You can specify the data merge behavior using the merge-engine parameter. The following table describes the data merge mechanisms.

Merge mechanism

Details

Deduplicate

Deduplicate is the default data merge mechanism. For multiple data records with the same primary key, the Paimon sink table retains only the latest record and discards others.

Note

If the latest record is a delete message, all data with that primary key is discarded.

Partial Update

Use the partial-update mechanism to incrementally update data with multiple messages, ultimately achieving complete data. Specifically, new data with the same primary key overwrites existing data, but columns with null values do not overwrite existing data.

For example, the Paimon sink table receives the following data records in sequence:

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

If the first column is the primary key, the final result is <1, 25.2, 10, 'This is a book'>.

Note
  • To stream read partial-update results, set the changelog-producer parameter to lookup or full-compaction.

  • Partial-update cannot process delete messages. Set the partial-update.ignore-delete parameter to ignore delete messages.

Aggregation

In some scenarios, you might only need aggregated values. The aggregation mechanism aggregates data with the same primary key using a specified aggregate function. For each non-primary key column, specify an aggregate function using fields.<field-name>.aggregate-function. Otherwise, the column defaults to the last_non_null_value aggregate function. For example, consider the following Paimon table definition:

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

The price column aggregates using the max function, and the sales column aggregates using the sum function. Given two input data records <1, 23.0, 15> and <1, 30.2, 20>, the final result is <1, 30.2, 35>. Supported aggregate functions and their corresponding data types:

  • sum: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE.

  • min and max: DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ.

  • last_value and last_non_null_value support all data types.

  • listagg: Supports STRING.

  • bool_and and bool_or: BOOLEAN.

Note
  • Only the sum function supports data retraction and deletion. Other aggregate functions do not. If you need certain columns to ignore retraction and deletion messages, set 'fields.${field_name}.ignore-retract'='true'.

  • To stream read aggregation results, set the changelog-producer parameter to lookup or full-compaction.

Incremental data generation mechanism

You can set the changelog-producer parameter to configure the incremental data generation mechanism. Paimon generates complete incremental data (all update_after data has corresponding update_before data) for any input data stream. The following list describes all incremental data generation mechanisms. For more information, see the Apache Paimon official documentation.

Mechanism

Details

None

When you set changelog-producer to none (default), the downstream Paimon source table only sees the latest data for a given primary key. This latest data does not provide downstream consumers with complete incremental data for correct calculations. It only shows if data was deleted or what the latest data is, not what the data was before the change.

For example, if a downstream consumer needs to calculate the sum of a column and only sees the latest data as 5, it cannot determine how to update the sum. If the previous data was 4, it should increase the sum by 1. If the previous data was 6, it should decrease the sum by 1. Such consumers are sensitive to update_before data. Do not configure the incremental data generation mechanism to None for these consumers. However, other incremental data generation mechanisms incur performance overhead.

Note

If your downstream consumer, such as a database, is not sensitive to update_before data, you can configure the incremental data generation mechanism to None. Configure the incremental data generation mechanism as needed.

Input

When you set changelog-producer to input, the Paimon sink table dual-writes the input data stream to an incremental data file as incremental data.

Therefore, use this incremental data generation mechanism only when the input data stream itself is complete incremental data (such as CDC data).

Lookup

When you set changelog-producer to lookup, the Paimon sink table uses a point query mechanism, similar to a dimension table, to generate complete incremental data for the current snapshot before each commit snapshot. This mechanism generates complete incremental data regardless of whether the input data is complete incremental data.

Compared to the Full Compaction mechanism described below, the Lookup mechanism generates incremental data with better timeliness but consumes more resources overall.

Use it when high freshness of incremental data is required (such as minute-level freshness).

Full Compaction

When you set changelog-producer to full-compaction, the Paimon sink table generates complete incremental data during each full compaction. This mechanism generates complete incremental data regardless of whether the input data is complete incremental data. The full compaction interval is specified by the full-compaction.delta-commits parameter.

Compared to the Lookup mechanism described above, the Full Compaction mechanism generates incremental data with less timeliness. However, it leverages the data's full compaction process, incurring no additional computation, thus consuming fewer resources overall.

Use it when high freshness of incremental data is not required (such as hour-level freshness).

Write modes

Paimon tables support the following write modes:

Mode

Details

Change-log

Change-log is the default write mode for Paimon tables. This mode supports inserting, deleting, and updating data based on the primary key. You can also use the data merge mechanism and incremental data generation mechanism mentioned above in this write mode.

Append-only

Append-only write mode supports only data insertion and does not support primary keys. This mode is more efficient than change-log mode. Use it as a message queue alternative in scenarios where data freshness requirements are moderate (such as minute-level freshness).

For a detailed introduction to append-only write mode, see the Apache Paimon official documentation. When using append-only write mode, note the following:

  • Set the bucket-key parameter as needed. Otherwise, the Paimon table buckets data based on all column values, leading to lower computational efficiency.

  • Append-only write mode ensures data output order to some extent. The specific output order is:

    1. If two data records come from different partitions, and the scan.plan-sort-partition parameter is set, data with smaller partition values is output first. Otherwise, data from earlier created partitions is output first.

    2. If two data records come from the same bucket within the same partition, earlier written data is output first.

    3. If two data records come from different buckets within the same partition, the output order is not guaranteed because different buckets are processed by different concurrent tasks.

As CTAS and CDAS target

Paimon tables support real-time synchronization of data at the single-table or full-database level. If the schema of the upstream table changes during synchronization, the change is also synchronized to the Paimon table in real time. For more information, see Manage Paimon tables and Manage Paimon Catalogs.

Data ingestion

You can use the Paimon connector for data ingestion in YAML job development to write data to the destination.

Syntax

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

Configuration items

Parameter

Description

Required

Data type

Default value

Remarks

type

Connector type.

Yes

STRING

None

The value is fixed to paimon.

name

Target end name.

No

STRING

None

Sink name.

catalog.properties.metastore

Paimon Catalog type.

No

STRING

filesystem

Values:

  • filesystem (default)

  • rest (supports only DLF, not DLF-Legacy)

catalog.properties.*

Parameters for creating a Paimon Catalog.

No

STRING

None

For more information, see Manage Paimon Catalogs.

table.properties.*

Parameters for creating a Paimon table.

No

STRING

None

For more information, see Paimon table options.

catalog.properties.warehouse

Root directory for file storage.

No

STRING

None

This takes effect only when catalog.properties.metastore is set to filesystem.

commit.user-prefix

Username prefix when committing data files.

No

STRING

None

Note

Set different usernames for different jobs. This helps locate conflicting jobs during commit conflicts.

partition.key

Partition field for each partitioned table.

No

STRING

None

Separate different tables with ;. Separate different fields with ,. Separate tables and fields with :. For example: testdb.table1:id1,id2;testdb.table2:name.

sink.cross-partition-upsert.tables

Specify tables that require cross-partition upsert (primary key does not include all partition fields).

No

STRING

None

Applies to tables with cross-partition upserts.

  • Format: Separate table names with semicolons ;.

  • Performance recommendation: This operation consumes significant compute resources. Create separate jobs for these tables.

Important
  • List all eligible tables. Omitting table names leads to data duplication.

sink.commit.parallelism

Specify the parallelism of the Commit operator.

No

INTEGER

None

If the Commit operator becomes a bottleneck, specify its parallelism using this parameter to improve performance.

Only Realtime Compute for Apache Flink VVR 11.6 and later versions support this parameter.

Note

Setting this parameter changes operator parallelism. When restarting a stateful job, specify to allow ignoring partial operator states (AllowNonRestoredState).

Usage examples

When you use Paimon as a data ingestion target, you can refer to the following examples for configuration based on the Paimon Catalog type.

  • If the Paimon Catalog is a filesystem, see the following configuration example for writing data to Alibaba Cloud OSS:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: filesystem
      catalog.properties.warehouse: oss://default/test
      catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
      catalog.properties.fs.oss.accessKeyId: xxxxxxxx
      catalog.properties.fs.oss.accessKeySecret: xxxxxxxx

    For information about the parameters that are prefixed with catalog.properties, see Create a Paimon Filesystem Catalog.

  • If the Paimon Catalog is rest, see the following configuration example for writing data to Alibaba Cloud Data Lake Formation:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: rest
      catalog.properties.uri: dlf_uri
      catalog.properties.warehouse: your_warehouse
      catalog.properties.token.provider: dlf
      # (Optional) Enable deletion vectors to improve read performance
      table.properties.deletion-vectors.enabled: true

    For information about the parameters that are prefixed with catalog.properties, see Flink CDC Catalog configuration parameters.

Schema evolution

Currently, Paimon supports the following schema evolution events as a data ingestion target:

  • CREATE TABLE EVENT

  • ADD COLUMN EVENT

  • ALTER COLUMN TYPE EVENT (Modifying primary key column types is not supported.)

  • RENAME COLUMN EVENT

  • DROP COLUMN EVENT

  • TRUNCATE TABLE EVENT

  • DROP TABLE EVENT

Note

If the downstream Paimon table already exists, the system uses the existing table schema for writes and does not attempt to recreate the table.

FAQ