すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:マテリアライズドテーブルのクイックスタート

最終更新日:Mar 07, 2026

本トピックでは、マテリアライズドテーブルを活用したストリーム/バッチ統合型データレイクハウスの構築方法について説明します。また、マテリアライズドテーブルのデータ新鮮度(Freshness)を調整することで、バッチ処理からストリーム処理へ切り替え、リアルタイムなデータ更新を実現する方法も解説します。

マテリアライズドテーブルの概要

マテリアライズドテーブルは、Flink SQL における新しいテーブルタイプであり、バッチおよびストリーム処理のデータパイプラインを簡素化し、統合された開発体験を提供します。マテリアライズドテーブルを作成する際には、フィールドやデータ型を明示的に宣言する必要はありません。代わりに、データ新鮮度とクエリ文のみを指定します。Flink エンジンが自動的にクエリ文からスキーマを推論し、指定されたデータ新鮮度を維持するためのデータパイプラインを構築します。詳細については、「マテリアライズドテーブル管理」をご参照ください。

リアルタイムデータレイクハウスのパイプライン図

  1. Flink が、データソースから Paimon へデータを書き込むことで、運用データストア(ODS)レイヤーを構築します。

  2. Flink が、ODS レイヤーのデータを結合・拡張し、その結果をマテリアライズドテーブルへ書き込むことで、DWD レイヤーを構築します。

  3. アプリケーション向けのクエリを実行するデータウェアハウスサービス(DWS)レイヤーは、異なるデータ新鮮度レベルを持つ複数のマテリアライズドテーブルを構築し、多次元的なビジネス統計を行うことで作成できます。

前提条件

  • Realtime Compute for Apache Flink ワークスペースが作成されました。詳細については、「Realtime Compute for Apache Flink を有効化する」をご参照ください。

  • Resource Access Management (RAM) ユーザーまたは RAM ロールを使用する場合、当該 RAM ユーザーまたは RAM ロールが Realtime Compute for Apache Flink コンソールへのアクセスに必要な権限を有していることを確認してください。詳細については、「権限管理」をご参照ください。

