This topic describes how to build a streaming data lakehouse using Realtime Compute for Apache Flink, Apache Paimon, and StarRocks.
Background information
As society becomes more digital, businesses require faster access to data. Traditional offline data warehouses use scheduled jobs to merge new changes from the previous period into hierarchical data warehouse layers, such as the Operational Data Store (ODS), Data Warehouse Detail (DWD), Data Warehouse Summary (DWS), and Application Data Store (ADS). However, this approach has two major drawbacks: high latency and high cost. Offline jobs typically run hourly or daily, which means data consumers can only view data from the previous hour or day. Additionally, data updates often overwrite entire partitions. This process requires rereading the original data in the partition to merge it with new changes and generate new results.
Building a streaming data lakehouse with Realtime Compute for Apache Flink and Apache Paimon solves these issues. The real-time computing capabilities of Flink allow data to flow between data warehouse layers in real time. The efficient update capabilities of Paimon deliver data changes to downstream consumers with minute-level latency. Therefore, a streaming data lakehouse offers advantages in terms of both latency and cost.
For more information about the features of Apache Paimon, see Features and visit the Apache Paimon official website.
Architecture and benefits
Architecture
Realtime Compute for Apache Flink is a powerful stream computing engine that efficiently processes large amounts of real-time data. Apache Paimon is a unified streaming and batch data lake storage format that supports high-throughput updates and low-latency queries. Paimon is deeply integrated with Flink to provide an all-in-one streaming data lakehouse solution. The architecture of the streaming data lakehouse built with Flink and Paimon is as follows:
Flink writes data from a data source to Paimon to create the ODS layer.
Flink subscribes to changelogs at the ODS layer for processing and then rewrites the data to Paimon to create the DWD layer.
Flink subscribes to changelogs at the DWD layer for processing and then rewrites the data to Paimon to create the DWS layer.
Finally, StarRocks on the open source big data platform EMR reads the Paimon foreign table to support application queries.

Benefits
This solution provides the following benefits:
Each Paimon layer can deliver changes to the downstream with minute-level latency. This reduces the latency of traditional offline data warehouses from hours or days to minutes.
Each Paimon layer can directly accept change data without overwriting partitions. This significantly reduces the cost of data updates and corrections in traditional offline data warehouses. It also solves the issue of data at intermediate layers being difficult to query, update, or correct.
The model is unified and the architecture is simplified. The logic of the extract, transform, and load (ETL) pipeline is implemented using Flink SQL. Data at the ODS, DWD, and DWS layers is stored uniformly in Paimon. This reduces architectural complexity and improves data processing efficiency.
This solution relies on three core capabilities of Paimon, as shown in the following table.
Core capabilities of Paimon | Details |
Primary key table updates | Paimon uses a Log-Structured Merge-tree (LSM tree) data structure at the underlying layer to achieve efficient data updates. For more information about Paimon primary key tables and underlying data structures, see Primary Key Table and File Layouts. |
Changelog producer | Paimon can produce a complete changelog for any input data stream. All `update_after` data has a corresponding `update_before` data. This ensures that data changes are completely passed to the downstream. For more information, see Changelog production mechanism. |
Merge engine | When a Paimon primary key table receives multiple records with the same primary key, the sink table merges them into a single record to maintain primary key uniqueness. Paimon supports various merge behaviors, such as deduplication, partial updates, and pre-aggregation. For more information, see Data merging mechanism. |
Scenarios
This topic uses an e-commerce platform as an example to demonstrate how to build a streaming data lakehouse to process and cleanse data and to support data queries from upper-layer applications. The streaming data lakehouse implements data layering and reuse. It supports various business scenarios, such as report queries for transaction dashboards, behavioral data analytics, user persona tagging, and personalized recommendations.

