All Products
Search
Document Center

Realtime Compute for Apache Flink:Getting started: Real-time data ingestion into Paimon databases

Last Updated:Jan 26, 2026

Apache Paimon is a unified lake storage format for streaming and batch processing. It supports high-throughput writes and low-latency queries. This topic describes how to use a Paimon Catalog and the MySQL connector to import order data and table schema changes from an ApsaraDB RDS for MySQL instance into a Paimon table. You will also use Flink to perform simple analysis on the Paimon table.

Background information

Apache Paimon is a unified lake storage format for streaming and batch processing that supports high-throughput writes and low-latency queries. Realtime Compute for Apache Flink and common compute engines on the open source big data platform E-MapReduce (EMR), such as Spark, Hive, or Trino, are well-integrated with Paimon. You can use Apache Paimon to quickly build a data lake storage service on Hadoop Distributed File System (HDFS) or Object Storage Service (OSS). You can then connect a compute engine to analyze the data in your data lake.

Prerequisites

  • If you use a Resource Access Management (RAM) user or RAM role, make sure that the user or role has the required permissions to access the Realtime Compute for Apache Flink console. For more information, see Permission management.

  • You have created a Realtime Compute for Apache Flink workspace. For more information, see Activate Realtime Compute for Apache Flink.

Step 1: Prepare the data source

  1. Create an ApsaraDB RDS for MySQL instance and configure a database.

    Note

    The ApsaraDB RDS for MySQL instance must be in the same virtual private cloud (VPC) as the Flink workspace. If the instance and workspace are not in the same VPC, see Network connectivity.

    Create a database named orders and a privileged account or a standard account that has read and write permissions for the orders database.

  2. Connect to the ApsaraDB RDS for MySQL instance and create the tables orders_1 and orders_2 in the orders database.

    CREATE TABLE `orders_1` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
    
    CREATE TABLE `orders_2` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
  3. Insert the following test data.

    INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among ');
    INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot');
    INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos');
    INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro');
    INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly');
    INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia');
    INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests ');
    INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep');
    INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request');
    INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe');
    INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio');
    INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu');
    INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never');
    INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re');
    INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir');
    INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin');
    INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit');
    INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate');
    INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron');
    INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');

Step 2: Create Catalogs

  1. Go to the Data Management page.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. In the Actions column of the target workspace, click Console.

    3. Click Data Management.

  2. Create a Paimon catalog.

    1. Click Create Catalog. On the Built-in Catalog tab, select Apache Paimon and click Next.

    2. Enter the configuration information.

      image.png

      Parameter

      Description

      Note:

      catalog name

      You can enter a custom Paimon Catalog name.

      This topic uses paimon-catalog.

      metastore

      The type of metastore for Paimon tables.

      • filesystem: Stores metadata in OSS only.

      • dlf: Stores metadata in OSS and synchronizes it to Data Lake Formation (DLF).

      This topic uses filesystem.

      warehouse

      The root directory of the Paimon catalog. This must be an OSS directory. You can use the OSS bucket that was created when you activated Realtime Compute for Apache Flink or another OSS bucket in the same region under your Alibaba Cloud account.

      The format is oss://<bucket>/<object>, where:

      • bucket: The name of your OSS bucket.

      • object: The path to the directory where your data is stored.

      You can view the bucket and object names in the OSS console.

      fs.oss.endpoint

      The connection endpoint for the OSS service.

      If Flink and DLF are in the same region, use the VPC endpoint. Otherwise, use the public endpoint. For more information about how to obtain an endpoint, see Regions and endpoints.

      fs.oss.accessKeyId

      The AccessKey ID of the Alibaba Cloud account or RAM user that has read and write permissions for OSS.

      If you do not have an AccessKey ID, see Create an AccessKey pair.

      fs.oss.accessKeySecret

      The AccessKey secret for the Alibaba Cloud account or RAM user.

      This topic uses a variable for the AccessKey secret to prevent plaintext exposure. For more information, see Project variables.

    3. Click OK.

  3. Create a MySQL catalog.

    1. Click Create Catalog. On the Built-in Catalog tab, select MySQL and click Next.

    2. Enter the configuration information.

      mysql-catalog.png

      Parameter

      Description

      Note:

      catalogname

      The name of the MySQL catalog.

      This topic uses mysql-catalog.

      hostname

      The IP address or hostname of the MySQL database.

      This topic uses the private endpoint of the RDS instance.

      port

      The port of the MySQL database service.

      The default value is 3306.

      default-database

      The name of the default database.

      Enter the orders database created in Step 1: Prepare the data source.

      username

      The username used to connect to the MySQL database.

      Enter your database username.

      password

      Specifies the password for the MySQL database service.

      This topic uses a variable for the password to prevent plaintext exposure. For more information, see Project variables.

    3. Click OK.

