All Products
Search
Document Center

Realtime Compute for Apache Flink:Build a streaming lakehouse with Paimon and StarRocks

Last Updated:Mar 26, 2026

This tutorial walks you through building a three-layer streaming data lakehouse (ODS → DWD → DWS) for an e-commerce platform using Realtime Compute for Apache Flink, Apache Paimon, and StarRocks. By the end, your pipeline delivers source database changes to analytics-ready aggregate tables with minute-level latency, and StarRocks can query Paimon tables for ad-hoc analysis.

How it works

Traditional batch data warehouses schedule jobs hourly or daily, leaving consumers with stale results. Updating a partition requires re-reading all existing data, merging it with new changes, and overwriting—a slow and costly process.

Realtime Compute for Apache Flink and Apache Paimon solve this together:

  • Flink streams data changes between warehouse layers continuously, without waiting for a batch window.

  • Paimon delivers changelogs to the next layer with minute-level latency, using its efficient update mechanism instead of partition overwrites.

  • StarRocks reads from Paimon external tables to serve ad-hoc analytical queries.

The architecture processes data in four steps:

  1. Flink reads from the MySQL source and writes to Paimon, forming the operational data store (ODS) layer.

  2. Flink subscribes to ODS changelogs, processes them, and writes results to Paimon, forming the data warehouse detail (DWD) layer.

  3. Flink subscribes to DWD changelogs, aggregates them, and writes results to Paimon, forming the data warehouse service (DWS) layer.

  4. StarRocks of E-MapReduce (EMR) reads from Paimon external tables and serves queries.

Architecture diagram

Key Paimon capabilities

This solution relies on three core Apache Paimon capabilities:

Capability What it does Why it matters
Primary key table updates Uses the log-structured merge-tree (LSM tree) to apply updates efficiently. For more information, see Primary Key Table and File Layouts. Avoids rewriting entire partitions on each update
Changelog generation (changelog-producer) Produces complete incremental records for any input stream—each UPDATE_AFTER record is paired with an UPDATE_BEFORE record. For more information, see Incremental data generation mechanism. Downstream consumers see the full change, enabling streaming consumption of each layer
Data merging (merge-engine) When a primary key table receives multiple records with the same primary key, Paimon merges them into one record. Supported strategies: deduplication, partial-update, aggregation. For more information, see Merge engine. Enables wide-table construction from multiple streams and incremental metric accumulation

Layer design at a glance

Layer Table(s) merge-engine changelog-producer Purpose
ODS orders, orders_pay, product_catalog Raw CDC ingestion from MySQL
DWD dwd_orders partial-update lookup Wide table joining orders, payments, and catalog
DWM dwm_users_shops aggregation lookup Intermediate fanout table; generates changelogs for both DWS tables
DWS dws_users, dws_shops aggregation User- and shop-level metric aggregates

Prerequisites

Before you begin, make sure you have:

  • Activated Data Lake Formation (DLF) and created a catalog. For more information, see Get started with DLF.

  • A Realtime Compute for Apache Flink workspace. For more information, see Create a workspace.

  • A Serverless StarRocks instance.

Your Flink workspace, StarRocks instance, and DLF catalog must be in the same region.

Version requirements

Your Flink jobs must run Ververica Runtime (VVR) 11.1.0 or later.

Build the streaming data lakehouse

Step 1: Prepare the MySQL CDC source

This tutorial uses three business tables in an ApsaraDB RDS for MySQL database named order_dw.

  1. Create an ApsaraDB RDS for MySQL instance.

    If your ApsaraDB RDS for MySQL instance and Flink workspace are in different virtual private clouds (VPCs), see How do I access other services across VPCs?
  2. Create a database and an account. Create a database named order_dw and a privileged account (or a standard account with read/write access to order_dw).

  3. Create the three source tables and insert the sample data:

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee bigint not null,
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null
    );
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null,
      create_time timestamp not null
    );
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- Sample data
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

Step 2: Register catalogs

Create a Paimon catalog in Flink

  1. Log on to the Realtime Compute for Apache Flink console. In the Actions column for your workspace, click Console to open the Development Console.

  2. In the left navigation pane, click Catalogs. On the Catalogs page, click Create Catalog.

  3. On the Built-in Catalog tab, select Apache Paimon and click Next.

  4. Set the parameters and click Confirm:

    Parameter Description Required Value
    metastore Metadata storage type Yes Select dlf
    catalog name The DLF catalog name Yes Using Latest DLF lets you select an existing DLF catalog without configuring AccessKey pairs. This tutorial uses paimoncatalog. For more information, see Manage catalogs. If using a RAM identity, make sure it has read/write access to DLF. See Data authorization management.
  5. Create the order_dw database inside the Paimon catalog to receive data synced from MySQL:

    1. In the left navigation pane, choose Development > Scripts.

    2. Click + > New Script.

    3. Paste the following SQL and run it: ``sql USE CATALOG paimoncatalog; CREATE DATABASE order_dw; ``

    4. The following statement has been executed successfully! confirms the database is created.