Build the ODS layer: Ingest data from a business database into the data warehouse in real time.
A MySQL database contains three business tables: `orders`, `orders_pay`, and `product_catalog`. Flink writes the data from these tables to OSS in real time and stores it in the Paimon format to create the ODS layer.Build the DWD layer: Create a topic-based wide table.
Use the partial-update merge mechanism of Paimon to widen the `orders`, `product_catalog`, and `orders_pay` tables. This generates a DWD layer wide table and produces a changelog with minute-level latency.Build the DWS layer: Calculate metrics.
Flink consumes the changelog of the wide table in real time. It uses the aggregation merge mechanism of Paimon to produce the `dwm_users_shops` intermediate table (user-merchant aggregation) at the DWM layer. Finally, it produces the `dws_users` (user aggregation metrics) and `dws_shops` (merchant aggregation metrics) tables at the DWS layer.
Prerequisites
Data Lake Formation (DLF) is activated. Use DLF 2.5 as the storage service. For more information, see Get started with DLF.
Fully managed Flink is activated. For more information, see Activate Realtime Compute for Apache Flink.
StarRocks on EMR is activated. For more information, see Quickly use a compute-storage separated instance.
The StarRocks instance and DLF must be in the same region as the Flink workspace.
Limits
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 11.1.0 or later supports this streaming data lakehouse solution.
Build a streaming data lakehouse
Prepare a MySQL CDC data source
This example uses an ApsaraDB RDS for MySQL instance. Create a database named `order_dw` and three business tables with data.
Create an ApsaraDB RDS for MySQL instance.
ImportantThe ApsaraDB RDS for MySQL instance must be in the same VPC as the Flink workspace. If they are not in the same VPC, see How do I access other services across VPCs?
Create a database and an account.
Create a database named `order_dw`. Create a privileged account or a standard account with read and write permissions on the `order_dw` database.
Create three tables and insert data.
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee bigint not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- Prepare data. INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
Manage metadata
Create a Paimon catalog
Log on to the Realtime Compute for Apache Flink console.
In the navigation pane on the left, click Catalogs, and then click Create Catalog.
On the Built-in Catalog tab, click Apache Paimon, then click Next.
Configure the following parameters, select DLF as the storage type, and click OK.
Parameter
Description
Required
Note:
metastore
The metastore type.
Yes
In this example, select dlf.
catalog name
The DLF data catalog name.
ImportantWhen you use a Resource Access Management (RAM) user or role, make sure that you have read and write permissions on DLF. For more information, see Authorization management.
Yes
Use DLF 2.5. You do not need to enter an AccessKey pair. You can select an existing DLF data catalog. For more information about how to create a data catalog, see Data Catalog.
In this example, select a data catalog named paimoncatalog.
Create the `order_dw` database in the data catalog to synchronize all table data from the `order_dw` database in MySQL.
In the left navigation pane, choose and click New to create a temporary query.
-- Use the paimoncatalog data source. USE CATALOG paimoncatalog; -- Create the order_dw database. CREATE DATABASE order_dw;The message
The following statement has been executed successfully!indicates that the database is created.
For more information about how to use Paimon catalogs, see Manage Paimon catalogs.
Create a MySQL catalog
On the Catalogs page, click Create Catalog.
On the Built-in Catalog tab, click MySQL, then click Next.
To create a MySQL catalog named mysqlcatalog, configure the following parameters and click OK.
Parameter
Description
Required
Note:
catalog name
The name of the catalog.
Yes
Enter a custom name. This example uses mysqlcatalog.
hostname
The IP address or hostname of the MySQL database.
Yes
For more information, see View and manage instance endpoints and ports. Because the ApsaraDB RDS for MySQL instance and the fully managed Flink workspace are in the same VPC, enter the internal endpoint.
port
The port number of the MySQL database service. The default value is 3306.
No
For more information, see View and manage instance endpoints and ports.
default-database
The name of the default MySQL database.
Yes
Enter the name of the database to synchronize, `order_dw`.
username
The username for the MySQL database service.
Yes
This is the account created in the Prepare a MySQL CDC data source section.
password
The password for the MySQL database service.
Yes
This is the password created in the Prepare a MySQL CDC data source section.
Build the ODS layer: Ingest data from a business database into the data warehouse in real time
Use Flink CDC to synchronize data from MySQL to Paimon through a YAML data ingestion job. This builds the ODS layer in one step.
Create and start the YAML data ingestion job.
In the Realtime Compute for Apache Flink console, navigate to the page and create a blank YAML draft named ods.
Copy the following code to the editor. Make sure to modify parameters such as the username and password.
source: type: mysql name: MySQL Source hostname: rm-bp1e********566g.mysql.rds.aliyuncs.com port: 3306 username: ${secret_values.username} password: ${secret_values.password} tables: order_dw.\.* # Use a regular expression to read all tables in the order_dw database. sink: type: paimon name: Paimon Sink catalog.properties.metastore: rest catalog.properties.uri: http://cn-beijing-vpc.dlf.aliyuncs.com catalog.properties.warehouse: paimoncatalog catalog.properties.token.provider: dlf pipeline: name: MySQL to Paimon PipelineParameter
Description
Required
Example
catalog.properties.metastoreThe Metastore type. Set this to `rest`.
Yes
rest
catalog.properties.token.providerThe token provider. Set this to `dlf`.
Yes
dlf
catalog.properties.uriThe URI to access the DLF Rest Catalog Server. The format is
http://[region-id]-vpc.dlf.aliyuncs.com. For more information, see the Region ID in Service endpoints.Yes
http://cn-beijing-vpc.dlf.aliyuncs.com
catalog.properties.warehouseThe DLF Catalog name.
Yes
paimoncatalog
For more information about how to optimize Paimon write performance, see Paimon performance optimization.
In the upper-right corner, click Deploy.
Go to . Find the `ods` job that you just deployed. In the Actions column, click Start and select Start Without Initial State. For more information about job startup configurations, see Start a job.
View the data of the three tables that are synchronized from MySQL to Paimon.
In the Realtime Compute for Apache Flink console, navigate to the page. On the Query Scripts tab, copy the following code into the query script. Select the code snippet and click Run in the upper-right corner.
SELECT * FROM paimoncatalog.order_dw.orders ORDER BY order_id;
Build the DWD layer: Create a topic-based wide table
Create the DWD layer Paimon wide table dwd_orders
In the Realtime Compute for Apache Flink console, navigate to the page. On the Query Scripts tab, copy the following code into the query script. Select the code snippet and click Run in the upper-right corner.
CREATE TABLE paimoncatalog.order_dw.dwd_orders ( order_id BIGINT, order_user_id STRING, order_shop_id BIGINT, order_product_id BIGINT, order_product_catalog_name STRING, order_fee BIGINT, order_create_time TIMESTAMP, order_update_time TIMESTAMP, order_state INT, pay_id BIGINT, pay_platform INT COMMENT 'platform 0: phone, 1: pc', pay_create_time TIMESTAMP, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'merge-engine' = 'partial-update', -- Use the partial-update merge engine to generate a wide table. 'changelog-producer' = 'lookup' -- Use the lookup changelog producer to generate a changelog with low latency. );The message
Query has been executedindicates that the table is created.Consume the changelog of the ODS layer orders and orders_pay tables in real time
In the Realtime Compute for Apache Flink console, go to the page. Create a new SQL streaming job named `dwd`, and copy the following code into the SQL editor. Then, Deploy the job and Start it without an initial state.
This SQL job joins the `orders` table with the `product_catalog` table. The joined result and the `orders_pay` table are written to the `dwd_orders` table. The partial-update merge engine of Paimon widens the data from the `orders` and `orders_pay` tables that have the same `order_id`.
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; -- Paimon does not currently support multiple INSERT statements into the same table in a single job. Therefore, use UNION ALL. INSERT INTO paimoncatalog.order_dw.dwd_orders SELECT o.order_id, o.user_id, o.shop_id, o.product_id, dim.catalog_name, o.buy_fee, o.create_time, o.update_time, o.state, NULL, NULL, NULL FROM paimoncatalog.order_dw.orders o LEFT JOIN paimoncatalog.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim ON o.product_id = dim.product_id UNION ALL SELECT order_id, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, pay_id, pay_platform, create_time FROM paimoncatalog.order_dw.orders_pay;View the data of the dwd_orders wide table
In the Realtime Compute for Apache Flink console, go to the page. On the Query Scripts tab, copy the following code into the query script. Select the code snippet and click Run in the upper-right corner.
SELECT * FROM paimoncatalog.order_dw.dwd_orders ORDER BY order_id;
Build the DWS layer: Calculate metrics
Create the DWS layer aggregation tables dws_users and dws_shops
In the Realtime Compute for Apache Flink console, go to the page. On the Query Scripts tab, copy the following code into the query script, select the code snippet, and click Run in the upper-right corner.
-- User dimension aggregation metric table. CREATE TABLE paimoncatalog.order_dw.dws_users ( user_id STRING, ds STRING, payed_buy_fee_sum BIGINT COMMENT 'Total amount of payments completed on the day', PRIMARY KEY (user_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', -- Use the aggregation merge engine to generate an aggregation table. 'fields.payed_buy_fee_sum.aggregate-function' = 'sum' -- Sum the payed_buy_fee_sum data to produce the aggregation result. -- Because the dws_users table is no longer consumed downstream in a streaming fashion, you do not need to specify a changelog producer. ); -- Merchant dimension aggregation metric table. CREATE TABLE paimoncatalog.order_dw.dws_shops ( shop_id BIGINT, ds STRING, payed_buy_fee_sum BIGINT COMMENT 'Total amount of payments completed on the day', uv BIGINT COMMENT 'Total number of distinct purchasing users on the day', pv BIGINT COMMENT 'Total number of purchases by users on the day', PRIMARY KEY (shop_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', -- Use the aggregation merge engine to generate an aggregation table. 'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- Sum the payed_buy_fee_sum data to produce the aggregation result. 'fields.uv.aggregate-function' = 'sum', -- Sum the uv data to produce the aggregation result. 'fields.pv.aggregate-function' = 'sum' -- Sum the pv data to produce the aggregation result. -- Because the dws_shops table is no longer consumed downstream in a streaming fashion, you do not need to specify a changelog producer. ); -- To calculate both the user-perspective and merchant-perspective aggregation tables, create an intermediate table with user + merchant as the primary key. CREATE TABLE paimoncatalog.order_dw.dwm_users_shops ( user_id STRING, shop_id BIGINT, ds STRING, payed_buy_fee_sum BIGINT COMMENT 'Total amount paid by the user at the merchant on the day', pv BIGINT COMMENT 'Number of purchases made by the user at the merchant on the day', PRIMARY KEY (user_id, shop_id, ds) NOT ENFORCED ) WITH ( 'merge-engine' = 'aggregation', -- Use the aggregation merge engine to generate an aggregation table. 'fields.payed_buy_fee_sum.aggregate-function' = 'sum', -- Sum the payed_buy_fee_sum data to produce the aggregation result. 'fields.pv.aggregate-function' = 'sum', -- Sum the pv data to produce the aggregation result. 'changelog-producer' = 'lookup', -- Use the lookup changelog producer to generate a changelog with low latency. -- The intermediate table at the DWM layer is generally not queried directly by upper-layer applications, so you can optimize for write performance. 'file.format' = 'avro', -- The avro row store format provides more efficient write performance. 'metadata.stats-mode' = 'none' -- Discarding statistics information increases the cost of OLAP queries (with no effect on continuous stream processing), but makes write performance more efficient. );The message
Query has been executedindicates that the table is created.Consume the changelog of the DWD layer dwd_orders table
In the Realtime Compute for Apache Flink console, navigate to the tab. Create a SQL streaming job named `dwm`. Copy the following code into the SQL editor. Then, Deploy and Start the job without an initial state.
This SQL job writes data from the `dwd_orders` table to the `dwm_users_shops` table. It uses the pre-aggregation merge engine of Paimon to automatically sum the `order_fee` to calculate the user's total spending at the merchant. It also sums `1` to calculate the number of times the user has purchased from the merchant.
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; INSERT INTO paimoncatalog.order_dw.dwm_users_shops SELECT order_user_id, order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, order_fee, 1 -- One input record represents one purchase. FROM paimoncatalog.order_dw.dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL;Consume the changelog of the DWM layer dwm_users_shops table in real time
In the Realtime Compute for Apache Flink console, go to the page. Create a new SQL streaming job named `dws`. Copy the following code to the SQL editor. Then, Deploy and Start the job without an initial state.
This SQL job writes data from the `dwm_users_shops` table to the `dws_users` and `dws_shops` tables. It uses the pre-aggregation merge engine of Paimon to calculate each user's total spending (`payed_buy_fee_sum`) in the `dws_users` table. In the `dws_shops` table, it calculates the merchant's total revenue (`payed_buy_fee_sum`), the number of purchasing users by summing `1`, and the total number of purchases (`pv`).
SET 'execution.checkpointing.max-concurrent-checkpoints' = '3'; SET 'table.exec.sink.upsert-materialize' = 'NONE'; SET 'execution.checkpointing.interval' = '10s'; SET 'execution.checkpointing.min-pause' = '10s'; -- Unlike DWD, each INSERT statement here writes to a different Paimon table, so they can be in the same job. BEGIN STATEMENT SET; INSERT INTO paimoncatalog.order_dw.dws_users SELECT user_id, ds, payed_buy_fee_sum FROM paimoncatalog.order_dw.dwm_users_shops; -- With merchant as the primary key, some popular merchants may have much more data than others. -- Therefore, use local merge to pre-aggregate in memory before writing to Paimon to alleviate data skew. INSERT INTO paimoncatalog.order_dw.dws_shops /*+ OPTIONS('local-merge-buffer-size' = '64mb') */ SELECT shop_id, ds, payed_buy_fee_sum, 1, -- One input record represents all of a user's purchases at this merchant. pv FROM paimoncatalog.order_dw.dwm_users_shops; END;View the data in the dws_users and dws_shops tables
In the Realtime Compute for Apache Flink console, navigate to . On the Query Scripts tab, copy the following code into the editor. Select the code snippet and click Run in the upper-right corner.
--View dws_users table data SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;
--View dws_shops table data SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;
Capture changes in the business database
Now that you have built the streaming data lakehouse, the following steps test its ability to capture changes from the business database.
Insert the following data into the `order_dw` database in MySQL.
INSERT INTO orders VALUES (100008, 'user_001', 12345, 3, 3000, '2023-02-15 17:40:56', '2023-02-15 18:42:56', 1), (100009, 'user_002', 12348, 4, 1000, '2023-02-15 18:40:56', '2023-02-15 19:42:56', 1), (100010, 'user_003', 12348, 2, 2000, '2023-02-15 19:40:56', '2023-02-15 20:42:56', 1); INSERT INTO orders_pay VALUES (2008, 100008, 1, '2023-02-15 18:40:56'), (2009, 100009, 1, '2023-02-15 19:40:56'), (2010, 100010, 0, '2023-02-15 20:40:56');View the data in the dws_users and dws_shops tables. In the Realtime Compute for Apache Flink console, navigate to the page. On the Query Scripts tab, copy the following code into the query script. Select the code snippet and click Run in the upper-right corner.
dws_users table
SELECT * FROM paimoncatalog.order_dw.dws_users ORDER BY user_id;
dws_shops table
SELECT * FROM paimoncatalog.order_dw.dws_shops ORDER BY shop_id;
Use the streaming data lakehouse
The previous section showed how to create a Paimon catalog and write to Paimon tables in Flink. This section describes simple data analysis scenarios using StarRocks after the streaming data lakehouse is built.
First, log on to the StarRocks instance and create an `oss-paimon` catalog. For more information, see Paimon Catalog.
CREATE EXTERNAL CATALOG paimon_catalog
PROPERTIES
(
'type' = 'paimon',
'paimon.catalog.type' = 'filesystem',
'aliyun.oss.endpoint' = 'oss-cn-beijing-internal.aliyuncs.com',
'paimon.catalog.warehouse' = 'oss://<bucket>/<object>'
);Property | Required | Remarks |
type | Yes | The data source type. Set this to `paimon`. |
paimon.catalog.type | Yes | The metastore type used by Paimon. This example uses `filesystem` as the metastore type. |
aliyun.oss.endpoint | Yes | If you use OSS or OSS-HDFS as the warehouse, you must specify the corresponding endpoint. |
paimon.catalog.warehouse | Yes | The format is oss://<bucket>/<object>, where:
You can view your bucket and object names on the OSS console. |
Ranking query
To analyze the DWS layer aggregation table, the following example code shows how to use StarRocks to query the top three merchants with the highest transaction amounts on February 15, 2023.
SELECT ROW_NUMBER() OVER (ORDER BY payed_buy_fee_sum DESC) AS rn, shop_id, payed_buy_fee_sum
FROM dws_shops
WHERE ds = '20230215'
ORDER BY rn LIMIT 3;
Details query
To analyze the wide table at the DWD layer, the following sample code shows how to use StarRocks to query the order details paid by a customer on a specific payment platform in February 2023:
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' AND order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time;;
Data reports
To analyze the wide table at the DWD layer, the following sample code shows how to use StarRocks to query the total number of orders and the total order amount for each category in February 2023:
SELECT
order_create_time AS order_create_date,
order_product_catalog_name,
COUNT(*),
SUM(order_fee)
FROM
dwd_orders
WHERE
order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
GROUP BY
order_create_date, order_product_catalog_name
ORDER BY
order_create_date, order_product_catalog_name;
References
Build an offline Paimon data lakehouse using the batch processing capabilities of Flink. For more information, see Get started with Flink batch processing.