このトピックでは、マテリアライズドテーブルの作成と使用方法について説明します。マテリアライズドテーブルへの既存データのバックフィル、データ鮮度の調整、データ系列の確認などの主要な側面について説明します。
前提条件と制限事項
エンジンバージョンは Ververica Runtime (VVR) 8.0.10 以降です。
ファイルシステム Apache Paimon カタログメタストアタイプの があります。
名前空間でジョブを開発およびデプロイするための権限があります。詳細については、「名前空間に対する権限の付与」をご参照ください。
一時テーブル、関数、ビューなどの一時オブジェクトはサポートされていません。
マテリアライズドテーブルの作成
構文
CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
-- プライマリキー制約を定義します。
[([CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED)]
[COMMENT table_comment]
-- パーティションキーを設定します。
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
-- コネクタオプションを設定します。
[WITH (key1=val1, key2=val2, ...)]
-- データ鮮度を設定します。
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
-- データリフレッシュモードを設定します。
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS <select_statement>パラメーター
パラメーター | 必須 | 説明 |
FRESHNESS | はい | マテリアライズドテーブルのデータの鮮度。データ鮮度は、マテリアライズドテーブルとベーステーブル間のデータ更新の最大遅延時間を指定します。 説明
|
AS <select_statement> | はい | この句は、マテリアライズドテーブルデータを読み込むためのクエリを定義するために使用されます。アップストリームテーブルは、マテリアライズドテーブル、テーブル、またはビューにすることができます。SELECT 文はすべての Flink SQL クエリをサポートしています。 |
PRIMARY KEY | いいえ | テーブルの各行を一意に識別するオプションの列のセットを定義します。プライマリキーフィールドは NULL にすることはできません。 |
PARTITIONED BY | いいえ | マテリアライズドテーブルのパーティション分割に使用するオプションの列のグループを定義します。 |
WITH オプション | いいえ | マテリアライズドテーブルの作成に必要なテーブルプロパティとパーティション分割列の時間形式パラメーターを定義します。 たとえば、パーティション列の時間形式パラメーターを |
REFRESH_MODE | いいえ | マテリアライズドテーブルのリフレッシュモードを指定します。明示的に指定されたリフレッシュモードは、自動的に推測されたリフレッシュモードよりも優先されます。有効な値:
|
手順
対象のワークスペースの [アクション] 列で [コンソール] をクリックします。
左側のナビゲーションウィンドウで、[カタログ] をクリックします。[カタログ] ペインで、対象の Apache Paimon カタログをクリックします。
対象のデータベースをクリックします。表示されるページで、[マテリアライズドテーブルの作成] をクリックします。
この例では、orders という名前のベーステーブルを使用します。テーブルでは、プライマリキーは order_id、カテゴリ名は order_name、日付フィールドは ds です。次の例は、orders テーブルに基づいてマテリアライズドテーブルを作成する方法を示しています。
orders テーブルに基づいて mt_order という名前のマテリアライズドテーブルを作成します。orders テーブルからすべての列をマテリアライズドテーブルフィールドとして取得し、データ鮮度を 5 秒に設定します。
CREATE MATERIALIZED TABLE mt_order FRESHNESS = INTERVAL '5' SECOND AS SELECT * FROM `paimon`.`db`.`orders` ;マテリアライズドテーブル mt_order に基づいて mt_id という名前のマテリアライズドテーブルを作成します。order_id フィールドと ds フィールドをテーブルフィールドとして取得し、order_id フィールドをプライマリキーとして、ds フィールドをパーティション分割列として指定し、データ鮮度を 30 分に設定します。
CREATE MATERIALIZED TABLE mt_id ( PRIMARY KEY (order_id) NOT ENFORCED ) PARTITIONED BY(ds) FRESHNESS = INTERVAL '30' MINUTE AS SELECT order_id,ds FROM mt_order ;マテリアライズドテーブル mt_order に基づいて、mt_ds という名前のマテリアライズドテーブルを作成します。
date-formatter時間形式をdsパーティション列に指定します。マテリアライズドテーブル mt_ds がスケジュールされるたびに、スケジュールされた時間からデータの鮮度を引いた値が、対応するdsパーティション値に変換されます。たとえば、データの鮮度が 1 時間に設定され、スケジュールされた時間が2024-01-01 00:00:00の場合、計算された ds 値は 20231231 になります。この場合、パーティションds = '20231231'のデータのみがリフレッシュされます。スケジュールされた時間が2024-01-01 01:00:00で、計算された ds 値が 20240101 の場合、パーティションds = '20240101'のデータがリフレッシュされます。CREATE MATERIALIZED TABLE mt_ds PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR AS SELECT order_id,order_name,ds FROM mt_order ;説明partition.fields.#.date-formatterの # フィールドは、文字列タイプの有効なパーティションフィールドである必要があります。partition.fields.#.date-formatterは、マテリアライズドテーブルの時間パーティション形式を指定します。# は文字列タイプのパーティション列の名前で、システムが指定されたパーティションのデータをリフレッシュできるようにします。
マテリアライズドテーブルの更新を開始または停止します。
対象のカタログの下にあるマテリアライズドテーブルをクリックします。
詳細ページの右上隅にある [開始] または [停止] をクリックして、更新を開始または停止します。
説明[停止] をクリックしたときにマテリアライズドテーブルが更新されている場合、現在の更新ラウンドが完了するまで更新は停止しません。
ジョブを確認します。
マテリアライズドテーブルの詳細ページの [テーブルスキーマ] タブを選択します。[基本情報] セクションで、[最新のジョブ] または [ワークフロー] フィールドの ID をクリックします。
マテリアライズドテーブルのクエリの変更
制限
VVR 11.1 以降で動作する新しいマテリアライズドテーブルのみがサポートされています。
列の追加と計算ロジックの変更のみがサポートされている操作です。詳細は、以下の表を参照してください。
操作
サポートされている
説明
列を追加する
サポートされている
列の順序を維持しながら、マテリアライズドテーブルに新しい列を追加します。
計算ロジックを変更する
サポートされている
列名とタイプを変更せずに、既存の列の計算ロジックを変更します。
列の順序を変更する
サポートされていない
列の順序を変更するには、マテリアライズドテーブルを削除し、目的の列の順序で新しいテーブルを作成します。
列名またはタイプを変更する
サポートされていない
列名またはタイプを変更するには、マテリアライズドテーブルを削除し、目的の列名またはタイプで新しいテーブルを作成します。
手順
[テーブルの編集] をクリックし、クエリを変更します。サンプルコード:
ALTER MATERIALIZED TABLE `paimon`.`default`.`mt-orders` AS SELECT *, price * quantity AS total_price FROM orders WHERE price * quantity > 1000 ;[プレビュー] をクリックして変更を確認します。

[OK] をクリックします。[テーブルスキーマ] タブを選択して、追加された列と変更されたクエリを確認します。
ダウンストリームテーブルが SELECT * FROM クエリや自動フィールドマッピングなどの動的解析に依存している場合、新しい列を追加すると、ジョブの失敗やデータ形式の不一致エラーが発生する可能性があります。ダウンストリームテーブルには動的解析ではなく固定列を使用し、スキーマを速やかに更新することをお勧めします。
マテリアライズドテーブルの増分更新
制限
VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink でのみ、マテリアライズドテーブルの増分データを更新できます。
マテリアライズドテーブルの更新モード
マテリアライズドテーブルは、ストリーミング更新モード、フルバッチ更新モード、増分バッチ更新モードの更新モードを提供します。
マテリアライズドテーブルのデータ鮮度は、テーブルがストリーミングモードかバッチモードかを定義します。マテリアライズドテーブルのデータ鮮度が 30 分未満の場合、テーブルはストリーミングモードになります。マテリアライズドテーブルのデータ鮮度が 30 分以上の場合、テーブルはバッチモードになります。バッチモードでは、エンジンは自動的にフルアップデートまたは増分アップデートを実行します。増分アップデートは、最後のアップデート以降の増分データのみを計算し、結果をマテリアライズドテーブルにマージします。フルアップデートは、テーブル全体またはパーティション全体のデータを計算し、マテリアライズドテーブルのデータを上書きします。バッチモードでは、エンジンは優先的に増分アップデートを実行します。フルアップデートは、増分アップデートがマテリアライズドテーブルの要件を満たせない場合にのみ実行されます。
増分更新の条件
増分更新は、マテリアライズドテーブルが次の条件を満たす場合にのみ実行されます。
マテリアライズドテーブルを定義するときに、
partition.fields.#.date-formatterパラメーターを設定してパーティション列の時間形式を指定しません。ソーステーブルのプライマリキーを指定しません。
次のシナリオでは、マテリアライズドテーブルのクエリ文は増分更新をサポートしています。
SQL 文
サポートされている増分更新
SELECT
列をクエリでき、ユーザー定義関数を含むスカラー関数式がサポートされています。集計関数はサポートされていません。
FROM
テーブル名クエリまたはサブクエリがサポートされています。
WITH
共通テーブル式 (CTE) がサポートされています。
WHERE
ユーザー定義関数を含むスカラー関数式は、フィルター条件でサポートされています。サブクエリはフィルター条件ではサポートされていません。たとえば、WHERE [NOT] EXISTS <subquery> および WHERE <列名> [NOT] IN <subquery> 形式のサブクエリ句はサポートされていません。
UNION
UNION ALL のみがサポートされています。
JOIN
INNER JOIN がサポートされています。
LEFT JOIN、RIGHT JOIN、および FULL [OUTER] JOIN は、Lateal Join と Lookup Join が使用される次のシナリオを除いて、サポートされていません。
[LEFT [OUTER]] JOIN LATERAL と、ユーザー定義関数を含むテーブル関数式がサポートされています。
Lookup Join では、A [LEFT [OUTER]] JOIN B FOR SYSTEM_TIME AS OF PROCTIME() のみがサポートされています。
説明JOIN キーワードを使用しない JOIN 文 (例:SELECT * FROM a, b WHERE a.id = b.id) はサポートされています。
INNER JOIN を使用して増分更新を実行する場合でも、2 つのソーステーブルの完全データが読み取られます。
GROUP BY
サポートされていません。
例
例 1:スカラー関数を使用して orders ソーステーブルのデータを処理します。
CREATE MATERIALIZED TABLE mt_shipped_orders (
PRIMARY KEY (order_id) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
SELECT
order_id,
COALESCE(customer_id, 'Unknown') AS customer_id,
CAST(order_amount AS DECIMAL(10, 2)) AS order_amount,
CASE
WHEN status = 'shipped' THEN 'Completed'
WHEN status = 'pending' THEN 'In Progress'
ELSE 'Unknown'
END AS order_status,
DATE_FORMAT(order_ts, 'yyyyMMdd') AS order_date,
UDSF_ProcessFunction(notes) AS notes
FROM
orders
WHERE
status = 'shipped';例 2:lateral join と lookup join を使用して orders テーブルを強化します。
CREATE MATERIALIZED TABLE mt_enriched_orders (
PRIMARY KEY (order_id, order_tag) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
WITH o AS (
SELECT
order_id,
product_id,
quantity,
proc_time,
e.tag AS order_tag
FROM
orders,
LATERAL TABLE(UDTF_StringSplitFunction(tags, ',')) AS e(tag))
SELECT
o.order_id,
o.product_id,
p.product_name,
p.category,
o.quantity,
p.price,
o.quantity * p.price AS total_amount,
order_tag
FROM o
LEFT JOIN
product_info FOR SYSTEM_TIME AS OF PROCTIME() AS p
ON
o.product_id = p.product_id;既存データのバックフィル
従来は、前日のすべてのデータを使用してストリーム処理結果を修正するために、別のバッチジョブが必要でした。マテリアライズドテーブルを使用すると、現在のジョブを使用して直感的にデータをバックフィルできます。マテリアライズドテーブルのこの機能は、バッチ処理とストリーミング処理を統合し、開発と O&M の効率を高めます。
[カタログ] ペインで、Apache Paimon カタログの下にある対象のマテリアライズドテーブルをクリックします。
[データ情報] タブで、データをリフレッシュします。
マテリアライズドテーブルの作成時にパーティションキーを定義した場合、テーブルはパーティションテーブルになります。
パーティションテーブル
[テーブル情報] タブの [パーティション] セクションの右上隅で、初めてデータバックフィルを実行する場合、またはパーティションが使用できない場合は、[更新のトリガー] をクリックします。パーティションが存在する場合は、対応するパーティションを見つけて、[アクション] 列の [リフレッシュ] をクリックします。


パラメーター:
ds:テーブルのパーティションフィールド。たとえば、ds フィールドに 20241201 と入力すると、ds=20241201 のすべてのデータがバックフィルされます。
タスク名:データバックフィルタスクの名前。
更新範囲(オプション):ダウンストリームマテリアライズドテーブルへのカスケード更新を行うかどうかを指定します。パーティションテーブルを開始点として使用して、すべてのマテリアライズドテーブルを更新できます。ダウンストリームレイヤーの最大数は 6 です。
説明パーティションテーブルを更新する場合、ダウンストリームマテリアライズドテーブルのパーティション分割列は、パーティションテーブルのパーティション分割列と同じである必要があります。そうでない場合、更新操作は失敗します。
マテリアライズドテーブルの更新に失敗した場合、すべてのダウンストリームノードが失敗します。
デプロイメントターゲット:テーブルがデプロイされるオブジェクト。キューとセッションクラスターを選択できます。デフォルトでは、default-queue が使用されます。
非パーティションテーブル
[データ情報] タブを選択し、マテリアライズドテーブルの詳細を表示して、[リフレッシュ] をクリックします。

パラメーター:
タスク名:データバックフィルタスクの名前。
更新範囲:このパラメーターは、非パーティションテーブルではサポートされていません。
説明ダウンストリームテーブルのすべてのデータがリフレッシュされます。
マテリアライズドテーブルの更新に失敗した場合、すべてのダウンストリームノードが失敗します。
システムがベーステーブルのデータ鮮度に基づいてワークロードがストリーミングであると判断し、テーブルが非パーティションテーブルである場合、カスケード更新はサポートされません。
デプロイメントターゲット:テーブルがデプロイされるオブジェクト。キューとセッションクラスターを選択できます。デフォルトでは、default-queue が使用されます。
データ鮮度の変更
目的のカタログの下にある [マテリアライズドテーブル] データベースをクリックし、表示するマテリアライズドテーブルをクリックします。
マテリアライズドテーブルの詳細ページの右上隅で、[鮮度の編集] をクリックします。
マテリアライズドテーブルがプライマリキーを持たないテーブルの場合、タスクのストリーミングプロパティとバッチプロパティを変更することはできません。たとえば、データ鮮度を 2 秒から 1 時間に変更すると、Realtime Compute for Apache Flink はストリーミングジョブをバッチジョブに変換します。データ鮮度を 1 時間から 2 秒に変更すると、Realtime Compute for Apache Flink はバッチジョブをストリーミングジョブに変換します。このような操作は実行できません。データ鮮度が 30 分未満の場合、ジョブはストリーミングとして分類されます。データ鮮度が 30 分以上の場合、ジョブはバッチになります。
ベーステーブルがマテリアライズドテーブルの場合、ダウンストリームテーブルのデータ鮮度がアップストリームテーブルの 1 ~ N 倍であることを確認してください。N は正の整数です。
データ鮮度は 1 日を超えることはできません。
データ系列の表示
[データ系列] ページで、すべてのマテリアライズドテーブル間のデータ系列を表示します。マテリアライズドテーブルの更新を開始または停止し、データ鮮度を変更できます。また、ページの右上隅にある [詳細] をクリックして、マテリアライズドテーブルの詳細ページに移動することもできます。