For more information, see Manage Paimon catalogs.

Create a MySQL catalog

  1. On the Catalogs page, click Create Catalog.

  2. On the Built-in Catalog tab, select MySQL and click Next.

  3. Configure the following parameters and click Confirm to create a catalog named mysqlcatalog:

    Parameter Description Required Value
    catalog name The catalog name Yes Enter mysqlcatalog
    hostname The MySQL instance endpoint Yes Use the internal endpoint (same region as Flink). See View and manage instance endpoints and ports
    port The MySQL port number No Default: 3306. See View and manage instance endpoints and ports
    default-database The default MySQL database name Yes Enter order_dw
    username The MySQL account username Yes The account created in Step 1
    password The MySQL account password Yes The password for the account created in Step 1

Step 3: Build the ODS layer

The ODS layer ingests all tables from the MySQL order_dw database into Paimon in real time using Flink CDC.

  1. Create the data ingestion job:

    1. In the left navigation pane, choose Development > ETL. Click + > New Blank Stream Draft.

    2. In the New Draft dialog, enter ods in the Name field, select a VVR version, and click Create.

    3. Paste the following pipeline configuration:

      source:
        type: mysql
        name: MySQL Source
        hostname: rm-bp1e********566g.mysql.rds.aliyuncs.com
        port: 3306
        username: ${secret_values.username}
        password: ${secret_values.password}
        tables: order_dw.\.*  # Read all tables in order_dw
        scan.binlog.newly-added-table.enabled: true       # (Optional) Replicate tables added during incremental phase
        include-comments.enabled: true                    # (Optional) Replicate table and field comments
        scan.incremental.snapshot.unbounded-chunk-first.enabled: true   # (Optional) Scan unbounded chunks first to prevent Task Manager OOMs
        scan.only.deserialize.captured.tables.changelog.enabled: true   # (Optional) Speed up reads by deserializing only captured tables
      
      sink:
        type: paimon
        name: Paimon Sink
        catalog.properties.metastore: rest
        catalog.properties.uri: http://ap-southeast-5-vpc.dlf.aliyuncs.com
        catalog.properties.warehouse: paimoncatalog
        catalog.properties.token.provider: dlf
      
      pipeline:
        name: MySQL to Paimon Pipeline

      The Paimon sink parameters:

      Parameter Description Required Example
      catalog.properties.metastore Metastore type Yes rest
      catalog.properties.token.provider Token provider Yes dlf
      catalog.properties.uri DLF server URI. Format: http://[region-id]-vpc.dlf.aliyuncs.com. Replace [region-id] with your region ID. See Endpoints. Yes http://ap-southeast-5-vpc.dlf.aliyuncs.com
      catalog.properties.warehouse DLF catalog name Yes paimoncatalog

      To tune write performance, see Performance tuning for Paimon tables.

    4. Click Deploy in the upper-right corner.

  2. Start the job: In the left navigation pane, choose O&M > Deployments. Find the ods deployment and click Start in the Actions column. In the Start Job panel, select Initial Mode and click Start.

  3. Verify the ODS data: In the left navigation pane, choose Development > Scripts. Click + > New Script. Run the following query. You should see 7 rows corresponding to the sample orders you inserted.

    SELECT * FROM paimoncatalog.order_dw.orders ORDER BY order_id;

    ODS verification result

Step 4: Build the DWD layer

