Apache Paimon is a unified lake storage format for streaming and batch processing. It supports high-throughput writes and low-latency queries. This topic describes how to use a Paimon Catalog and the MySQL connector to import order data and table schema changes from an ApsaraDB RDS for MySQL instance into a Paimon table. You will also use Flink to perform simple analysis on the Paimon table.
Background information
Apache Paimon is a unified lake storage format for streaming and batch processing that supports high-throughput writes and low-latency queries. Realtime Compute for Apache Flink and common compute engines on the open source big data platform E-MapReduce (EMR), such as Spark, Hive, or Trino, are well-integrated with Paimon. You can use Apache Paimon to quickly build a data lake storage service on Hadoop Distributed File System (HDFS) or Object Storage Service (OSS). You can then connect a compute engine to analyze the data in your data lake.
Prerequisites
If you use a Resource Access Management (RAM) user or RAM role, make sure that the user or role has the required permissions to access the Realtime Compute for Apache Flink console. For more information, see Permission management.
You have created a Realtime Compute for Apache Flink workspace. For more information, see Activate Realtime Compute for Apache Flink.
Step 1: Prepare the data source
Create an ApsaraDB RDS for MySQL instance and configure a database.
NoteThe ApsaraDB RDS for MySQL instance must be in the same virtual private cloud (VPC) as the Flink workspace. If the instance and workspace are not in the same VPC, see Network connectivity.
Create a database named orders and a privileged account or a standard account that has read and write permissions for the orders database.
Connect to the ApsaraDB RDS for MySQL instance and create the tables orders_1 and orders_2 in the orders database.
CREATE TABLE `orders_1` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) ); CREATE TABLE `orders_2` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) );Insert the following test data.
INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among '); INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot'); INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos'); INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro'); INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly'); INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia'); INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests '); INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep'); INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request'); INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe'); INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio'); INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu'); INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never'); INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re'); INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir'); INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin'); INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit'); INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate'); INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron'); INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');
Step 2: Create Catalogs
Go to the Data Management page.
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
Click Data Management.
Create a Paimon catalog.
Click Create Catalog. On the Built-in Catalog tab, select Apache Paimon and click Next.
Enter the configuration information.

Parameter
Description
Note:
catalog name
You can enter a custom Paimon Catalog name.
This topic uses paimon-catalog.
metastore
The type of metastore for Paimon tables.
filesystem: Stores metadata in OSS only.
dlf: Stores metadata in OSS and synchronizes it to Data Lake Formation (DLF).
This topic uses filesystem.
warehouse
The root directory of the Paimon catalog. This must be an OSS directory. You can use the OSS bucket that was created when you activated Realtime Compute for Apache Flink or another OSS bucket in the same region under your Alibaba Cloud account.
The format is oss://<bucket>/<object>, where:
bucket: The name of your OSS bucket.
object: The path to the directory where your data is stored.
You can view the bucket and object names in the OSS console.
fs.oss.endpoint
The connection endpoint for the OSS service.
If Flink and DLF are in the same region, use the VPC endpoint. Otherwise, use the public endpoint. For more information about how to obtain an endpoint, see Regions and endpoints.
fs.oss.accessKeyId
The AccessKey ID of the Alibaba Cloud account or RAM user that has read and write permissions for OSS.
If you do not have an AccessKey ID, see Create an AccessKey pair.
fs.oss.accessKeySecret
The AccessKey secret for the Alibaba Cloud account or RAM user.
This topic uses a variable for the AccessKey secret to prevent plaintext exposure. For more information, see Project variables.
Click OK.
Create a MySQL catalog.
Click Create Catalog. On the Built-in Catalog tab, select MySQL and click Next.
Enter the configuration information.

Parameter
Description
Note:
catalogname
The name of the MySQL catalog.
This topic uses mysql-catalog.
hostname
The IP address or hostname of the MySQL database.
This topic uses the private endpoint of the RDS instance.
port
The port of the MySQL database service.
The default value is 3306.
default-database
The name of the default database.
Enter the orders database created in Step 1: Prepare the data source.
username
The username used to connect to the MySQL database.
Enter your database username.
password
Specifies the password for the MySQL database service.
This topic uses a variable for the password to prevent plaintext exposure. For more information, see Project variables.
Click OK.
Step 3: Create a Flink job
Log on to the Realtime Compute development console and create a data ingestion job.
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
In the navigation pane on the left, click .
Click
, and then click Create Data Ingestion Draft. Enter a File Name and select a Database Engine Version.Job parameter
Description
Example
File Name
The name of the job.
NoteThe job name must be unique in the current project.
flink-test
Engine Version
The Flink engine version for the current job.
We recommend that you use versions with the Recommended or Stable tag. These versions offer higher reliability and better performance. For more information about engine versions, see Feature Release Notes and Introduction to Engine Versions.
vvr-11.5-jdk11-flink-1.20
Click Create.
Enter the following statements to capture real-time changes from the relevant tables in the orders database and synchronize them to the Paimon table.
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: 3306 username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: orders.orders_\d+ server-id: 8601-8604 # Optional. Synchronize data from newly created tables during the incremental phase. scan.binlog.newly-added-table.enabled: true # Optional. Synchronize table and field comments. include-comments.enabled: true # Optional. Prioritize distributing unbounded shards to prevent potential TaskManager OutOfMemory issues. scan.incremental.snapshot.unbounded-chunk-first.enabled: true # Optional. Enable parsing filters to accelerate reads. scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxx # Optional. Enable deletion vectors to improve read performance. table.properties.deletion-vectors.enabled: true # If you use a DLF Catalog, use the following configuration # sink: # type: paimon # catalog.properties.metastore: rest # catalog.properties.token.provider: dlf # catalog.properties.uri: dlf_uri # catalog.properties.warehouse: your_warehouse # table.properties.deletion-vectors.enabled: true # Captures MySQL tables whose names match the regular expression orders_\d+ and synchronizes changes to the orders table in the Paimon default database. route: - source-table: orders.orders_\d+ sink-table: default.ordersFor more information about how to configure data ingestion jobs, see Flink CDC data ingestion job development reference.
In the upper-right corner, click Deploy. Then, click OK.
In the navigation pane on the left, click . On the Deployments page, click the name of the target job to go to the deployment details page of the job.
In the upper-right corner of the Parameters section, click Edit.
To more quickly observe the results of the job run, change the values of the Checkpointing Interval and Min Interval Between Checkpoints parameters to 10 s, and then click Save.

At the top of the deployment details page of the target job, click Start, select Stateless Start, and then click Start.

Query the Paimon data.
On the page, on the Query Script tab, copy the following code into the query script editor.
select custkey, sum(total_price) from `paimon-catalog`.`default`.`orders` group by custkey;Select the target segment and click Run to the left of the code line.

Step 4: Update the MySQL table schema
This section demonstrates how to synchronize MySQL table schema changes to the Paimon table.
Log on to the ApsaraDB RDS console.
In the orders database, enter the following SQL statement and click Execute to add a column to the two tables and populate the column with data.
ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5; UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;On the Scripts tab of the page in the Realtime Compute for Apache Flink console, copy the following code into the script editor.
select * from `paimon-catalog`.`default`.`orders` where `quantity` is not null;Select the target segment and click Run to the left of the code line.

References
The Paimon connector for streaming data lakehouses is used with Paimon catalogs. For more information about its usage and features, see Paimon connector for streaming data lakehouses.
For more information about how to use Paimon catalogs, see Manage Paimon Catalogs.
For best practices on configuring data ingestion jobs in complex scenarios, see Best practices for Flink CDC data ingestion.