Apache Paimon is a unified lake storage format for both streaming and batch processing that supports high-throughput writes and low-latency queries. This tutorial describes how to use a Paimon catalog and the MySQL connector to import order data and table schema changes from an ApsaraDB RDS for MySQL instance into a Paimon table. You can also learn how to use Flink to perform simple analysis on the Paimon table.
Background
Apache Paimon is a unified lake storage format for both streaming and batch processing that supports high-throughput writes and low-latency queries. Currently, Realtime Compute for Apache Flink and common compute engines on E-MapReduce, such as Spark, Hive, or Trino, integrate well with Paimon. You can use Apache Paimon to quickly build your own data lake storage service on HDFS or OSS and connect to a compute engine to analyze the data in your data lake.
Prerequisites
-
If you use a RAM user or RAM role to perform operations, ensure the RAM user or RAM role has the required permissions to access the Realtime Compute for Apache Flink console. For more information, see Permission management.
-
You have created a Flink workspace. For more information, see Activate Realtime Compute for Apache Flink.
Step 1: Prepare a data source
-
Create an ApsaraDB RDS for MySQL instance and configure a database.
NoteThe ApsaraDB RDS for MySQL instance must be in the same VPC as the Flink workspace. If the instance and the workspace are in different VPCs, see Network connectivity.
Create a database named orders and an account with permissions to read from and write to the orders database.
-
Connect to the ApsaraDB RDS for MySQL instance, and create the orders_1 and orders_2 tables in the orders database.
CREATE TABLE `orders_1` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) ); CREATE TABLE `orders_2` ( orderkey BIGINT NOT NULL, custkey BIGINT, order_status VARCHAR(100), total_price DOUBLE, order_date DATE, order_priority VARCHAR(100), clerk VARCHAR(100), ship_priority INT, comment VARCHAR(100), PRIMARY KEY (orderkey) ); -
Insert the following test data.
INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among '); INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot'); INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos'); INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro'); INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly'); INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia'); INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests '); INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep'); INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request'); INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe'); INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio'); INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu'); INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never'); INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re'); INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir'); INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin'); INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit'); INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate'); INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron'); INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');
Step 2: Create catalogs
-
Go to the Data Management page.
-
Log on to the Realtime Compute for Apache Flink console.
-
In the Actions column of the workspace that you want to manage, click Console.
-
Click Data Management.
-
-
Create a Paimon catalog.
-
Click Create Catalog. On the Built-in Catalog tab, select Apache Paimon and click Next.
-
Configure the parameters.
Parameter
Description
Example
catalog name
The name of the Paimon catalog.
paimon-catalog
metastore
The storage type of metadata about Paimon tables. Valid values:
-
filesystem: Metadata is stored only in OSS.
-
dlf: Metadata is stored in OSS and synchronized to Data Lake Formation (DLF).
This example uses filesystem.
warehouse
The root directory for storing the Paimon catalog. The directory must be an OSS directory. You can select the OSS bucket used to activate Realtime Compute for Apache Flink, or another OSS bucket that belongs to the same Alibaba Cloud account and is in the same region as the workspace.
The value must be in the oss://<bucket>/<object> format. In this format,
-
bucket: the name of the OSS bucket that you created.
-
object: the path to the directory where your data is stored.
You can view the names of your bucket and object in the OSS console.
fs.oss.endpoint
The endpoint of the OSS service.
If Flink and DLF are deployed in the same region, use a VPC endpoint. Otherwise, use a public endpoint. For more information, see Regions and endpoints.
fs.oss.accessKeyId
The AccessKey ID of the Alibaba Cloud account or RAM user with permissions to read from and write to OSS.
If you do not have an AccessKey ID, see Create an AccessKey pair.
fs.oss.accessKeySecret
The AccessKey secret of the Alibaba Cloud account or RAM user.
This example uses a namespace variable for the AccessKey Secret to prevent plaintext leaks. For more information, see Namespace variables.
-
-
Click OK.
-
-
Create a MySQL catalog.
-
Click Create Catalog. On the Built-in Catalog tab, select MySQL and click Next.
-
Configure the parameters.
Parameter
Description
Example
catalog name
The name of the MySQL catalog.
mysql-catalog
hostname
The IP address or hostname of the MySQL database.
In this example, enter the internal endpoint of the ApsaraDB RDS for MySQL instance.
port
The port number of the MySQL database service.
The default value is 3306.
default-database
The name of the default MySQL database.
In this example, enter the orders database that you created in Step 1: Prepare a data source.
username
The username for the MySQL database.
Enter the username of your database.
password
The password for the MySQL database.
In this example, a namespace variable is used as the password to prevent plaintext leaks. For more information, see Namespace variables.
-
Click OK.
-
Step 3: Create a Flink job
-
Go to the development console of Realtime Compute for Apache Flink and create a data ingestion job.
-
Log on to the Realtime Compute for Apache Flink console.
-
Click Console in the Actions column of the target workspace.
-
In the left-side navigation pane, choose .
-
Click the
icon, click New Data Ingestion Draft, specify File Name, and then select an Engine Version.Parameter
Description
Example
File name
The name of the job.
NoteThe job name must be unique within the project.
flink-test
Engine version
The Flink engine version for the job.
We recommend that you use a version with the Recommended or Stable tag. Versions with these tags provide higher reliability and better performance. For more information about engine versions, see Release notes and Engine versions.
vvr-11.5-jdk11-flink-1.20
-
Click Create.
-
-
Enter the following statements to capture changes to the specified tables in the orders database in real time and synchronize the changes to Paimon tables.
source: type: mysql name: MySQL Source hostname: ${secret_values.mysql.hostname} port: 3306 username: ${secret_values.mysql.username} password: ${secret_values.mysql.password} tables: orders.orders_\d+ server-id: 8601-8604 #(Optional) Synchronize data from newly added tables during the incremental phase. scan.binlog.newly-added-table.enabled: true #(Optional) Synchronize table and field comments. include-comments.enabled: true #(Optional) Prioritize the distribution of unbounded shards to prevent potential Out Of Memory (OOM) issues on TaskManagers. scan.incremental.snapshot.unbounded-chunk-first.enabled: true #(Optional) Enable parsing and filtering to accelerate data reads. scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: oss://default/test catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com catalog.properties.fs.oss.accessKeyId: xxxxxxxx catalog.properties.fs.oss.accessKeySecret: xxxxxxxx #(Optional) Enable deletion vectors to improve data read performance. table.properties.deletion-vectors.enabled: true # If you use a DLF catalog, use the following configurations. # sink: # type: paimon # catalog.properties.metastore: rest # catalog.properties.token.provider: dlf # catalog.properties.uri: dlf_uri # catalog.properties.warehouse: your_warehouse # table.properties.deletion-vectors.enabled: true # Capture changes from the MySQL tables whose names match the regular expression orders_\\d+ and synchronize the changes to the orders table in the default Paimon database. route: - source-table: orders.orders_\d+ sink-table: default.ordersFor more information about how to configure data ingestion jobs, see Develop Flink CDC-based data ingestion jobs.
-
In the upper-right corner of the page, click Deploy. In the message that appears, click OK.
-
In the left-side navigation pane, choose . Then, click the name of your job to open its Deployments tab.
-
In the Running Parameters section, click Edit.
To see job results sooner, set System Checkpoint Interval and Minimum Interval Between System Checkpoints to 10s and click Save.
-
On the Deployments tab of the job, click Start. In the Start Job dialog box, select Stateless Start and click Start.
-
Query Paimon data.
-
On the Query Script tab of the page, copy the following code to the query script.
select custkey, sum(total_price) from `paimon-catalog`.`default`.`orders` group by custkey; -
Select the code that you want to run and click Run that appears on the left side of the code.
After the select custkey, sum(total_price) from
paimon-catalog.default.ordersgroup by custkey; SQL statement runs, the query status changes to Completed. The result table shows the sum of total_price values from the orders table, grouped by custkey. The query returns a total of 7 rows of data for custkey values from 0 to 6.
-
Step 4: Update MySQL schema
This section describes how to synchronize schema changes from a MySQL table to a Paimon table.
-
Log on to the ApsaraDB RDS console.
-
In the orders database, enter and run the following SQL statements to add a column to both data tables and update the data. Then, click Execute.
ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5; UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15; -
On the Query Script tab of the page in the Realtime Compute for Apache Flink console, copy the following code to the query script.
select * from `paimon-catalog`.`default`.`orders` where `quantity` is not null;Select the code that you want to run and click Run that appears on the left side of the code.
After the query succeeds, nine records are returned. The data is from the
orders_1andorders_2tables, and the value in thequantitycolumn of each record is 100.
Related documents
-
The Streaming data lakehouse Paimon connector works with Paimon catalogs. For more information about the usage, features, and other details about the connector, see Streaming data lakehouse Paimon.
-
For more information about how to use Paimon catalogs, see Manage Paimon catalogs.
-
For best practices on configuring data ingestion jobs in complex scenarios, see Best practices for Flink CDC-based data ingestion.
-
Build a streaming data lakehouse by using Paimon and StarRocks