ステップ 1:テストデータの準備

  1. (任意)Paimon カタログを作成します。

    マテリアライズドテーブル機能は Apache Paimon を基盤としています。メタストアタイプが Filesystem の Paimon カタログを作成する必要があります。すでに作成済みの場合は、このステップをスキップできます。詳細については、「Paimon カタログの作成」をご参照ください。

    Paimon カタログの作成

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. 対象のワークスペースの[アクション] 列で、[コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで、[Data Management] を選択し、[カタログの作成] をクリックします。次に、[Apache Paimon] を選択し、[次へ] をクリックします。

      image

      パラメーターの説明:

      パラメーター

      説明

      備考

      metastore

      メタストアのタイプです。

      本例では、Filesystem を使用します。

      catalog name

      Paimon カタログの名前です。

      任意の名前を入力します。本例では `paimon` を使用します。

      warehouse

      Object Storage Service (OSS) 上のデータウェアハウスディレクトリです。

      形式は oss://<bucket>/<object> です。ここで:

      • <bucket>:ご利用の OSS バケット名です。

      • `` は、データが格納されているパスです。

      OSS コンソールでバケット名およびオブジェクト名を確認できます。OSS コンソール

      fs.oss.endpoint

      OSS サービスのエンドポイントです。

      Flink と OSS が同一リージョンにある場合は、内部エンドポイントを使用します。それ以外の場合は、パブリックエンドポイントを使用します。詳細については、「リージョンとエンドポイント」をご参照ください。

      fs.oss.accessKeyId

      OSS に対する読み取り/書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID です。

      AccessKey ペアの取得方法については、「AccessKey ペアの作成」をご参照ください。AccessKey ペアがプレーンテキストで公開されるのを防ぐため、変数の使用を推奨します。変数の管理方法については、「変数の管理」をご参照ください。

      fs.oss.accessKeySecret

      OSS に対する読み取り/書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret です。

  2. `ods_user_log` および `ods_dim_product` テーブルを作成します。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. 対象のワークスペースで、[コンソール][操作] 列でクリックします。

    3. 左側ナビゲーションウィンドウで、Data Studio > Scripts を選択します。以下のコードをコピー&ペーストして、ソーステーブルを作成します。

      本例では、`paimon` という名前の Paimon カタログおよびデフォルトデータベースを使用します。
      CREATE TABLE `paimon`.`default`.`ods_user_log` (
        item_id INT NOT NULL,
        user_id INT NOT NULL,
        vtime TIMESTAMP(6),
        ds VARCHAR(10) NOT NULL
      ) 
      PARTITIONED BY(ds)
      WITH (
        'bucket' = '4',            -- 4 つのバケットを指定。
        'bucket-key' = 'item_id'   -- バケットキーを指定。同じ item_id を持つデータは同一バケットに配置されます。
      );
      
      CREATE TABLE `paimon`.`default`.`ods_dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
      ) WITH (
        'bucket' = '4',
        'bucket-key' = 'item_id'
      );
    4. 右上隅の[実行]をクリックして、対応するデータテーブルを作成します。

    5. 左側のナビゲーションウィンドウで、[Data Management] を選択します。次に、対応する Paimon カタログをクリックし、[Refresh] をクリックして、新しいテーブルを表示します。

  3. Faker データジェネレーター コネクタを使用して、シミュレートされたデータを生成し、Paimon テーブルへ書き込みます。

    1. 左側ナビゲーションウィンドウで、Data Studio > ETL を選択します。

    2. 新規]をクリックし、[空のストリーム下書き]を選択して、[次へ]をクリックし、次に[作成]をクリックします。

    3. 以下の SQL 文をエディターにコピーします。

      CREATE TEMPORARY TABLE `user_log` (
        item_id INT,  // 製品 ID
        user_id INT,  // ユーザー ID
        vtime TIMESTAMP,  
        ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd')
      ) WITH (
        'connector' = 'faker',    -- Faker コネクタ
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',    -- 0 ~ 1000 の乱数を生成。
        'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}',
        'fields.vtime.expression'='#{date.past ''5'',''HOURS''}',           -- 現在時刻から過去 5 時間分のデータを生成。
        'rows-per-second' = '3'   -- 1 秒あたり 3 行を生成。
       );
        
       CREATE TEMPORARY TABLE `dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
       ) WITH (
        'connector' = 'faker',    -- Faker コネクタ
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',
        'fields.title.expression'='#{book.title}',
        'fields.pict_url.expression'='#{internet.domainName}',
        'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}',   
        'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}',
        'rows-per-second' = '3'        -- 1 秒あたり 3 行を生成。
       );
      
      BEGIN STATEMENT SET; 
      
      INSERT INTO `paimon`.`default`.`ods_user_log` 
        SELECT 
        item_id,
        user_id,
        vtime,
        CAST(ds AS VARCHAR(10)) AS ds
      FROM `user_log`;
      INSERT INTO `paimon`.`default`.`ods_dim_product`
        SELECT 
        item_id,
        title,
        pict_url,
        brand_id,
        seller_id
      FROM `dim_product`;
      
      END; 
    4. 右上隅で、[デプロイ]をクリックします。

    5. 左側のナビゲーションウィンドウで、[オペレーションセンター] > [デプロイメント] をクリックします。対象のデプロイメントについて、[操作] 列の [開始] をクリックし、[ステートレス開始] を選択してから、[開始] をクリックします。

  4. シミュレートされたデータをクエリします。

    左側のナビゲーションウィンドウで、[データ開発] > [データクエリ] を選択します。次のSQL文をSQL エディターにコピーし、右上隅の[実行]をクリックします。

    SELECT * FROM `paimon`.`default`.ods_dim_product LIMIT 10;
    
    SELECT * FROM `paimon`.`default`.ods_user_log LIMIT 10;

    image

ステップ 2:マテリアライズドテーブルの作成