The DWD layer joins the orders, orders_pay, and product_catalog tables into a single wide table named dwd_orders. Two key design decisions drive the table configuration:

  • `merge-engine = partial-update` — allows different source streams (orders side and payments side) to write partial columns into the same row. Without this, writing to dwd_orders from two separate INSERT streams would fail.

  • `changelog-producer = lookup` — generates complete changelogs with low latency so the DWS layer can consume them as a streaming source. Without this, the DWS layer would have no changelog to subscribe to.

  1. Create the dwd_orders wide table: In the left navigation pane, choose Development > Scripts. Click + > New Script. Run the following SQL:

    CREATE TABLE paimoncatalog.order_dw.dwd_orders (
        order_id BIGINT,
        order_user_id STRING,
        order_shop_id BIGINT,
        order_product_id BIGINT,
        order_product_catalog_name STRING,
        order_fee BIGINT,
        order_create_time TIMESTAMP,
        order_update_time TIMESTAMP,
        order_state INT,
        pay_id BIGINT,
        pay_platform INT COMMENT 'platform 0: phone, 1: pc',
        pay_create_time TIMESTAMP,
        PRIMARY KEY (order_id) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'partial-update',
        'changelog-producer' = 'lookup'
    );

    Query has been executed confirms the table is created.

  2. Create the DWD ETL job: In the left navigation pane, choose Development > ETL. Click + > New Blank Stream Draft, name it dwd, and paste the following SQL. Deploy the draft and start the deployment with Initial Mode. The job joins orders with product_catalog (as a dimension table lookup) and merges the result with orders_pay data. Paimon's partial-update engine merges records sharing the same order_id—the orders side fills order columns and the payments side fills payment columns.

    Important

    Apache Paimon does not allow multiple INSERT statements writing to the same table within a single job. Use UNION ALL to combine the two source streams into one write operation, as shown below.

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    INSERT INTO paimoncatalog.order_dw.dwd_orders
    SELECT
        o.order_id,
        o.user_id,
        o.shop_id,
        o.product_id,
        dim.catalog_name,
        o.buy_fee,
        o.create_time,
        o.update_time,
        o.state,
        NULL,  -- pay_id (filled by orders_pay stream)
        NULL,  -- pay_platform (filled by orders_pay stream)
        NULL   -- pay_create_time (filled by orders_pay stream)
    FROM
        paimoncatalog.order_dw.orders o
        LEFT JOIN paimoncatalog.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
        ON o.product_id = dim.product_id
    UNION ALL
    SELECT
        order_id,
        NULL,  -- order_user_id (filled by orders stream)
        NULL,  -- order_shop_id (filled by orders stream)
        NULL,  -- order_product_id (filled by orders stream)
        NULL,  -- order_product_catalog_name (filled by orders stream)
        NULL,  -- order_fee (filled by orders stream)
        NULL,  -- order_create_time (filled by orders stream)
        NULL,  -- order_update_time (filled by orders stream)
        NULL,  -- order_state (filled by orders stream)
        pay_id,
        pay_platform,
        create_time
    FROM
        paimoncatalog.order_dw.orders_pay;
  3. Verify the DWD data: In the left navigation pane, choose Development > Scripts. Click + > New Script. Run the following query. You should see 7 rows with order_product_catalog_name populated from the dimension join and pay_platform populated from the orders_pay stream.

    SELECT * FROM paimoncatalog.order_dw.dwd_orders ORDER BY order_id;

    DWD verification result

Step 5: Build the DWS layer

The DWS layer aggregates the wide table into user-level and shop-level metric tables. The architecture uses an intermediate data warehouse middle (DWM) table:

  • dwm_users_shops — intermediate table keyed by (user_id, shop_id, ds), generating changelogs consumed by both DWS tables

  • dws_users — user-dimension aggregate (total spend per day)

  • dws_shops — shop-dimension aggregate (total revenue, unique visitors (UV), and page views (PV) per day)

