All Products
Search
Document Center

Realtime Compute for Apache Flink:Create and use materialized tables

Last Updated:Jan 04, 2026

This topic describes how to create a materialized table. It also covers how to backfill historical data, modify data freshness, and view the data lineage of the materialized table.

Limits

  • This feature is supported only in Ververica Runtime (VVR) 8.0.10 and later.

  • This feature supports only Paimon Catalogs that use a Filesystem or DLF metastore. You cannot use custom Paimon Catalogs to create materialized tables.

  • You must have permission to develop and deploy jobs. For more information, see Authorize the development console.

  • References to temporary objects, such as temporary tables, temporary functions, and temporary views, are not supported.

Create a materialized table

Syntax

CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
-- Primary key constraint
[([CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED)]

[COMMENT table_comment]
-- Partition key
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
-- WITH options
[WITH (key1=val1, key2=val2, ...)]
-- Data freshness
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
-- Refresh mode
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS  <select_statement>

Parameters

Parameter

Required

Description

FRESHNESS

Yes

Specifies the data freshness of the materialized table. This defines the maximum delay for data updates from the source table.

Note
  • If the base table is a materialized table, make sure that the data freshness of the downstream table is 1 to N times that of the upstream table, where N is a positive integer.

  • The data freshness cannot exceed one day.

AS <select_statement>

Yes

Defines the query that populates the materialized table. The upstream table can be a materialized table, a regular table, or a view. The SELECT statement supports all Flink SQL queries.

PRIMARY KEY

No

Defines an optional set of columns that uniquely identifies each row in the table. The specified columns must not be null.

PARTITIONED BY

No

Defines an optional set of columns used to partition the materialized table.

WITH Options

No

Defines table properties and time format parameters for partition fields that are required to create the materialized table.

For example, the time format parameter for a partition field is WITH ('partition.fields.#.date-formatter' = 'yyyyMMdd'). For more information about how to use the parameters, see the following examples.

REFRESH_MODE

No

Specifies the refresh mode of the materialized table. The specified refresh mode has a higher priority than the mode that the framework automatically infers from the data freshness. This lets you meet specific scenario requirements.

  • CONTINUOUS: A stream job incrementally updates the materialized table. The downstream data is visible immediately or after a checkpoint is completed.

  • FULL: A workflow periodically triggers an update of the materialized table. The engine determines whether to perform a full or incremental update. For more information, see Incremental update of a materialized table. The data refresh cycle matches the data freshness. By default, the update overwrites data at the table level. If partition fields exist, you can choose to refresh only the latest partition or all partitions.

Procedure

  1. Log on to the Realtime Compute for Apache Flink console.

  2. In the Actions column of the target workspace, click Console.

  3. In the navigation pane on the left, choose Data Management and click the target Paimon Catalog.

  4. Click the target database, and then click Create Materialized Table.

    Assume you have a base table named `orders` with a primary key `order_id`, a category name `order_name`, and a date field `ds`. The following examples show how to create materialized tables based on this table:

    • Create a materialized table named `mt_order` based on the `orders` table. The new table includes all fields from the source table and has a data freshness of 5 seconds.

      CREATE MATERIALIZED TABLE mt_order
      FRESHNESS = INTERVAL '5' SECOND
      AS
      SELECT * FROM `paimon`.`db`.`orders`
      ;
    • Create a materialized table named `mt_id` based on the `mt_order` table. The new table includes the `order_id` and `ds` fields. Set `order_id` as the primary key, `ds` as the partition field, and the data freshness to 30 minutes.

      CREATE MATERIALIZED TABLE mt_id (
       PRIMARY KEY (order_id) NOT ENFORCED
      )
      PARTITIONED BY(ds)
      FRESHNESS = INTERVAL '30' MINUTE
      AS
      SELECT order_id,ds FROM mt_order
      ;
    • Create a materialized table named `mt_ds` based on the `mt_order` table. Specify the date-formatter (time format) for the ds partition field. Each time a job is scheduled, the system subtracts the freshness value from the scheduled time and converts the result to the corresponding ds partition value. For example, if the data freshness is 1 hour and the scheduled time is 2024-01-01 00:00:00, the calculated `ds` value is `20231231`. Only data in the partition ds = '20231231' is refreshed. If the scheduled time is 2024-01-01 01:00:00, the calculated `ds` value is `20240101`, and data in the partition ds = '20240101' is refreshed.

      CREATE MATERIALIZED TABLE mt_ds
      PARTITIONED BY(ds)
      WITH (
          'partition.fields.ds.date-formatter' = 'yyyyMMdd'
      )
      FRESHNESS = INTERVAL '1' HOUR
      AS
      SELECT order_id,order_name,ds FROM mt_order
      ;
      Note
      • In partition.fields.#.date-formatter, the `#` must be a valid partition field of the string type.

      • The partition.fields.#.date-formatter parameter specifies the time partition format for the materialized table. The `#` represents the name of a partition field of the string type. This parameter tells the system which partition to refresh.

  5. Start or stop updating the materialized table.

    1. Click the target materialized table under the catalog.

    2. In the upper-right corner, click Start Update or Stop Update.

      Note

      If an update is in progress when you click Stop Update, the process stops after the current update cycle is complete.

  6. View the details of the materialized table job.

    On the Table Schema Details tab, view the Basic Information section. Click the job ID next to Data Update Job or Workflow to view the details.

Modify the query of a materialized table

Limits

  • You can modify the query only for materialized tables created in Ververica Runtime (VVR) 11.1 or later.

  • When you modify a query, you can only append columns and modify calculation logic. You cannot change the order of existing columns or modify their definitions.

    Operation Type

    Support

    Description

    Append a new column

    Support

    You can add a new column to the end of the schema while keeping the existing column order.

    Modify the calculation logic of an existing column (without changing the column name and type)

    Support

    For example, you can modify the calculation logic, but the column name and data type must remain the same.

    Change the order of existing columns

    No

    The column order is fixed. You must delete and re-create the materialized table.

    Modify the name or data type of an existing column

    No

    You must delete and re-create the materialized table.

Modification example

  1. Click Edit Table and modify the query. The following code provides an example:

    ALTER MATERIALIZED TABLE `paimon`.`default`.`mt-orders`
        AS
        SELECT
          *,
          price * quantity AS total_price
        FROM orders
        WHERE price * quantity > 1000
    ;
  2. Click Preview to view the changes.

    image

  3. Click Confirm. You can then view the new column and query logic on the Table Schema Details tab.

Important

Adding a field usually does not affect downstream nodes. However, if a downstream node uses dynamic parsing, such as SELECT * or automatic field mapping, the synchronization task might fail or report a data format mismatch error. We recommend that you avoid dynamic parsing and use fixed fields instead. When the upstream table changes, promptly update the downstream table schema.

Incremental update of a materialized table

Limits

This feature is supported only in Ververica Runtime (VVR) 8.0.11 and later.

Update modes of a materialized table

Materialized tables have three update modes: stream update, full batch update, and incremental batch update.

The stream or batch mode of a materialized table is determined by its data freshness. A freshness of less than 30 minutes indicates stream mode, while 30 minutes or more indicates batch mode. In batch mode, the engine automatically decides whether to perform a full or incremental update. An incremental update calculates only the data that has changed since the last update and merges it into the materialized table. A full update calculates the data for the entire table or partition and overwrites the data in the materialized table. In batch mode, the engine prioritizes incremental updates and uses full updates only when incremental updates are not supported for the materialized table.

Conditions for incremental updates

Incremental updates are used only when the materialized table meets the following conditions:

  • The partition.fields.#.date-formatter parameter is not configured for any partition field in the materialized table definition.

  • The source table does not have a primary key.

  • The query in the materialized table supports incremental updates in the following cases:

    SQL Statement

    Support

    SELECT

    Supports selecting columns and scalar function expressions, including user-defined functions (UDFs). Aggregate functions are not supported.

    FROM

    Supports table names or subqueries.

    WITH

    Supports common table expressions (CTEs).

    WHERE

    Supports filter conditions that include various scalar function expressions, including UDFs. Subqueries, such as `WHERE [NOT] EXISTS <subquery>` and `WHERE <column_name> [NOT] IN <subquery>`, are not supported.

    UNION

    Only UNION ALL is supported.

    JOIN

    • INNER JOIN is supported.

    • LEFT/RIGHT/FULL [OUTER] JOIN is not supported, except for the LATERAL JOIN and Lookup Join cases described below.

    • [LEFT [OUTER]] JOIN LATERAL with table function expressions, including user-defined table-valued functions (UDTFs), is supported.

    • For Lookup Join, only `A [LEFT [OUTER]] JOIN B FOR SYSTEM_TIME AS OF PROCTIME()` is supported.

    Note
    • JOINs without the `JOIN` keyword are supported, such as `SELECT * FROM a, b WHERE a.id = b.id`.

    • Currently, an incremental calculation with `INNER JOIN` still reads the full data from both source tables.

    GROUP BY

    Not supported.

Incremental update examples

Example 1: Process data in the source table `orders` using scalar functions.

CREATE MATERIALIZED TABLE mt_shipped_orders (
    PRIMARY KEY (order_id) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
SELECT 
    order_id,
    COALESCE(customer_id, 'Unknown') AS customer_id,
    CAST(order_amount AS DECIMAL(10, 2)) AS order_amount,
    CASE 
        WHEN status = 'shipped' THEN 'Completed'
        WHEN status = 'pending' THEN 'In Progress'
        ELSE 'Unknown'
    END AS order_status,
    DATE_FORMAT(order_ts, 'yyyyMMdd') AS order_date,
    UDSF_ProcessFunction(notes) AS notes
FROM 
    orders
WHERE
    status = 'shipped';

Example 2: Enrich data in the source table `orders` using a LATERAL JOIN and a Lookup Join.

CREATE MATERIALIZED TABLE mt_enriched_orders (
    PRIMARY KEY (order_id, order_tag) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
WITH o AS (
    SELECT
        order_id,
        product_id,
        quantity,
        proc_time,
        e.tag AS order_tag
    FROM 
        orders,
        LATERAL TABLE(UDTF_StringSplitFunction(tags, ',')) AS e(tag))
SELECT 
    o.order_id,
    o.product_id,
    p.product_name,
    p.category,
    o.quantity,
    p.price,
    o.quantity * p.price AS total_amount,
    order_tag
FROM o 
LEFT JOIN 
    product_info FOR SYSTEM_TIME AS OF PROCTIME() AS p
ON 
    o.product_id = p.product_id;

Backfill historical data

Previously, after running a stream job, you had to develop a separate batch job to reconcile the results with the full dataset from the previous day. With materialized tables, you can directly select a historical data partition to backfill data. This change reduces development and O&M costs and provides unified stream and batch processing.

  1. Click the target materialized table under the catalog.

  2. On the Data Information tab, backfill the data.

    If you declared partition fields when you created the materialized table, it is a partitioned table. Otherwise, it is a non-partitioned table.

    Partitioned table

    View the Data Partitions. If this is the first backfill or the required partition does not exist, click Manual Update. If partitions exist, select the partition to backfill and click Backfill.

    image

    image

    Parameters:

    • Partition Field: The partition field of the table. For example, if you enter `20241201`, all data where `ds=20241201` is backfilled.

    • Task Name: The name of the data backfill task.

    • Update Scope (Optional): Specifies whether to cascade updates to associated downstream materialized tables. The update cascades from the current table through all materialized tables in its data lineage. The maximum number of downstream layers is 6.

      Note
      • For partitioned table updates, the partition fields of the downstream materialized tables must be identical to those of the starting table. Otherwise, the update operation fails.

      • If a materialized table in the lineage fails to update, all downstream nodes also fail.

    • Deployment Target: You can select a queue or a session cluster. The default is `default-queue`.

    Non-partitioned table

    View the Data Status and click Backfill.

    image

    Parameters:

    • Task Name: The name of the data backfill task.

    • Update Scope: This parameter is not available for non-partitioned tables.

      Note
      • During the update, a full refresh is performed on downstream data.

      • If a materialized table in the lineage fails to update, all downstream nodes also fail.

      • If the system determines that the starting table is a non-partitioned stream task based on its freshness, cascade updates are not supported.

    • Deployment Target: You can select a queue or a session cluster. The default is `default-queue`.

  3. Scheduled and batch backfills.

    You can use Task Orchestration to create a materialized table workflow for recurring schedules. This enables scheduled backfills. You can also use the data backfill feature of the workflow to select a time range for batch data backfills.

Modify data freshness

  1. In the catalog, click the materialized table database, and then click the target materialized table.

  2. In the upper-right corner, click Modify Data Freshness.

    • If the materialized table does not have a primary key, you cannot change the job's processing mode between stream and batch. For example, you cannot change the data freshness from 2 seconds (stream) to 1 hour (batch), or vice versa. A job is a stream job if its freshness is less than 30 minutes, and a batch job if its freshness is 30 minutes or more.

    • If the base table is a materialized table, the data freshness of the downstream table must be an integer multiple of the upstream table's data freshness.

    • The data freshness cannot exceed one day.

View data lineage

The data lineage page displays the lineage relationships between all materialized tables. From this page, you can also perform operations such as Start/Stop Update and Modify Data Freshness on the materialized tables. To view the details page for a specific materialized table, click Details.

image

References