All Products
Search
Document Center

Realtime Compute for Apache Flink:Create and use materialized tables

Last Updated:Nov 11, 2025

This topic describes how to create and use materialized tables, covering key aspects such as backfilling historical data in materialized tables, adjusting their data freshness, and examining their data lineage.

Prerequisites and limitations

  • The engine version is Ververica Runtime (VVR) 8.0.10 or later.

  • You have an Apache Paimon catalog of the filesystem metastore type.

  • You have the permissions to develop and deploy jobs in a namespace. For more information, see Grant permissions on namespaces.

  • Temporary objects, such as temporary tables, functions, and views, are not supported.

Create a materialized table

Syntax

CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
-- Define the primary key constraint.
[([CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED)]

[COMMENT table_comment]
-- Configure the partition key.
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
-- Configure connector options.
[WITH (key1=val1, key2=val2, ...)]
-- Configure the data freshness.
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
-- Configure the data refresh mode.
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS  <select_statement>

Parameters

Parameter

Required

Description

FRESHNESS

Yes

The freshness of the data in the materialized table. Data freshness specifies the maximum delay time of data updates between materialized tables and base tables.

Note
  • If the base table is a materialized table, make sure that the data freshness of the downstream table is 1 to N times that of the upstream table. N is a positive integer.

  • Data freshness cannot exceed one day.

AS <select_statement>

Yes

This clause is used to define the query for populating materialized table data. An upstream table can be a materialized table, table, or view. The SELECT statement supports all Flink SQL queries.

PRIMARY KEY

No

Defines a set of optional columns that uniquely identify each row in a table. The primary key fields must not be null.

PARTITIONED BY

No

Defines a group of optional columns that are used to partition the materialized table.

WITH options

No

Defines table properties and time format parameters for partitioning columns required to create a materialized table.

For example, define the time format parameter for a partitioning column as WITH ('partition.fields.#.date-formatter' = 'yyyyMMdd'). For more information, see the examples provided in this topic.

REFRESH_MODE

No

Specifies the refresh mode of a materialized table. The explicitly specified refresh mode takes precedence over the automatically inferred one. Valid values:

  • CONTINUOUS: A streaming job incrementally updates the data in a materialized table. The data in the downstream storage is immediately visible or is visible only after checkpointing is complete.

  • FULL: The workflow periodically updates all data in a materialized table. The engine determines whether to update data incrementally or in full. For more information, see Incrementally update a materialized table. The data refresh interval matches the data freshness. By default, all data in the materialized table is overwritten. If a partitioning column exists, you can choose to refresh the latest partition or update all partitions.

Procedure

  1. Log on to the management console of Realtime Compute for Apache Flink.

  2. Click Console in the Actions column of the target workspace.

  3. In the left-side navigation pane, click Catalogs. In the Catalogs pane, click the target Apache Paimon catalog.

  4. Click the target database. On the page that appears, click Create Materialized Table.

    In this example, a base table named orders is used. In the table, the primary key is order_id, the category name is order_name, and the date field is ds. The following example shows how to create materialized tables based on the orders table.

    • Create a materialized table named mt_order based on the orders table. Retrieve all columns from the orders table as materialized table fields, and set the data freshness to 5 seconds.

      CREATE MATERIALIZED TABLE mt_order
      FRESHNESS = INTERVAL '5' SECOND
      AS
      SELECT * FROM `paimon`.`db`.`orders`
      ;
    • Create a materialized table named mt_id based on the materialized table mt_order. Retrieve the order_id and ds fields as table fields, specify the order_id field as the primary key and the ds field as the partitioning column, and set the data freshness to 30 minutes.

      CREATE MATERIALIZED TABLE mt_id (
       PRIMARY KEY (order_id) NOT ENFORCED
      )
      PARTITIONED BY(ds)
      FRESHNESS = INTERVAL '30' MINUTE
      AS
      SELECT order_id,ds FROM mt_order
      ;
    • Create a materialized table named mt_ds based on the materialized table mt_order. Specify the date-formatter time format for the ds partitioning column. Each time the materialized table mt_ds is scheduled, the value obtained by subtracting the data freshness from the scheduled time is converted into the corresponding ds partition value. For example, if the data freshness is set to 1 hour and the scheduled time is 2024-01-01 00:00:00, the calculated ds value is 20231231. In this case, only the data in the partition ds = '20231231' is refreshed. If the scheduled time is 2024-01-01 01:00:00 and the calculated ds value is 20240101, the data in the partition ds = '20240101' is refreshed.

      CREATE MATERIALIZED TABLE mt_ds
      PARTITIONED BY(ds)
      WITH (
          'partition.fields.ds.date-formatter' = 'yyyyMMdd'
      )
      FRESHNESS = INTERVAL '1' HOUR
      AS
      SELECT order_id,order_name,ds FROM mt_order
      ;
      Note
      • The # field in partition.fields.#.date-formatter must be a valid partitioning field of the string type.

      • partition.fields.#.date-formatter specifies the time partition format of the materialized table. # is the name of the partitioning column of the string type, which allows the system to refresh the data of the specified partition.

  5. Start or stop the update of a materialized table.

    1. Click the materialized table under the target catalog.

    2. Click Start or Stop in the upper-right corner of the details page to start or stop the update.

      Note

      If a materialized table is updating when you click Stop, the update will not halt until the current update round is complete.

  6. Examine the job.

    Select the Table Schema tab of the details page of the materialized table. In the Basic Information section, click the ID in the Latest Job or Workflow field.

Modify the query of a materialized table

Limits

  • Only new materialized tables powered by VVR 11.1 or later are supported.

  • Only adding columns and modifying computation logic are supported operations. See the table below for details:

    Operation

    Supported

    Description

    Add a column

    Supported

    Appends a new column to the materialized table while maintaining the column order.

    Modify computation logic

    Supported

    Modifies the computation logic of existing columns, without changing column names and types.

    Change the column order

    Not supported

    To change the column order, drop the materialized table and create a new one with the desired column order.

    Modify column names or types

    Not supported

    To modify the column names or types, drop the materialized table and create a new one with the desired column names or types.

Procedure

  1. Click Edit Table and modify the query. Sample code:

    ALTER MATERIALIZED TABLE `paimon`.`default`.`mt-orders`
        AS
        SELECT
          *,
          price * quantity AS total_price
        FROM orders
        WHERE price * quantity > 1000
    ;
  2. Click Preview to check the modifications.

    image

  3. Click OK. Select the Table Schema tab to check the added columns and modified query.

Important

If downstream tables rely on dynamic parsing for synchronization, such as SELECT * FROM queries or automatic field mapping, adding new columns may result in job failures or unmatched data format errors. We recommend using fixed columns rather than dynamic parsing for downstream tables and promptly updating their schemas.

Incrementally update a materialized table

Limits

Only Realtime Compute for Apache Flink that uses VVR 8.0.11 or later allows you to update incremental data in a materialized table.

Update mode of a materialized table

Materialized tables provide the following update modes: streaming update mode, full batch update mode, and incremental batch update mode.

The data freshness of a materialized table defines whether the table is in streaming mode or batch mode. If the data freshness of a materialized table is less than 30 minutes, the table is in streaming mode. If the data freshness of a materialized table is greater than or equal to 30 minutes, the table is in batch mode. In batch mode, the engine automatically performs full updates or incremental updates. Incremental updates only calculate the incremental data since the last update and merge the results to the materialized table. Full updates calculate the data of the entire table or the entire partition and overwrite the data in the materialized table. In batch mode, the engine preferentially performs incremental updates. Full updates are performed only when incremental updates cannot meet the requirements of the materialized table.

Conditions for incremental updates

Incremental updates are performed only when the materialized table meets the following conditions.

  • When you define a materialized table, you do not configure the partition.fields.#.date-formatter parameter to specify the time format of the partitioning column.

  • You do not specify a primary key for the source table.

  • In the following scenarios, query statements in a materialized table support incremental updates.

    SQL statement

    Supported incremental update

    SELECT

    Columns can be queried and scalar function expressions, including user-defined functions, are supported. Aggregate functions are not supported.

    FROM

    Table name queries or subqueries are supported.

    WITH

    Common table expressions (CTEs) are supported.

    WHERE

    Scalar function expressions, including user-defined functions, are supported in filter conditions. Subqueries are not supported in filter conditions. For example, subquery clauses in the WHERE [NOT] EXISTS <subquery> and WHERE <column name> [NOT] IN <subquery> formats are not supported.

    UNION

    Only UNION ALL is supported.

    JOIN

    • INNER JOIN is supported.

    • LEFT JOIN, RIGHT JOIN, and FULL [OUTER] JOIN are not supported, except for the following scenarios in which Lateal Join and Lookup Join are used.

      • [LEFT [OUTER]] JOIN LATERAL and table function expressions, including user-defined functions, are supported.

      • Only A [LEFT [OUTER]] JOIN B FOR SYSTEM_TIME AS OF PROCTIME() is supported in Lookup Join.

    Note
    • JOIN statements that do not use the JOIN keyword, such as SELECT * FROM a, b WHERE a.id = b.id, are supported.

    • If you use INNER JOIN to perform incremental updates, the full data of the two source tables are still read.

    GROUP BY

    Not supported.

Examples

Example 1: Use scalar functions to process data in the orders source table.

CREATE MATERIALIZED TABLE mt_shipped_orders (
    PRIMARY KEY (order_id) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
SELECT 
    order_id,
    COALESCE(customer_id, 'Unknown') AS customer_id,
    CAST(order_amount AS DECIMAL(10, 2)) AS order_amount,
    CASE 
        WHEN status = 'shipped' THEN 'Completed'
        WHEN status = 'pending' THEN 'In Progress'
        ELSE 'Unknown'
    END AS order_status,
    DATE_FORMAT(order_ts, 'yyyyMMdd') AS order_date,
    UDSF_ProcessFunction(notes) AS notes
FROM 
    orders
WHERE
    status = 'shipped';

Example 2: Use lateral join and lookup join to enrich the orders table.

CREATE MATERIALIZED TABLE mt_enriched_orders (
    PRIMARY KEY (order_id, order_tag) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
WITH o AS (
    SELECT
        order_id,
        product_id,
        quantity,
        proc_time,
        e.tag AS order_tag
    FROM 
        orders,
        LATERAL TABLE(UDTF_StringSplitFunction(tags, ',')) AS e(tag))
SELECT 
    o.order_id,
    o.product_id,
    p.product_name,
    p.category,
    o.quantity,
    p.price,
    o.quantity * p.price AS total_amount,
    order_tag
FROM o 
LEFT JOIN 
    product_info FOR SYSTEM_TIME AS OF PROCTIME() AS p
ON 
    o.product_id = p.product_id;

Backfill historical data

Traditionally, you need a separate batch job to correct stream processing results with all of the data from the previous day. With the materialized table, you can intuitively backfill data using the current job. This capability of materialized tables unifies batch and streaming processing, enhancing development and O&M efficiency.

  1. In the Catalogs pane, click the target materialized table under your Apache Paimon catalog.

  2. On the Data Information tab, refresh the data.

    If you have defined the partition key when creating the materialized table, the table is a partitioned table.

    Partitioned table

    In the upper-right corner of the Partitions section on the Table Information tab, click Trigger Update the first time you perform data backfilling or if no partitions are available. If a partition exists, find the corresponding partition and click Refresh in the Actions column.

    image

    image

    Parameters:

    • ds: the partition field of the table. For example, if you enter 20241201 in the ds field, all data with ds=20241201 is backfilled.

    • Task Name: the name of the data backfilling task.

    • Update Range (optional): specifies whether to cascade updates to the downstream materialized tables. You can use the partitioned table as the starting point to update all materialized tables. The maximum number of downstream layers is 6.

      Note
      • If you update a partitioned table, the partitioning columns of the downstream materialized table must be the same as the those of the partitioned table. Otherwise, the update operation fails.

      • If a materialized table fails to be updated, all downstream nodes fail.

    • Deployment Target: the object on which the table is deployed. You can select queues and session clusters. By default, default-queue is used.

    Non-partitioned table

    Select he Data Information tab, view the details of the materialized table, and click Refresh.

    image

    Parameters:

    • Task Name: the name of the task for data backfilling.

    • Update Range: This parameter is not supported for non-partitioned tables.

      Note
      • All data in downstream tables will be refreshed.

      • If a materialized table fails to be updated, all downstream nodes fail.

      • If the system determines that the workload is streaming based on the data freshness of the base table and that the table is a non-partitioned table, cascade update is not supported.

    • Deployment Target: the object on which the table is deployed. You can select queues and session clusters. By default, default-queue is used.

Change data freshness

  1. Click the materialized table database under the desired catalog and click the materialized table that you want to view.

  2. In the upper-right corner of the details page of the materialized table, click Edit Freshness.

    • If the materialized table is a table that does not have a primary key, you cannot modify the streaming and batch properties of the task. For example, if you change the data freshness from 2 seconds to 1 hour, Realtime Compute for Apache Flink converts a streaming job into a batch one. If you change the data freshness from 1 hour to 2 seconds, Realtime Compute for Apache Flink converts a batch job into a streaming one. You cannot perform such operations. If the data freshness is less than 30 minutes, the job is classified as streaming. If the data freshness is equal to or greater than 30 minutes, the job is batch.

    • If the base table is a materialized table, make sure that the data freshness of the downstream table is 1 to N times that of the upstream table. N is a positive integer.

    • Data freshness cannot exceed 1 day.

View data lineage

On the Data Lineage page, view the data lineage among all materialized tables. You can start or stop the update of the materialized table and change its data freshness. You can also click Details in the upper-right corner of the page to go to the details page of the materialized table.

image

References