All Products
Search
Document Center

Realtime Compute for Apache Flink:Quick start with materialized tables

Last Updated:Mar 06, 2026

This topic describes how to build a stream-batch-integrated data lakehouse with materialized tables. It also explains how to adjust the data freshness of a materialized table to switch from batch to stream processing and enable real-time data updates.

Introduction to materialized tables

A materialized table is a new table type in Flink SQL that simplifies batch and stream processing data pipelines and provides an integrated development experience. When you create a materialized table, you do not need to declare its fields or types. You only need to specify the data freshness and a query statement. The Flink engine automatically infers the schema of the materialized table from the query statement and creates a data pipeline to maintain the specified data freshness. For more information, see Materialized table management.

Real-time data lakehouse pipeline diagram

  1. Flink creates the Operational Data Store (ODS) layer by writing data from a data source to Paimon.

  2. Flink creates the DWD layer by joining and enriching data from the ODS layer and writing the result to a materialized table.

  3. You can create a data warehouse service (DWS) layer for application queries by building multiple materialized tables with different data freshness levels and performing multi-dimensional business statistics.

image

Prerequisites

  • A Realtime Compute for Apache Flink workspace has been created. For more information, see Activate Realtime Compute for Apache Flink.

  • If you use a Resource Access Management (RAM) user or RAM role, ensure that the RAM user or role has the required permissions to access the Realtime Compute for Apache Flink console. For more information, see Permission management.

Step 1: Prepare test data

  1. (Optional) Create a Paimon catalog.

    The materialized table feature is based on Apache Paimon. You must create a Paimon catalog with a metastore type of Filesystem . If you have already created one, you can skip this step. For more information, see Create a Paimon catalog.

    Create a Paimon catalog

    1. Log on to the Realtime Compute for Apache Flink console.

    2. In the Actions column of the target workspace, click Console.

    3. In the left navigation pane, select Data Management and click Create Catalog. Then, select Apache Paimon and click Next.

      image

      Parameter descriptions:

      Parameter

      Description

      Notes

      metastore

      The type of metastore.

      In this example, Filesystem is used.

      catalog name

      The name of the Paimon catalog.

      Enter a custom name. This example uses paimon.

      warehouse

      The data warehouse directory in Object Storage Service (OSS).

      The format is oss://<bucket>/<object>, where:

      • <bucket>: The name of your OSS bucket.

      • `` is the path where your data is stored.

      You can view your bucket and object names in the OSS console.

      fs.oss.endpoint

      The endpoint of the OSS service.

      If Flink and OSS are in the same region, use the internal endpoint. Otherwise, use the public endpoint. For more information, see Regions and endpoints.

      fs.oss.accessKeyId

      The AccessKey ID of the Alibaba Cloud account or RAM user that has read and write permissions on OSS.

      For more information about how to obtain an AccessKey pair, see Create an AccessKey pair. To prevent your AccessKey pair from being exposed in plaintext, we recommend that you use variables. For more information, see Manage variables.

      fs.oss.accessKeySecret

      The AccessKey secret of the Alibaba Cloud account or RAM user that has read and write permissions on OSS.

  2. Create the ods_user_log and ods_dim_product tables.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. For the target workspace, click Console in the Actions column.

    3. In the navigation pane on the left, select Data Studio > Scripts. Copy and paste the following code to create the source table.

      This example uses a Paimon catalog named paimon and the default database.
      CREATE TABLE `paimon`.`default`.`ods_user_log` (
        item_id INT NOT NULL,
        user_id INT NOT NULL,
        vtime TIMESTAMP(6),
        ds VARCHAR(10) NOT NULL
      ) 
      PARTITIONED BY(ds)
      WITH (
        'bucket' = '4',            -- Specify 4 buckets.
        'bucket-key' = 'item_id'   -- Specify the bucket key. Data with the same item_id is placed in the same bucket.
      );
      
      CREATE TABLE `paimon`.`default`.`ods_dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
      ) WITH (
        'bucket' = '4',
        'bucket-key' = 'item_id'
      );
    4. Click Run in the upper-right corner to create the corresponding data tables.

    5. In the left navigation pane, select Data Management. Then, click the corresponding Paimon Catalog and click Refresh to view the new tables.

  3. Use the Faker data generator connector to generate simulated data and write the data to the Paimon tables.

    1. In the left navigation pane, select Data Studio > ETL.

    2. Click New, select Blank Stream Draft, click Next, and then click Create.

    3. Copy the following SQL statement to the editor.

      CREATE TEMPORARY TABLE `user_log` (
        item_id INT,  // Product ID
        user_id INT,  // User ID
        vtime TIMESTAMP,  
        ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd')
      ) WITH (
        'connector' = 'faker',    -- Faker connector
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',    -- Generate a random number between 0 and 1000.
        'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}',
        'fields.vtime.expression'='#{date.past ''5'',''HOURS''}',           -- Generate data from the last 5 hours based on the current date and time.
        'rows-per-second' = '3'   -- Generate 3 rows per second.
       );
        
       CREATE TEMPORARY TABLE `dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
       ) WITH (
        'connector' = 'faker',    -- Faker connector
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',
        'fields.title.expression'='#{book.title}',
        'fields.pict_url.expression'='#{internet.domainName}',
        'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}',   
        'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}',
        'rows-per-second' = '3'        -- Generate 3 rows per second.
       );
      
      BEGIN STATEMENT SET; 
      
      INSERT INTO `paimon`.`default`.`ods_user_log` 
        SELECT 
        item_id,
        user_id,
        vtime,
        CAST(ds AS VARCHAR(10)) AS ds
      FROM `user_log`;
      INSERT INTO `paimon`.`default`.`ods_dim_product`
        SELECT 
        item_id,
        title,
        pict_url,
        brand_id,
        seller_id
      FROM `dim_product`;
      
      END; 
    4. In the upper-right corner, click Deploy.

    5. In the navigation pane on the left, click Operation Center > Deployments. For the target deployment, click Start in the Actions column, select Stateless Start, and then click Start.

  4. Query the simulated data.

    In the left navigation pane, select Data Development > Data Query. Copy the following SQL statement into the SQL editor and click Run in the upper-right corner.

    SELECT * FROM `paimon`.`default`.ods_dim_product LIMIT 10;
    
    SELECT * FROM `paimon`.`default`.ods_user_log LIMIT 10;

    image