Step 3: Create a Flink job

  1. Log on to the Realtime Compute development console and create a data ingestion job.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. In the Actions column of the target workspace, click Console.

    3. In the navigation pane on the left, click Data Studio > Data Ingestion.

    4. Click image, and then click Create Data Ingestion Draft. Enter a File Name and select a Database Engine Version.

      Job parameter

      Description

      Example

      File Name

      The name of the job.

      Note

      The job name must be unique in the current project.

      flink-test

      Engine Version

      The Flink engine version for the current job.

      We recommend that you use versions with the Recommended or Stable tag. These versions offer higher reliability and better performance. For more information about engine versions, see Feature Release Notes and Introduction to Engine Versions.

      vvr-11.5-jdk11-flink-1.20

    5. Click Create.

  2. Enter the following statements to capture real-time changes from the relevant tables in the orders database and synchronize them to the Paimon table.

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: 3306
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: orders.orders_\d+
      server-id: 8601-8604
      # Optional. Synchronize data from newly created tables during the incremental phase.
      scan.binlog.newly-added-table.enabled: true
      # Optional. Synchronize table and field comments.
      include-comments.enabled: true
      # Optional. Prioritize distributing unbounded shards to prevent potential TaskManager OutOfMemory issues.
      scan.incremental.snapshot.unbounded-chunk-first.enabled: true
      # Optional. Enable parsing filters to accelerate reads.
      scan.only.deserialize.captured.tables.changelog.enabled: true
    
    sink:
      type: paimon
      name: Paimon Sink
      catalog.properties.metastore: filesystem
      catalog.properties.warehouse: oss://default/test
      catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
      catalog.properties.fs.oss.accessKeyId: xxxxxxxx
      catalog.properties.fs.oss.accessKeySecret: xxxxxxxx
      # Optional. Enable deletion vectors to improve read performance.
      table.properties.deletion-vectors.enabled: true
      
    # If you use a DLF Catalog, use the following configuration
    # sink:
    #  type: paimon
    #  catalog.properties.metastore: rest
    #  catalog.properties.token.provider: dlf
    #  catalog.properties.uri: dlf_uri
    #  catalog.properties.warehouse: your_warehouse
    #  table.properties.deletion-vectors.enabled: true   
    
    # Captures MySQL tables whose names match the regular expression orders_\d+ and synchronizes changes to the orders table in the Paimon default database.
    route:
      - source-table: orders.orders_\d+
        sink-table: default.orders

    For more information about how to configure data ingestion jobs, see Flink CDC data ingestion job development reference.

  3. In the upper-right corner, click Deploy. Then, click OK.

  4. In the navigation pane on the left, click Operation Center > Deployments. On the Deployments page, click the name of the target job to go to the deployment details page of the job.

  5. In the upper-right corner of the Parameters section, click Edit.

    To more quickly observe the results of the job run, change the values of the Checkpointing Interval and Min Interval Between Checkpoints parameters to 10 s, and then click Save.

    image

  6. At the top of the deployment details page of the target job, click Start, select Stateless Start, and then click Start.

    image.png

  7. Query the Paimon data.

    1. On the Data Studio > Data Query page, on the Query Script tab, copy the following code into the query script editor.

      select custkey, sum(total_price) from `paimon-catalog`.`default`.`orders` group by custkey;
    2. Select the target segment and click Run to the left of the code line.

      image.png

Step 4: Update the MySQL table schema

This section demonstrates how to synchronize MySQL table schema changes to the Paimon table.

  1. Log on to the ApsaraDB RDS console.

  2. In the orders database, enter the following SQL statement and click Execute to add a column to the two tables and populate the column with data.

    ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; 
    ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; 
    UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5;
    UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
  3. On the Scripts tab of the Development > Scripts page in the Realtime Compute for Apache Flink console, copy the following code into the script editor.

    select * from `paimon-catalog`.`default`.`orders` where `quantity` is not null;

    Select the target segment and click Run to the left of the code line.

    Image 32

References