すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:マテリアライズドテーブルの作成と使用

最終更新日:Nov 09, 2025

このトピックでは、マテリアライズドテーブルの作成と使用方法について説明します。マテリアライズドテーブルへの既存データのバックフィル、データ鮮度の調整、データ系列の確認などの主要な側面について説明します。

前提条件と制限事項

  • エンジンバージョンは Ververica Runtime (VVR) 8.0.10 以降です。

  • ファイルシステム Apache Paimon カタログメタストアタイプの があります。

  • 名前空間でジョブを開発およびデプロイするための権限があります。詳細については、「名前空間に対する権限の付与」をご参照ください。

  • 一時テーブル、関数、ビューなどの一時オブジェクトはサポートされていません。

マテリアライズドテーブルの作成

構文

CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name
-- プライマリキー制約を定義します。
[([CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED)]

[COMMENT table_comment]
-- パーティションキーを設定します。
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
-- コネクタオプションを設定します。
[WITH (key1=val1, key2=val2, ...)]
-- データ鮮度を設定します。
FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY }
-- データリフレッシュモードを設定します。
[REFRESH_MODE = { CONTINUOUS | FULL }]
AS  <select_statement>

パラメーター

パラメーター

必須

説明

FRESHNESS

はい

マテリアライズドテーブルのデータの鮮度。データ鮮度は、マテリアライズドテーブルとベーステーブル間のデータ更新の最大遅延時間を指定します。

説明
  • ベーステーブルがマテリアライズドテーブルの場合、ダウンストリームテーブルのデータ鮮度がアップストリームテーブルの 1 ~ N 倍であることを確認してください。N は正の整数です。

  • データ鮮度は 1 日を超えることはできません。

AS <select_statement>

はい

この句は、マテリアライズドテーブルデータを読み込むためのクエリを定義するために使用されます。アップストリームテーブルは、マテリアライズドテーブル、テーブル、またはビューにすることができます。SELECT 文はすべての Flink SQL クエリをサポートしています。

PRIMARY KEY

いいえ

テーブルの各行を一意に識別するオプションの列のセットを定義します。プライマリキーフィールドは NULL にすることはできません。

PARTITIONED BY

いいえ

マテリアライズドテーブルのパーティション分割に使用するオプションの列のグループを定義します。

WITH オプション

いいえ

マテリアライズドテーブルの作成に必要なテーブルプロパティとパーティション分割列の時間形式パラメーターを定義します。

たとえば、パーティション列の時間形式パラメーターを WITH ('partition.fields.#.date-formatter' = 'yyyyMMdd') として定義します。詳細については、このトピックで提供されている例を参照してください。

REFRESH_MODE

いいえ

マテリアライズドテーブルのリフレッシュモードを指定します。明示的に指定されたリフレッシュモードは、自動的に推測されたリフレッシュモードよりも優先されます。有効な値:

  • CONTINUOUS:ストリーミングジョブは、マテリアライズドテーブルのデータを増分的に更新します。ダウンストリームストレージのデータはすぐに表示されるか、チェックポイントが完了した後にのみ表示されます。

  • FULL:ワークフローは、マテリアライズドテーブルのすべてのデータを定期的に更新します。エンジンは、データを増分的に更新するか、完全に更新するかを決定します。詳細については、「マテリアライズドテーブルの増分更新」を参照してください。データリフレッシュ間隔はデータ鮮度と一致します。デフォルトでは、マテリアライズドテーブルのすべてのデータが上書きされます。パーティション分割列が存在する場合は、最新のパーティションをリフレッシュするか、すべてのパーティションを更新するかを選択できます。

