Running Spark queries on data that Flink is actively writing typically requires pausing the write stream or managing separate metadata stores. This tutorial shows you how to eliminate that constraint: Realtime Compute for Apache Flink writes streaming data continuously into an Apache Paimon table stored in Object Storage Service (OSS), while EMR Serverless Spark reads from the same table for interactive queries and scheduled compaction — all without interrupting the Flink write stream.
The two engines share metadata through Data Lake Formation (DLF). Flink and Spark each connect to the same Paimon catalog through DLF, so you can query data while Flink is still writing, with no manual synchronization required.
How it works
Realtime Compute for Apache Flink EMR Serverless Spark
───────────────────────────────── ────────────────────
DataGen source (100 rows/s) Interactive queries
│ Compaction workflows
▼
Paimon Catalog (DLF metastore)
│
▼
OSS (Paimon table storage)
Flink handles stream writes. Spark handles batch reads and compaction. DLF is the shared integration point — Flink and Spark each register the same Paimon catalog through DLF, so both engines see consistent metadata without any coordination overhead. This is why you can query data while Flink is writing.
Prerequisites
Before you begin, ensure that you have:
-
An active Realtime Compute for Apache Flink workspace (Ververica Runtime (VVR) 8.0.4 or later required)
-
An EMR Serverless Spark workspace
-
A Data Lake Formation (DLF) data catalog, with its catalog ID, region, and endpoint
-
An AccessKey ID and AccessKey secret with permissions to access DLF. See Create an AccessKey
-
An OSS path to use as the Paimon warehouse location
Step 1: Set up Paimon in Realtime Compute for Apache Flink
Create a Paimon catalog
A Paimon catalog manages all Paimon tables under a single warehouse directory and exposes them to multiple compute engines through DLF. For background on catalogs, see Manage Paimon Catalogs.
-
Log on to the Realtime Compute for Apache Flink console.
-
Click Console in the Actions column of your workspace.
-
In the navigation pane, choose Development > Scripts.
-
Create a query script and paste the following SQL, then replace each placeholder with your actual values:
Parameter Description Notes typeCatalog type Must be paimonmetastoreMetadata backend Set to dlfto share metadata across engineswarehouseOSS path for table data For example, oss://my-bucket/paimon-warehousedlf.catalog.idDLF catalog ID Find this in the Data Lake Formation console dlf.catalog.accessKeyIdAccessKey ID for DLF access dlf.catalog.accessKeySecretAccessKey secret for DLF access dlf.catalog.endpointDLF service endpoint Use the VPC endpoint when Flink and DLF are in the same region; otherwise use the public endpoint. See Regions and endpoints dlf.catalog.regionRegion where DLF is deployed Must match the region in dlf.catalog.endpointCREATE CATALOG `paimon` WITH ( 'type' = 'paimon', 'metastore' = 'dlf', 'warehouse' = '<warehouse>', 'dlf.catalog.id' = '<dlf.catalog.id>', 'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>', 'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>', 'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>', 'dlf.catalog.region' = '<dlf.catalog.region>' ); -
In the lower-right corner, click Execution Environment and select a session cluster running VVR 8.0.4 or later. If you don't have one, create one first. See Create a session cluster.
-
Select the SQL statement and click Run.
Create a Paimon table
In the Scripts editor, run the following SQL to create the target table. The write-only = true option optimizes the table for high-throughput streaming writes by disabling read-time merge overhead.
CREATE TABLE IF NOT EXISTS `paimon`.`test_paimon_db`.`test_append_tbl`
(
id STRING,
data STRING,
category INT,
ts STRING,
dt STRING,
hh STRING
) PARTITIONED BY (dt, hh)
WITH (
'write-only' = 'true'
);
Create and start a stream job
-
In the navigation pane, choose Development > ETL.
-
Click New, configure the job in the New Blank Stream Draft dialog, and click Create:
Parameter Description Name A unique name within the current project Engine version The Flink engine version. See Engine versions -
In the job editor, paste the following code. It uses the DataGen connector to generate 100 rows per second with randomized
categoryvalues (1-10) and inserts them into the Paimon table:CREATE TEMPORARY TABLE datagen ( id STRING, data STRING, category INT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.category.kind' = 'random', 'fields.category.min' = '1', 'fields.category.max' = '10' ); INSERT INTO `paimon`.`test_paimon_db`.`test_append_tbl` SELECT id, data, category, CAST(LOCALTIMESTAMP AS STRING) AS ts, CAST(CURRENT_DATE AS STRING) AS dt, CAST(HOUR(LOCALTIMESTAMP) AS STRING) AS hh FROM datagen; -
Click Deploy to publish the job to the production environment.
-
On the Deployments page, start the job. See Start a job.
The job is now running and writing data to the Paimon table continuously.
Step 2: Create an SQL session in EMR Serverless Spark
An SQL session maintains a long-running Spark context for interactive SQL queries, so each query runs immediately without waiting for cluster initialization. For more details, see Session Manager.
-
Log on to the EMR console.
-
In the navigation pane, choose EMR Serverless > Spark.
-
Click the name of your workspace.
-
In the navigation pane, choose Operation Center > Sessions.
-
On the SQL Sessions tab, click Create SQL Session.
-
Configure the session, leave other parameters at their defaults, and click Create: Enter the following Spark configuration to connect the session to the Paimon catalog in DLF:
Parameter Value Name A descriptive name, such as paimon_computeSpark configuration See the configuration block below spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf spark.sql.catalog.paimon.warehouse <warehouse> spark.sql.catalog.paimon.dlf.catalog.id <dlf.catalog.id>Replace
<warehouse>and<dlf.catalog.id>with the same values you used when creating the Flink catalog. -
In the Actions column, click Start.
Step 3: Query and compact the Paimon table
EMR Serverless Spark supports two modes for working with Paimon data:
-
Interactive query: run SQL statements directly in the editor for fast iteration and debugging
-
Task orchestration: publish jobs into scheduled workflows for production automation
Start with interactive query to verify data is landing correctly. Once your query logic is validated, switch to task orchestration to automate compaction for production.
Interactive query
-
In the navigation pane, choose Development.
-
On the Development tab, click New.
-
In the dialog, set Name to
paimon_compact, set Type to SparkSQL, and click OK. -
In the upper-right corner, select the data catalog, database, and the SQL session you started in Step 2.
-
Run any of the following queries to inspect the live data: Query the first 10 rows:
SELECT * FROM paimon.test_paimon_db.test_append_tbl LIMIT 10;Count rows in a specific partition:
SELECT COUNT(*) FROM paimon.test_paimon_db.test_append_tbl WHERE dt = '2024-06-24' AND hh = '19';

-
Click Run. Results appear on the Execution Results tab. If errors occur, check the Run Issues tab.
-
After confirming the results look correct, click Publish in the upper-right corner. Add a release description and click OK.
Task orchestration
Streaming writes produce many small files over time. Compaction merges them into larger files, which reduces read latency and storage overhead. Use task orchestration to schedule compaction automatically for production workloads.
Check files before compaction
On the Development tab, create a Spark SQL job to query the Paimon $files system table. This gives you a baseline to compare after compaction. For instructions on creating a job, see Develop Spark SQL tasks.
SELECT file_path, record_count, file_size_in_bytes
FROM paimon.test_paimon_db.test_append_tbl$files
WHERE partition = '[2024-06-24, 19]';
Note the number of files and individual file sizes. After compaction, you should see fewer files with higher record counts per file and larger individual file sizes.
Write and publish the compaction job
Create another Spark SQL job (for example, paimon_compact) with the following compaction call:
CALL paimon.sys.compact (
table => 'test_paimon_db.test_append_tbl',
partitions => 'dt="2024-06-24",hh="19"',
order_strategy => 'zorder',
order_by => 'category'
);
The order_strategy and order_by parameters control how data is sorted during compaction:
| Parameter | Value used | When to use it |
|---|---|---|
order_strategy |
zorder |
Use zorder when queries filter on multiple columns simultaneously (multi-dimensional queries). Use order for single-column sort optimization. |
order_by |
category |
The column used for sort ordering. Choose a column that appears frequently in query filters. In this example, category is a common filter field. |
Publish the job after writing it.
Create a workflow
-
In the navigation pane, choose Task Orchestration.
-
Click Create Workflow.
-
In the Create Workflow panel, enter a Workflow name (for example,
paimon_workflow_task) and click Next. Configure any additional settings in Other Settings. See Manage workflows. -
On the workflow canvas, click Add Node.
-
From the Source File Path list, select the published
paimon_compactjob. Enter the Spark configuration below and click Save:spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf spark.sql.catalog.paimon.warehouse <warehouse> spark.sql.catalog.paimon.dlf.catalog.id <dlf.catalog.id> -
Click Publish Workflow, then click OK.
Run and verify
-
On the Task Orchestration page, click the workflow name (
paimon_workflow_task). -
On the Workflow Instance List page, click Run Manually.
-
In the Trigger Run dialog, click OK.
-
After the workflow completes, run the same
$filesquery again to verify the compaction result:SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition = '[2024-06-24, 19]';Successful compaction produces fewer files with higher record counts per file and larger individual file sizes compared to the pre-compaction baseline. If the file count and sizes remain unchanged, check the workflow run logs for errors.

What's next
-
Schedule the compaction workflow to run periodically so small files are merged automatically. See Manage workflows.
-
Explore additional Paimon system tables (such as
$snapshotsand$manifests) to monitor table health. See Apache Paimon documentation. -
Learn more about EMR Serverless Spark SQL sessions in Session Manager.