すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:マテリアライズドテーブルの概要:ストリームバッチ統合データレイクハウスの構築

最終更新日:Feb 09, 2025

このトピックでは、Flink マテリアライズドテーブルを使用して、ストリームバッチ統合データレイクハウスを構築する方法について説明します。 また、マテリアライズドテーブルの鮮度を調整して、バッチ実行モードからストリーミング実行モードに切り替え、リアルタイムのデータ更新を可能にする方法についても説明します。

マテリアライズドテーブルの概要

マテリアライズドテーブルは、Flink SQL の新しいテーブルタイプであり、バッチデータパイプラインとストリーミングデータパイプラインを簡素化して、一貫した開発エクスペリエンスを提供することを目的としています。 マテリアライズドテーブルを作成する場合、フィールドとタイプを宣言する必要はありません。 代わりに、必要なデータの鮮度と使用する SQL クエリを指定するだけです。 Flink エンジンは、マテリアライズドテーブルのスキーマを自動的に導出し、対応するデータリフレッシュパイプラインを作成して、指定された鮮度を実現します。 詳細については、「マテリアライズドテーブルの管理」をご参照ください。

リアルタイム データレイクハウス パイプライン

  1. Operational Data Store(ODS)レイヤーでは、Flink はデータソースから Paimon にデータを取り込みます。

  2. Data Warehouse Detail(DWD)レイヤーでは、Flink は ODS レイヤーのテーブルを結合し、結合されたデータからマテリアライズドテーブルを作成します。

  3. Data Warehouse Service(DWS)レイヤーでは、Flink は DWD レイヤーのデータから複数のマテリアライズドテーブルを作成し、それぞれがユーザーによって指定された特定の鮮度の要件を満たします。 これらのマテリアライズドテーブルは、さまざまなビジネスユースケースに対応し、外部クエリに応答します。

前提条件

  • Realtime Compute for Apache Flink ワークスペースが作成されていること。 詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。

  • RAM ユーザーまたは RAM ロールが、Realtime Compute for Apache Flink のコンソールにアクセスするために必要な権限を持っていること。 詳細については、「権限管理」をご参照ください。