手順

  1. Realtime Compute for Apache Flink の管理コンソール にログオンします。

  2. 対象のワークスペースの [アクション] 列で [コンソール] をクリックします。

  3. 左側のナビゲーションウィンドウで、[カタログ] をクリックします。[カタログ] ペインで、対象の Apache Paimon カタログをクリックします。

  4. 対象のデータベースをクリックします。表示されるページで、[マテリアライズドテーブルの作成] をクリックします。

    この例では、orders という名前のベーステーブルを使用します。テーブルでは、プライマリキーは order_id、カテゴリ名は order_name、日付フィールドは ds です。次の例は、orders テーブルに基づいてマテリアライズドテーブルを作成する方法を示しています。

    • orders テーブルに基づいて mt_order という名前のマテリアライズドテーブルを作成します。orders テーブルからすべての列をマテリアライズドテーブルフィールドとして取得し、データ鮮度を 5 秒に設定します。

      CREATE MATERIALIZED TABLE mt_order
      FRESHNESS = INTERVAL '5' SECOND
      AS
      SELECT * FROM `paimon`.`db`.`orders`
      ;
    • マテリアライズドテーブル mt_order に基づいて mt_id という名前のマテリアライズドテーブルを作成します。order_id フィールドと ds フィールドをテーブルフィールドとして取得し、order_id フィールドをプライマリキーとして、ds フィールドをパーティション分割列として指定し、データ鮮度を 30 分に設定します。

      CREATE MATERIALIZED TABLE mt_id (
       PRIMARY KEY (order_id) NOT ENFORCED
      )
      PARTITIONED BY(ds)
      FRESHNESS = INTERVAL '30' MINUTE
      AS
      SELECT order_id,ds FROM mt_order
      ;
    • マテリアライズドテーブル mt_order に基づいて、mt_ds という名前のマテリアライズドテーブルを作成します。date-formatter 時間形式を ds パーティション列に指定します。マテリアライズドテーブル mt_ds がスケジュールされるたびに、スケジュールされた時間からデータの鮮度を引いた値が、対応する ds パーティション値に変換されます。たとえば、データの鮮度が 1 時間に設定され、スケジュールされた時間が 2024-01-01 00:00:00 の場合、計算された ds 値は 20231231 になります。この場合、パーティション ds = '20231231' のデータのみがリフレッシュされます。スケジュールされた時間が 2024-01-01 01:00:00 で、計算された ds 値が 20240101 の場合、パーティション ds = '20240101' のデータがリフレッシュされます。

      CREATE MATERIALIZED TABLE mt_ds
      PARTITIONED BY(ds)
      WITH (
          'partition.fields.ds.date-formatter' = 'yyyyMMdd'
      )
      FRESHNESS = INTERVAL '1' HOUR
      AS
      SELECT order_id,order_name,ds FROM mt_order
      ;
      説明
      • partition.fields.#.date-formatter の # フィールドは、文字列タイプの有効なパーティションフィールドである必要があります。

      • partition.fields.#.date-formatter は、マテリアライズドテーブルの時間パーティション形式を指定します。# は文字列タイプのパーティション列の名前で、システムが指定されたパーティションのデータをリフレッシュできるようにします。

  5. マテリアライズドテーブルの更新を開始または停止します。

    1. 対象のカタログの下にあるマテリアライズドテーブルをクリックします。

    2. 詳細ページの右上隅にある [開始] または [停止] をクリックして、更新を開始または停止します。

      説明

      [停止] をクリックしたときにマテリアライズドテーブルが更新されている場合、現在の更新ラウンドが完了するまで更新は停止しません。

  6. ジョブを確認します。

    マテリアライズドテーブルの詳細ページの [テーブルスキーマ] タブを選択します。[基本情報] セクションで、[最新のジョブ] または [ワークフロー] フィールドの ID をクリックします。

マテリアライズドテーブルのクエリの変更

制限

  • VVR 11.1 以降で動作する新しいマテリアライズドテーブルのみがサポートされています。

  • 列の追加計算ロジックの変更のみがサポートされている操作です。詳細は、以下の表を参照してください。

    操作

    サポートされている

    説明

    列を追加する

    サポートされている

    列の順序を維持しながら、マテリアライズドテーブルに新しい列を追加します。

    計算ロジックを変更する

    サポートされている

    列名とタイプを変更せずに、既存の列の計算ロジックを変更します。

    列の順序を変更する

    サポートされていない

    列の順序を変更するには、マテリアライズドテーブルを削除し、目的の列の順序で新しいテーブルを作成します。

    列名またはタイプを変更する

    サポートされていない

    列名またはタイプを変更するには、マテリアライズドテーブルを削除し、目的の列名またはタイプで新しいテーブルを作成します。

手順

  1. [テーブルの編集] をクリックし、クエリを変更します。サンプルコード:

    ALTER MATERIALIZED TABLE `paimon`.`default`.`mt-orders`
        AS
        SELECT
          *,
          price * quantity AS total_price
        FROM orders
        WHERE price * quantity > 1000
    ;
  2. [プレビュー] をクリックして変更を確認します。

    image

  3. [OK] をクリックします。[テーブルスキーマ] タブを選択して、追加された列と変更されたクエリを確認します。

重要

ダウンストリームテーブルが SELECT * FROM クエリや自動フィールドマッピングなどの動的解析に依存している場合、新しい列を追加すると、ジョブの失敗やデータ形式の不一致エラーが発生する可能性があります。ダウンストリームテーブルには動的解析ではなく固定列を使用し、スキーマを速やかに更新することをお勧めします。

