すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:マテリアライズドテーブルの作成と使用

最終更新日:Jan 05, 2026

このトピックでは、マテリアライズドテーブルの作成方法について説明します。また、既存データのバックフィル、データフレッシュネスの変更、マテリアライズドテーブルのデータリネージの表示方法についても説明します。

制限事項

  • この機能は、Ververica Runtime (VVR) 8.0.10 以降のバージョンでのみサポートされています。

  • この機能は、ファイルシステムまたは DLF メタストアを使用する Paimon カタログ のみをサポートします。カスタム Paimon カタログを使用してマテリアライズドテーブルを作成することはできません。

  • ジョブを開発およびデプロイする権限が必要です。詳細については、「開発コンソールの権限付与」をご参照ください。

  • 一時テーブル、一時関数、一時ビューなど、一時オブジェクトへの参照はサポートされていません。

マテリアライズドテーブルの作成

構文

CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
-- Primary key constraint
[([CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED)]

[COMMENT table_comment]
-- Partition key
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
-- WITH options
[WITH (key1=val1, key2=val2, ...)]
-- Data freshness
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
-- Refresh mode
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS  <select_statement>

パラメーター

パラメーター

必須

説明

FRESHNESS

はい

マテリアライズドテーブルのデータフレッシュネスを指定します。これは、ソーステーブルからのデータ更新の最大遅延を定義します。

説明
  • ベーステーブルがマテリアライズドテーブルの場合、ダウンストリームテーブルのデータフレッシュネスがアップストリームテーブルの 1〜N 倍 (N は正の整数) であることを確認してください。

  • データフレッシュネスは 1 日を超えることはできません。

AS <select_statement>

はい

マテリアライズドテーブルにデータを投入するクエリを定義します。アップストリームテーブルには、マテリアライズドテーブル、通常テーブル、またはビューを指定できます。SELECT 文はすべての Flink SQL クエリをサポートします。

PRIMARY KEY

いいえ

テーブル内の各行を一意に識別するオプションの列セットを定義します。指定された列に null は許可されません。

PARTITIONED BY

いいえ

マテリアライズドテーブルをパーティション分割するために使用されるオプションの列セットを定義します。

WITH Options

いいえ

マテリアライズドテーブルの作成に必要な、テーブルプロパティとパーティションフィールドの時刻フォーマットパラメーターを定義します。

たとえば、パーティションフィールドの時刻フォーマットパラメーターは WITH ('partition.fields.#.date-formatter' = 'yyyyMMdd') です。パラメーターの使用方法の詳細については、以下の例をご参照ください。

REFRESH_MODE

いいえ

マテリアライズドテーブルのリフレッシュモードを指定します。指定されたリフレッシュモードは、フレームワークがデータフレッシュネスから自動的に推測するモードよりも優先されます。これにより、特定のシナリオ要件を満たすことができます。

  • CONTINUOUS:ストリームジョブがマテリアライズドテーブルを増分更新します。ダウンストリームデータは即時、またはチェックポイント完了後に可視になります。

  • FULL:ワークフローが定期的にマテリアライズドテーブルの更新をトリガーします。エンジンは、完全更新を実行するか増分更新を実行するかを決定します。詳細については、「マテリアライズドテーブルの増分更新」をご参照ください。データリフレッシュサイクルはデータフレッシュネスと一致します。デフォルトでは、更新はテーブルレベルでデータを上書きします。パーティションフィールドが存在する場合、最新のパーティションのみをリフレッシュするか、すべてのパーティションをリフレッシュするかを選択できます。

手順

  1. Realtime Compute for Apache Flink コンソールにログインします。

  2. 対象のワークスペースの アクション 列で、コンソール をクリックします。

  3. 左側のナビゲーションウィンドウで、Data Management を選択し、対象の Paimon カタログをクリックします。

  4. 対象のデータベースをクリックし、マテリアライズドテーブルの作成 をクリックします。

    プライマリキー `order_id`、カテゴリ名 `order_name`、日付フィールド `ds` を持つ `orders` という名前のベーステーブルがあると仮定します。以下の例は、このテーブルに基づいてマテリアライズドテーブルを作成する方法を示しています。

    • `orders` テーブルに基づいて `mt_order` という名前のマテリアライズドテーブルを作成します。新しいテーブルには、ソーステーブルのすべてのフィールドが含まれ、データフレッシュネスは 5 秒です。

      CREATE MATERIALIZED TABLE mt_order
      FRESHNESS = INTERVAL '5' SECOND
      AS
      SELECT * FROM `paimon`.`db`.`orders`
      ;
    • `mt_order` テーブルに基づいて `mt_id` という名前のマテリアライズドテーブルを作成します。新しいテーブルには `order_id` と `ds` フィールドが含まれます。`order_id` をプライマリキー、`ds` をパーティションフィールドに設定し、データフレッシュネスを 30 分に設定します。

      CREATE MATERIALIZED TABLE mt_id (
       PRIMARY KEY (order_id) NOT ENFORCED
      )
      PARTITIONED BY(ds)
      FRESHNESS = INTERVAL '30' MINUTE
      AS
      SELECT order_id,ds FROM mt_order
      ;
    • `mt_order` テーブルに基づいて、`mt_ds` という名前のマテリアライズドテーブルを作成します。ds パーティションフィールドに date-formatter (時間フォーマット) を指定します。ジョブがスケジュールされるたびに、システムはスケジュールされた時間から鮮度の値を減算し、その結果を対応する ds パーティション値に変換します。たとえば、データの鮮度が 1 時間で、スケジュールされた時間が 2024-01-01 00:00:00 の場合、計算される `ds` 値は `20231231` になります。パーティション ds = '20231231' のデータのみがリフレッシュされます。スケジュールされた時間が 2024-01-01 01:00:00 の場合、計算される `ds` 値は `20240101` になり、パーティション ds = '20240101' のデータがリフレッシュされます。

      CREATE MATERIALIZED TABLE mt_ds
      PARTITIONED BY(ds)
      WITH (
          'partition.fields.ds.date-formatter' = 'yyyyMMdd'
      )
      FRESHNESS = INTERVAL '1' HOUR
      AS
      SELECT order_id,order_name,ds FROM mt_order
      ;
      説明
      • partition.fields.#.date-formatter では、`#` は文字列型の有効なパーティションフィールドでなければなりません。

      • partition.fields.#.date-formatter パラメーターは、マテリアライズドテーブルの時刻パーティションフォーマットを指定します。`#` は文字列型のパーティションフィールドの名前を表します。このパラメーターは、どのパーティションをリフレッシュするかをシステムに伝えます。

  5. マテリアライズドテーブルの更新を開始または停止します。

    1. カタログの下にある対象のマテリアライズドテーブルをクリックします。

    2. 右上隅で、更新の開始 または 更新の停止 をクリックします。

      説明

      [更新の停止] をクリックしたときに更新が進行中の場合、現在の更新サイクルが完了した後にプロセスが停止します。

  6. マテリアライズドテーブルジョブの詳細を表示します。

    テーブルスキーマ詳細 タブで、基本情報 セクションを表示します。データ更新ジョブ または ワークフロー の横にあるジョブ ID をクリックして詳細を表示します。

マテリアライズドテーブルのクエリの変更

制限

  • クエリを変更できるのは、Ververica Runtime (VVR) 11.1 以降で作成されたマテリアライズドテーブルのみです。

  • クエリを変更する場合、列の追加と計算ロジックの変更のみが可能です。既存の列の順序を変更したり、その定義を変更したりすることはできません。

    操作タイプ

    サポート

    説明

    新しい列の追加

    サポート

    既存の列の順序を維持したまま、スキーマの末尾に新しい列を追加できます。

    既存の列の計算ロジックの変更 (列名と型は変更しない)

    サポート

    たとえば、計算ロジックを変更できますが、列名とデータ型は同じままである必要があります。

    既存の列の順序の変更

    いいえ

    列の順序は固定です。マテリアライズドテーブルを削除して再作成する必要があります。

    既存の列の名前またはデータ型の変更

    いいえ

    マテリアライズドテーブルを削除して再作成する必要があります。

変更例

  1. テーブルの編集 をクリックし、クエリを変更します。以下のコードは例です。

    ALTER MATERIALIZED TABLE `paimon`.`default`.`mt-orders`
        AS
        SELECT
          *,
          price * quantity AS total_price
        FROM orders
        WHERE price * quantity > 1000
    ;
  2. プレビュー をクリックして変更内容を表示します。

    image

  3. 確認 をクリックします。その後、テーブルスキーマ詳細 タブで新しい列とクエリロジックを表示できます。

重要

フィールドを追加しても、通常はダウンストリームノードに影響はありません。ただし、ダウンストリームノードが SELECT * や自動フィールドマッピングなどの動的解析を使用している場合、同期タスクが失敗したり、データ形式の不一致エラーが報告されたりする可能性があります。動的解析を避け、代わりに固定フィールドを使用することを推奨します。アップストリームテーブルが変更された場合は、速やかにダウンストリームテーブルのスキーマを更新してください。

マテリアライズドテーブルの増分更新

制限

この機能は、Ververica Runtime (VVR) 8.0.11 以降のバージョンでのみサポートされています。

マテリアライズドテーブルの更新モード

マテリアライズドテーブルには、ストリーム更新、完全バッチ更新、増分バッチ更新の 3 つの更新モードがあります。

マテリアライズドテーブルのストリームモードまたはバッチモードは、そのデータフレッシュネスによって決まります。フレッシュネスが 30 分未満の場合はストリームモード、30 分以上の場合はバッチモードを示します。バッチモードでは、エンジンは完全更新を実行するか増分更新を実行するかを自動的に決定します。増分更新は、最後の更新以降に変更されたデータのみを計算し、それをマテリアライズドテーブルにマージします。完全更新は、テーブル全体またはパーティション全体のデータを計算し、マテリアライズドテーブルのデータを上書きします。バッチモードでは、エンジンは増分更新を優先し、マテリアライズドテーブルで増分更新がサポートされていない場合にのみ完全更新を使用します。

増分更新の条件

増分更新は、マテリアライズドテーブルが以下の条件を満たす場合にのみ使用されます。

  • マテリアライズドテーブルの定義で、どのパーティションフィールドにも partition.fields.#.date-formatter パラメーターが設定されていない。

  • ソーステーブルにプライマリキーがない。

  • マテリアライズドテーブルのクエリが、以下の場合に増分更新をサポートしている。

    SQL 文

    サポート

    SELECT

    ユーザー定義関数 (UDF) を含む、列およびスカラー関数式の選択をサポートします。集計関数はサポートされていません。

    FROM

    テーブル名またはサブクエリをサポートします。

    WITH

    共通テーブル式 (CTE) をサポートします。

    WHERE

    UDF を含む、さまざまなスカラー関数式を含むフィルター条件をサポートします。`WHERE [NOT] EXISTS <subquery>` や `WHERE <column_name> [NOT] IN <subquery>` などのサブクエリはサポートされていません。

    UNION

    UNION ALL のみがサポートされています。

    JOIN

    • INNER JOIN がサポートされています。

    • 以下で説明する LATERAL JOIN およびルックアップ結合の場合を除き、LEFT/RIGHT/FULL [OUTER] JOIN はサポートされていません。

    • ユーザー定義テーブル値関数 (UDTF) を含む、テーブル関数式との [LEFT [OUTER]] JOIN LATERAL はサポートされています。

    • ルックアップ結合については、`A [LEFT [OUTER]] JOIN B FOR SYSTEM_TIME AS OF PROCTIME()` のみがサポートされています。

    説明
    • `JOIN` キーワードのない JOIN (`SELECT * FROM a, b WHERE a.id = b.id` など) はサポートされています。

    • 現在、`INNER JOIN` を使用した増分計算では、両方のソーステーブルから完全なデータを読み取ります。

    GROUP BY

    サポートされていません。

増分更新の例

例 1:スカラー関数を使用してソーステーブル `orders` のデータを処理します。

CREATE MATERIALIZED TABLE mt_shipped_orders (
    PRIMARY KEY (order_id) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
SELECT 
    order_id,
    COALESCE(customer_id, 'Unknown') AS customer_id,
    CAST(order_amount AS DECIMAL(10, 2)) AS order_amount,
    CASE 
        WHEN status = 'shipped' THEN 'Completed'
        WHEN status = 'pending' THEN 'In Progress'
        ELSE 'Unknown'
    END AS order_status,
    DATE_FORMAT(order_ts, 'yyyyMMdd') AS order_date,
    UDSF_ProcessFunction(notes) AS notes
FROM 
    orders
WHERE
    status = 'shipped';

例 2:LATERAL JOIN とルックアップ結合を使用してソーステーブル `orders` のデータをエンリッチします。

CREATE MATERIALIZED TABLE mt_enriched_orders (
    PRIMARY KEY (order_id, order_tag) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
WITH o AS (
    SELECT
        order_id,
        product_id,
        quantity,
        proc_time,
        e.tag AS order_tag
    FROM 
        orders,
        LATERAL TABLE(UDTF_StringSplitFunction(tags, ',')) AS e(tag))
SELECT 
    o.order_id,
    o.product_id,
    p.product_name,
    p.category,
    o.quantity,
    p.price,
    o.quantity * p.price AS total_amount,
    order_tag
FROM o 
LEFT JOIN 
    product_info FOR SYSTEM_TIME AS OF PROCTIME() AS p
ON 
    o.product_id = p.product_id;

既存データのバックフィル

以前は、ストリームジョブを実行した後、結果を前日の完全なデータセットと調整するために、別のバッチジョブを開発する必要がありました。マテリアライズドテーブルを使用すると、既存データのパーティションを直接選択してデータをバックフィルできます。この変更により、開発と O&M のコストが削減され、ストリームとバッチの統合処理が提供されます。

  1. カタログの下にある対象のマテリアライズドテーブルをクリックします。

  2. データ情報 タブで、データをバックフィルします。

    マテリアライズドテーブルを作成したときにパーティションフィールドを宣言した場合、それはパーティションテーブルです。それ以外の場合は、非パーティションテーブルです。

    パーティションテーブル

    データパーティション を表示します。これが最初のバックフィルであるか、必要なパーティションが存在しない場合は、手動更新 をクリックします。パーティションが存在する場合は、バックフィルするパーティションを選択し、バックフィル をクリックします。

    image

    image

    パラメーター:

    • パーティションフィールド:テーブルのパーティションフィールド。たとえば、`20241201` と入力すると、`ds=20241201` のすべてのデータがバックフィルされます。

    • タスク名:データバックフィルタスクの名前。

    • 更新範囲 (オプション):関連するダウンストリームのマテリアライズドテーブルに更新をカスケードするかどうかを指定します。更新は、現在のテーブルからそのデータリネージ内のすべてのマテリアライズドテーブルにカスケードされます。ダウンストリームレイヤーの最大数は 6 です。

      説明
      • パーティションテーブルの更新では、ダウンストリームのマテリアライズドテーブルのパーティションフィールドが開始テーブルのパーティションフィールドと同一でなければなりません。そうでない場合、更新操作は失敗します。

      • リネージ内のマテリアライズドテーブルの更新が失敗した場合、すべてのダウンストリームノードも失敗します。

    • デプロイターゲット:キューまたはセッションクラスターを選択できます。デフォルトは `default-queue` です。

    非パーティションテーブル

    データステータス を表示し、バックフィル をクリックします。

    image

    パラメーター:

    • タスク名:データバックフィルタスクの名前。

    • 更新範囲:このパラメーターは非パーティションテーブルでは使用できません。

      説明
      • 更新中、ダウンストリームデータに対して完全更新が実行されます。

      • リネージ内のマテリアライズドテーブルの更新が失敗した場合、すべてのダウンストリームノードも失敗します。

      • システムがそのフレッシュネスに基づいて開始テーブルが非パーティションのストリームタスクであると判断した場合、カスケード更新はサポートされません。

    • デプロイターゲット:キューまたはセッションクラスターを選択できます。デフォルトは `default-queue` です。

  3. スケジュールされたバッチバックフィル。

    タスクオーケストレーション を使用して、定期的なスケジュールのためのマテリアライズドテーブルワークフローを作成できます。これにより、スケジュールされたバックフィルが可能になります。また、ワークフローのデータバックフィル機能を使用して、バッチデータバックフィルの時間範囲を選択することもできます。

データフレッシュネスの変更

  1. カタログで、マテリアライズドテーブル データベースをクリックし、次に対象の マテリアライズドテーブル をクリックします。

  2. 右上隅で、データフレッシュネスの変更 をクリックします。

    • マテリアライズドテーブルにプライマリキーがない場合、ジョブの処理モードをストリームとバッチの間で変更することはできません。たとえば、データフレッシュネスを 2 秒 (ストリーム) から 1 時間 (バッチ) に変更したり、その逆を行ったりすることはできません。ジョブは、フレッシュネスが 30 分未満の場合はストリームジョブ、30 分以上の場合はバッチジョブになります。

    • ベーステーブルがマテリアライズドテーブルの場合、ダウンストリームテーブルのデータフレッシュネスは、アップストリームテーブルのデータフレッシュネスの整数倍でなければなりません。

    • データフレッシュネスは 1 日を超えることはできません。

データ系列の表示

データリネージページには、すべてのマテリアライズドテーブル間のリネージ関係が表示されます。このページから、マテリアライズドテーブルに対して 更新の開始/停止データフレッシュネスの変更 などの操作を実行することもできます。特定のマテリアライズドテーブルの詳細ページを表示するには、詳細 をクリックします。

image

参照資料