This topic describes how to create a materialized table. It also covers how to backfill historical data, modify data freshness, and view the data lineage of the materialized table.
Limits
This feature is supported only in Ververica Runtime (VVR) 8.0.10 and later.
This feature supports only Paimon Catalogs that use a Filesystem or DLF metastore. You cannot use custom Paimon Catalogs to create materialized tables.
You must have permission to develop and deploy jobs. For more information, see Authorize the development console.
References to temporary objects, such as temporary tables, temporary functions, and temporary views, are not supported.
Create a materialized table
Syntax
CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
-- Primary key constraint
[([CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED)]
[COMMENT table_comment]
-- Partition key
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
-- WITH options
[WITH (key1=val1, key2=val2, ...)]
-- Data freshness
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
-- Refresh mode
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS <select_statement>Parameters
Parameter | Required | Description |
FRESHNESS | Yes | Specifies the data freshness of the materialized table. This defines the maximum delay for data updates from the source table. Note
|
AS <select_statement> | Yes | Defines the query that populates the materialized table. The upstream table can be a materialized table, a regular table, or a view. The SELECT statement supports all Flink SQL queries. |
PRIMARY KEY | No | Defines an optional set of columns that uniquely identifies each row in the table. The specified columns must not be null. |
PARTITIONED BY | No | Defines an optional set of columns used to partition the materialized table. |
WITH Options | No | Defines table properties and time format parameters for partition fields that are required to create the materialized table. For example, the time format parameter for a partition field is |
REFRESH_MODE | No | Specifies the refresh mode of the materialized table. The specified refresh mode has a higher priority than the mode that the framework automatically infers from the data freshness. This lets you meet specific scenario requirements.
|
Procedure
Log on to the Realtime Compute for Apache Flink console.
In the Actions column of the target workspace, click Console.
In the navigation pane on the left, choose Data Management and click the target Paimon Catalog.
Click the target database, and then click Create Materialized Table.
Assume you have a base table named `orders` with a primary key `order_id`, a category name `order_name`, and a date field `ds`. The following examples show how to create materialized tables based on this table:
Create a materialized table named `mt_order` based on the `orders` table. The new table includes all fields from the source table and has a data freshness of 5 seconds.
CREATE MATERIALIZED TABLE mt_order FRESHNESS = INTERVAL '5' SECOND AS SELECT * FROM `paimon`.`db`.`orders` ;Create a materialized table named `mt_id` based on the `mt_order` table. The new table includes the `order_id` and `ds` fields. Set `order_id` as the primary key, `ds` as the partition field, and the data freshness to 30 minutes.
CREATE MATERIALIZED TABLE mt_id ( PRIMARY KEY (order_id) NOT ENFORCED ) PARTITIONED BY(ds) FRESHNESS = INTERVAL '30' MINUTE AS SELECT order_id,ds FROM mt_order ;Create a materialized table named `mt_ds` based on the `mt_order` table. Specify the
date-formatter(time format) for thedspartition field. Each time a job is scheduled, the system subtracts the freshness value from the scheduled time and converts the result to the correspondingdspartition value. For example, if the data freshness is 1 hour and the scheduled time is2024-01-01 00:00:00, the calculated `ds` value is `20231231`. Only data in the partitionds = '20231231'is refreshed. If the scheduled time is2024-01-01 01:00:00, the calculated `ds` value is `20240101`, and data in the partitionds = '20240101'is refreshed.CREATE MATERIALIZED TABLE mt_ds PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR AS SELECT order_id,order_name,ds FROM mt_order ;NoteIn
partition.fields.#.date-formatter, the `#` must be a valid partition field of the string type.The
partition.fields.#.date-formatterparameter specifies the time partition format for the materialized table. The `#` represents the name of a partition field of the string type. This parameter tells the system which partition to refresh.
Start or stop updating the materialized table.
Click the target materialized table under the catalog.
In the upper-right corner, click Start Update or Stop Update.
NoteIf an update is in progress when you click Stop Update, the process stops after the current update cycle is complete.
View the details of the materialized table job.
On the Table Schema Details tab, view the Basic Information section. Click the job ID next to Data Update Job or Workflow to view the details.
Modify the query of a materialized table
Limits
You can modify the query only for materialized tables created in Ververica Runtime (VVR) 11.1 or later.
When you modify a query, you can only append columns and modify calculation logic. You cannot change the order of existing columns or modify their definitions.
Operation Type
Support
Description
Append a new column
Support
You can add a new column to the end of the schema while keeping the existing column order.
Modify the calculation logic of an existing column (without changing the column name and type)
Support
For example, you can modify the calculation logic, but the column name and data type must remain the same.
Change the order of existing columns
No
The column order is fixed. You must delete and re-create the materialized table.
Modify the name or data type of an existing column
No
You must delete and re-create the materialized table.
Modification example
Click Edit Table and modify the query. The following code provides an example:
ALTER MATERIALIZED TABLE `paimon`.`default`.`mt-orders` AS SELECT *, price * quantity AS total_price FROM orders WHERE price * quantity > 1000 ;Click Preview to view the changes.

Click Confirm. You can then view the new column and query logic on the Table Schema Details tab.
Adding a field usually does not affect downstream nodes. However, if a downstream node uses dynamic parsing, such as SELECT * or automatic field mapping, the synchronization task might fail or report a data format mismatch error. We recommend that you avoid dynamic parsing and use fixed fields instead. When the upstream table changes, promptly update the downstream table schema.
Incremental update of a materialized table
Limits
This feature is supported only in Ververica Runtime (VVR) 8.0.11 and later.
Update modes of a materialized table
Materialized tables have three update modes: stream update, full batch update, and incremental batch update.
The stream or batch mode of a materialized table is determined by its data freshness. A freshness of less than 30 minutes indicates stream mode, while 30 minutes or more indicates batch mode. In batch mode, the engine automatically decides whether to perform a full or incremental update. An incremental update calculates only the data that has changed since the last update and merges it into the materialized table. A full update calculates the data for the entire table or partition and overwrites the data in the materialized table. In batch mode, the engine prioritizes incremental updates and uses full updates only when incremental updates are not supported for the materialized table.
Conditions for incremental updates
Incremental updates are used only when the materialized table meets the following conditions:
The
partition.fields.#.date-formatterparameter is not configured for any partition field in the materialized table definition.The source table does not have a primary key.
The query in the materialized table supports incremental updates in the following cases:
SQL Statement
Support
SELECT
Supports selecting columns and scalar function expressions, including user-defined functions (UDFs). Aggregate functions are not supported.
FROM
Supports table names or subqueries.
WITH
Supports common table expressions (CTEs).
WHERE
Supports filter conditions that include various scalar function expressions, including UDFs. Subqueries, such as `WHERE [NOT] EXISTS <subquery>` and `WHERE <column_name> [NOT] IN <subquery>`, are not supported.
UNION
Only UNION ALL is supported.
JOIN
INNER JOIN is supported.
LEFT/RIGHT/FULL [OUTER] JOIN is not supported, except for the LATERAL JOIN and Lookup Join cases described below.
[LEFT [OUTER]] JOIN LATERAL with table function expressions, including user-defined table-valued functions (UDTFs), is supported.
For Lookup Join, only `A [LEFT [OUTER]] JOIN B FOR SYSTEM_TIME AS OF PROCTIME()` is supported.
NoteJOINs without the `JOIN` keyword are supported, such as `SELECT * FROM a, b WHERE a.id = b.id`.
Currently, an incremental calculation with `INNER JOIN` still reads the full data from both source tables.
GROUP BY
Not supported.
Incremental update examples
Example 1: Process data in the source table `orders` using scalar functions.
CREATE MATERIALIZED TABLE mt_shipped_orders (
PRIMARY KEY (order_id) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
SELECT
order_id,
COALESCE(customer_id, 'Unknown') AS customer_id,
CAST(order_amount AS DECIMAL(10, 2)) AS order_amount,
CASE
WHEN status = 'shipped' THEN 'Completed'
WHEN status = 'pending' THEN 'In Progress'
ELSE 'Unknown'
END AS order_status,
DATE_FORMAT(order_ts, 'yyyyMMdd') AS order_date,
UDSF_ProcessFunction(notes) AS notes
FROM
orders
WHERE
status = 'shipped';Example 2: Enrich data in the source table `orders` using a LATERAL JOIN and a Lookup Join.
CREATE MATERIALIZED TABLE mt_enriched_orders (
PRIMARY KEY (order_id, order_tag) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
WITH o AS (
SELECT
order_id,
product_id,
quantity,
proc_time,
e.tag AS order_tag
FROM
orders,
LATERAL TABLE(UDTF_StringSplitFunction(tags, ',')) AS e(tag))
SELECT
o.order_id,
o.product_id,
p.product_name,
p.category,
o.quantity,
p.price,
o.quantity * p.price AS total_amount,
order_tag
FROM o
LEFT JOIN
product_info FOR SYSTEM_TIME AS OF PROCTIME() AS p
ON
o.product_id = p.product_id;Backfill historical data
Previously, after running a stream job, you had to develop a separate batch job to reconcile the results with the full dataset from the previous day. With materialized tables, you can directly select a historical data partition to backfill data. This change reduces development and O&M costs and provides unified stream and batch processing.
Click the target materialized table under the catalog.
On the Data Information tab, backfill the data.
If you declared partition fields when you created the materialized table, it is a partitioned table. Otherwise, it is a non-partitioned table.
Partitioned table
View the Data Partitions. If this is the first backfill or the required partition does not exist, click Manual Update. If partitions exist, select the partition to backfill and click Backfill.


Parameters:
Partition Field: The partition field of the table. For example, if you enter `20241201`, all data where `ds=20241201` is backfilled.
Task Name: The name of the data backfill task.
Update Scope (Optional): Specifies whether to cascade updates to associated downstream materialized tables. The update cascades from the current table through all materialized tables in its data lineage. The maximum number of downstream layers is 6.
NoteFor partitioned table updates, the partition fields of the downstream materialized tables must be identical to those of the starting table. Otherwise, the update operation fails.
If a materialized table in the lineage fails to update, all downstream nodes also fail.
Deployment Target: You can select a queue or a session cluster. The default is `default-queue`.
Non-partitioned table
View the Data Status and click Backfill.

Parameters:
Task Name: The name of the data backfill task.
Update Scope: This parameter is not available for non-partitioned tables.
NoteDuring the update, a full refresh is performed on downstream data.
If a materialized table in the lineage fails to update, all downstream nodes also fail.
If the system determines that the starting table is a non-partitioned stream task based on its freshness, cascade updates are not supported.
Deployment Target: You can select a queue or a session cluster. The default is `default-queue`.
Scheduled and batch backfills.
You can use Task Orchestration to create a materialized table workflow for recurring schedules. This enables scheduled backfills. You can also use the data backfill feature of the workflow to select a time range for batch data backfills.
Modify data freshness
In the catalog, click the materialized table database, and then click the target materialized table.
In the upper-right corner, click Modify Data Freshness.
If the materialized table does not have a primary key, you cannot change the job's processing mode between stream and batch. For example, you cannot change the data freshness from 2 seconds (stream) to 1 hour (batch), or vice versa. A job is a stream job if its freshness is less than 30 minutes, and a batch job if its freshness is 30 minutes or more.
If the base table is a materialized table, the data freshness of the downstream table must be an integer multiple of the upstream table's data freshness.
The data freshness cannot exceed one day.
View data lineage
The data lineage page displays the lineage relationships between all materialized tables. From this page, you can also perform operations such as Start/Stop Update and Modify Data Freshness on the materialized tables. To view the details page for a specific materialized table, click Details.

References
For more information about materialized tables, see Materialized table management.
To learn how to build a stream-batch integrated data lakehouse using Paimon and materialized tables, and how to switch from batch to stream processing for real-time data updates by modifying data freshness, see Get started with materialized tables: Build a stream-batch integrated data lakehouse.