The key design choice: `merge-engine = aggregation` on DWS tables accumulates incremental values automatically. Without this, each incoming record would overwrite the previous value rather than add to it.

  1. Create the aggregate tables: In the left navigation pane, choose Development > Scripts. Click + > New Script. Run the following SQL:

    -- User-dimension aggregate metric table
    CREATE TABLE paimoncatalog.order_dw.dws_users (
        user_id STRING,
        ds STRING,
        paid_buy_fee_sum BIGINT COMMENT 'Total payment amount completed on the current day',
        PRIMARY KEY (user_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation',
        'fields.paid_buy_fee_sum.aggregate-function' = 'sum'
        -- No changelog-producer needed: dws_users is not consumed downstream in streaming mode
    );
    
    -- Shop-dimension aggregate metric table
    CREATE TABLE paimoncatalog.order_dw.dws_shops (
        shop_id BIGINT,
        ds STRING,
        paid_buy_fee_sum BIGINT COMMENT 'Total payment amount completed on the current day',
        uv BIGINT COMMENT 'Total number of purchasing users on the current day',
        pv BIGINT COMMENT 'Total number of purchases on the current day',
        PRIMARY KEY (shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation',
        'fields.paid_buy_fee_sum.aggregate-function' = 'sum',
        'fields.uv.aggregate-function' = 'sum',
        'fields.pv.aggregate-function' = 'sum'
        -- No changelog-producer needed: dws_shops is not consumed downstream in streaming mode
    );
    
    -- Intermediate DWM table: keyed by (user_id, shop_id, ds) to fan out to both dws_users and dws_shops
    CREATE TABLE paimoncatalog.order_dw.dwm_users_shops (
        user_id STRING,
        shop_id BIGINT,
        ds STRING,
        paid_buy_fee_sum BIGINT COMMENT 'Total amount paid by the user in the shop on the current day',
        pv BIGINT COMMENT 'Number of purchases by the user in the shop on the current day',
        PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED
    ) WITH (
        'merge-engine' = 'aggregation',
        'fields.paid_buy_fee_sum.aggregate-function' = 'sum',
        'fields.pv.aggregate-function' = 'sum',
        'changelog-producer' = 'lookup',  -- generates changelogs for dws_users and dws_shops
        'file.format' = 'avro',           -- row-oriented format for faster writes (DWM is write-heavy)
        'metadata.stats-mode' = 'none'    -- discard stats to improve write throughput (increases OLAP query cost, but DWM is not queried directly)
    );

    Query has been executed confirms all three tables are created.

  2. Create the DWM ETL job: In the left navigation pane, choose Development > ETL. Click + > New Blank Stream Draft, name it dwm, and paste the following SQL. Deploy and start with Initial Mode. The job reads from dwd_orders, filters rows where payment is complete (pay_id IS NOT NULL), and writes aggregated spend and purchase count into dwm_users_shops.

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    INSERT INTO paimoncatalog.order_dw.dwm_users_shops
    SELECT
        order_user_id,
        order_shop_id,
        DATE_FORMAT(pay_create_time, 'yyyyMMdd') AS ds,
        order_fee,
        1  -- each input record represents one purchase
    FROM paimoncatalog.order_dw.dwd_orders
    WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;
  3. Create the DWS ETL job: In the left navigation pane, choose Development > ETL. Click + > New Blank Stream Draft, name it dws, and paste the following SQL. Deploy and start with Initial Mode. This job fans out dwm_users_shops changelogs to both DWS tables in a single deployment. Unlike the DWD layer (where multiple INSERTs to the same table required UNION ALL), writing to *different* Paimon tables from the same job is supported—use BEGIN STATEMENT SET.

    SET 'execution.checkpointing.max-concurrent-checkpoints' = '3';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    
    BEGIN STATEMENT SET;
    
    INSERT INTO paimoncatalog.order_dw.dws_users
    SELECT
        user_id,
        ds,
        paid_buy_fee_sum
    FROM paimoncatalog.order_dw.dwm_users_shops;
    
    -- shop_id can have highly skewed data (popular shops receive far more records).
    -- local-merge-buffer-size pre-aggregates records in memory before writing to Paimon,
    -- reducing the write amplification from data skew.
    INSERT INTO paimoncatalog.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */
    SELECT
        shop_id,
        ds,
        paid_buy_fee_sum,
        1,  -- each input record represents one user's spend in the shop
        pv
    FROM paimoncatalog.order_dw.dwm_users_shops;
    
    END;
  4. Verify the DWS data: In the left navigation pane, choose Development > Scripts. Click + > New Script. Run the following queries. dws_users should show 3 rows (one per user) and dws_shops should show 4 rows (one per shop that received orders on 2023-02-15).

    SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;
    SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;

    dws_users verification result

    dws_shops verification result

Step 6: Test change capture

With all three layers running, test that changes in the source database flow through the entire pipeline.

  1. Insert new orders into MySQL:

    INSERT INTO orders VALUES
    (100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1),
    (100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1),
    (100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2008, 100008, 1, '2023-02-15 18:40:56'),
    (2009, 100009, 1, '2023-02-15 19:40:56'),
    (2010, 100010, 0, '2023-02-15 20:40:56');
  2. Wait a moment for the pipeline to propagate, then re-run the DWS verification queries. The aggregate totals for user_001, user_002, user_003, and shops 12345 and 12348 should increase to reflect the new orders.

    • dws_users table: ``sql SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id; `` 截屏2024-09-02 15

    • dws_shops table: ``sql SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id; `` 截屏2024-09-02 15

Query the lakehouse with StarRocks

Use StarRocks to run analytical queries against the Paimon tables you built.

Log on to the StarRocks instance and create a Paimon external catalog:

CREATE EXTERNAL CATALOG paimon_catalog
PROPERTIES
(
    'type' = 'paimon',
    'paimon.catalog.type' = 'filesystem',
    'aliyun.oss.endpoint' = 'oss-cn-beijing-internal.aliyuncs.com',
    'paimon.catalog.warehouse' = 'oss://<bucket>/<object>'
);
Parameter Required Description
type Yes Set to paimon
paimon.catalog.type Yes Metadata storage type. Set to filesystem
aliyun.oss.endpoint Yes The OSS or OSS-HDFS endpoint. Required when paimon.catalog.warehouse points to an OSS path
paimon.catalog.warehouse Yes The OSS warehouse path. Format: oss://<bucket>/<object>. Find the bucket name and object path in the OSS console

Ranking queries

Query the top three shops by transaction volume on February 15, 2023:

SELECT ROW_NUMBER() OVER (ORDER BY paid_buy_fee_sum DESC) AS rn, shop_id, paid_buy_fee_sum
FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;
Ranking query result

Details queries

Query order details for a specific user on a specific payment platform in February 2023:

SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;
Details query result

Report queries

Query order count and total amount by product category in February 2023:

SELECT
  order_create_time AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;
Report query result

What's next