All Products
Search
Document Center

Realtime Compute for Apache Flink:Materialized Table (Build a Unified Stream-Batch Lakehouse)

Last Updated:Jun 20, 2026

This topic walks you through building a unified stream-batch lakehouse analytics pipeline using materialized tables. You will also learn how to switch from batch to stream processing by adjusting the freshness of a materialized table to enable real-time data updates.

What is a materialized table?

A materialized table is a new table type introduced in Flink SQL to simplify data pipelines for both batch and stream processing and deliver a unified development experience. When creating a materialized table, you do not need to declare fields or their types. Instead, specify the desired data freshness and a query statement. The Flink engine automatically infers the schema from the query and creates a corresponding data refresh pipeline to meet the specified freshness requirement. For more information, see Materialized Table Management.

Real-Time Lakehouse Pipeline Diagram

  1. Flink writes data from sources into Paimon to form the Operational Data Store (ODS) layer.

  2. Flink enriches and widens ODS-layer data through table joins and writes the result into a materialized table to form the Data Warehouse Detail (DWD) layer.

  3. Multiple materialized tables with different freshness settings perform multi-dimensional business aggregations to form the Data Warehouse Service (DWS) layer, which serves application queries.

Prerequisites

Step 1: Prepare test data

  1. Create a Paimon Catalog.

    Materialized tables are powered by Apache Paimon. You must create a Paimon Catalog with a metastore type of Filesystem. Skip this step if you already have one. For more information, see Create a Paimon Catalog.

    Create a Paimon Catalog

    1. Log on to the Realtime Compute Management Console.

    2. Click Console in the Actions column of your target workspace.

    3. In the navigation pane on the left, choose Data Management and click Create Catalog. Select Apache Paimon and click Next.

      Parameter description:

      Configuration item

      Description

      Remarks

      metastore

      Metastore type.

      This example uses filesystem as the metastore type.

      catalog name

      Name of the Paimon Catalog.

      Enter a custom English name. This example uses paimon.

      warehouse

      Data warehouse directory in OSS.

      Use the format oss://<bucket>/<object>. Where:

      • <bucket>: the name of your OSS bucket.

      • <object>: the path where your data is stored.

      Check your bucket and object names in the OSS Management Console.

      fs.oss.endpoint

      Connection address of OSS.

      If Flink and OSS are in the same region, use the private network endpoint. Otherwise, use the public network endpoint. For more information, see Regions and endpoints.

      fs.oss.accessKeyId

      AccessKey ID of your Alibaba Cloud account or RAM user with read and write permissions on OSS.

      For instructions on how to obtain it, see Create an AccessKey. To avoid exposing plaintext credentials, use variables instead. For more information, see Variable management.

      fs.oss.accessKeySecret

      AccessKey secret of your Alibaba Cloud account or RAM user with read and write permissions on OSS.

  2. Create the user behavioral log table ods_user_log and the product information table ods_dim_product.

    1. Log on to the Realtime Compute Management Console.

    2. Click Console in the Actions column of your target workspace.

    3. In the navigation pane on the left, choose Data Development > Data Query. Copy and paste the following code to create source tables.

      This example assumes you have already created a Paimon Catalog named paimon and are using the default database.
      CREATE TABLE `paimon`.`default`.`ods_user_log` (
        item_id INT NOT NULL,
        user_id INT NOT NULL,
        vtime TIMESTAMP(6),
        ds VARCHAR(10) NOT NULL
      ) 
      PARTITIONED BY(ds)
      WITH (
        'bucket' = '4',            -- Set the number of buckets to 4.
        'bucket-key' = 'item_id'   -- Specify the key used to determine bucket assignment. Rows with the same item_id go into the same bucket.
      );
      CREATE TABLE `paimon`.`default`.`ods_dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
      ) WITH (
        'bucket' = '4',
        'bucket-key' = 'item_id'
      );
    4. Click Run in the upper-right corner to create the tables.

    5. In the navigation pane on the left, choose Data Management, click your Paimon Catalog, and then click Refresh to view the new tables.

  3. Use the Faker connector for simulated data generation to generate mock data and write it into Paimon tables.

    1. In the navigation pane on the left, choose Data Development > ETL.

    2. Click New, select Blank stream draft, click Next, and then click Create.

    3. Copy the following SQL statement into the SQL editor.

      CREATE TEMPORARY TABLE `user_log` (
        item_id INT,  // Product ID
        user_id INT,  // User ID
        vtime TIMESTAMP,  
        ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd')
      ) WITH (
        'connector' = 'faker',    -- Faker connector
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',    -- Generate a random number between 0 and 1000
        'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}',
        'fields.vtime.expression'='#{date.past ''5'',''HOURS''}',           -- Generate data up to 5 hours before the current time
        'rows-per-second' = '3'   -- Generate 3 rows per second
       );
       CREATE TEMPORARY TABLE `dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
       ) WITH (
        'connector' = 'faker',    -- Faker connector
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',
        'fields.title.expression'='#{book.title}',
        'fields.pict_url.expression'='#{internet.domainName}',
        'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}',   
        'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}',
        'rows-per-second' = '3'        -- Generate 3 rows per second
       );
      BEGIN STATEMENT SET; 
      INSERT INTO `paimon`.`default`.`ods_user_log` 
        SELECT 
        item_id,
        user_id,
        vtime,
        CAST(ds AS VARCHAR(10)) AS ds
      FROM `user_log`;
      INSERT INTO `paimon`.`default`.`ods_dim_product`
        SELECT 
        item_id,
        title,
        pict_url,
        brand_id,
        seller_id
      FROM `dim_product`;
      END; 
    4. Click Deploy in the upper-right corner to deploy the job.

    5. In the navigation pane on the left, choose Operation Center > Job O&M. Click Start in the Actions column of your target job, select Stateless start, and then click Start.

  4. Query the simulated data.

    In the navigation pane on the left, choose Data Development > Data Query. Copy the following SQL statement into the SQL editor and click Run in the upper-right corner.

    SELECT * FROM `paimon`.`default`.ods_dim_product LIMIT 10;
    SELECT * FROM `paimon`.`default`.ods_user_log LIMIT 10;