ステップ 1:テストデータを準備する

  1. Paimon カタログを作成します。

    マテリアライズドテーブル機能は、ストレージシステムとして Apache Paimon に依存しています。 したがって、ファイルシステムメタストアタイプの Apache Paimon カタログを作成する必要があります。 既存の Paimon カタログがすでにセットアップされている場合は、この手順をスキップできます。 詳細については、「Apache Paimon カタログの管理」をご参照ください。

    Apache Paimon カタログを作成する

    1. Realtime Compute for Apache Flink の 管理コンソール にログインします。

    2. ターゲットワークスペースを見つけ、[コンソール] 列の [アクション] をクリックします。

    3. 左側のナビゲーションウィンドウで、[カタログ] をクリックします。 カタログリストページで、[カタログの作成] をクリックします。 表示されるウィザードの組み込みカタログタブで、[apache Paimon] を選択し、[次へ] をクリックします。

      imageパラメーター:

      パラメーター

      説明

      備考

      metastore

      メタデータストレージタイプ。

      この例では、metastore パラメーターを filesystem に設定します。

      カタログ名

      Apache Paimon カタログの名前。

      カスタム名を入力します。 この例では、paimon が使用されています。

      warehouse

      OSS で指定されたデータウェアハウスディレクトリ。

      パスは oss://<bucket>/<object> 形式である必要があります。 フィールドの説明:

      • <bucket>:OSS バケットの名前。

      • <object>:データが格納されているパス。

      OSS コンソール で bucket フィールドと object フィールドの値を表示できます。

      fs.oss.endpoint

      OSS のエンドポイント。

      OSS が Realtime Compute for Apache Flink と同じリージョンにある場合は、リージョンの内部エンドポイントを使用します。 それ以外の場合は、パブリックエンドポイントを使用します。 詳細については、「リージョンとエンドポイント」をご参照ください。

      fs.oss.accessKeyId

      OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。

      AccessKey ID の取得方法については、「AccessKey ペアの作成」をご参照ください。 セキュリティリスクを防ぐため、プレーンテキストの AccessKey ペアではなく変数を使用してください。 詳細については、「変数の管理」をご参照ください。

      fs.oss.accessKeySecret

      OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。

  2. Paimon カタログに ods_user_log テーブルと ods_dim_product テーブルを作成します。

    1. Realtime Compute for Apache Flink の 管理コンソール にログインします。

    2. ターゲットワークスペースを見つけ、[コンソール] 列の [アクション] をクリックします。

    3. 左側のナビゲーションで、[開発] > [スクリプト] を選択します。

      この例では、テーブルは paimon という名前のカタログのデフォルトデータベースに作成されます。
      CREATE TABLE `paimon`.`default`.`ods_user_log` (
        item_id INT NOT NULL,
        user_id INT NOT NULL,
        vtime TIMESTAMP(6),
        ds VARCHAR(10)
      ) 
      PARTITIONED BY(ds)
      WITH (
        'bucket' = '4',            -- バケット数を 4 に設定します。
        'bucket-key' = 'item_id'   -- バケットキー列を指定します。 同じバケットキー(item_id)を持つデータは、同じバケットに入れられます。
      );
      
      CREATE TABLE `paimon`.`default`.`ods_dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
      ) WITH (
        'bucket' = '4',
        'bucket-key' = 'item_id'
      );
    4. 右上隅にある [実行] をクリックしてコードスニペットを実行し、テーブルを作成します。

    5. 左側のナビゲーションウィンドウで、[カタログ] を選択します。 カタログウィンドウで、paimon カタログを選択し、[更新] アイコンをクリックして、作成されたテーブルを表示します。

  3. Faker コネクタ を使用してテストデータを生成し、Paimon テーブルにロードします。

    1. 左側のナビゲーションウィンドウで、[開発] > [ETL] を選択します。

    2. SQL エディターウィンドウの左上隅にある [新規] をクリックします。 新規ドラフトダイアログボックスの SQL スクリプトタブで、[空のストリームドラフト] を選択し、[次へ] をクリックします。 [作成] をクリックします。

    3. 次のコードスニペットを SQL エディターにコピーします。

      CREATE TEMPORARY TABLE `user_log` (
        item_id INT,
        user_id INT,
        vtime TIMESTAMP,  
        ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd')
      ) WITH (
        'connector' = 'faker',
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',    -- 0 から 1000 までの乱数を生成します。
        'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}',
        'fields.vtime.expression'='#{date.past ''5'',''HOURS''}',           -- 過去 5 時間以内の vtime フィールドの値を生成します。
        'rows-per-second' = '3'   -- 1 秒あたり 3 レコードを生成します。
       );
        
       CREATE TEMPORARY TABLE `dim_product` (
        item_id INT NOT NULL,
        title VARCHAR(255),
        pict_url VARCHAR(255), 
        brand_id INT,
        seller_id INT,
        PRIMARY KEY(item_id) NOT ENFORCED
       ) WITH (
        'connector' = 'faker',
        'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}',
        'fields.title.expression'='#{book.title}',
        'fields.pict_url.expression'='#{internet.domainName}',
        'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}',   
        'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}',
        'rows-per-second' = '3'   -- 1 秒あたり 3 レコードを生成します。
       );
      
      BEGIN STATEMENT SET; 
      
      INSERT INTO `paimon`.`default`.`ods_user_log` 
        SELECT 
        item_id,
        user_id,
        vtime,
        CAST(ds AS VARCHAR(10)) AS ds
      FROM `user_log`;
      INSERT INTO `paimon`.`default`.`ods_dim_product`
        SELECT 
        item_id,
        title,
        pict_url,
        brand_id,
        seller_id
      FROM `dim_product`;
      
      END; 
    4. SQL エディターウィンドウの右上隅にある [デプロイ] をクリックします。

    5. 左側のナビゲーションウィンドウで、[O&M] > [デプロイメント] を選択します。 デプロイメントページで、ターゲットデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。 表示されるジョブの開始パネルで、[初期モード] を選択し、[開始] をクリックします。

  4. テストデータをクエリします。

    左側のナビゲーションで、[開発] > [スクリプト] を選択します。 次のコードスニペットを SQL エディターにコピーし、右上隅にある [実行] をクリックします。

    SELECT * FROM `paimon`.`default`.ods_dim_product LIMIT 10;
    
    SELECT * FROM `paimon`.`default`.ods_user_log LIMIT 10;

    image

ステップ 2:マテリアライズドテーブルを作成する

このステップでは、Flink を使用して 2 つの Paimon ベーステーブルを結合し、結合された行から dwd_user_log_product という名前のマテリアライズドテーブルを作成することにより、データレイクハウスの DWD レイヤーを確立します。 その後、さまざまなビジネスアプリケーションのために dwd_user_log_product マテリアライズドテーブルから追加のマテリアライズドテーブルを作成し、それによって DWS レイヤーを構築します。

  1. dwd_user_log_product マテリアライズドテーブルを作成し、DWD レイヤーを確立します。

    1. 左側のナビゲーションウィンドウで、[カタログ] をクリックします。 カタログウィンドウで、ターゲット Paimon カタログをクリックします。

    2. ターゲットデータベース(この例では「default」)を選択し、右側のウィンドウで [マテリアライズドテーブルの作成] をクリックします。 サイドパネルで、次のコードスニペットを貼り付けて、[作成] をクリックします。

      -- DWD レイヤーを構築します。
      CREATE MATERIALIZED TABLE dwd_user_log_product(
          PRIMARY KEY (item_id) NOT ENFORCED
      )
      PARTITIONED BY(ds)
      WITH (
        'partition.fields.ds.date-formatter' = 'yyyyMMdd'
      )
      FRESHNESS = INTERVAL '1' HOUR      -- 1 時間ごとにデータを更新します。
      AS SELECT
        l.ds,
        l.item_id,
        l.user_id,
        l.vtime,
        r.brand_id,
        r.seller_id
      FROM `paimon`.`default`.`ods_user_log` l INNER JOIN `paimon`.`default`.`ods_dim_product` r
      ON l.item_id = r.item_id;
  2. dwd_user_log_product マテリアライズドテーブルからマテリアライズドテーブルを作成することにより、DWS レイヤーを構築します。

    例として、次のコードスニペットは、1 時間ごとのページビュー数とユニークビジター数を集計した dws_overall マテリアライズドテーブルを作成します。 マテリアライズドテーブルの作成手順については、前のサブステップを参照してください。

    -- 1 日のページビュー数とユニークビジター数を集計します。
    CREATE MATERIALIZED TABLE dws_overall(
        PRIMARY KEY(ds, hh) NOT ENFORCED
    )
    PARTITIONED BY(ds)
    WITH (
      'partition.fields.ds.date-formatter' = 'yyyyMMdd'
    )
    FRESHNESS = INTERVAL '1' HOUR   -- 1 時間ごとにデータを更新します。
    AS SELECT 
        ds,
        COALESCE(hh, 'day') AS hh,
        count(*) AS pv,
        count(distinct user_id) AS uv
        FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id 
    FROM `paimon`.`default`.`dwd_user_log_product`) tmp
    GROUP BY GROUPING SETS(ds, (ds, hh));

ステップ 3:マテリアライズドテーブルを更新する

マテリアライズドテーブルの更新を開始する

この例では、データの鮮度は 1 時間に設定されています。 ジョブが開始されると、マテリアライズドテーブルのデータは、ベーステーブルの更新から少なくとも 1 時間後に更新されます。

  1. 左側のナビゲーションウィンドウで、[データリネージ] をクリックし、ターゲットマテリアライズドテーブルを見つけます。

    image

  2. ターゲットマテリアライズドテーブルを選択し、ページの右下隅にある [開始] をクリックします。

データをバックフィルする

データバックフィルとは、パーティションまたはテーブル内の古いストリーミングデータを既存データに置き換えて、ストリーム処理結果の不正確さを修正することです。 バッチジョブのコンテキストでは、データバックフィルにより、事前に定義された更新間隔を待機するのではなく、すぐにデータを更新できます。

古いデータを置き換えるか、すぐにデータを更新するには、次の手順に従います。 データリネージウィンドウで、dwd_user_log_product というラベルのボックスを選択し、右下隅にある [更新のトリガー] をクリックします。 更新のトリガーダイアログボックスで、ds フィールドに今日の日付(例:20241216)を入力し、[更新範囲] チェックボックスをオンにして、更新をダウンストリームマテリアライズドテーブルにカスケードします。 [確認] をクリックします。 後続の更新の確認ダイアログボックスで、[OK] をクリックします。

image

鮮度を変更する

必要に応じて、マテリアライズドテーブルの鮮度設定を調整して、1 日以上、数時間、数分、または数秒の間隔でデータを更新できます。

dwd_user_log_product マテリアライズドテーブルと dws_overall マテリアライズドテーブルの鮮度設定を変更します。 ターゲットマテリアライズドテーブルを表すボックスを選択します。 ページの右下隅にある [鮮度の編集] をクリックします。 鮮度の編集ダイアログボックスで、ドロップダウンリストから分を選択し、目的の値を入力します。

image

ステップ 4:マテリアライズドテーブルをクエリする

データをプレビューする

マテリアライズドテーブルの最新の 100 レコードをプレビューできます。

  1. 左側のナビゲーションウィンドウで、[データリネージ] をクリックし、ターゲットマテリアライズドテーブルを見つけます。

  2. ターゲットマテリアライズドテーブルを選択し、ページの右下隅にある [詳細] をクリックします。

  3. マテリアライズドテーブルの詳細ウィンドウが表示されます。 [データプレビュー] タブで、右上隅にある [クエリ] アイコンをクリックします。

    image

データをクエリする

左側のナビゲーションウィンドウで、[開発] > [スクリプト] を選択します。 次のコードスニペットを SQL エディターにコピーし、[実行] をクリックして dws_overall マテリアライズドテーブルをクエリします。

SELECT * FROM `paimon`.`default`.dws_overall ORDER BY hh;

image

関連情報