All Products
Search
Document Center

E-MapReduce:Integrate Paimon with Flink

Last Updated:Mar 26, 2026

E-MapReduce (EMR) lets you use the Flink SQL client to read data from and write data to Paimon. Configure dependencies, create a catalog, stream data in and out of Paimon tables, run an online analytical processing (OLAP) query, and clean up resources.

Prerequisites

Before you begin, make sure you have:

  • A Dataflow cluster or custom cluster with the Flink and Paimon services installed. For more information, see Create a cluster

  • For a Hive catalog: a custom cluster that includes the Flink, Paimon, and Hive services, with Metadata set to Self-managed RDS or Built-in MySQL during cluster creation

  • For a Data Lake Formation (DLF) catalog: a cluster with Metadata set to DLF Unified Metadata during cluster creation

Limitations

The following table summarizes EMR version support for Paimon and Flink integration:

EMR versionFilesystem catalogHive catalogDLF catalog
V3.46.0SupportedNot supportedNot supported
V3.46.1–V3.50.XSupportedSupportedSupported
V5.12.0–V5.16.XSupportedSupportedSupported
V3.51.X or laterConfigure based on requirementsConfigure based on requirementsConfigure based on requirements
V5.17.X or laterConfigure based on requirementsConfigure based on requirementsConfigure based on requirements
Note For EMR V3.51.X or later and EMR V5.17.X or later, configure dependencies based on your business requirements. See Quick Start in the Apache Paimon documentation.

How it works

Paimon stores data and metadata in a distributed file system such as Hadoop Distributed File System (HDFS) or an object storage system such as Object Storage Service (OSS)-HDFS. The root path is specified by the warehouse parameter. If the path does not exist, it is created automatically. If the path already exists, the catalog uses it to access existing tables.

Paimon supports three catalog types:

Catalog typeMetadata storageUse case
FilesystemFile system or object storageNo external metadata service required
HiveHive MetastoreAllows Hive to query Paimon tables directly
DLFDLFAllows DLF-integrated services to access Paimon data

Step 1: Configure dependencies

Copy the required JAR files to the Flink library directory based on the catalog type you plan to use.

Filesystem catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/

Hive catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/

DLF catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/PAIMON/paimon-current/lib/jackson/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/METASTORE/metastore-*/hive2/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/

Step 2: Start the EMR cluster

This example uses YARN session mode. For other deployment modes, see Basic usage.

Start a detached YARN session:

yarn-session.sh --detached

Step 3: Create a catalog

Create a filesystem catalog

A filesystem catalog stores metadata and table files in a file system or object storage. No external metadata service is needed.

  1. Start the Flink SQL client:

    sql-client.sh
  2. Create a filesystem catalog. Replace <yourBucketName> with your OSS bucket name:

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'filesystem',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

Create a Hive catalog

A Hive catalog syncs metadata to Hive Metastore, making Paimon tables directly queryable from Hive. For details on querying Paimon data from Hive, see Integrate Paimon with Hive.

  1. Start the Flink SQL client:

    The startup command is the same regardless of your Hive version.
    sql-client.sh
  2. Create a Hive catalog. The uri parameter specifies the Hive Metastore address. Replace <yourBucketName> with your OSS bucket name:

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'hive',
        'uri' = 'thrift://master-1-1:9083', -- The address of Hive Metastore.
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

Create a DLF catalog

A DLF catalog syncs metadata to DLF, enabling other DLF-integrated services to access Paimon data.

Important

When creating the EMR cluster, set Metadata to DLF Unified Metadata.

  1. Start the Flink SQL client:

    The startup command is the same regardless of your Hive version.
    sql-client.sh
  2. Create a DLF catalog. The hive-conf-dir parameter points to the Flink configuration directory that contains DLF connection settings. Replace <yourBucketName> with your OSS bucket name:

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'dlf',
        'hive-conf-dir' = '/etc/taihao-apps/flink-conf',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

Step 4: Read and write data in streaming mode

The following SQL uses the Datagen connector to generate random data, writes it to a Paimon table in streaming mode with a 10-second checkpoint interval, then reads aggregated results from the same table.

Note The write job and read job run concurrently. Make sure the cluster has enough task slots for both. If resources are insufficient, the read job will fail.
-- Set the execution mode to streaming.
SET 'execution.runtime-mode' = 'streaming';

-- Set a checkpoint interval for Paimon.
SET 'execution.checkpointing.interval' = '10s';

-- Use the catalog created in the previous step.
USE CATALOG test_catalog;

-- Create a test database.
CREATE DATABASE test_db;
USE test_db;

-- Create a Datagen source table that generates random data.
CREATE TEMPORARY TABLE datagen_source (
    uuid int,
    kind int,
    price int
) WITH (
    'connector' = 'datagen',
    'fields.kind.min' = '0',
    'fields.kind.max' = '9',
    'rows-per-second' = '10'
);

-- Create a Paimon table with a primary key.
CREATE TABLE test_tbl (
    uuid int,
    kind int,
    price int,
    PRIMARY KEY (uuid) NOT ENFORCED
);

-- Write data from the Datagen source to the Paimon table.
INSERT INTO test_tbl SELECT * FROM datagen_source;

-- Read aggregated data from the Paimon table (runs concurrently with the write job).
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

Step 5: Run an OLAP query

Switch to batch mode to run a one-time analytical query against the Paimon table. The tableau result mode displays results inline in the Flink SQL client.

-- Reset the checkpoint interval and switch to batch mode.
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- Display query results in the terminal.
SET 'sql-client.execution.result-mode' = 'tableau';

-- Query aggregated data from the Paimon table.
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

Step 6: Clean up resources

Important

Stop the write job before dropping the table to prevent a resource leak.

Drop the Paimon table:

DROP TABLE test_tbl;

What's next