Step 2: Create materialized tables

This section builds a DWD-layer materialized table named dwd_user_log_product by widening source tables. It then builds downstream materialized tables based on dwd_user_log_product for business aggregation, completing the DWS layer.

  1. Build the DWD layer of the data warehouse by creating the dwd_user_log_product materialized table.

    1. In the navigation pane on the left, choose Data Management and click your target Paimon Catalog.

    2. Click your target database (default in this example), then click Create Materialized Table. Copy the following SQL statement into the SQL editor and click Create.

      -- DWD layer widening logic
      CREATE MATERIALIZED TABLE dwd_user_log_product(
          PRIMARY KEY (item_id) NOT ENFORCED
      )
      PARTITIONED BY(ds)
      WITH (
        'partition.fields.ds.date-formatter' = 'yyyyMMdd'
      )
      FRESHNESS = INTERVAL '1' HOUR      -- Refresh every hour
      AS SELECT
        l.ds,
        l.item_id,
        l.user_id,
        l.vtime,
        r.brand_id,
        r.seller_id
      FROM `paimon`.`default`.`ods_user_log` l INNER JOIN `paimon`.`default`.`ods_dim_product` r
      ON l.item_id = r.item_id;
  2. Build the DWS layer by performing multi-dimensional business aggregations based on the dwd_user_log_product materialized table.

    This topic shows how to create the dws_overall materialized table that aggregates hourly PV/UV counts by day. Follow the previous step to create the dws_overall materialized table.

    // Aggregate PV/UV by day
    CREATE MATERIALIZED TABLE dws_overall(
        PRIMARY KEY(ds, hh) NOT ENFORCED
    )
    PARTITIONED BY(ds)
    WITH (
      'partition.fields.ds.date-formatter' = 'yyyyMMdd'
    )
    FRESHNESS = INTERVAL '1' HOUR   -- Refresh every hour
    AS SELECT 
        ds,
        COALESCE(hh, 'day') AS hh,
        count(*) AS pv,
        count(distinct user_id) AS uv
        FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id 
    FROM `paimon`.`default`.`dwd_user_log_product`) tmp
    GROUP BY GROUPING SETS(ds, (ds, hh));

Step 3: Update materialized tables

Start update

The data freshness in this example is set to 1 hour. After you click Start Update, data updates will lag behind base table updates by at least 1 hour.

  1. In the navigation pane on the left, choose Operation Center > Data Lineage and search for your target materialized table.

  2. Click the materialized table view and then click Start Update in the lower-right corner of the page.

Data backfill

Data backfill rewrites historical data into specific partitions or the entire table. Use it to correct stream processing results or immediately update data for batch jobs that have not yet reached their scheduled time.

Select the dwd_user_log_product materialized table view and click Manual Update in the lower-right corner. Enter the current date (for example, 20241216) as the partition name, check Cascade update downstream associated materialized tables, and click Confirm. In the confirmation dialog box, click Confirm to overwrite the data immediately.

For more information about data backfill, see Backfill historical data.

Modify data freshness

You can adjust data freshness to update materialized tables daily, hourly, minutely, or even secondly based on your business needs.

Update the freshness settings for both dwd_user_log_product and dws_overall materialized tables. Click the materialized table view and then click Modify Data Freshness in the lower-right corner. Set the freshness to minute-level for real-time updates.

For more information about modifying data freshness, see Modify data freshness.

Step 4: Query materialized tables

Data preview

You can preview the latest 100 rows of a materialized table.

  1. In the navigation pane on the left, choose Operation Center > Data Lineage and search for your target materialized table.

  2. Click the materialized table view and then click Details in the lower-right corner of the page.

  3. On the Data Preview tab of the materialized table, click the Query icon.

Data query

In the navigation pane on the left, choose Data Development > Data Query. Copy the following SQL statement into the SQL editor, select the code snippet, and click Run to query the dws_overall materialized table.

SELECT * FROM `paimon`.`default`.dws_overall ORDER BY hh;

References