本トピックでは、マテリアライズドテーブルを活用したストリーム/バッチ統合型データレイクハウスの構築方法について説明します。また、マテリアライズドテーブルのデータ新鮮度(Freshness)を調整することで、バッチ処理からストリーム処理へ切り替え、リアルタイムなデータ更新を実現する方法も解説します。
マテリアライズドテーブルの概要
マテリアライズドテーブルは、Flink SQL における新しいテーブルタイプであり、バッチおよびストリーム処理のデータパイプラインを簡素化し、統合された開発体験を提供します。マテリアライズドテーブルを作成する際には、フィールドやデータ型を明示的に宣言する必要はありません。代わりに、データ新鮮度とクエリ文のみを指定します。Flink エンジンが自動的にクエリ文からスキーマを推論し、指定されたデータ新鮮度を維持するためのデータパイプラインを構築します。詳細については、「マテリアライズドテーブル管理」をご参照ください。
リアルタイムデータレイクハウスのパイプライン図
-
Flink が、データソースから Paimon へデータを書き込むことで、運用データストア(ODS)レイヤーを構築します。
-
Flink が、ODS レイヤーのデータを結合・拡張し、その結果をマテリアライズドテーブルへ書き込むことで、DWD レイヤーを構築します。
-
アプリケーション向けのクエリを実行するデータウェアハウスサービス(DWS)レイヤーは、異なるデータ新鮮度レベルを持つ複数のマテリアライズドテーブルを構築し、多次元的なビジネス統計を行うことで作成できます。
前提条件
-
Realtime Compute for Apache Flink ワークスペースが作成されました。詳細については、「Realtime Compute for Apache Flink を有効化する」をご参照ください。
-
Resource Access Management (RAM) ユーザーまたは RAM ロールを使用する場合、当該 RAM ユーザーまたは RAM ロールが Realtime Compute for Apache Flink コンソールへのアクセスに必要な権限を有していることを確認してください。詳細については、「権限管理」をご参照ください。
ステップ 1:テストデータの準備
-
(任意)Paimon カタログを作成します。
マテリアライズドテーブル機能は Apache Paimon を基盤としています。メタストアタイプが Filesystem の Paimon カタログを作成する必要があります。すでに作成済みの場合は、このステップをスキップできます。詳細については、「Paimon カタログの作成」をご参照ください。
-
`ods_user_log` および `ods_dim_product` テーブルを作成します。
-
Realtime Compute for Apache Flink コンソール にログインします。
-
対象のワークスペースで、[コンソール] を [操作] 列でクリックします。
-
左側ナビゲーションウィンドウで、 を選択します。以下のコードをコピー&ペーストして、ソーステーブルを作成します。
本例では、`paimon` という名前の Paimon カタログおよびデフォルトデータベースを使用します。
CREATE TABLE `paimon`.`default`.`ods_user_log` ( item_id INT NOT NULL, user_id INT NOT NULL, vtime TIMESTAMP(6), ds VARCHAR(10) NOT NULL ) PARTITIONED BY(ds) WITH ( 'bucket' = '4', -- 4 つのバケットを指定。 'bucket-key' = 'item_id' -- バケットキーを指定。同じ item_id を持つデータは同一バケットに配置されます。 ); CREATE TABLE `paimon`.`default`.`ods_dim_product` ( item_id INT NOT NULL, title VARCHAR(255), pict_url VARCHAR(255), brand_id INT, seller_id INT, PRIMARY KEY(item_id) NOT ENFORCED ) WITH ( 'bucket' = '4', 'bucket-key' = 'item_id' ); -
右上隅の[実行]をクリックして、対応するデータテーブルを作成します。
-
左側のナビゲーションウィンドウで、[Data Management] を選択します。次に、対応する Paimon カタログをクリックし、[Refresh] をクリックして、新しいテーブルを表示します。
-
-
Faker データジェネレーター コネクタを使用して、シミュレートされたデータを生成し、Paimon テーブルへ書き込みます。
-
左側ナビゲーションウィンドウで、 を選択します。
-
[新規]をクリックし、[空のストリーム下書き]を選択して、[次へ]をクリックし、次に[作成]をクリックします。
-
以下の SQL 文をエディターにコピーします。
CREATE TEMPORARY TABLE `user_log` ( item_id INT, // 製品 ID user_id INT, // ユーザー ID vtime TIMESTAMP, ds AS DATE_FORMAT(CURRENT_DATE,'yyyyMMdd') ) WITH ( 'connector' = 'faker', -- Faker コネクタ 'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}', -- 0 ~ 1000 の乱数を生成。 'fields.user_id.expression'='#{number.numberBetween ''0'',''100''}', 'fields.vtime.expression'='#{date.past ''5'',''HOURS''}', -- 現在時刻から過去 5 時間分のデータを生成。 'rows-per-second' = '3' -- 1 秒あたり 3 行を生成。 ); CREATE TEMPORARY TABLE `dim_product` ( item_id INT NOT NULL, title VARCHAR(255), pict_url VARCHAR(255), brand_id INT, seller_id INT, PRIMARY KEY(item_id) NOT ENFORCED ) WITH ( 'connector' = 'faker', -- Faker コネクタ 'fields.item_id.expression'='#{number.numberBetween ''0'',''1000''}', 'fields.title.expression'='#{book.title}', 'fields.pict_url.expression'='#{internet.domainName}', 'fields.brand_id.expression'='#{number.numberBetween ''1000'',''10000''}', 'fields.seller_id.expression'='#{number.numberBetween ''1000'',''10000''}', 'rows-per-second' = '3' -- 1 秒あたり 3 行を生成。 ); BEGIN STATEMENT SET; INSERT INTO `paimon`.`default`.`ods_user_log` SELECT item_id, user_id, vtime, CAST(ds AS VARCHAR(10)) AS ds FROM `user_log`; INSERT INTO `paimon`.`default`.`ods_dim_product` SELECT item_id, title, pict_url, brand_id, seller_id FROM `dim_product`; END; -
右上隅で、[デプロイ]をクリックします。
-
左側のナビゲーションウィンドウで、 をクリックします。対象のデプロイメントについて、[操作] 列の [開始] をクリックし、[ステートレス開始] を選択してから、[開始] をクリックします。
-
-
シミュレートされたデータをクエリします。
左側のナビゲーションウィンドウで、 を選択します。次のSQL文をSQL エディターにコピーし、右上隅の[実行]をクリックします。
SELECT * FROM `paimon`.`default`.ods_dim_product LIMIT 10; SELECT * FROM `paimon`.`default`.ods_user_log LIMIT 10;
ステップ 2:マテリアライズドテーブルの作成
本セクションでは、ソーステーブルを結合して DWD レイヤー向けの `dwd_user_log_product` マテリアライズドテーブルを作成する方法について説明します。その後、`dwd_user_log_product` テーブルを基に、ビジネス分析を実行するための下流マテリアライズドテーブル(DWS レイヤー)を構築します。
-
データウェアハウスの DWD レイヤーを、`dwd_user_log_product` マテリアライズドテーブルの作成によって構築します。
-
左側のナビゲーションウィンドウで、Data Management を選択し、対象の Paimon カタログをクリックします。
-
対象のデータベース (この例では「default」) をクリックしてから、[マテリアライズドテーブルの作成] をクリックします。次の SQL 文を SQL エディターにコピーし、[作成] をクリックします。
-- DWD レイヤーのワイドニングロジック CREATE MATERIALIZED TABLE dwd_user_log_product( PRIMARY KEY (item_id) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR -- 1 時間ごとに更新。 AS SELECT l.ds, l.item_id, l.user_id, l.vtime, r.brand_id, r.seller_id FROM `paimon`.`default`.`ods_user_log` l INNER JOIN `paimon`.`default`.`ods_dim_product` r ON l.item_id = r.item_id;
-
-
データウェアハウスの DWS レイヤーを構築し、`dwd_user_log_product` マテリアライズドテーブルに基づいて多次元的なビジネス分析を実行します。
本トピックでは、`dws_overall` マテリアライズドテーブルの作成を例として説明します。このテーブルは、時間単位での日次ページビュー(PV)およびユニークビジター(UV)を計算します。前のセクションの手順に従って、`dws_overall` マテリアライズドテーブルを作成できます。
// 日次 PV および UV の計算。 CREATE MATERIALIZED TABLE dws_overall( PRIMARY KEY(ds, hh) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'partition.fields.ds.date-formatter' = 'yyyyMMdd' ) FRESHNESS = INTERVAL '1' HOUR -- 1 時間ごとに更新。 AS SELECT ds, COALESCE(hh, 'day') AS hh, count(*) AS pv, count(distinct user_id) AS uv FROM (SELECT ds, date_format(vtime, 'HH') AS hh, user_id FROM `paimon`.`default`.`dwd_user_log_product`) tmp GROUP BY GROUPING SETS(ds, (ds, hh));
ステップ 3:マテリアライズドテーブルの更新
更新の開始
この例では、データの新鮮度は 1 時間です。[Start Update] をクリックすると、データ更新はベーステーブルより少なくとも 1 時間遅れます。
-
左側のナビゲーションウィンドウで、[データ系統] をクリックし、その後、対象のマテリアライズドテーブルを検索します。

-
対応するマテリアライズドテーブルビューをクリックし、ページの右下隅にある[更新を開始]をクリックします。
既存データのバックフィル
既存データのバックフィルにより、特定のパーティションまたはテーブル全体の履歴データを再書き込みできます。これは、ストリーム処理結果の修正に利用可能です。また、まだスケジュール時刻に達していないバッチジョブに対しても、即時に書き込み・更新を行うためにバックフィルを実行できます。
dwd_user_log_product マテリアライズドテーブルビューを選択し、ページの右下隅にある [手動更新] をクリックします。 表示されるダイアログボックスで、パーティション名として実行日 (例: 20241216) を入力します。 [下流の関連マテリアライズドテーブルのカスケード更新] チェックボックスを選択し、[確認] をクリックします。 確認ダイアログボックスで、もう一度 [確認] をクリックして、対応するデータを上書きし、更新を開始します。

既存データのバックフィル方法については、「既存データのバックフィル」をご参照ください。
データ新鮮度の変更
必要に応じて、データ新鮮度を日次、時間単位、分単位、秒単位などに変更し、マテリアライズドテーブルの更新頻度を調整できます。
dwd_user_log_product および dws_overall のマテリアライズドテーブルのデータの鮮度を変更するには、対象テーブルのビューをクリックします。次に、ページの右下隅にある [Modify Data Freshness] をクリックし、データの鮮度をリアルタイム更新用に分レベルに設定します。

データ新鮮度の変更方法については、「データ新鮮度の変更」をご参照ください。
ステップ 4:マテリアライズドテーブルのクエリ
データのプレビュー
マテリアライズドテーブルの最新 100 行のデータをプレビューできます。
-
左側のナビゲーションウィンドウで、[データ系統] をクリックして、対象のマテリアライズドテーブルを検索できます。
-
対象のマテリアライズドテーブルのビューをクリックした後、ページの右下の隅にある[詳細]をクリックします。
-
「[データプレビュー]」タブで、[クエリ] アイコンをクリックします。

データのクエリ
左側のナビゲーションウィンドウで に移動し、次の SQL 文を SQL エディターにコピーしてコードスニペットを選択してから、[実行] をクリックして dws_overall マテリアライズドテーブルにクエリを実行します。
SELECT * FROM `paimon`.`default`.dws_overall ORDER BY hh;

リファレンス
-
マテリアライズドテーブルの詳細については、「マテリアライズドテーブル管理」をご参照ください。
-
マテリアライズドテーブルの作成および使用方法については、「マテリアライズドテーブルの作成と使用」をご参照ください。
