All Products
Search
Document Center

Realtime Compute for Apache Flink:Stream database data to Apache Paimon

Last Updated:Jun 21, 2026

Apache Paimon is a unified lake storage format for both streaming and batch processing that supports high-throughput writes and low-latency queries. This tutorial describes how to use a Paimon catalog and the MySQL connector to import order data and table schema changes from an ApsaraDB RDS for MySQL instance into a Paimon table. You can also learn how to use Flink to perform simple analysis on the Paimon table.

Background

Apache Paimon is a unified lake storage format for both streaming and batch processing that supports high-throughput writes and low-latency queries. Currently, Realtime Compute for Apache Flink and common compute engines on E-MapReduce, such as Spark, Hive, or Trino, integrate well with Paimon. You can use Apache Paimon to quickly build your own data lake storage service on HDFS or OSS and connect to a compute engine to analyze the data in your data lake.

Prerequisites

  • If you use a RAM user or RAM role to perform operations, ensure the RAM user or RAM role has the required permissions to access the Realtime Compute for Apache Flink console. For more information, see Permission management.

  • You have created a Flink workspace. For more information, see Activate Realtime Compute for Apache Flink.

Step 1: Prepare a data source

  1. Create an ApsaraDB RDS for MySQL instance and configure a database.

    Note

    The ApsaraDB RDS for MySQL instance must be in the same VPC as the Flink workspace. If the instance and the workspace are in different VPCs, see Network connectivity.

    Create a database named orders and an account with permissions to read from and write to the orders database.

  2. Connect to the ApsaraDB RDS for MySQL instance, and create the orders_1 and orders_2 tables in the orders database.

    CREATE TABLE `orders_1` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
    CREATE TABLE `orders_2` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
  3. Insert the following test data.

    INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among ');
    INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot');
    INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos');
    INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro');
    INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly');
    INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia');
    INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests ');
    INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep');
    INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request');
    INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe');
    INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio');
    INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu');
    INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never');
    INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re');
    INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir');
    INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin');
    INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit');
    INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate');
    INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron');
    INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');

Step 2: Create catalogs

  1. Go to the Data Management page.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. In the Actions column of the workspace that you want to manage, click Console.

    3. Click Data Management.

  2. Create a Paimon catalog.

    1. Click Create Catalog. On the Built-in Catalog tab, select Apache Paimon and click Next.

    2. Configure the parameters.

      Parameter

      Description

      Example

      catalog name

      The name of the Paimon catalog.

      paimon-catalog

      metastore

      The storage type of metadata about Paimon tables. Valid values:

      • filesystem: Metadata is stored only in OSS.

      • dlf: Metadata is stored in OSS and synchronized to Data Lake Formation (DLF).

      This example uses filesystem.

      warehouse

      The root directory for storing the Paimon catalog. The directory must be an OSS directory. You can select the OSS bucket used to activate Realtime Compute for Apache Flink, or another OSS bucket that belongs to the same Alibaba Cloud account and is in the same region as the workspace.

      The value must be in the oss://<bucket>/<object> format. In this format,

      • bucket: the name of the OSS bucket that you created.

      • object: the path to the directory where your data is stored.

      You can view the names of your bucket and object in the OSS console.

      fs.oss.endpoint

      The endpoint of the OSS service.

      If Flink and DLF are deployed in the same region, use a VPC endpoint. Otherwise, use a public endpoint. For more information, see Regions and endpoints.

      fs.oss.accessKeyId

      The AccessKey ID of the Alibaba Cloud account or RAM user with permissions to read from and write to OSS.

      If you do not have an AccessKey ID, see Create an AccessKey pair.

      fs.oss.accessKeySecret

      The AccessKey secret of the Alibaba Cloud account or RAM user.

      This example uses a namespace variable for the AccessKey Secret to prevent plaintext leaks. For more information, see Namespace variables.

    3. Click OK.

  3. Create a MySQL catalog.

    1. Click Create Catalog. On the Built-in Catalog tab, select MySQL and click Next.

    2. Configure the parameters.

      Parameter

      Description

      Example

      catalog name

      The name of the MySQL catalog.

      mysql-catalog

      hostname

      The IP address or hostname of the MySQL database.

      In this example, enter the internal endpoint of the ApsaraDB RDS for MySQL instance.

      port

      The port number of the MySQL database service.

      The default value is 3306.

      default-database

      The name of the default MySQL database.

      In this example, enter the orders database that you created in Step 1: Prepare a data source.

      username

      The username for the MySQL database.

      Enter the username of your database.

      password

      The password for the MySQL database.

      In this example, a namespace variable is used as the password to prevent plaintext leaks. For more information, see Namespace variables.

    3. Click OK.

