This topic describes how to create and use materialized tables, covering key aspects such as backfilling historical data in materialized tables, adjusting their data freshness, and examining their data lineage.
Prerequisites and limitations
The engine version is Ververica Runtime (VVR) 8.0.10 or later.
You have an Apache Paimon catalog of the filesystem metastore type.
You have the permissions to develop and deploy jobs in a namespace. For more information, see Grant permissions on namespaces.
Temporary objects, such as temporary tables, functions, and views, are not supported.
Create a materialized table
Syntax
CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
-- Define the primary key constraint.
[([CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED)]
[COMMENT table_comment]
-- Configure the partition key.
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
-- Configure connector options.
[WITH (key1=val1, key2=val2, ...)]
-- Configure the data freshness.
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
-- Configure the data refresh mode.
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS <select_statement>Parameters
Parameter | Required | Description |
FRESHNESS | Yes | The freshness of the data in the materialized table. Data freshness specifies the maximum delay time of data updates between materialized tables and base tables. Note
|
AS <select_statement> | Yes | This clause is used to define the query for populating materialized table data. An upstream table can be a materialized table, table, or view. The SELECT statement supports all Flink SQL queries. |
PRIMARY KEY | No | Defines a set of optional columns that uniquely identify each row in a table. The primary key fields must not be null. |
PARTITIONED BY | No | Defines a group of optional columns that are used to partition the materialized table. |
WITH options | No | Defines table properties and time format parameters for partitioning columns required to create a materialized table. For example, define the time format parameter for a partitioning column as |
REFRESH_MODE | No | Specifies the refresh mode of a materialized table. The explicitly specified refresh mode takes precedence over the automatically inferred one. Valid values:
|
Procedure
Log on to the management console of Realtime Compute for Apache Flink.
Click Console in the Actions column of the target workspace.
In the left-side navigation pane, click Catalogs. In the Catalogs pane, click the target Apache Paimon catalog.
Click the target database. On the page that appears, click Create Materialized Table.
In this example, a base table named orders is used. In the table, the primary key is order_id, the category name is order_name, and the date field is ds. The following example shows how to create materialized tables based on the orders table.
Create a materialized table named mt_order based on the orders table. Retrieve all columns from the orders table as materialized table fields, and set the data freshness to 5 seconds.
CREATE MATERIALIZED TABLE mt_order FRESHNESS = INTERVAL '5' SECOND AS SELECT * FROM `paimon`.`db`.`orders` ;Create a materialized table named mt_id based on the materialized table mt_order. Retrieve the order_id and ds fields as table fields, specify the order_id field as the primary key and the ds field as the partitioning column, and set the data freshness to 30 minutes.
CREATE MATERIALIZED TABLE mt_id ( PRIMARY KEY (order_id) NOT ENFORCED ) PARTITIONED BY(ds) FRESHNESS = INTERVAL '30' MINUTE AS SELECT order_id,ds FROM mt_order ;Create a materialized table named mt_ds based on the materialized table mt_order. Specify the
date-formattertime format for thedspartitioning column. Each time the materialized table mt_ds is scheduled, the value obtained by subtracting the data freshness from the scheduled time is converted into the correspondingdspartition value. For example, if the data freshness is set to 1 hour and the scheduled time is2024-01-01 00:00:00, the calculated ds value is 20231231. In this case, only the data in the partitionds = '20231231'is refreshed. If the scheduled time is2024-01-01 01:00:00and the calculated ds value is 20240101, the data in the partitionds = '20240101'is refreshed.CREATE MATERIALIZED TABLE mt_ds PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR AS SELECT order_id,order_name,ds FROM mt_order ;NoteThe # field in
partition.fields.#.date-formattermust be a valid partitioning field of the string type.partition.fields.#.date-formatterspecifies the time partition format of the materialized table. # is the name of the partitioning column of the string type, which allows the system to refresh the data of the specified partition.
Start or stop the update of a materialized table.
Click the materialized table under the target catalog.
Click Start or Stop in the upper-right corner of the details page to start or stop the update.
NoteIf a materialized table is updating when you click Stop, the update will not halt until the current update round is complete.
Examine the job.
Select the Table Schema tab of the details page of the materialized table. In the Basic Information section, click the ID in the Latest Job or Workflow field.
Modify the query of a materialized table
Limits
Only new materialized tables powered by VVR 11.1 or later are supported.
Only adding columns and modifying computation logic are supported operations. See the table below for details:
Operation
Supported
Description
Add a column
Supported
Appends a new column to the materialized table while maintaining the column order.
Modify computation logic
Supported
Modifies the computation logic of existing columns, without changing column names and types.
Change the column order
Not supported
To change the column order, drop the materialized table and create a new one with the desired column order.
Modify column names or types
Not supported
To modify the column names or types, drop the materialized table and create a new one with the desired column names or types.
Procedure
Click Edit Table and modify the query. Sample code:
ALTER MATERIALIZED TABLE `paimon`.`default`.`mt-orders` AS SELECT *, price * quantity AS total_price FROM orders WHERE price * quantity > 1000 ;Click Preview to check the modifications.

Click OK. Select the Table Schema tab to check the added columns and modified query.
If downstream tables rely on dynamic parsing for synchronization, such as SELECT * FROM queries or automatic field mapping, adding new columns may result in job failures or unmatched data format errors. We recommend using fixed columns rather than dynamic parsing for downstream tables and promptly updating their schemas.
Incrementally update a materialized table
Limits
Only Realtime Compute for Apache Flink that uses VVR 8.0.11 or later allows you to update incremental data in a materialized table.
Update mode of a materialized table
Materialized tables provide the following update modes: streaming update mode, full batch update mode, and incremental batch update mode.
The data freshness of a materialized table defines whether the table is in streaming mode or batch mode. If the data freshness of a materialized table is less than 30 minutes, the table is in streaming mode. If the data freshness of a materialized table is greater than or equal to 30 minutes, the table is in batch mode. In batch mode, the engine automatically performs full updates or incremental updates. Incremental updates only calculate the incremental data since the last update and merge the results to the materialized table. Full updates calculate the data of the entire table or the entire partition and overwrite the data in the materialized table. In batch mode, the engine preferentially performs incremental updates. Full updates are performed only when incremental updates cannot meet the requirements of the materialized table.
Conditions for incremental updates
Incremental updates are performed only when the materialized table meets the following conditions.
When you define a materialized table, you do not configure the
partition.fields.#.date-formatterparameter to specify the time format of the partitioning column.You do not specify a primary key for the source table.
In the following scenarios, query statements in a materialized table support incremental updates.
SQL statement
Supported incremental update
SELECT
Columns can be queried and scalar function expressions, including user-defined functions, are supported. Aggregate functions are not supported.
FROM
Table name queries or subqueries are supported.
WITH
Common table expressions (CTEs) are supported.
WHERE
Scalar function expressions, including user-defined functions, are supported in filter conditions. Subqueries are not supported in filter conditions. For example, subquery clauses in the WHERE [NOT] EXISTS <subquery> and WHERE <column name> [NOT] IN <subquery> formats are not supported.
UNION
Only UNION ALL is supported.
JOIN
INNER JOIN is supported.
LEFT JOIN, RIGHT JOIN, and FULL [OUTER] JOIN are not supported, except for the following scenarios in which Lateal Join and Lookup Join are used.
[LEFT [OUTER]] JOIN LATERAL and table function expressions, including user-defined functions, are supported.
Only A [LEFT [OUTER]] JOIN B FOR SYSTEM_TIME AS OF PROCTIME() is supported in Lookup Join.
NoteJOIN statements that do not use the JOIN keyword, such as SELECT * FROM a, b WHERE a.id = b.id, are supported.
If you use INNER JOIN to perform incremental updates, the full data of the two source tables are still read.
GROUP BY
Not supported.
Examples
Example 1: Use scalar functions to process data in the orders source table.
CREATE MATERIALIZED TABLE mt_shipped_orders (
PRIMARY KEY (order_id) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
SELECT
order_id,
COALESCE(customer_id, 'Unknown') AS customer_id,
CAST(order_amount AS DECIMAL(10, 2)) AS order_amount,
CASE
WHEN status = 'shipped' THEN 'Completed'
WHEN status = 'pending' THEN 'In Progress'
ELSE 'Unknown'
END AS order_status,
DATE_FORMAT(order_ts, 'yyyyMMdd') AS order_date,
UDSF_ProcessFunction(notes) AS notes
FROM
orders
WHERE
status = 'shipped';Example 2: Use lateral join and lookup join to enrich the orders table.
CREATE MATERIALIZED TABLE mt_enriched_orders (
PRIMARY KEY (order_id, order_tag) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
WITH o AS (
SELECT
order_id,
product_id,
quantity,
proc_time,
e.tag AS order_tag
FROM
orders,
LATERAL TABLE(UDTF_StringSplitFunction(tags, ',')) AS e(tag))
SELECT
o.order_id,
o.product_id,
p.product_name,
p.category,
o.quantity,
p.price,
o.quantity * p.price AS total_amount,
order_tag
FROM o
LEFT JOIN
product_info FOR SYSTEM_TIME AS OF PROCTIME() AS p
ON
o.product_id = p.product_id;Backfill historical data
Traditionally, you need a separate batch job to correct stream processing results with all of the data from the previous day. With the materialized table, you can intuitively backfill data using the current job. This capability of materialized tables unifies batch and streaming processing, enhancing development and O&M efficiency.
In the Catalogs pane, click the target materialized table under your Apache Paimon catalog.
On the Data Information tab, refresh the data.
If you have defined the partition key when creating the materialized table, the table is a partitioned table.
Partitioned table
In the upper-right corner of the Partitions section on the Table Information tab, click Trigger Update the first time you perform data backfilling or if no partitions are available. If a partition exists, find the corresponding partition and click Refresh in the Actions column.


Parameters:
ds: the partition field of the table. For example, if you enter 20241201 in the ds field, all data with ds=20241201 is backfilled.
Task Name: the name of the data backfilling task.
Update Range (optional): specifies whether to cascade updates to the downstream materialized tables. You can use the partitioned table as the starting point to update all materialized tables. The maximum number of downstream layers is 6.
NoteIf you update a partitioned table, the partitioning columns of the downstream materialized table must be the same as the those of the partitioned table. Otherwise, the update operation fails.
If a materialized table fails to be updated, all downstream nodes fail.
Deployment Target: the object on which the table is deployed. You can select queues and session clusters. By default, default-queue is used.
Non-partitioned table
Select he Data Information tab, view the details of the materialized table, and click Refresh.

Parameters:
Task Name: the name of the task for data backfilling.
Update Range: This parameter is not supported for non-partitioned tables.
NoteAll data in downstream tables will be refreshed.
If a materialized table fails to be updated, all downstream nodes fail.
If the system determines that the workload is streaming based on the data freshness of the base table and that the table is a non-partitioned table, cascade update is not supported.
Deployment Target: the object on which the table is deployed. You can select queues and session clusters. By default, default-queue is used.
Change data freshness
Click the materialized table database under the desired catalog and click the materialized table that you want to view.
In the upper-right corner of the details page of the materialized table, click Edit Freshness.
If the materialized table is a table that does not have a primary key, you cannot modify the streaming and batch properties of the task. For example, if you change the data freshness from 2 seconds to 1 hour, Realtime Compute for Apache Flink converts a streaming job into a batch one. If you change the data freshness from 1 hour to 2 seconds, Realtime Compute for Apache Flink converts a batch job into a streaming one. You cannot perform such operations. If the data freshness is less than 30 minutes, the job is classified as streaming. If the data freshness is equal to or greater than 30 minutes, the job is batch.
If the base table is a materialized table, make sure that the data freshness of the downstream table is 1 to N times that of the upstream table. N is a positive integer.
Data freshness cannot exceed 1 day.
View data lineage
On the Data Lineage page, view the data lineage among all materialized tables. You can start or stop the update of the materialized table and change its data freshness. You can also click Details in the upper-right corner of the page to go to the details page of the materialized table.