本セクションでは、ソーステーブルを結合して DWD レイヤー向けの `dwd_user_log_product` マテリアライズドテーブルを作成する方法について説明します。その後、`dwd_user_log_product` テーブルを基に、ビジネス分析を実行するための下流マテリアライズドテーブル(DWS レイヤー)を構築します。

  1. データウェアハウスの DWD レイヤーを、`dwd_user_log_product` マテリアライズドテーブルの作成によって構築します。

    1. 左側のナビゲーションウィンドウで、Data Management を選択し、対象の Paimon カタログをクリックします。

    2. 対象のデータベース (この例では「default」) をクリックしてから、[マテリアライズドテーブルの作成] をクリックします。次の SQL 文を SQL エディターにコピーし、[作成] をクリックします。

      -- DWD レイヤーのワイドニングロジック
      CREATE MATERIALIZED TABLE dwd_user_log_product(
          PRIMARY KEY (item_id) NOT ENFORCED
      )
      PARTITIONED BY(ds)
      WITH (
        'partition.fields.ds.date-formatter' = 'yyyyMMdd'
      )
      FRESHNESS = INTERVAL '1' HOUR      -- 1 時間ごとに更新。
      AS SELECT
        l.ds,
        l.item_id,
        l.user_id,
        l.vtime,
        r.brand_id,
        r.seller_id
      FROM `paimon`.`default`.`ods_user_log` l INNER JOIN `paimon`.`default`.`ods_dim_product` r
      ON l.item_id = r.item_id;
  2. データウェアハウスの DWS レイヤーを構築し、`dwd_user_log_product` マテリアライズドテーブルに基づいて多次元的なビジネス分析を実行します。

    本トピックでは、`dws_overall` マテリアライズドテーブルの作成を例として説明します。このテーブルは、時間単位での日次ページビュー(PV)およびユニークビジター(UV)を計算します。前のセクションの手順に従って、`dws_overall` マテリアライズドテーブルを作成できます。

    // 日次 PV および UV の計算。
    CREATE MATERIALIZED TABLE dws_overall(
        PRIMARY KEY(ds, hh) NOT ENFORCED
    )
    PARTITIONED BY(ds)
    WITH (
      'partition.fields.ds.date-formatter' = 'yyyyMMdd'
    )
    FRESHNESS = INTERVAL '1' HOUR   -- 1 時間ごとに更新。
    AS SELECT 
        ds,
        COALESCE(hh, 'day') AS hh,
        count(*) AS pv,
        count(distinct user_id) AS uv
        FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id 
    FROM `paimon`.`default`.`dwd_user_log_product`) tmp
    GROUP BY GROUPING SETS(ds, (ds, hh));

ステップ 3:マテリアライズドテーブルの更新

更新の開始

この例では、データの新鮮度は 1 時間です。[Start Update] をクリックすると、データ更新はベーステーブルより少なくとも 1 時間遅れます。

  1. 左側のナビゲーションウィンドウで、[データ系統] をクリックし、その後、対象のマテリアライズドテーブルを検索します。

    image

  2. 対応するマテリアライズドテーブルビューをクリックし、ページの右下隅にある[更新を開始]をクリックします。

既存データのバックフィル

既存データのバックフィルにより、特定のパーティションまたはテーブル全体の履歴データを再書き込みできます。これは、ストリーム処理結果の修正に利用可能です。また、まだスケジュール時刻に達していないバッチジョブに対しても、即時に書き込み・更新を行うためにバックフィルを実行できます。

dwd_user_log_product マテリアライズドテーブルビューを選択し、ページの右下隅にある [手動更新] をクリックします。 表示されるダイアログボックスで、パーティション名として実行日 (例: 20241216) を入力します。 [下流の関連マテリアライズドテーブルのカスケード更新] チェックボックスを選択し、[確認] をクリックします。 確認ダイアログボックスで、もう一度 [確認] をクリックして、対応するデータを上書きし、更新を開始します。

image

既存データのバックフィル方法については、「既存データのバックフィル」をご参照ください。

データ新鮮度の変更

必要に応じて、データ新鮮度を日次、時間単位、分単位、秒単位などに変更し、マテリアライズドテーブルの更新頻度を調整できます。

dwd_user_log_product および dws_overall のマテリアライズドテーブルのデータの鮮度を変更するには、対象テーブルのビューをクリックします。次に、ページの右下隅にある [Modify Data Freshness] をクリックし、データの鮮度をリアルタイム更新用に分レベルに設定します。

image

データ新鮮度の変更方法については、「データ新鮮度の変更」をご参照ください。

ステップ 4:マテリアライズドテーブルのクエリ

データのプレビュー

マテリアライズドテーブルの最新 100 行のデータをプレビューできます。

  1. 左側のナビゲーションウィンドウで、[データ系統] をクリックして、対象のマテリアライズドテーブルを検索できます。

  2. 対象のマテリアライズドテーブルのビューをクリックした後、ページの右下の隅にある[詳細]をクリックします。

  3. [データプレビュー]」タブで、[クエリ] アイコンをクリックします。

    image

データのクエリ

左側のナビゲーションウィンドウで [データ開発] > [データクエリ] に移動し、次の SQL 文を SQL エディターにコピーしてコードスニペットを選択してから、[実行] をクリックして dws_overall マテリアライズドテーブルにクエリを実行します。

SELECT * FROM `paimon`.`default`.dws_overall ORDER BY hh;

image

リファレンス