This tutorial walks you through building a three-layer streaming data lakehouse (ODS → DWD → DWS) for an e-commerce platform using Realtime Compute for Apache Flink, Apache Paimon, and StarRocks. By the end, your pipeline delivers source database changes to analytics-ready aggregate tables with minute-level latency, and StarRocks can query Paimon tables for ad-hoc analysis.
How it works
Traditional batch data warehouses schedule jobs hourly or daily, leaving consumers with stale results. Updating a partition requires re-reading all existing data, merging it with new changes, and overwriting—a slow and costly process.
Realtime Compute for Apache Flink and Apache Paimon solve this together:
-
Flink streams data changes between warehouse layers continuously, without waiting for a batch window.
-
Paimon delivers changelogs to the next layer with minute-level latency, using its efficient update mechanism instead of partition overwrites.
-
StarRocks reads from Paimon external tables to serve ad-hoc analytical queries.
The architecture processes data in four steps:
-
Flink reads from the MySQL source and writes to Paimon, forming the operational data store (ODS) layer.
-
Flink subscribes to ODS changelogs, processes them, and writes results to Paimon, forming the data warehouse detail (DWD) layer.
-
Flink subscribes to DWD changelogs, aggregates them, and writes results to Paimon, forming the data warehouse service (DWS) layer.
-
StarRocks of E-MapReduce (EMR) reads from Paimon external tables and serves queries.
Key Paimon capabilities
This solution relies on three core Apache Paimon capabilities:
| Capability | What it does | Why it matters |
|---|---|---|
| Primary key table updates | Uses the log-structured merge-tree (LSM tree) to apply updates efficiently. For more information, see Primary Key Table and File Layouts. | Avoids rewriting entire partitions on each update |
Changelog generation (changelog-producer) |
Produces complete incremental records for any input stream—each UPDATE_AFTER record is paired with an UPDATE_BEFORE record. For more information, see Incremental data generation mechanism. |
Downstream consumers see the full change, enabling streaming consumption of each layer |
Data merging (merge-engine) |
When a primary key table receives multiple records with the same primary key, Paimon merges them into one record. Supported strategies: deduplication, partial-update, aggregation. For more information, see Merge engine. |
Enables wide-table construction from multiple streams and incremental metric accumulation |
Layer design at a glance
| Layer | Table(s) | merge-engine |
changelog-producer |
Purpose |
|---|---|---|---|---|
| ODS | orders, orders_pay, product_catalog |
— | — | Raw CDC ingestion from MySQL |
| DWD | dwd_orders |
partial-update |
lookup |
Wide table joining orders, payments, and catalog |
| DWM | dwm_users_shops |
aggregation |
lookup |
Intermediate fanout table; generates changelogs for both DWS tables |
| DWS | dws_users, dws_shops |
aggregation |
— | User- and shop-level metric aggregates |
Prerequisites
Before you begin, make sure you have:
-
Activated Data Lake Formation (DLF) and created a catalog. For more information, see Get started with DLF.
-
A Realtime Compute for Apache Flink workspace. For more information, see Create a workspace.
-
A Serverless StarRocks instance.
Your Flink workspace, StarRocks instance, and DLF catalog must be in the same region.
Version requirements
Your Flink jobs must run Ververica Runtime (VVR) 11.1.0 or later.
Build the streaming data lakehouse
Step 1: Prepare the MySQL CDC source
This tutorial uses three business tables in an ApsaraDB RDS for MySQL database named order_dw.
-
Create an ApsaraDB RDS for MySQL instance.
If your ApsaraDB RDS for MySQL instance and Flink workspace are in different virtual private clouds (VPCs), see How do I access other services across VPCs?
-
Create a database and an account. Create a database named
order_dwand a privileged account (or a standard account with read/write access toorder_dw). -
Create the three source tables and insert the sample data:
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee bigint not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- Sample data INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
Step 2: Register catalogs
Create a Paimon catalog in Flink
-
Log on to the Realtime Compute for Apache Flink console. In the Actions column for your workspace, click Console to open the Development Console.
-
In the left navigation pane, click Catalogs. On the Catalogs page, click Create Catalog.
-
On the Built-in Catalog tab, select Apache Paimon and click Next.
-
Set the parameters and click Confirm:
Parameter Description Required Value metastore Metadata storage type Yes Select dlf catalog name The DLF catalog name Yes Using Latest DLF lets you select an existing DLF catalog without configuring AccessKey pairs. This tutorial uses paimoncatalog. For more information, see Manage catalogs. If using a RAM identity, make sure it has read/write access to DLF. See Data authorization management. -
Create the
order_dwdatabase inside the Paimon catalog to receive data synced from MySQL:-
In the left navigation pane, choose Development > Scripts.
-
Click + > New Script.
-
Paste the following SQL and run it: ``
sql USE CATALOG paimoncatalog; CREATE DATABASE order_dw;`` -
The following statement has been executed successfully!confirms the database is created.
-
For more information, see Manage Paimon catalogs.
Create a MySQL catalog
-
On the Catalogs page, click Create Catalog.
-
On the Built-in Catalog tab, select MySQL and click Next.
-
Configure the following parameters and click Confirm to create a catalog named
mysqlcatalog:Parameter Description Required Value catalog name The catalog name Yes Enter mysqlcataloghostname The MySQL instance endpoint Yes Use the internal endpoint (same region as Flink). See View and manage instance endpoints and ports port The MySQL port number No Default: 3306. See View and manage instance endpoints and portsdefault-database The default MySQL database name Yes Enter order_dwusername The MySQL account username Yes The account created in Step 1 password The MySQL account password Yes The password for the account created in Step 1
Step 3: Build the ODS layer
The ODS layer ingests all tables from the MySQL order_dw database into Paimon in real time using Flink CDC.
-
Create the data ingestion job:
-
In the left navigation pane, choose Development > ETL. Click + > New Blank Stream Draft.
-
In the New Draft dialog, enter
odsin the Name field, select a VVR version, and click Create. -
Paste the following pipeline configuration:
source: type: mysql name: MySQL Source hostname: rm-bp1e********566g.mysql.rds.aliyuncs.com port: 3306 username: ${secret_values.username} password: ${secret_values.password} tables: order_dw.\.* # Read all tables in order_dw scan.binlog.newly-added-table.enabled: true # (Optional) Replicate tables added during incremental phase include-comments.enabled: true # (Optional) Replicate table and field comments scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (Optional) Scan unbounded chunks first to prevent Task Manager OOMs scan.only.deserialize.captured.tables.changelog.enabled: true # (Optional) Speed up reads by deserializing only captured tables sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: http://ap-southeast-5-vpc.dlf.aliyuncs.com catalog.properties.warehouse: paimoncatalog catalog.properties.token.provider: dlf pipeline: name: MySQL to Paimon PipelineThe Paimon sink parameters:
Parameter Description Required Example catalog.properties.metastoreMetastore type Yes restcatalog.properties.token.providerToken provider Yes dlfcatalog.properties.uriDLF server URI. Format: http://[region-id]-vpc.dlf.aliyuncs.com. Replace[region-id]with your region ID. See Endpoints.Yes http://ap-southeast-5-vpc.dlf.aliyuncs.comcatalog.properties.warehouseDLF catalog name Yes paimoncatalogTo tune write performance, see Performance tuning for Paimon tables.
-
Click Deploy in the upper-right corner.
-
-
Start the job: In the left navigation pane, choose O&M > Deployments. Find the
odsdeployment and click Start in the Actions column. In the Start Job panel, select Initial Mode and click Start. -
Verify the ODS data: In the left navigation pane, choose Development > Scripts. Click + > New Script. Run the following query. You should see 7 rows corresponding to the sample orders you inserted.
SELECT * FROM paimoncatalog.order_dw.orders ORDER BY order_id;
Step 4: Build the DWD layer
The DWD layer joins the orders, orders_pay, and product_catalog tables into a single wide table named dwd_orders. Two key design decisions drive the table configuration:
-
`merge-engine = partial-update` — allows different source streams (orders side and payments side) to write partial columns into the same row. Without this, writing to
dwd_ordersfrom two separate INSERT streams would fail. -
`changelog-producer = lookup` — generates complete changelogs with low latency so the DWS layer can consume them as a streaming source. Without this, the DWS layer would have no changelog to subscribe to.
-
Create the
dwd_orderswide table: In the left navigation pane, choose Development > Scripts. Click + > New Script. Run the following SQL:CREATE TABLE paimoncatalog.order_dw.dwd_orders ( order_id BIGINT, order_user_id STRING, order_shop_id BIGINT, order_product_id BIGINT, order_product_catalog_name STRING, order_fee BIGINT, order_create_time TIMESTAMP, order_update_time TIMESTAMP, order_state INT, pay_id BIGINT, pay_platform INT COMMENT 'platform 0: phone, 1: pc', pay_create_time TIMESTAMP, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'merge-engine' = 'partial-update', 'changelog-producer' = 'lookup' );Query has been executedconfirms the table is created. -
Create the DWD ETL job: In the left navigation pane, choose Development > ETL. Click + > New Blank Stream Draft, name it
dwd, and paste the following SQL. Deploy the draft and start the deployment with Initial Mode. The job joinsorderswithproduct_catalog(as a dimension table lookup) and merges the result withorders_paydata. Paimon'spartial-updateengine merges records sharing the sameorder_id—the orders side fills order columns and the payments side fills payment columns.ImportantApache Paimon does not allow multiple
INSERTstatements writing to the same table within a single job. UseUNION ALLto combine the two source streams into one write operation, as shown below.SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; INSERT INTO paimoncatalog.order_dw.dwd_orders SELECT o.order_id, o.user_id, o.shop_id, o.product_id, dim.catalog_name, o.buy_fee, o.create_time, o.update_time, o.state, NULL, -- pay_id (filled by orders_pay stream) NULL, -- pay_platform (filled by orders_pay stream) NULL -- pay_create_time (filled by orders_pay stream) FROM paimoncatalog.order_dw.orders o LEFT JOIN paimoncatalog.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim ON o.product_id = dim.product_id UNION ALL SELECT order_id, NULL, -- order_user_id (filled by orders stream) NULL, -- order_shop_id (filled by orders stream) NULL, -- order_product_id (filled by orders stream) NULL, -- order_product_catalog_name (filled by orders stream) NULL, -- order_fee (filled by orders stream) NULL, -- order_create_time (filled by orders stream) NULL, -- order_update_time (filled by orders stream) NULL, -- order_state (filled by orders stream) pay_id, pay_platform, create_time FROM paimoncatalog.order_dw.orders_pay; -
Verify the DWD data: In the left navigation pane, choose Development > Scripts. Click + > New Script. Run the following query. You should see 7 rows with
order_product_catalog_namepopulated from the dimension join andpay_platformpopulated from theorders_paystream.SELECT * FROM paimoncatalog.order_dw.dwd_orders ORDER BY order_id;
Step 5: Build the DWS layer
The DWS layer aggregates the wide table into user-level and shop-level metric tables. The architecture uses an intermediate data warehouse middle (DWM) table:
-
dwm_users_shops— intermediate table keyed by(user_id, shop_id, ds), generating changelogs consumed by both DWS tables -
dws_users— user-dimension aggregate (total spend per day) -
dws_shops— shop-dimension aggregate (total revenue, unique visitors (UV), and page views (PV) per day)
The key design choice: `merge-engine = aggregation` on DWS tables accumulates incremental values automatically. Without this, each incoming record would overwrite the previous value rather than add to it.
-
Create the aggregate tables: In the left navigation pane, choose Development > Scripts. Click + > New Script. Run the following SQL:
-- User-dimension aggregate metric table CREATE TABLE paimoncatalog.order_dw.dws_users ( user_id STRING, ds STRING, paid_buy_fee_sum BIGINT COMMENT 'Total payment amount completed on the current day', PRIMARY KEY (user_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', 'fields.paid_buy_fee_sum.aggregate-function' = 'sum' -- No changelog-producer needed: dws_users is not consumed downstream in streaming mode ); -- Shop-dimension aggregate metric table CREATE TABLE paimoncatalog.order_dw.dws_shops ( shop_id BIGINT, ds STRING, paid_buy_fee_sum BIGINT COMMENT 'Total payment amount completed on the current day', uv BIGINT COMMENT 'Total number of purchasing users on the current day', pv BIGINT COMMENT 'Total number of purchases on the current day', PRIMARY KEY (shop_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', 'fields.paid_buy_fee_sum.aggregate-function' = 'sum', 'fields.uv.aggregate-function' = 'sum', 'fields.pv.aggregate-function' = 'sum' -- No changelog-producer needed: dws_shops is not consumed downstream in streaming mode ); -- Intermediate DWM table: keyed by (user_id, shop_id, ds) to fan out to both dws_users and dws_shops CREATE TABLE paimoncatalog.order_dw.dwm_users_shops ( user_id STRING, shop_id BIGINT, ds STRING, paid_buy_fee_sum BIGINT COMMENT 'Total amount paid by the user in the shop on the current day', pv BIGINT COMMENT 'Number of purchases by the user in the shop on the current day', PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', 'fields.paid_buy_fee_sum.aggregate-function' = 'sum', 'fields.pv.aggregate-function' = 'sum', 'changelog-producer' = 'lookup', -- generates changelogs for dws_users and dws_shops 'file.format' = 'avro', -- row-oriented format for faster writes (DWM is write-heavy) 'metadata.stats-mode' = 'none' -- discard stats to improve write throughput (increases OLAP query cost, but DWM is not queried directly) );Query has been executedconfirms all three tables are created. -
Create the DWM ETL job: In the left navigation pane, choose Development > ETL. Click + > New Blank Stream Draft, name it
dwm, and paste the following SQL. Deploy and start with Initial Mode. The job reads fromdwd_orders, filters rows where payment is complete (pay_id IS NOT NULL), and writes aggregated spend and purchase count intodwm_users_shops.SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; INSERT INTO paimoncatalog.order_dw.dwm_users_shops SELECT order_user_id, order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd') AS ds, order_fee, 1 -- each input record represents one purchase FROM paimoncatalog.order_dw.dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL; -
Create the DWS ETL job: In the left navigation pane, choose Development > ETL. Click + > New Blank Stream Draft, name it
dws, and paste the following SQL. Deploy and start with Initial Mode. This job fans outdwm_users_shopschangelogs to both DWS tables in a single deployment. Unlike the DWD layer (where multiple INSERTs to the same table required UNION ALL), writing to *different* Paimon tables from the same job is supported—useBEGIN STATEMENT SET.SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; BEGIN STATEMENT SET; INSERT INTO paimoncatalog.order_dw.dws_users SELECT user_id, ds, paid_buy_fee_sum FROM paimoncatalog.order_dw.dwm_users_shops; -- shop_id can have highly skewed data (popular shops receive far more records). -- local-merge-buffer-size pre-aggregates records in memory before writing to Paimon, -- reducing the write amplification from data skew. INSERT INTO paimoncatalog.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */ SELECT shop_id, ds, paid_buy_fee_sum, 1, -- each input record represents one user's spend in the shop pv FROM paimoncatalog.order_dw.dwm_users_shops; END; -
Verify the DWS data: In the left navigation pane, choose Development > Scripts. Click + > New Script. Run the following queries.
dws_usersshould show 3 rows (one per user) anddws_shopsshould show 4 rows (one per shop that received orders on 2023-02-15).SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;

Step 6: Test change capture
With all three layers running, test that changes in the source database flow through the entire pipeline.
-
Insert new orders into MySQL:
INSERT INTO orders VALUES (100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1), (100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1), (100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1); INSERT INTO orders_pay VALUES (2008, 100008, 1, '2023-02-15 18:40:56'), (2009, 100009, 1, '2023-02-15 19:40:56'), (2010, 100010, 0, '2023-02-15 20:40:56'); -
Wait a moment for the pipeline to propagate, then re-run the DWS verification queries. The aggregate totals for
user_001,user_002,user_003, and shops12345and12348should increase to reflect the new orders.-
dws_userstable: ``sql SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;``
-
dws_shopstable: ``sql SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;``
-
Query the lakehouse with StarRocks
Use StarRocks to run analytical queries against the Paimon tables you built.
Log on to the StarRocks instance and create a Paimon external catalog:
CREATE EXTERNAL CATALOG paimon_catalog
PROPERTIES
(
'type' = 'paimon',
'paimon.catalog.type' = 'filesystem',
'aliyun.oss.endpoint' = 'oss-cn-beijing-internal.aliyuncs.com',
'paimon.catalog.warehouse' = 'oss://<bucket>/<object>'
);
| Parameter | Required | Description |
|---|---|---|
type |
Yes | Set to paimon |
paimon.catalog.type |
Yes | Metadata storage type. Set to filesystem |
aliyun.oss.endpoint |
Yes | The OSS or OSS-HDFS endpoint. Required when paimon.catalog.warehouse points to an OSS path |
paimon.catalog.warehouse |
Yes | The OSS warehouse path. Format: oss://<bucket>/<object>. Find the bucket name and object path in the OSS console |
Ranking queries
Query the top three shops by transaction volume on February 15, 2023:
SELECT ROW_NUMBER() OVER (ORDER BY paid_buy_fee_sum DESC) AS rn, shop_id, paid_buy_fee_sum
FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;
Details queries
Query order details for a specific user on a specific payment platform in February 2023:
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;
Report queries
Query order count and total amount by product category in February 2023:
SELECT
order_create_time AS order_create_date,
order_product_catalog_name,
COUNT(*),
SUM(order_fee)
FROM
dwd_orders
WHERE
order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
GROUP BY
order_create_date, order_product_catalog_name
ORDER BY
order_create_date, order_product_catalog_name;