This topic walks you through building a unified stream-batch lakehouse analytics pipeline using materialized tables. You will also learn how to switch from batch to stream processing by adjusting the freshness of a materialized table to enable real-time data updates.
What is a materialized table?
A materialized table is a new table type introduced in Flink SQL to simplify data pipelines for both batch and stream processing and deliver a unified development experience. When creating a materialized table, you do not need to declare fields or their types. Instead, specify the desired data freshness and a query statement. The Flink engine automatically infers the schema from the query and creates a corresponding data refresh pipeline to meet the specified freshness requirement. For more information, see Materialized Table Management.
Real-Time Lakehouse Pipeline Diagram
-
Flink writes data from sources into Paimon to form the Operational Data Store (ODS) layer.
-
Flink enriches and widens ODS-layer data through table joins and writes the result into a materialized table to form the Data Warehouse Detail (DWD) layer.
-
Multiple materialized tables with different freshness settings perform multi-dimensional business aggregations to form the Data Warehouse Service (DWS) layer, which serves application queries.
Prerequisites
-
You have created a Flink workspace. For more information, see Activate Realtime Compute for Apache Flink.
-
If you access resources as a Resource Access Management (RAM) user or RAM role, confirm that you have the required permissions for the Flink console. For more information, see Permission Management.
Step 1: Prepare test data
-
Create a Paimon Catalog.
Materialized tables are powered by Apache Paimon. You must create a Paimon Catalog with a metastore type of Filesystem. Skip this step if you already have one. For more information, see Create a Paimon Catalog.
-
Create the user behavioral log table ods_user_log and the product information table ods_dim_product.
-
Log on to the Realtime Compute Management Console.
-
Click Console in the Actions column of your target workspace.
-
In the navigation pane on the left, choose . Copy and paste the following code to create source tables.
This example assumes you have already created a Paimon Catalog named paimon and are using the default database.
CREATE TABLE `paimon`.`default`.`ods_user_log` ( item_id INT NOT NULL, user_id INT NOT NULL, vtime TIMESTAMP(6), ds VARCHAR(10) NOT NULL ) PARTITIONED BY(ds) WITH ( 'bucket' = '4', -- Set the number of buckets to 4. 'bucket-key' = 'item_id' -- Specify the key used to determine bucket assignment. Rows with the same item_id go into the same bucket. ); CREATE TABLE `paimon`.`default`.`ods_dim_product` ( item_id INT NOT NULL, title VARCHAR(255), pict_url VARCHAR(255), brand_id INT, seller_id INT, PRIMARY KEY(item_id) NOT ENFORCED ) WITH ( 'bucket' = '4', 'bucket-key' = 'item_id' ); -
Click Run in the upper-right corner to create the tables.
-
In the navigation pane on the left, choose Data Management, click your Paimon Catalog, and then click Refresh to view the new tables.
-
-
Use the Faker connector for simulated data generation to generate mock data and write it into Paimon tables.
-
In the navigation pane on the left, choose .
-
Click New, select Blank stream draft, click Next, and then click Create.
-
Copy the following SQL statement into the SQL editor.
CREATE TEMPORARY TABLE `user_log` ( item_id INT, // Product ID user_id INT, // User ID vtime TIMESTAMP, ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd') ) WITH ( 'connector' = 'faker', -- Faker connector 'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}', -- Generate a random number between 0 and 1000 'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}', 'fields.vtime.expression'='#{date.past ''5'',''HOURS''}', -- Generate data up to 5 hours before the current time 'rows-per-second' = '3' -- Generate 3 rows per second ); CREATE TEMPORARY TABLE `dim_product` ( item_id INT NOT NULL, title VARCHAR(255), pict_url VARCHAR(255), brand_id INT, seller_id INT, PRIMARY KEY(item_id) NOT ENFORCED ) WITH ( 'connector' = 'faker', -- Faker connector 'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}', 'fields.title.expression'='#{book.title}', 'fields.pict_url.expression'='#{internet.domainName}', 'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}', 'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}', 'rows-per-second' = '3' -- Generate 3 rows per second ); BEGIN STATEMENT SET; INSERT INTO `paimon`.`default`.`ods_user_log` SELECT item_id, user_id, vtime, CAST(ds AS VARCHAR(10)) AS ds FROM `user_log`; INSERT INTO `paimon`.`default`.`ods_dim_product` SELECT item_id, title, pict_url, brand_id, seller_id FROM `dim_product`; END; -
Click Deploy in the upper-right corner to deploy the job.
-
In the navigation pane on the left, choose . Click Start in the Actions column of your target job, select Stateless start, and then click Start.
-
-
Query the simulated data.
In the navigation pane on the left, choose . Copy the following SQL statement into the SQL editor and click Run in the upper-right corner.
SELECT * FROM `paimon`.`default`.ods_dim_product LIMIT 10; SELECT * FROM `paimon`.`default`.ods_user_log LIMIT 10;
Step 2: Create materialized tables
This section builds a DWD-layer materialized table named dwd_user_log_product by widening source tables. It then builds downstream materialized tables based on dwd_user_log_product for business aggregation, completing the DWS layer.
-
Build the DWD layer of the data warehouse by creating the dwd_user_log_product materialized table.
-
In the navigation pane on the left, choose Data Management and click your target Paimon Catalog.
-
Click your target database (default in this example), then click Create Materialized Table. Copy the following SQL statement into the SQL editor and click Create.
-- DWD layer widening logic CREATE MATERIALIZED TABLE dwd_user_log_product( PRIMARY KEY (item_id) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR -- Refresh every hour AS SELECT l.ds, l.item_id, l.user_id, l.vtime, r.brand_id, r.seller_id FROM `paimon`.`default`.`ods_user_log` l INNER JOIN `paimon`.`default`.`ods_dim_product` r ON l.item_id = r.item_id;
-
-
Build the DWS layer by performing multi-dimensional business aggregations based on the dwd_user_log_product materialized table.
This topic shows how to create the dws_overall materialized table that aggregates hourly PV/UV counts by day. Follow the previous step to create the dws_overall materialized table.
// Aggregate PV/UV by day CREATE MATERIALIZED TABLE dws_overall( PRIMARY KEY(ds, hh) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR -- Refresh every hour AS SELECT ds, COALESCE(hh, 'day') AS hh, count(*) AS pv, count(distinct user_id) AS uv FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id FROM `paimon`.`default`.`dwd_user_log_product`) tmp GROUP BY GROUPING SETS(ds, (ds, hh));
Step 3: Update materialized tables
Start update
The data freshness in this example is set to 1 hour. After you click Start Update, data updates will lag behind base table updates by at least 1 hour.
-
In the navigation pane on the left, choose and search for your target materialized table.
-
Click the materialized table view and then click Start Update in the lower-right corner of the page.
Data backfill
Data backfill rewrites historical data into specific partitions or the entire table. Use it to correct stream processing results or immediately update data for batch jobs that have not yet reached their scheduled time.
Select the dwd_user_log_product materialized table view and click Manual Update in the lower-right corner. Enter the current date (for example, 20241216) as the partition name, check Cascade update downstream associated materialized tables, and click Confirm. In the confirmation dialog box, click Confirm to overwrite the data immediately.
For more information about data backfill, see Backfill historical data.
Modify data freshness
You can adjust data freshness to update materialized tables daily, hourly, minutely, or even secondly based on your business needs.
Update the freshness settings for both dwd_user_log_product and dws_overall materialized tables. Click the materialized table view and then click Modify Data Freshness in the lower-right corner. Set the freshness to minute-level for real-time updates.
For more information about modifying data freshness, see Modify data freshness.
Step 4: Query materialized tables
Data preview
You can preview the latest 100 rows of a materialized table.
-
In the navigation pane on the left, choose and search for your target materialized table.
-
Click the materialized table view and then click Details in the lower-right corner of the page.
-
On the Data Preview tab of the materialized table, click the Query icon.
Data query
In the navigation pane on the left, choose . Copy the following SQL statement into the SQL editor, select the code snippet, and click Run to query the dws_overall materialized table.
SELECT * FROM `paimon`.`default`.dws_overall ORDER BY hh;
References
-
For more information about materialized tables, see Materialized Table Management.
-
For more information about creating and using materialized tables, see Create and use materialized tables.