マテリアライズドテーブルの増分更新

制限

VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink でのみ、マテリアライズドテーブルの増分データを更新できます。

マテリアライズドテーブルの更新モード

マテリアライズドテーブルは、ストリーミング更新モード、フルバッチ更新モード、増分バッチ更新モードの更新モードを提供します。

マテリアライズドテーブルのデータ鮮度は、テーブルがストリーミングモードかバッチモードかを定義します。マテリアライズドテーブルのデータ鮮度が 30 分未満の場合、テーブルはストリーミングモードになります。マテリアライズドテーブルのデータ鮮度が 30 分以上の場合、テーブルはバッチモードになります。バッチモードでは、エンジンは自動的にフルアップデートまたは増分アップデートを実行します。増分アップデートは、最後のアップデート以降の増分データのみを計算し、結果をマテリアライズドテーブルにマージします。フルアップデートは、テーブル全体またはパーティション全体のデータを計算し、マテリアライズドテーブルのデータを上書きします。バッチモードでは、エンジンは優先的に増分アップデートを実行します。フルアップデートは、増分アップデートがマテリアライズドテーブルの要件を満たせない場合にのみ実行されます。

増分更新の条件

増分更新は、マテリアライズドテーブルが次の条件を満たす場合にのみ実行されます。

  • マテリアライズドテーブルを定義するときに、partition.fields.#.date-formatter パラメーターを設定してパーティション列の時間形式を指定しません。

  • ソーステーブルのプライマリキーを指定しません。

  • 次のシナリオでは、マテリアライズドテーブルのクエリ文は増分更新をサポートしています。

    SQL 文

    サポートされている増分更新

    SELECT

    列をクエリでき、ユーザー定義関数を含むスカラー関数式がサポートされています。集計関数はサポートされていません。

    FROM

    テーブル名クエリまたはサブクエリがサポートされています。

    WITH

    共通テーブル式 (CTE) がサポートされています。

    WHERE

    ユーザー定義関数を含むスカラー関数式は、フィルター条件でサポートされています。サブクエリはフィルター条件ではサポートされていません。たとえば、WHERE [NOT] EXISTS <subquery> および WHERE <列名> [NOT] IN <subquery> 形式のサブクエリ句はサポートされていません。

    UNION

    UNION ALL のみがサポートされています。

    JOIN

    • INNER JOIN がサポートされています。

    • LEFT JOIN、RIGHT JOIN、および FULL [OUTER] JOIN は、Lateal Join と Lookup Join が使用される次のシナリオを除いて、サポートされていません。

      • [LEFT [OUTER]] JOIN LATERAL と、ユーザー定義関数を含むテーブル関数式がサポートされています。

      • Lookup Join では、A [LEFT [OUTER]] JOIN B FOR SYSTEM_TIME AS OF PROCTIME() のみがサポートされています。

    説明
    • JOIN キーワードを使用しない JOIN 文 (例:SELECT * FROM a, b WHERE a.id = b.id) はサポートされています。

    • INNER JOIN を使用して増分更新を実行する場合でも、2 つのソーステーブルの完全データが読み取られます。

    GROUP BY

    サポートされていません。

例 1:スカラー関数を使用して orders ソーステーブルのデータを処理します。

