E-MapReduce (EMR) lets you use the Flink SQL client to read data from and write data to Paimon. Configure dependencies, create a catalog, stream data in and out of Paimon tables, run an online analytical processing (OLAP) query, and clean up resources.
Prerequisites
Before you begin, make sure you have:
A Dataflow cluster or custom cluster with the Flink and Paimon services installed. For more information, see Create a cluster
For a Hive catalog: a custom cluster that includes the Flink, Paimon, and Hive services, with Metadata set to Self-managed RDS or Built-in MySQL during cluster creation
For a Data Lake Formation (DLF) catalog: a cluster with Metadata set to DLF Unified Metadata during cluster creation
Limitations
The following table summarizes EMR version support for Paimon and Flink integration:
| EMR version | Filesystem catalog | Hive catalog | DLF catalog |
|---|---|---|---|
| V3.46.0 | Supported | Not supported | Not supported |
| V3.46.1–V3.50.X | Supported | Supported | Supported |
| V5.12.0–V5.16.X | Supported | Supported | Supported |
| V3.51.X or later | Configure based on requirements | Configure based on requirements | Configure based on requirements |
| V5.17.X or later | Configure based on requirements | Configure based on requirements | Configure based on requirements |
How it works
Paimon stores data and metadata in a distributed file system such as Hadoop Distributed File System (HDFS) or an object storage system such as Object Storage Service (OSS)-HDFS. The root path is specified by the warehouse parameter. If the path does not exist, it is created automatically. If the path already exists, the catalog uses it to access existing tables.
Paimon supports three catalog types:
| Catalog type | Metadata storage | Use case |
|---|---|---|
| Filesystem | File system or object storage | No external metadata service required |
| Hive | Hive Metastore | Allows Hive to query Paimon tables directly |
| DLF | DLF | Allows DLF-integrated services to access Paimon data |
Step 1: Configure dependencies
Copy the required JAR files to the Flink library directory based on the catalog type you plan to use.
Filesystem catalog
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/Hive catalog
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/DLF catalog
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/PAIMON/paimon-current/lib/jackson/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/METASTORE/metastore-*/hive2/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/Step 2: Start the EMR cluster
This example uses YARN session mode. For other deployment modes, see Basic usage.
Start a detached YARN session:
yarn-session.sh --detachedStep 3: Create a catalog
Create a filesystem catalog
A filesystem catalog stores metadata and table files in a file system or object storage. No external metadata service is needed.
Start the Flink SQL client:
sql-client.shCreate a filesystem catalog. Replace
<yourBucketName>with your OSS bucket name:CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'filesystem', 'warehouse' = 'oss://<yourBucketName>/warehouse' );
Create a Hive catalog
A Hive catalog syncs metadata to Hive Metastore, making Paimon tables directly queryable from Hive. For details on querying Paimon data from Hive, see Integrate Paimon with Hive.
Start the Flink SQL client:
The startup command is the same regardless of your Hive version.
sql-client.shCreate a Hive catalog. The
uriparameter specifies the Hive Metastore address. Replace<yourBucketName>with your OSS bucket name:CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://master-1-1:9083', -- The address of Hive Metastore. 'warehouse' = 'oss://<yourBucketName>/warehouse' );
Create a DLF catalog
A DLF catalog syncs metadata to DLF, enabling other DLF-integrated services to access Paimon data.
When creating the EMR cluster, set Metadata to DLF Unified Metadata.
Start the Flink SQL client:
The startup command is the same regardless of your Hive version.
sql-client.shCreate a DLF catalog. The
hive-conf-dirparameter points to the Flink configuration directory that contains DLF connection settings. Replace<yourBucketName>with your OSS bucket name:CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'dlf', 'hive-conf-dir' = '/etc/taihao-apps/flink-conf', 'warehouse' = 'oss://<yourBucketName>/warehouse' );
Step 4: Read and write data in streaming mode
The following SQL uses the Datagen connector to generate random data, writes it to a Paimon table in streaming mode with a 10-second checkpoint interval, then reads aggregated results from the same table.
-- Set the execution mode to streaming.
SET 'execution.runtime-mode' = 'streaming';
-- Set a checkpoint interval for Paimon.
SET 'execution.checkpointing.interval' = '10s';
-- Use the catalog created in the previous step.
USE CATALOG test_catalog;
-- Create a test database.
CREATE DATABASE test_db;
USE test_db;
-- Create a Datagen source table that generates random data.
CREATE TEMPORARY TABLE datagen_source (
uuid int,
kind int,
price int
) WITH (
'connector' = 'datagen',
'fields.kind.min' = '0',
'fields.kind.max' = '9',
'rows-per-second' = '10'
);
-- Create a Paimon table with a primary key.
CREATE TABLE test_tbl (
uuid int,
kind int,
price int,
PRIMARY KEY (uuid) NOT ENFORCED
);
-- Write data from the Datagen source to the Paimon table.
INSERT INTO test_tbl SELECT * FROM datagen_source;
-- Read aggregated data from the Paimon table (runs concurrently with the write job).
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;Step 5: Run an OLAP query
Switch to batch mode to run a one-time analytical query against the Paimon table. The tableau result mode displays results inline in the Flink SQL client.
-- Reset the checkpoint interval and switch to batch mode.
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
-- Display query results in the terminal.
SET 'sql-client.execution.result-mode' = 'tableau';
-- Query aggregated data from the Paimon table.
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;Step 6: Clean up resources
Stop the write job before dropping the table to prevent a resource leak.
Drop the Paimon table:
DROP TABLE test_tbl;What's next
Integrate Paimon with Hive — query Paimon tables directly from Hive using the Hive catalog
Apache Paimon Quick Start — configure dependencies for EMR V3.51.X or later and EMR V5.17.X or later