ユーザー行動データは、その量の多さとフォーマットの多様性から、処理が困難です。従来のワイドテーブルモデルは効率的なクエリパフォーマンスを提供しますが、高いデータ冗長性、増加するストレージコスト、維持の難しさ、更新の遅さといった問題があります。このトピックでは、Realtime Compute for Apache Flink、ApsaraDB for MongoDB、Hologres を活用して効率的なユーザー行動分析を行う方法について説明します。
アーキテクチャと利点
アーキテクチャ
Realtime Compute for Apache Flink は、大量のリアルタイムデータを効率的に処理できる強力なストリーム処理フレームワークです。ApsaraDB for MongoDB は、柔軟なデータスキーマを提供し、優れた読み書きパフォーマンスを実現し、複雑なクエリ条件をサポートするドキュメント指向の NoSQL データベースです。Hologres は、リアルタイムのデータ書き込みと更新をサポートするオールインワンのデータウェアハウスであり、データは書き込み後すぐにクエリできます。これら 3 つのサービスはシームレスに連携して、多様なデータを一元的に処理し、大規模なデータセットのリアルタイム更新、クエリ、分析を可能にします。アーキテクチャは次のとおりです。
Flink ジョブは MongoDB から変更データキャプチャ (CDC) ストリームを読み取ります。データが更新されると、Flink ジョブは Upsert Kafka コネクタを使用して、更新されたデータのプライマリキー (PK) を Kafka にストリーミングします。
ディメンションテーブルが更新されると、Flink ジョブはルックアップ結合を実行してファクトテーブル内の対応するレコードを特定し、影響を受けるファクトテーブル内のデータの PK を Kafka に書き込みます。
2 番目の Flink ジョブは Kafka から PK を消費し、MongoDB のファクトテーブルとディメンションテーブルに対してルックアップ結合を実行して完全なワイドテーブルレコードを再構築し、Hologres のデータを更新します。
利点
このソリューションには次の利点があります。
高い書き込みスループットとスケーラブルなストレージ: ApsaraDB for MongoDB は、特にシャードクラスターにおいて、高同時実行性の読み書きワークロードに優れています。パフォーマンスとストレージを拡張して、大量の頻繁なデータ書き込みに対応できるため、潜在的な書き込みのボトルネックやストレージの制限を解消します。
効率的な変更伝播: MongoDB のファクトテーブルまたはディメンションテーブルが更新されると、Flink は影響を受けるファクトテーブルレコードの PK のみを特定して Kafka にストリーミングします。この変更駆動型のアプローチにより、総データ量に関係なく、関連データのみが再処理されるため、効率的な更新が保証されます。
リアルタイムのクエリと分析: Hologres は、効率的で低待機時間の更新をサポートし、データが書き込まれた直後にクエリを実行できます。これにより、新しく更新されたテーブルでリアルタイムのデータ分析が可能になります。
ハンズオン
このセクションでは、Flink を使用してゲーマーの購入データをリアルタイムで処理し、ファクトテーブルとディメンションテーブルを結合してワイドテーブルを構築し、結果を Hologres に書き込んで即時クエリを実行する方法をデモンストレーションします。
game_sales
sale_id | game_id | platform_id | sale_date | units_sold | sale_amt | status |
game_dimension
game_id | game_name | release_date | developer | publisher |
platform_dimension
platform_id | platform_name | type |
game_sales_details
sale_id | game_id | platform_id | sale_date | units_sold | sale_amt | status |
game_name | release_date | developer | publisher | platform_name | type |
ソリューションの説明
このソリューションは、ディメンションテーブルへの変更をリアルタイムで Hologres に複製します。このプロセスは、次の 4 つのステップで構成されます。
変更のキャプチャ: MongoDB のディメンションテーブルからリアルタイムのデータ変更をキャプチャします。
ディメンションテーブルの変更の伝播: ディメンションテーブルが変更されると、Flink ジョブはルックアップ結合 (例:
game_id) を使用して MongoDB ファクトテーブル内の影響を受けるレコードを特定し、sale_idなどのプライマリキーを抽出します。再計算のトリガー: プライマリキーを Kafka に送信して、下流のジョブに必要なデータのリフレッシュを通知します。
増分更新: 最新のデータを取得し、ワイドテーブルを再構築し、データを Hologres に Upsert します。
前提条件
Realtime Compute for Apache Flink ワークスペースが作成されています。Realtime Compute for Apache Flink は VVR 8.0.5 以降を使用する必要があります。詳細については、「Realtime Compute for Apache Flink をアクティブ化する」をご参照ください。
バージョン 4.0 以降の ApsaraDB for MongoDB インスタンスが作成されています。詳細については、「シャードクラスターインスタンスを作成する」をご参照ください。
バージョン 1.3 以降の専用の Hologres インスタンスが作成されています。詳細については、「Hologres インスタンスを購入する」をご参照ください。
ApsaraMQ for Kafka インスタンスが作成されていること。詳細については、「ApsaraMQ for Kafka インスタンスのデプロイ」をご参照ください。
Realtime Compute for Apache Flink ワークスペース、ApsaraDB for MongoDB インスタンス、Hologres インスタンス、および ApsaraMQ for Kafka インスタンスは、同じ VPC に存在します。同じ VPC に存在しない場合は、VPC 間の接続を確立するか、Realtime Compute for Apache Flink がインターネット経由で他のサービスにアクセスできるようにする必要があります。詳細については、「Realtime Compute for Apache Flink は VPC 間でサービスにどのようにアクセスしますか?」および「Realtime Compute for Apache Flink はインターネットにどのようにアクセスしますか?」をご参照ください。
RAM ユーザーまたは RAM ロールは、関連リソースへのアクセス権を持っています。
ステップ 1: データの準備
ApsaraDB for MongoDB インスタンスにデータベースと 3 つのテーブルを作成します。
Flink ワークスペースの CIDR ブロックを ApsaraDB for MongoDB インスタンスのホワイトリストに追加します。詳細については、「インスタンスのホワイトリストの設定」および「ホワイトリストを設定するにはどうすればよいですか?」をご参照ください。
Data Management (DMS) コンソールの SQL エディターで、次のコマンドを実行して
mongo_testという名前のデータベースを作成します。use mongo_test;データベースに
game_sales、game_dimension、platform_dimensionテーブルを作成し、テーブルにデータを挿入します。// ゲーム販売テーブル (status は論理フィールドで、1 は存在、0 は削除を意味します) db.game_sales.insert( [ {sale_id:0,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500,status:1}, ] ); // ゲームディメンションテーブル db.game_dimension.insert( [ {game_id:101,"game_name":"SpaceInvaders","release_date":"2023-06-15","developer":"DevCorp","publisher":"PubInc"}, {game_id:102,"game_name":"PuzzleQuest","release_date":"2023-07-20","developer":"PuzzleDev","publisher":"QuestPub"}, {game_id:103,"game_name":"RacingFever","release_date":"2023-08-10","developer":"SpeedCo","publisher":"RaceLtd"}, {game_id:104,"game_name":"AdventureLand","release_date":"2023-09-05","developer":"Adventure","publisher":"LandCo"}, ] ); // プラットフォームディメンションテーブル db.platform_dimension.insert( [ {platform_id:1,"platform_name":"PCGaming","type":"PC"}, {platform_id:2,"platform_name":"PlayStation","type":"Console"}, {platform_id:3,"platform_name":"Mobile","type":"Mobile"} ] );テーブル内のデータをクエリします。
db.game_sales.find(); db.game_dimension.find(); db.platform_dimension.find();
Hologres にテーブルを作成します。
Hologres コンソールにログオンし、左側のナビゲーションウィンドウで [インスタンス] をクリックし、アクセスしたい Hologres インスタンスをクリックします。右上隅にある [インスタンスに接続] をクリックします。
上部のナビゲーションバーで [メタデータ管理] をクリックし、次に [データベースの作成] をクリックします。ポップアップウィンドウの [データベース名] フィールドに
testと入力し、[ポリシー] を [SPM] に設定します。詳細については、「データベースの作成」をご参照ください。
上部のナビゲーションバーで [SQL エディター] をクリックします。クエリサイドペインの右上隅にある SQL アイコンをクリックして SQL クエリを作成します。ターゲットインスタンスとデータベースを選択し、次のコードスニペットをコピーして
game_sales_detailsという名前のテーブルを作成します。CREATE TABLE game_sales_details( sale_id INT not null primary key, game_id INT, platform_id INT, sale_date VARCHAR(50), units_sold INT, sale_amt INT, status INT, game_name VARCHAR(50), release_date VARCHAR(50), developer VARCHAR(50), publisher VARCHAR(50), platform_name VARCHAR(50), type VARCHAR(50) );
ApsaraMQ for Kafka トピックを作成します。
ApsaraMQ for Kafka コンソールにログオンします。左側のナビゲーションウィンドウで [インスタンス] をクリックし、お使いのインスタンスをクリックします。
左側のナビゲーションウィンドウで [ホワイトリスト管理] をクリックします。Flink ワークスペースの CIDR ブロックを、新しく作成した、または既存のホワイトリストに追加します。
左側のナビゲーションウィンドウで [Topics] をクリックします。[Topic の作成] を選択します。表示される右側のペインで、名前フィールドに
game_sales_factと入力し、説明フィールドに説明を入力し、他のフィールドはデフォルト値を使用します。[OK] をクリックします。
ステップ 2: ストリームジョブの作成
ジョブ 1: game_sales のデータの PK を Kafka に書き込む
game_sales テーブルまたはディメンションテーブルが更新されると、影響を受けるプライマリキー (sale_id) が Kafka に書き込まれます。game_sales が更新された場合、その sale_id は直接書き込まれます。ディメンションテーブルが更新された場合、game_sales とのルックアップ結合により関連する sale_id が取得され、それが Kafka に書き込まれます。
Realtime Compute for Apache Flink コンソール にログオンします。
ワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションメニューで、 をクリックします。
をクリックします。
[新しいドラフト] ダイアログで、[名前] に
dwd_mongo_kafkaと入力し、エンジンバージョンを選択します。[作成] をクリックします。
コードを記述します。
このコードでは、MongoDB コネクタを使用してソーステーブル
game_salesを作成し、Upsert Kafka コネクタを使用して Kafka Topicgame_sales_factを作成します。セキュリティを確保するため、コードにプレーンテキストパスワードをハードコーディングすることは避けてください。代わりに、パスワードなどの機密情報には変数を使用します。詳細については、「変数の管理」をご参照ください。3 つの
INSERT文は、game_sales、game_dimension、platform_dimensionからの変更を独立してキャプチャします。これらの変更は、sale_idを介してリアルタイムで下流のジョブにストリーミングされ、Hologres テーブルが正確、完全、かつリアルタイムに更新されることを保証します。-- game_sales テーブルを作成 CREATE TEMPORARY TABLE game_sales ( `_id` STRING, -- MongoDB が自動生成した ID sale_id INT, -- 販売 ID PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', -- MongoDB コネクタを使用 'uri' = '${secret_values.MongoDB-URI}', -- MongoDB URI の変数 'database' = 'mongo_test', -- MongoDB データベース名 'collection' = 'game_sales' -- MongoDB テーブル名 ); -- game_dimension テーブルを作成 CREATE TEMPORARY TABLE game_dimension ( `_id` STRING, game_id INT, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = '${secret_values.MongoDB-URI}', 'database' = 'mongo_test', 'collection' = 'game_dimension' ); -- platform_dimension テーブルを作成 CREATE TEMPORARY TABLE platform_dimension ( `_id` STRING, platform_id INT, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = '${secret_values.MongoDB-URI}', 'database' = 'mongo_test', 'collection' = 'platform_dimension' ); -- game_sales_dim テーブルを作成 CREATE TEMPORARY TABLE game_sales_dim ( `_id` STRING, sale_id INT, game_id INT, platform_id INT, PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', 'uri' = '${secret_values.MongoDB-URI}', 'database' = 'mongo_test', 'collection' = 'game_sales' ); -- PK を格納するための Kafka シンクを作成 CREATE TEMPORARY TABLE game_sales_fact ( sale_id INT, PRIMARY KEY (sale_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}', 'topic' = 'game_sales_fact', 'key.format' = 'json', 'value.format' = 'json', 'properties.enable.idempotence' = 'false' -- ApsaraMQ for Kafka シンクに書き込む場合は、このオプションを無効にします ); BEGIN STATEMENT SET; -- game_sales テーブルの PK を挿入 INSERT INTO game_sales_fact ( sale_id ) SELECT sale_id FROM game_sales ; -- game_dimension テーブルを game_sales_dim テーブルと結合します。データが更新された場合は、影響を受ける sale_id の PK を Kafka シンクに挿入します。 INSERT INTO game_sales_fact ( sale_id ) select gs.sale_id from game_dimension as gd join game_sales_dim FOR SYSTEM_TIME AS OF PROCTIME() as gs on gd.game_id = gs.game_id; -- platform_dimension テーブルを game_sales_dim テーブルと結合します。データが更新された場合は、影響を受ける sale_id を Kafka シンクに挿入します。 INSERT INTO game_sales_fact ( sale_id ) select gs.sale_id from platform_dimension as pd join game_sales_dim FOR SYSTEM_TIME AS OF PROCTIME() as gs on pd.platform_id = gs.platform_id; END;説明この例では、Upsert Kafka コネクタを使用しています。標準の Kafka コネクタとの違いについては、「Kafka、Upsert Kafka、または Kafka JSON カタログの選択」をご参照ください。
ルックアップ結合: ルックアップ結合は、外部データソースのディメンションテーブルをファクトテーブルに関連付けて、データストリームをエンリッチします。前提条件は次のとおりです。処理時間属性が定義され、同等の結合条件が存在すること。ディメンションテーブルとファクトテーブルのルックアップ結合の詳細については、「ディメンションテーブルの JOIN 文」をご参照ください。次のコードスニペットでは、処理時間属性は
FOR SYSTEM_TIME AS OF句を使用して定義されています。これにより、各game_sales行が、game_sales行が結合演算子によって処理される時点で結合述語に一致する対応するディメンションテーブル行と結合されることが保証されます。処理時間属性を定義すると、結合されたディメンションテーブル行が将来更新されたときに結合結果が更新されるのを防ぐこともできます。次の SQL コードスニペットでは、同等の結合条件はgd.game_id = gsf.game_idとpd.platform_id = gsf.platform_idです。
右上隅で [デプロイ] をクリックします。ダイアログボックスで [確認] をクリックします。
詳細については、「ジョブのデプロイ」をご参照ください。
ジョブ 2: Kafka からのプライマリキーに基づいて MongoDB テーブルを結合し、Hologres テーブルに部分的な更新を実行する
game_sales を変更された PK および個々のディメンションテーブルと結合するジョブを作成します。このジョブは、いずれかのテーブルのデータが更新されたときに、影響を受けるデータを Hologres にストリーミングします。
ジョブ 1 の手順に従って、dws_kafka_mongo_holo という名前の新しいドラフトを作成し、デプロイします。次のコードでは、Hologres コネクタを使用してシンクテーブル game_sales_details を作成します。
このジョブは、game_sales_factKafka Topic からsale_idを消費します。次に、これらの Kafka から供給されたsale_idに基づいて、MongoDB のgame_salesファクトテーブルと関連するディメンションテーブルの間で時間的なルックアップ結合 (FOR SYSTEM_TIME AS OF PROCTIME()を使用) を実行します。結果は、Hologres のgame_sales_detailsテーブルにデータを Upsert するために使用され、リアルタイムで正確かつ完全な状態の反映を保証します。
-- プライマリキーを格納および消費するための Kafka テーブルを作成
CREATE TEMPORARY TABLE game_sales_fact
(
sale_id INT
,PRIMARY KEY (sale_id) NOT ENFORCED
)
WITH (
'connector' = 'upsert-kafka'
,'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}'
,'topic' = 'game_sales_fact'
,'key.format' = 'json'
,'value.format' = 'json'
,'properties.group.id' = 'game_sales_fact'
,'properties.auto.offset.reset' = 'earliest'
);
-- game_sales テーブルを作成
CREATE TEMPORARY TABLE game_sales
(
`_id` STRING,
sale_id INT,
game_id INT,
platform_id INT,
sale_date STRING,
units_sold INT,
sale_amt INT,
status INT,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'uri' = '${secret_values.MongoDB-URI}',
'database' = 'mongo_test',
'collection' = 'game_sales'
);
-- game_dimension テーブルを作成
CREATE TEMPORARY TABLE game_dimension
(
`_id` STRING,
game_id INT,
game_name STRING,
release_date STRING,
developer STRING,
publisher STRING,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'uri' = '${secret_values.MongoDB-URI}',
'database' = 'mongo_test',
'collection' = 'game_dimension'
);
-- platform_dimension テーブルを作成
CREATE TEMPORARY TABLE platform_dimension
(
`_id` STRING
,platform_id INT
,platform_name STRING
,type STRING
,PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'uri' = '${secret_values.MongoDB-URI}',
'database' = 'mongo_test',
'collection' = 'platform_dimension'
);
-- game_sales_details シンクテーブルを作成
CREATE TEMPORARY TABLE IF NOT EXISTS game_sales_details
(
sale_id INT,
game_id INT,
platform_id INT,
sale_date STRING,
units_sold INT,
sale_amt INT,
status INT,
game_name STRING,
release_date STRING,
developer STRING,
publisher STRING,
platform_name STRING,
type STRING,
PRIMARY KEY (sale_id) NOT ENFORCED
)
WITH (
-- VVR 11 用の Hologres コネクタオプション。
'connector' = 'hologres',
'dbname' = 'test', -- Hologres データベース名
'tablename' = 'public.game_sales_details', -- Hologres テーブル名
'username' = '${secret_values.AccessKeyID}', -- Alibaba Cloud アカウントの AccessKey ID
'password' = '${secret_values.AccessKeySecret}', -- Alibaba Cloud アカウントの AccessKey シークレット
'endpoint' = '${secret_values.Hologres-endpoint}', -- Hologres インスタンスの VPC エンドポイント
'sink.delete-strategy'='IGNORE_DELETE', -- リトラクトされたメッセージを処理するためのポリシー。IGNORE_DELETE はデータを挿入または更新し、削除はしません。
'sink.on-conflict-action'='INSERT_OR_UPDATE', -- 更新モード。これを有効にすると、部分的な列の更新が実行されます。
'sink.partial-insert.enabled'='true'
);
INSERT INTO game_sales_details (
sale_id,
game_id,
platform_id,
sale_date,
units_sold,
sale_amt,
status,
game_name,
release_date,
developer,
publisher,
platform_name,
type
)
select
gsf.sale_id,
gs.game_id,
gs.platform_id,
gs.sale_date,
gs.units_sold,
gs.sale_amt,
gs.status,
gd.game_name,
gd.release_date,
gd.developer,
gd.publisher,
pd.platform_name,
pd.type
from game_sales_fact as gsf
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on gsf.sale_id = gs.sale_id
join game_dimension FOR SYSTEM_TIME AS OF PROCTIME() as gd
on gs.game_id = gd.game_id
join platform_dimension FOR SYSTEM_TIME AS OF PROCTIME() as pd
on gs.platform_id = pd.platform_id;ステップ 3: ジョブの開始
開発コンソールの左側のナビゲーションメニューで、 を選択します。[デプロイメント] ページで、2 つのジョブデプロイメントを開始します。
ジョブのステータスが実行中に変わったら、HoloWeb に移動して
game_sales_detailsテーブルをクエリします。SELECT * FROM game_sales_details;テーブルに新しいデータ行が挿入されていることがわかります。

ステップ 4: データを更新してクエリする
MongoDB の game_sales テーブルとディメンションテーブルへの更新は、クエリと分析のために Hologres に同期されます。次の例は、データがリアルタイムでどのように更新されるかを示しています。
game_sales テーブルの更新
MongoDB の
game_salesテーブルに 5 行を挿入します。db.game_sales.insert( [ {sale_id:1,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500,status:1}, {sale_id:2,game_id:102,platform_id:2,"sale_date":"2024-08-02",units_sold:400,sale_amt:2000,status:1}, {sale_id:3,game_id:103,platform_id:1,"sale_date":"2024-08-03",units_sold:300,sale_amt:1500,status:1}, {sale_id:4,game_id:101,platform_id:3,"sale_date":"2024-08-04",units_sold:200,sale_amt:1000,status:1}, {sale_id:5,game_id:104,platform_id:2,"sale_date":"2024-08-05",units_sold:100,sale_amt:3000,status:1} ] );Hologres の
game_sales_detailsテーブルをクエリすると、対応する行が挿入されていることがわかります。
MongoDB の
game_salesテーブルのsale_dateフィールドの値を '2024-01-01' から '2024-08-01' に変更します。db.game_sales.updateMany({"sale_date": "2024-01-01"}, {$set: {"sale_date": "2024-08-01"}});Hologres の
game_sales_detailsテーブルをクエリします。sale_date フィールドの値が '2024-01-01' から '2024-08-01' に変更されていることがわかります。
ApsaraDB for MongoDB の
game_salesテーブルで、sale_idが 5 の行のstatusの値を 0 に変更して、論理削除を実行します。db.game_sales.updateMany({"sale_id": 5}, {$set: {"status": 0}});Hologres の
game_sales_detailsテーブルをクエリします。sale_idが 5 のレコードのstatus列の値が 0 に変更されていることがわかります。これで論理削除は完了です。
ディメンションテーブルの更新
game_dimensionテーブルとplatform_dimensionテーブルに新しい行を挿入します。// ゲームディメンションテーブル db.game_dimension.insert( [ {game_id:105,"game_name":"HSHWK","release_date":"2024-08-20","developer":"GameSC","publisher":"GameSC"}, {game_id:106,"game_name":"HPBUBG","release_date":"2018-01-01","developer":"BLUE","publisher":"KK"} ] ); // プラットフォームディメンションテーブル db.platform_dimension.insert( [ {platform_id:4,"platform_name":"Steam","type":"PC"}, {platform_id:5,"platform_name":"Epic","type":"PC"} ] );ディメンションテーブルに新しいデータを挿入するだけでは、MongoDB から Hologres へのデータ同期は開始されません。この同期プロセスは、購入やダウンロードに起因する
game_salesテーブルへの変更によってトリガーされます。したがって、game_salesテーブルにデータを挿入する必要があります。// ゲーム販売テーブル db.game_sales.insert( [ {sale_id:6,game_id:105,platform_id:4,"sale_date":"2024-09-01",units_sold:400,sale_amt:2000,,status:1}, {sale_id:7,game_id:106,platform_id:1,"sale_date":"2024-09-01",units_sold:300,sale_amt:1500,,status:1} ] );Hologres の
game_sales_detailsテーブルをクエリして更新を確認します。2 つの新しいデータレコードが挿入されていることがわかります。
MongoDB の
game_dimensionテーブルとplatform_dimensionテーブルのデータを更新します。// リリース日を更新 db.game_dimension.updateMany({"release_date": "2018-01-01"}, {$set: {"release_date": "2024-01-01"}}); // プラットフォームタイプを更新 db.platform_dimension.updateMany({"type": "PC"}, {$set: {"type": "Swich"}});Hologres テーブルで関連フィールドが更新されていることがわかります。
