This tutorial walks you through the core Apache Paimon operations in Realtime Compute for Apache Flink: creating a catalog, building a table, writing data, consuming changes in streaming mode, and cleaning up resources.
What you will learn:
Step | Operation | Where |
1 | Create a Paimon catalog backed by OSS | Scripts tab |
2 | Create a partitioned table with a primary key | Scripts tab |
3 | Write sample data through a streaming job | ETL > Drafts |
4 | Consume data in streaming mode | ETL > Drafts |
5 | Update existing rows and observe changes | ETL > Drafts |
6 | Cancel jobs and clean up resources | Scripts tab |
Steps 1 and 2 use the Scripts tab for one-time DDL operations (catalog and table management). Steps 3 through 5 use streaming drafts under ETL > Drafts for data processing jobs that go through a deploy-and-start lifecycle.
Prerequisites
Before you begin, make sure you have:
A Realtime Compute for Apache Flink workspace. For details, see Activate Realtime Compute for Apache Flink
An Object Storage Service (OSS) bucket with the Standard storage class. For details, see Get started by using the OSS console
Ververica Runtime (VVR) 8.0.5 or later, which adds Apache Paimon support
(If applicable) Permissions for your RAM user or RAM role to access the Realtime Compute for Apache Flink console. For details, see Permission management
Step 1: Create an Apache Paimon catalog
A catalog connects Realtime Compute for Apache Flink to your OSS warehouse directory, where all Paimon databases and tables are stored.
Log on to the Realtime Compute for Apache Flink console.
Find the target workspace and click Console in the Actions column.
In the left-side navigation pane, choose Development > Scripts. Create a script on the Scripts tab.
Enter the following SQL in the script editor: Replace the placeholders with your actual values:
NoteTo store the Apache Paimon table in OSS-HDFS, configure
fs.oss.endpoint,fs.oss.accessKeyId, andfs.oss.accessKeySecret. Set the endpoint in the formatcn-<region>.oss-dls.aliyuncs.com, for examplecn-hangzhou.oss-dls.aliyuncs.com.Parameter
Required
Description
typeYes
Catalog type. Set to
paimon.metastoreYes
Metadata storage type. This example uses
filesystem. For other types, see Manage Apache Paimon catalogs.warehouseYes
OSS path for the data warehouse directory, in the format
oss://<bucket>/<object>. Find the bucket name and object path in the OSS console.fs.oss.endpointConditional
OSS endpoint. Required when the OSS bucket is in a different region from the workspace, or belongs to a different Alibaba Cloud account. For endpoint values, see Regions and endpoints.
fs.oss.accessKeyIdConditional
AccessKey ID with read and write permissions on OSS. Required in the same scenarios as
fs.oss.endpoint. For details, see Create an AccessKey pair.fs.oss.accessKeySecretConditional
AccessKey secret corresponding to the AccessKey ID above. Required in the same scenarios as
fs.oss.endpoint.CREATE Catalog `my-catalog` WITH ( 'type' = 'paimon', 'metastore' = 'filesystem', 'warehouse' = '<warehouse>', 'fs.oss.endpoint' = '<fs.oss.endpoint>', 'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>', 'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>' );Select the
CREATE Catalogstatement and click Run on the left side of the script editor. Expected output:The following statement has been executed successfully!
Step 2: Create an Apache Paimon table
In the same script editor on the Scripts tab, enter the following SQL to create a database named
my_dband a table namedmy_tbl: The table uses a composite primary key(dt, id)and is partitioned bydt. Thechangelog-produceris set tolookup, which enables downstream jobs to consume changes in streaming mode. For details, see Change data generation mechanism.CREATE DATABASE `my-catalog`.`my_db`; CREATE TABLE `my-catalog`.`my_db`.`my_tbl` ( dt STRING, id BIGINT, content STRING, PRIMARY KEY (dt, id) NOT ENFORCED ) PARTITIONED BY (dt) WITH ( 'changelog-producer' = 'lookup' );Select both statements and click Run. Expected output:
The following statement has been executed successfully!
Step 3: Write data to the Apache Paimon table
Writing data requires a streaming job. Create the job under ETL > Drafts.
In the left-side navigation pane, choose Development > ETL. On the Drafts tab, click New. On the SQL Scripts tab of the New Draft dialog box, click Blank Stream Draft. For details, see Develop an SQL draft.
Copy the following SQL into the SQL editor:
-- Paimon commits data only after each checkpoint completes. -- A 10-second interval lets you see results quickly in this tutorial. -- In production, set the interval to 1 to 10 minutes based on your latency requirements. SET 'execution.checkpointing.interval' = '10s'; INSERT INTO `my-catalog`.`my_db`.`my_tbl` VALUES ('20240108', 1, 'apple'), ('20240108', 2, 'banana'), ('20240109', 1, 'cat'), ('20240109', 2, 'dog');In the upper-right corner of the SQL editor, click Deploy. In the Deploy draft dialog box, configure the parameters and click Confirm.
In the left-side navigation pane, choose O&M > Deployments. Find the deployment and click Start in the Actions column.
In the Start Job panel, select Initial Mode and click Start.
Wait for the deployment status to change to FINISHED. This confirms that the data has been written.
Step 4: Consume data in streaming mode
This step creates a streaming job that reads all rows from the Paimon table and outputs them to the Task Manager logs through the Print connector.
Create a new Blank Stream Draft (under Development > ETL > Drafts) and enter the following SQL:
CREATE TEMPORARY TABLE Print ( dt STRING, id BIGINT, content STRING ) WITH ( 'connector' = 'print' ); INSERT INTO Print SELECT * FROM `my-catalog`.`my_db`.`my_tbl`;Click Deploy, configure the parameters in the Deploy draft dialog box, and click Confirm.
On the O&M > Deployments page, find the deployment and click Start in the Actions column. In the Start Job panel, select Initial Mode and click Start.
View the output: Expected output:
On the O&M > Deployments page, click the deployment name.
On the Logs tab, under Running Task Managers, click the value in the Path, ID column.
Click the Stdout tab.
+I[20240108, 1, apple] +I[20240108, 2, banana] +I[20240109, 1, cat] +I[20240109, 2, dog]
Step 5: Update data in the Apache Paimon table
Paimon tables with primary keys support upserts. When you insert a row with an existing primary key, the old value is replaced.
Create a new Blank Stream Draft and enter the following SQL: This replaces
applewithhellofor key(20240108, 1)anddogwithworldfor key(20240109, 2).SET 'execution.checkpointing.interval' = '10s'; INSERT INTO `my-catalog`.`my_db`.`my_tbl` VALUES ('20240108', 1, 'hello'), ('20240109', 2, 'world');Click Deploy, configure the parameters, and click Confirm.
On the O&M > Deployments page, find the deployment and click Start. Select Initial Mode and click Start.
Wait for the deployment status to change to FINISHED.
Go to the Stdout tab of the streaming consumption job started in Step 4. The updated rows appear in the log: Expected output: The
-Uprefix indicates the old value and+Uindicates the new value.-U[20240108, 1, apple] +U[20240108, 1, hello] -U[20240109, 2, dog] +U[20240109, 2, world]
(Optional) Step 6: Cancel the streaming job and clean up resources
After testing, cancel the streaming consumption job and remove the Paimon resources.
On the O&M > Deployments page, find the streaming consumption deployment and click Cancel in the Actions column.
Go to the Scripts tab. In the script editor, run the following SQL to delete the database and catalog: Expected output:
-- Delete the database and all associated data files in OSS DROP DATABASE `my-catalog`.`my_db` CASCADE; -- Remove the catalog metadata from Realtime Compute for Apache Flink -- Data files in OSS are not deleted by this statement DROP CATALOG `my-catalog`;The following statement has been executed successfully!
Next steps
Write data to or consume data from an Apache Paimon table: explore more data read and write patterns.
Modify the schema of an Apache Paimon table: add columns, change data types, or temporarily modify table parameters.
Optimize performance of Apache Paimon tables: tune primary key tables and Append Scalable tables for production workloads.
Troubleshoot upstream and downstream storage issues: resolve common Apache Paimon issues.