このトピックでは、マテリアライズドテーブルの作成方法について説明します。また、既存データのバックフィル、データフレッシュネスの変更、マテリアライズドテーブルのデータリネージの表示方法についても説明します。
制限事項
この機能は、Ververica Runtime (VVR) 8.0.10 以降のバージョンでのみサポートされています。
この機能は、ファイルシステムまたは DLF メタストアを使用する Paimon カタログ のみをサポートします。カスタム Paimon カタログを使用してマテリアライズドテーブルを作成することはできません。
ジョブを開発およびデプロイする権限が必要です。詳細については、「開発コンソールの権限付与」をご参照ください。
一時テーブル、一時関数、一時ビューなど、一時オブジェクトへの参照はサポートされていません。
マテリアライズドテーブルの作成
構文
CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
-- Primary key constraint
[([CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED)]
[COMMENT table_comment]
-- Partition key
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
-- WITH options
[WITH (key1=val1, key2=val2, ...)]
-- Data freshness
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
-- Refresh mode
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS <select_statement>パラメーター
パラメーター | 必須 | 説明 |
FRESHNESS | はい | マテリアライズドテーブルのデータフレッシュネスを指定します。これは、ソーステーブルからのデータ更新の最大遅延を定義します。 説明
|
AS <select_statement> | はい | マテリアライズドテーブルにデータを投入するクエリを定義します。アップストリームテーブルには、マテリアライズドテーブル、通常テーブル、またはビューを指定できます。SELECT 文はすべての Flink SQL クエリをサポートします。 |
PRIMARY KEY | いいえ | テーブル内の各行を一意に識別するオプションの列セットを定義します。指定された列に null は許可されません。 |
PARTITIONED BY | いいえ | マテリアライズドテーブルをパーティション分割するために使用されるオプションの列セットを定義します。 |
WITH Options | いいえ | マテリアライズドテーブルの作成に必要な、テーブルプロパティとパーティションフィールドの時刻フォーマットパラメーターを定義します。 たとえば、パーティションフィールドの時刻フォーマットパラメーターは |
REFRESH_MODE | いいえ | マテリアライズドテーブルのリフレッシュモードを指定します。指定されたリフレッシュモードは、フレームワークがデータフレッシュネスから自動的に推測するモードよりも優先されます。これにより、特定のシナリオ要件を満たすことができます。
|
手順
対象のワークスペースの アクション 列で、コンソール をクリックします。
左側のナビゲーションウィンドウで、Data Management を選択し、対象の Paimon カタログをクリックします。
対象のデータベースをクリックし、マテリアライズドテーブルの作成 をクリックします。
プライマリキー `order_id`、カテゴリ名 `order_name`、日付フィールド `ds` を持つ `orders` という名前のベーステーブルがあると仮定します。以下の例は、このテーブルに基づいてマテリアライズドテーブルを作成する方法を示しています。
`orders` テーブルに基づいて `mt_order` という名前のマテリアライズドテーブルを作成します。新しいテーブルには、ソーステーブルのすべてのフィールドが含まれ、データフレッシュネスは 5 秒です。
CREATE MATERIALIZED TABLE mt_order FRESHNESS = INTERVAL '5' SECOND AS SELECT * FROM `paimon`.`db`.`orders` ;`mt_order` テーブルに基づいて `mt_id` という名前のマテリアライズドテーブルを作成します。新しいテーブルには `order_id` と `ds` フィールドが含まれます。`order_id` をプライマリキー、`ds` をパーティションフィールドに設定し、データフレッシュネスを 30 分に設定します。
CREATE MATERIALIZED TABLE mt_id ( PRIMARY KEY (order_id) NOT ENFORCED ) PARTITIONED BY(ds) FRESHNESS = INTERVAL '30' MINUTE AS SELECT order_id,ds FROM mt_order ;`mt_order` テーブルに基づいて、`mt_ds` という名前のマテリアライズドテーブルを作成します。
dsパーティションフィールドにdate-formatter(時間フォーマット) を指定します。ジョブがスケジュールされるたびに、システムはスケジュールされた時間から鮮度の値を減算し、その結果を対応するdsパーティション値に変換します。たとえば、データの鮮度が 1 時間で、スケジュールされた時間が2024-01-01 00:00:00の場合、計算される `ds` 値は `20231231` になります。パーティションds = '20231231'のデータのみがリフレッシュされます。スケジュールされた時間が2024-01-01 01:00:00の場合、計算される `ds` 値は `20240101` になり、パーティションds = '20240101'のデータがリフレッシュされます。CREATE MATERIALIZED TABLE mt_ds PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR AS SELECT order_id,order_name,ds FROM mt_order ;説明partition.fields.#.date-formatterでは、`#` は文字列型の有効なパーティションフィールドでなければなりません。partition.fields.#.date-formatterパラメーターは、マテリアライズドテーブルの時刻パーティションフォーマットを指定します。`#` は文字列型のパーティションフィールドの名前を表します。このパラメーターは、どのパーティションをリフレッシュするかをシステムに伝えます。
マテリアライズドテーブルの更新を開始または停止します。
カタログの下にある対象のマテリアライズドテーブルをクリックします。
右上隅で、更新の開始 または 更新の停止 をクリックします。
説明[更新の停止] をクリックしたときに更新が進行中の場合、現在の更新サイクルが完了した後にプロセスが停止します。
マテリアライズドテーブルジョブの詳細を表示します。
テーブルスキーマ詳細 タブで、基本情報 セクションを表示します。データ更新ジョブ または ワークフロー の横にあるジョブ ID をクリックして詳細を表示します。
マテリアライズドテーブルのクエリの変更
制限
クエリを変更できるのは、Ververica Runtime (VVR) 11.1 以降で作成されたマテリアライズドテーブルのみです。
クエリを変更する場合、列の追加と計算ロジックの変更のみが可能です。既存の列の順序を変更したり、その定義を変更したりすることはできません。
操作タイプ
サポート
説明
新しい列の追加
サポート
既存の列の順序を維持したまま、スキーマの末尾に新しい列を追加できます。
既存の列の計算ロジックの変更 (列名と型は変更しない)
サポート
たとえば、計算ロジックを変更できますが、列名とデータ型は同じままである必要があります。
既存の列の順序の変更
いいえ
列の順序は固定です。マテリアライズドテーブルを削除して再作成する必要があります。
既存の列の名前またはデータ型の変更
いいえ
マテリアライズドテーブルを削除して再作成する必要があります。
変更例
テーブルの編集 をクリックし、クエリを変更します。以下のコードは例です。
ALTER MATERIALIZED TABLE `paimon`.`default`.`mt-orders` AS SELECT *, price * quantity AS total_price FROM orders WHERE price * quantity > 1000 ;プレビュー をクリックして変更内容を表示します。

確認 をクリックします。その後、テーブルスキーマ詳細 タブで新しい列とクエリロジックを表示できます。
フィールドを追加しても、通常はダウンストリームノードに影響はありません。ただし、ダウンストリームノードが SELECT * や自動フィールドマッピングなどの動的解析を使用している場合、同期タスクが失敗したり、データ形式の不一致エラーが報告されたりする可能性があります。動的解析を避け、代わりに固定フィールドを使用することを推奨します。アップストリームテーブルが変更された場合は、速やかにダウンストリームテーブルのスキーマを更新してください。
マテリアライズドテーブルの増分更新
制限
この機能は、Ververica Runtime (VVR) 8.0.11 以降のバージョンでのみサポートされています。
マテリアライズドテーブルの更新モード
マテリアライズドテーブルには、ストリーム更新、完全バッチ更新、増分バッチ更新の 3 つの更新モードがあります。
マテリアライズドテーブルのストリームモードまたはバッチモードは、そのデータフレッシュネスによって決まります。フレッシュネスが 30 分未満の場合はストリームモード、30 分以上の場合はバッチモードを示します。バッチモードでは、エンジンは完全更新を実行するか増分更新を実行するかを自動的に決定します。増分更新は、最後の更新以降に変更されたデータのみを計算し、それをマテリアライズドテーブルにマージします。完全更新は、テーブル全体またはパーティション全体のデータを計算し、マテリアライズドテーブルのデータを上書きします。バッチモードでは、エンジンは増分更新を優先し、マテリアライズドテーブルで増分更新がサポートされていない場合にのみ完全更新を使用します。
増分更新の条件
増分更新は、マテリアライズドテーブルが以下の条件を満たす場合にのみ使用されます。
マテリアライズドテーブルの定義で、どのパーティションフィールドにも
partition.fields.#.date-formatterパラメーターが設定されていない。ソーステーブルにプライマリキーがない。
マテリアライズドテーブルのクエリが、以下の場合に増分更新をサポートしている。
SQL 文
サポート
SELECT
ユーザー定義関数 (UDF) を含む、列およびスカラー関数式の選択をサポートします。集計関数はサポートされていません。
FROM
テーブル名またはサブクエリをサポートします。
WITH
共通テーブル式 (CTE) をサポートします。
WHERE
UDF を含む、さまざまなスカラー関数式を含むフィルター条件をサポートします。`WHERE [NOT] EXISTS <subquery>` や `WHERE <column_name> [NOT] IN <subquery>` などのサブクエリはサポートされていません。
UNION
UNION ALL のみがサポートされています。
JOIN
INNER JOIN がサポートされています。
以下で説明する LATERAL JOIN およびルックアップ結合の場合を除き、LEFT/RIGHT/FULL [OUTER] JOIN はサポートされていません。
ユーザー定義テーブル値関数 (UDTF) を含む、テーブル関数式との [LEFT [OUTER]] JOIN LATERAL はサポートされています。
ルックアップ結合については、`A [LEFT [OUTER]] JOIN B FOR SYSTEM_TIME AS OF PROCTIME()` のみがサポートされています。
説明`JOIN` キーワードのない JOIN (`SELECT * FROM a, b WHERE a.id = b.id` など) はサポートされています。
現在、`INNER JOIN` を使用した増分計算では、両方のソーステーブルから完全なデータを読み取ります。
GROUP BY
サポートされていません。
増分更新の例
例 1:スカラー関数を使用してソーステーブル `orders` のデータを処理します。
CREATE MATERIALIZED TABLE mt_shipped_orders (
PRIMARY KEY (order_id) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
SELECT
order_id,
COALESCE(customer_id, 'Unknown') AS customer_id,
CAST(order_amount AS DECIMAL(10, 2)) AS order_amount,
CASE
WHEN status = 'shipped' THEN 'Completed'
WHEN status = 'pending' THEN 'In Progress'
ELSE 'Unknown'
END AS order_status,
DATE_FORMAT(order_ts, 'yyyyMMdd') AS order_date,
UDSF_ProcessFunction(notes) AS notes
FROM
orders
WHERE
status = 'shipped';例 2:LATERAL JOIN とルックアップ結合を使用してソーステーブル `orders` のデータをエンリッチします。
CREATE MATERIALIZED TABLE mt_enriched_orders (
PRIMARY KEY (order_id, order_tag) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
WITH o AS (
SELECT
order_id,
product_id,
quantity,
proc_time,
e.tag AS order_tag
FROM
orders,
LATERAL TABLE(UDTF_StringSplitFunction(tags, ',')) AS e(tag))
SELECT
o.order_id,
o.product_id,
p.product_name,
p.category,
o.quantity,
p.price,
o.quantity * p.price AS total_amount,
order_tag
FROM o
LEFT JOIN
product_info FOR SYSTEM_TIME AS OF PROCTIME() AS p
ON
o.product_id = p.product_id;既存データのバックフィル
以前は、ストリームジョブを実行した後、結果を前日の完全なデータセットと調整するために、別のバッチジョブを開発する必要がありました。マテリアライズドテーブルを使用すると、既存データのパーティションを直接選択してデータをバックフィルできます。この変更により、開発と O&M のコストが削減され、ストリームとバッチの統合処理が提供されます。
カタログの下にある対象のマテリアライズドテーブルをクリックします。
データ情報 タブで、データをバックフィルします。
マテリアライズドテーブルを作成したときにパーティションフィールドを宣言した場合、それはパーティションテーブルです。それ以外の場合は、非パーティションテーブルです。
パーティションテーブル
データパーティション を表示します。これが最初のバックフィルであるか、必要なパーティションが存在しない場合は、手動更新 をクリックします。パーティションが存在する場合は、バックフィルするパーティションを選択し、バックフィル をクリックします。


パラメーター:
パーティションフィールド:テーブルのパーティションフィールド。たとえば、`20241201` と入力すると、`ds=20241201` のすべてのデータがバックフィルされます。
タスク名:データバックフィルタスクの名前。
更新範囲 (オプション):関連するダウンストリームのマテリアライズドテーブルに更新をカスケードするかどうかを指定します。更新は、現在のテーブルからそのデータリネージ内のすべてのマテリアライズドテーブルにカスケードされます。ダウンストリームレイヤーの最大数は 6 です。
説明パーティションテーブルの更新では、ダウンストリームのマテリアライズドテーブルのパーティションフィールドが開始テーブルのパーティションフィールドと同一でなければなりません。そうでない場合、更新操作は失敗します。
リネージ内のマテリアライズドテーブルの更新が失敗した場合、すべてのダウンストリームノードも失敗します。
デプロイターゲット:キューまたはセッションクラスターを選択できます。デフォルトは `default-queue` です。
非パーティションテーブル
データステータス を表示し、バックフィル をクリックします。

パラメーター:
タスク名:データバックフィルタスクの名前。
更新範囲:このパラメーターは非パーティションテーブルでは使用できません。
説明更新中、ダウンストリームデータに対して完全更新が実行されます。
リネージ内のマテリアライズドテーブルの更新が失敗した場合、すべてのダウンストリームノードも失敗します。
システムがそのフレッシュネスに基づいて開始テーブルが非パーティションのストリームタスクであると判断した場合、カスケード更新はサポートされません。
デプロイターゲット:キューまたはセッションクラスターを選択できます。デフォルトは `default-queue` です。
スケジュールされたバッチバックフィル。
タスクオーケストレーション を使用して、定期的なスケジュールのためのマテリアライズドテーブルワークフローを作成できます。これにより、スケジュールされたバックフィルが可能になります。また、ワークフローのデータバックフィル機能を使用して、バッチデータバックフィルの時間範囲を選択することもできます。
データフレッシュネスの変更
カタログで、マテリアライズドテーブル データベースをクリックし、次に対象の マテリアライズドテーブル をクリックします。
右上隅で、データフレッシュネスの変更 をクリックします。
マテリアライズドテーブルにプライマリキーがない場合、ジョブの処理モードをストリームとバッチの間で変更することはできません。たとえば、データフレッシュネスを 2 秒 (ストリーム) から 1 時間 (バッチ) に変更したり、その逆を行ったりすることはできません。ジョブは、フレッシュネスが 30 分未満の場合はストリームジョブ、30 分以上の場合はバッチジョブになります。
ベーステーブルがマテリアライズドテーブルの場合、ダウンストリームテーブルのデータフレッシュネスは、アップストリームテーブルのデータフレッシュネスの整数倍でなければなりません。
データフレッシュネスは 1 日を超えることはできません。
データ系列の表示
データリネージページには、すべてのマテリアライズドテーブル間のリネージ関係が表示されます。このページから、マテリアライズドテーブルに対して 更新の開始/停止 や データフレッシュネスの変更 などの操作を実行することもできます。特定のマテリアライズドテーブルの詳細ページを表示するには、詳細 をクリックします。

参照資料
マテリアライズドテーブルの詳細については、「マテリアライズドテーブル管理」をご参照ください。
Paimon とマテリアライズドテーブルを使用してストリームとバッチを統合したデータレイクハウスを構築する方法、およびデータフレッシュネスを変更してリアルタイムのデータ更新のためにバッチ処理からストリーム処理に切り替える方法については、「マテリアライズドテーブル入門:ストリームとバッチを統合したデータレイクハウスの構築」をご参照ください。