This topic describes how to build a stream-batch-integrated data lakehouse with materialized tables. It also explains how to adjust the data freshness of a materialized table to switch from batch to stream processing and enable real-time data updates.
Introduction to materialized tables
A materialized table is a new table type in Flink SQL that simplifies batch and stream processing data pipelines and provides an integrated development experience. When you create a materialized table, you do not need to declare its fields or types. You only need to specify the data freshness and a query statement. The Flink engine automatically infers the schema of the materialized table from the query statement and creates a data pipeline to maintain the specified data freshness. For more information, see Materialized table management.
Real-time data lakehouse pipeline diagram
-
Flink creates the Operational Data Store (ODS) layer by writing data from a data source to Paimon.
-
Flink creates the DWD layer by joining and enriching data from the ODS layer and writing the result to a materialized table.
-
You can create a data warehouse service (DWS) layer for application queries by building multiple materialized tables with different data freshness levels and performing multi-dimensional business statistics.
Prerequisites
-
A Realtime Compute for Apache Flink workspace has been created. For more information, see Activate Realtime Compute for Apache Flink.
-
If you use a Resource Access Management (RAM) user or RAM role, ensure that the RAM user or role has the required permissions to access the Realtime Compute for Apache Flink console. For more information, see Permission management.
Step 1: Prepare test data
-
(Optional) Create a Paimon catalog.
The materialized table feature is based on Apache Paimon. You must create a Paimon catalog with a metastore type of Filesystem . If you have already created one, you can skip this step. For more information, see Create a Paimon catalog.
-
Create the ods_user_log and ods_dim_product tables.
-
Log on to the Realtime Compute for Apache Flink console.
-
For the target workspace, click Console in the Actions column.
-
In the navigation pane on the left, select . Copy and paste the following code to create the source table.
This example uses a Paimon catalog named paimon and the default database.
CREATE TABLE `paimon`.`default`.`ods_user_log` ( item_id INT NOT NULL, user_id INT NOT NULL, vtime TIMESTAMP(6), ds VARCHAR(10) NOT NULL ) PARTITIONED BY(ds) WITH ( 'bucket' = '4', -- Specify 4 buckets. 'bucket-key' = 'item_id' -- Specify the bucket key. Data with the same item_id is placed in the same bucket. ); CREATE TABLE `paimon`.`default`.`ods_dim_product` ( item_id INT NOT NULL, title VARCHAR(255), pict_url VARCHAR(255), brand_id INT, seller_id INT, PRIMARY KEY(item_id) NOT ENFORCED ) WITH ( 'bucket' = '4', 'bucket-key' = 'item_id' ); -
Click Run in the upper-right corner to create the corresponding data tables.
-
In the left navigation pane, select Data Management. Then, click the corresponding Paimon Catalog and click Refresh to view the new tables.
-
-
Use the Faker data generator connector to generate simulated data and write the data to the Paimon tables.
-
In the left navigation pane, select .
-
Click New, select Blank Stream Draft, click Next, and then click Create.
-
Copy the following SQL statement to the editor.
CREATE TEMPORARY TABLE `user_log` ( item_id INT, // Product ID user_id INT, // User ID vtime TIMESTAMP, ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd') ) WITH ( 'connector' = 'faker', -- Faker connector 'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}', -- Generate a random number between 0 and 1000. 'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}', 'fields.vtime.expression'='#{date.past ''5'',''HOURS''}', -- Generate data from the last 5 hours based on the current date and time. 'rows-per-second' = '3' -- Generate 3 rows per second. ); CREATE TEMPORARY TABLE `dim_product` ( item_id INT NOT NULL, title VARCHAR(255), pict_url VARCHAR(255), brand_id INT, seller_id INT, PRIMARY KEY(item_id) NOT ENFORCED ) WITH ( 'connector' = 'faker', -- Faker connector 'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}', 'fields.title.expression'='#{book.title}', 'fields.pict_url.expression'='#{internet.domainName}', 'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}', 'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}', 'rows-per-second' = '3' -- Generate 3 rows per second. ); BEGIN STATEMENT SET; INSERT INTO `paimon`.`default`.`ods_user_log` SELECT item_id, user_id, vtime, CAST(ds AS VARCHAR(10)) AS ds FROM `user_log`; INSERT INTO `paimon`.`default`.`ods_dim_product` SELECT item_id, title, pict_url, brand_id, seller_id FROM `dim_product`; END; -
In the upper-right corner, click Deploy.
-
In the navigation pane on the left, click . For the target deployment, click Start in the Actions column, select Stateless Start, and then click Start.
-
-
Query the simulated data.
In the left navigation pane, select . Copy the following SQL statement into the SQL editor and click Run in the upper-right corner.
SELECT * FROM `paimon`.`default`.ods_dim_product LIMIT 10; SELECT * FROM `paimon`.`default`.ods_user_log LIMIT 10;
Step 2: Create a materialized table
This section describes how to create the `dwd_user_log_product` materialized table for the DWD layer by joining the source tables. You will then build the DWS layer by creating downstream materialized tables based on the `dwd_user_log_product` table to perform business analytics.
-
Build the DWD layer of the data warehouse by creating the dwd_user_log_product materialized table.
-
In the navigation pane on the left, choose Data Management and click the target Paimon Catalog.
-
Click the target database ("default" in this example) and then Create Materialized Table. Copy the following SQL statement into the SQL editor and click Create.
-- DWD layer widening logic CREATE MATERIALIZED TABLE dwd_user_log_product( PRIMARY KEY (item_id) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR -- Refresh every 1 hour. AS SELECT l.ds, l.item_id, l.user_id, l.vtime, r.brand_id, r.seller_id FROM `paimon`.`default`.`ods_user_log` l INNER JOIN `paimon`.`default`.`ods_dim_product` r ON l.item_id = r.item_id;
-
-
Build the DWS layer of the data warehouse and perform multi-dimensional business analytics based on the dwd_user_log_product materialized table.
This topic uses the creation of the `dws_overall` materialized table as an example. This table calculates the daily page views (PVs) and unique visitors (UVs) per hour. You can create the `dws_overall` materialized table by following the steps in the previous section.
// Calculate daily PVs and UVs. CREATE MATERIALIZED TABLE dws_overall( PRIMARY KEY(ds, hh) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR -- Refresh every 1 hour. AS SELECT ds, COALESCE(hh, 'day') AS hh, count(*) AS pv, count(distinct user_id) AS uv FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id FROM `paimon`.`default`.`dwd_user_log_product`) tmp GROUP BY GROUPING SETS(ds, (ds, hh));
Step 3: Update the materialized table
Start updates
In this example, the data freshness is 1 hour. After you click Start Update, the data update will lag behind the base table by at least 1 hour.
-
In the navigation pane on the left, click Data Lineage and then search for the target materialized table.

-
Click the corresponding materialized table view and then click Start Update in the lower-right corner of the page.
Backfill data
Data backfilling lets you rewrite historical data for a specific partition or an entire table. This can be used to correct stream processing results. For batch jobs that have not yet reached their scheduled time, you can also backfill data to immediately write and update it.
Select the dwd_user_log_product materialized table view and click Manual Update in the lower-right corner of the page. In the dialog box that appears, enter the runtime date as the partition name, such as 20241216. Select the Cascade Update Downstream Associated Materialized Tables checkbox and click Confirm. In the confirmation dialog box, click Confirm again to overwrite the corresponding data and start the update.

For more information about how to backfill data, see Backfill historical data.
Modify data freshness
You can change the data freshness to update the materialized table daily, hourly, minutely, or every second, as needed.
To change the data freshness of the dwd_user_log_product and dws_overall materialized tables, click the view for the target table. Then, click Modify Data Freshness in the lower-right corner of the page and set the data freshness to the minute level for real-time updates.

For more information about how to modify data freshness, see Modify data freshness.
Step 4: Query the materialized table
Preview data
You can preview the 100 most recent rows of data in the materialized table.
-
In the navigation pane on the left, you can click Data Lineage and search for the target materialized table.
-
After clicking the target materialized table view, click Details in the lower-right corner of the page.
-
On the Data Preview tab, click the Query icon.

Query data
In the navigation pane on the left, go to . Copy the following SQL statement to the SQL editor, select the code snippet, and then click Run to query the dws_overall materialized table.
SELECT * FROM `paimon`.`default`.dws_overall ORDER BY hh;

References
-
For more information about materialized tables, see Materialized table management.
-
For more information about how to create and use materialized tables, see Create and use materialized tables.