Step 2: Create a materialized table

This section describes how to create the `dwd_user_log_product` materialized table for the DWD layer by joining the source tables. You will then build the DWS layer by creating downstream materialized tables based on the `dwd_user_log_product` table to perform business analytics.

  1. Build the DWD layer of the data warehouse by creating the dwd_user_log_product materialized table.

    1. In the navigation pane on the left, choose Data Management and click the target Paimon Catalog.

    2. Click the target database ("default" in this example) and then Create Materialized Table. Copy the following SQL statement into the SQL editor and click Create.

      -- DWD layer widening logic
      CREATE MATERIALIZED TABLE dwd_user_log_product(
          PRIMARY KEY (item_id) NOT ENFORCED
      )
      PARTITIONED BY(ds)
      WITH (
        'partition.fields.ds.date-formatter' = 'yyyyMMdd'
      )
      FRESHNESS = INTERVAL '1' HOUR      -- Refresh every 1 hour.
      AS SELECT
        l.ds,
        l.item_id,
        l.user_id,
        l.vtime,
        r.brand_id,
        r.seller_id
      FROM `paimon`.`default`.`ods_user_log` l INNER JOIN `paimon`.`default`.`ods_dim_product` r
      ON l.item_id = r.item_id;
  2. Build the DWS layer of the data warehouse and perform multi-dimensional business analytics based on the dwd_user_log_product materialized table.

    This topic uses the creation of the `dws_overall` materialized table as an example. This table calculates the daily page views (PVs) and unique visitors (UVs) per hour. You can create the `dws_overall` materialized table by following the steps in the previous section.

    // Calculate daily PVs and UVs.
    CREATE MATERIALIZED TABLE dws_overall(
        PRIMARY KEY(ds, hh) NOT ENFORCED
    )
    PARTITIONED BY(ds)
    WITH (
      'partition.fields.ds.date-formatter' = 'yyyyMMdd'
    )
    FRESHNESS = INTERVAL '1' HOUR   -- Refresh every 1 hour.
    AS SELECT 
        ds,
        COALESCE(hh, 'day') AS hh,
        count(*) AS pv,
        count(distinct user_id) AS uv
        FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id 
    FROM `paimon`.`default`.`dwd_user_log_product`) tmp
    GROUP BY GROUPING SETS(ds, (ds, hh));

Step 3: Update the materialized table

Start updates

In this example, the data freshness is 1 hour. After you click Start Update, the data update will lag behind the base table by at least 1 hour.

  1. In the navigation pane on the left, click Data Lineage and then search for the target materialized table.

    image

  2. Click the corresponding materialized table view and then click Start Update in the lower-right corner of the page.

Backfill data

Data backfilling lets you rewrite historical data for a specific partition or an entire table. This can be used to correct stream processing results. For batch jobs that have not yet reached their scheduled time, you can also backfill data to immediately write and update it.

Select the dwd_user_log_product materialized table view and click Manual Update in the lower-right corner of the page. In the dialog box that appears, enter the runtime date as the partition name, such as 20241216. Select the Cascade Update Downstream Associated Materialized Tables checkbox and click Confirm. In the confirmation dialog box, click Confirm again to overwrite the corresponding data and start the update.

image

For more information about how to backfill data, see Backfill historical data.

Modify data freshness

You can change the data freshness to update the materialized table daily, hourly, minutely, or every second, as needed.

To change the data freshness of the dwd_user_log_product and dws_overall materialized tables, click the view for the target table. Then, click Modify Data Freshness in the lower-right corner of the page and set the data freshness to the minute level for real-time updates.

image

For more information about how to modify data freshness, see Modify data freshness.

Step 4: Query the materialized table

Preview data

You can preview the 100 most recent rows of data in the materialized table.

  1. In the navigation pane on the left, you can click Data Lineage and search for the target materialized table.

  2. After clicking the target materialized table view, click Details in the lower-right corner of the page.

  3. On the Data Preview tab, click the Query icon.

    image

Query data

In the navigation pane on the left, go to Data Development > Data Query. Copy the following SQL statement to the SQL editor, select the code snippet, and then click Run to query the dws_overall materialized table.

SELECT * FROM `paimon`.`default`.dws_overall ORDER BY hh;

image

References