Step 3: Create a Flink job

  1. Go to the development console of Realtime Compute for Apache Flink and create a data ingestion job.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. Click Console in the Actions column of the target workspace.

    3. In the left-side navigation pane, choose Data Development > >Data Ingestion.

    4. Click the image icon, click New Data Ingestion Draft, specify File Name, and then select an Engine Version.

      Parameter

      Description

      Example

      File name

      The name of the job.

      Note

      The job name must be unique within the project.

      flink-test

      Engine version

      The Flink engine version for the job.

      We recommend that you use a version with the Recommended or Stable tag. Versions with these tags provide higher reliability and better performance. For more information about engine versions, see Release notes and Engine versions.

      vvr-11.5-jdk11-flink-1.20

    5. Click Create.

  2. Enter the following statements to capture changes to the specified tables in the orders database in real time and synchronize the changes to Paimon tables.

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: 3306
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: orders.orders_\d+
      server-id: 8601-8604
      #(Optional) Synchronize data from newly added tables during the incremental phase.
      scan.binlog.newly-added-table.enabled: true
      #(Optional) Synchronize table and field comments.
      include-comments.enabled: true
      #(Optional) Prioritize the distribution of unbounded shards to prevent potential Out Of Memory (OOM) issues on TaskManagers.
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      #(Optional) Enable parsing and filtering to accelerate data reads.
      scan.only.deserialize.captured.tables.changelog.enabled: true
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: filesystem
      catalog.properties.warehouse: oss://default/test
      catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
      catalog.properties.fs.oss.accessKeyId: xxxxxxxx
      catalog.properties.fs.oss.accessKeySecret: xxxxxxxx
      #(Optional) Enable deletion vectors to improve data read performance.
      table.properties.deletion-vectors.enabled: true
    # If you use a DLF catalog, use the following configurations.
    # sink:
    #  type: paimon
    #  catalog.properties.metastore: rest
    #  catalog.properties.token.provider: dlf
    #  catalog.properties.uri: dlf_uri
    #  catalog.properties.warehouse: your_warehouse
    #  table.properties.deletion-vectors.enabled: true   
    # Capture changes from the MySQL tables whose names match the regular expression orders_\\d+ and synchronize the changes to the orders table in the default Paimon database.  
    route:
      - source-table: orders.orders_\d+
        sink-table: default.orders

    For more information about how to configure data ingestion jobs, see Develop Flink CDC-based data ingestion jobs.

  3. In the upper-right corner of the page, click Deploy. In the message that appears, click OK.

  4. In the left-side navigation pane, choose O&M Center > >Job O&M. Then, click the name of your job to open its Deployments tab.

  5. In the Running Parameters section, click Edit.

    To see job results sooner, set System Checkpoint Interval and Minimum Interval Between System Checkpoints to 10s and click Save.

  6. On the Deployments tab of the job, click Start. In the Start Job dialog box, select Stateless Start and click Start.

  7. Query Paimon data.

    1. On the Query Script tab of the Data Development > >Data Query page, copy the following code to the query script.

      select custkey, sum(total_price) from `paimon-catalog`.`default`.`orders` group by custkey;
    2. Select the code that you want to run and click Run that appears on the left side of the code.

      After the select custkey, sum(total_price) from paimon-catalog.default.orders group by custkey; SQL statement runs, the query status changes to Completed. The result table shows the sum of total_price values from the orders table, grouped by custkey. The query returns a total of 7 rows of data for custkey values from 0 to 6.

Step 4: Update MySQL schema

This section describes how to synchronize schema changes from a MySQL table to a Paimon table.

  1. Log on to the ApsaraDB RDS console.

  2. In the orders database, enter and run the following SQL statements to add a column to both data tables and update the data. Then, click Execute.

    ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; 
    ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; 
    UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5;
    UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
  3. On the Query Script tab of the Data Development > >Data Query page in the Realtime Compute for Apache Flink console, copy the following code to the query script.

    select * from `paimon-catalog`.`default`.`orders` where `quantity` is not null;

    Select the code that you want to run and click Run that appears on the left side of the code.

    After the query succeeds, nine records are returned. The data is from the orders_1 and orders_2 tables, and the value in the quantity column of each record is 100.

Related documents