CREATE MATERIALIZED TABLE mt_shipped_orders (
    PRIMARY KEY (order_id) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
SELECT 
    order_id,
    COALESCE(customer_id, 'Unknown') AS customer_id,
    CAST(order_amount AS DECIMAL(10, 2)) AS order_amount,
    CASE 
        WHEN status = 'shipped' THEN 'Completed'
        WHEN status = 'pending' THEN 'In Progress'
        ELSE 'Unknown'
    END AS order_status,
    DATE_FORMAT(order_ts, 'yyyyMMdd') AS order_date,
    UDSF_ProcessFunction(notes) AS notes
FROM 
    orders
WHERE
    status = 'shipped';

例 2:lateral join と lookup join を使用して orders テーブルを強化します。

CREATE MATERIALIZED TABLE mt_enriched_orders (
    PRIMARY KEY (order_id, order_tag) NOT ENFORCED
)
FRESHNESS = INTERVAL '30' MINUTE
AS
WITH o AS (
    SELECT
        order_id,
        product_id,
        quantity,
        proc_time,
        e.tag AS order_tag
    FROM 
        orders,
        LATERAL TABLE(UDTF_StringSplitFunction(tags, ',')) AS e(tag))
SELECT 
    o.order_id,
    o.product_id,
    p.product_name,
    p.category,
    o.quantity,
    p.price,
    o.quantity * p.price AS total_amount,
    order_tag
FROM o 
LEFT JOIN 
    product_info FOR SYSTEM_TIME AS OF PROCTIME() AS p
ON 
    o.product_id = p.product_id;

既存データのバックフィル

従来は、前日のすべてのデータを使用してストリーム処理結果を修正するために、別のバッチジョブが必要でした。マテリアライズドテーブルを使用すると、現在のジョブを使用して直感的にデータをバックフィルできます。マテリアライズドテーブルのこの機能は、バッチ処理とストリーミング処理を統合し、開発と O&M の効率を高めます。

  1. [カタログ] ペインで、Apache Paimon カタログの下にある対象のマテリアライズドテーブルをクリックします。

  2. [データ情報] タブで、データをリフレッシュします。

    マテリアライズドテーブルの作成時にパーティションキーを定義した場合、テーブルはパーティションテーブルになります。

    パーティションテーブル

    [テーブル情報] タブの [パーティション] セクションの右上隅で、初めてデータバックフィルを実行する場合、またはパーティションが使用できない場合は、[更新のトリガー] をクリックします。パーティションが存在する場合は、対応するパーティションを見つけて、[アクション] 列の [リフレッシュ] をクリックします。

    image

    image

    パラメーター:

    • ds:テーブルのパーティションフィールド。たとえば、ds フィールドに 20241201 と入力すると、ds=20241201 のすべてのデータがバックフィルされます。

    • タスク名:データバックフィルタスクの名前。

    • 更新範囲(オプション):ダウンストリームマテリアライズドテーブルへのカスケード更新を行うかどうかを指定します。パーティションテーブルを開始点として使用して、すべてのマテリアライズドテーブルを更新できます。ダウンストリームレイヤーの最大数は 6 です。

      説明
      • パーティションテーブルを更新する場合、ダウンストリームマテリアライズドテーブルのパーティション分割列は、パーティションテーブルのパーティション分割列と同じである必要があります。そうでない場合、更新操作は失敗します。

      • マテリアライズドテーブルの更新に失敗した場合、すべてのダウンストリームノードが失敗します。

    • デプロイメントターゲット:テーブルがデプロイされるオブジェクト。キューとセッションクラスターを選択できます。デフォルトでは、default-queue が使用されます。

    非パーティションテーブル

    [データ情報] タブを選択し、マテリアライズドテーブルの詳細を表示して、[リフレッシュ] をクリックします。

    image

    パラメーター:

    • タスク名:データバックフィルタスクの名前。

    • 更新範囲:このパラメーターは、非パーティションテーブルではサポートされていません。

      説明
      • ダウンストリームテーブルのすべてのデータがリフレッシュされます。

      • マテリアライズドテーブルの更新に失敗した場合、すべてのダウンストリームノードが失敗します。

      • システムがベーステーブルのデータ鮮度に基づいてワークロードがストリーミングであると判断し、テーブルが非パーティションテーブルである場合、カスケード更新はサポートされません。

    • デプロイメントターゲット:テーブルがデプロイされるオブジェクト。キューとセッションクラスターを選択できます。デフォルトでは、default-queue が使用されます。

データ鮮度の変更

  1. 目的のカタログの下にある [マテリアライズドテーブル] データベースをクリックし、表示するマテリアライズドテーブルをクリックします。

  2. マテリアライズドテーブルの詳細ページの右上隅で、[鮮度の編集] をクリックします。

    • マテリアライズドテーブルがプライマリキーを持たないテーブルの場合、タスクのストリーミングプロパティとバッチプロパティを変更することはできません。たとえば、データ鮮度を 2 秒から 1 時間に変更すると、Realtime Compute for Apache Flink はストリーミングジョブをバッチジョブに変換します。データ鮮度を 1 時間から 2 秒に変更すると、Realtime Compute for Apache Flink はバッチジョブをストリーミングジョブに変換します。このような操作は実行できません。データ鮮度が 30 分未満の場合、ジョブはストリーミングとして分類されます。データ鮮度が 30 分以上の場合、ジョブはバッチになります。

    • ベーステーブルがマテリアライズドテーブルの場合、ダウンストリームテーブルのデータ鮮度がアップストリームテーブルの 1 ~ N 倍であることを確認してください。N は正の整数です。

    • データ鮮度は 1 日を超えることはできません。

データ系列の表示

[データ系列] ページで、すべてのマテリアライズドテーブル間のデータ系列を表示します。マテリアライズドテーブルの更新を開始または停止し、データ鮮度を変更できます。また、ページの右上隅にある [詳細] をクリックして、マテリアライズドテーブルの詳細ページに移動することもできます。

image

参照資料