すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink、MongoDB、Hologres を使用したユーザー行動分析

最終更新日:Nov 10, 2025

ユーザー行動データは、その量の多さとフォーマットの多様性から、処理が困難です。従来のワイドテーブルモデルは効率的なクエリパフォーマンスを提供しますが、高いデータ冗長性、増加するストレージコスト、維持の難しさ、更新の遅さといった問題があります。このトピックでは、Realtime Compute for Apache Flink、ApsaraDB for MongoDB、Hologres を活用して効率的なユーザー行動分析を行う方法について説明します。

アーキテクチャと利点

アーキテクチャ

Realtime Compute for Apache Flink は、大量のリアルタイムデータを効率的に処理できる強力なストリーム処理フレームワークです。ApsaraDB for MongoDB は、柔軟なデータスキーマを提供し、優れた読み書きパフォーマンスを実現し、複雑なクエリ条件をサポートするドキュメント指向の NoSQL データベースです。Hologres は、リアルタイムのデータ書き込みと更新をサポートするオールインワンのデータウェアハウスであり、データは書き込み後すぐにクエリできます。これら 3 つのサービスはシームレスに連携して、多様なデータを一元的に処理し、大規模なデータセットのリアルタイム更新、クエリ、分析を可能にします。アーキテクチャは次のとおりです。

  1. Flink ジョブは MongoDB から変更データキャプチャ (CDC) ストリームを読み取ります。データが更新されると、Flink ジョブは Upsert Kafka コネクタを使用して、更新されたデータのプライマリキー (PK) を Kafka にストリーミングします。

  2. ディメンションテーブルが更新されると、Flink ジョブはルックアップ結合を実行してファクトテーブル内の対応するレコードを特定し、影響を受けるファクトテーブル内のデータの PK を Kafka に書き込みます。

  3. 2 番目の Flink ジョブは Kafka から PK を消費し、MongoDB のファクトテーブルとディメンションテーブルに対してルックアップ結合を実行して完全なワイドテーブルレコードを再構築し、Hologres のデータを更新します。

利点

このソリューションには次の利点があります。

  • 高い書き込みスループットとスケーラブルなストレージ: ApsaraDB for MongoDB は、特にシャードクラスターにおいて、高同時実行性の読み書きワークロードに優れています。パフォーマンスとストレージを拡張して、大量の頻繁なデータ書き込みに対応できるため、潜在的な書き込みのボトルネックやストレージの制限を解消します。

  • 効率的な変更伝播: MongoDB のファクトテーブルまたはディメンションテーブルが更新されると、Flink は影響を受けるファクトテーブルレコードの PK のみを特定して Kafka にストリーミングします。この変更駆動型のアプローチにより、総データ量に関係なく、関連データのみが再処理されるため、効率的な更新が保証されます。

  • リアルタイムのクエリと分析: Hologres は、効率的で低待機時間の更新をサポートし、データが書き込まれた直後にクエリを実行できます。これにより、新しく更新されたテーブルでリアルタイムのデータ分析が可能になります。

ハンズオン

このセクションでは、Flink を使用してゲーマーの購入データをリアルタイムで処理し、ファクトテーブルとディメンションテーブルを結合してワイドテーブルを構築し、結果を Hologres に書き込んで即時クエリを実行する方法をデモンストレーションします。

game_sales

sale_id

game_id

platform_id

sale_date

units_sold

sale_amt

status

game_dimension

game_id

game_name

release_date

developer

publisher

platform_dimension

platform_id

platform_name

type

game_sales_details

sale_id

game_id

platform_id

sale_date

units_sold

sale_amt

status

game_name

release_date

developer

publisher

platform_name

type

ソリューションの説明

このソリューションは、ディメンションテーブルへの変更をリアルタイムで Hologres に複製します。このプロセスは、次の 4 つのステップで構成されます。

  1. 変更のキャプチャ: MongoDB のディメンションテーブルからリアルタイムのデータ変更をキャプチャします。

  2. ディメンションテーブルの変更の伝播: ディメンションテーブルが変更されると、Flink ジョブはルックアップ結合 (例: game_id) を使用して MongoDB ファクトテーブル内の影響を受けるレコードを特定し、sale_id などのプライマリキーを抽出します。

  3. 再計算のトリガー: プライマリキーを Kafka に送信して、下流のジョブに必要なデータのリフレッシュを通知します。

  4. 増分更新: 最新のデータを取得し、ワイドテーブルを再構築し、データを Hologres に Upsert します。

前提条件

ステップ 1: データの準備

  1. ApsaraDB for MongoDB インスタンスにデータベースと 3 つのテーブルを作成します。

    1. ApsaraDB for MongoDB インスタンスにログオンします

    2. Flink ワークスペースの CIDR ブロックを ApsaraDB for MongoDB インスタンスのホワイトリストに追加します。詳細については、「インスタンスのホワイトリストの設定」および「ホワイトリストを設定するにはどうすればよいですか?」をご参照ください。

    3. Data Management (DMS) コンソールの SQL エディターで、次のコマンドを実行して mongo_test という名前のデータベースを作成します。

      use mongo_test;
    4. データベースに game_salesgame_dimensionplatform_dimension テーブルを作成し、テーブルにデータを挿入します。

      // ゲーム販売テーブル (status は論理フィールドで、1 は存在、0 は削除を意味します)
      db.game_sales.insert(
        [
      	{sale_id:0,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500,status:1},
        ]
      );
      
      // ゲームディメンションテーブル
      db.game_dimension.insert(
        [
      	{game_id:101,"game_name":"SpaceInvaders","release_date":"2023-06-15","developer":"DevCorp","publisher":"PubInc"},
      	{game_id:102,"game_name":"PuzzleQuest","release_date":"2023-07-20","developer":"PuzzleDev","publisher":"QuestPub"},
      	{game_id:103,"game_name":"RacingFever","release_date":"2023-08-10","developer":"SpeedCo","publisher":"RaceLtd"},
      	{game_id:104,"game_name":"AdventureLand","release_date":"2023-09-05","developer":"Adventure","publisher":"LandCo"},
        ]
      );
      
      // プラットフォームディメンションテーブル
      db.platform_dimension.insert(
        [
      	{platform_id:1,"platform_name":"PCGaming","type":"PC"},
      	{platform_id:2,"platform_name":"PlayStation","type":"Console"},
      	{platform_id:3,"platform_name":"Mobile","type":"Mobile"}
        ]
      );
    5. テーブル内のデータをクエリします。

      db.game_sales.find();
      db.game_dimension.find();
      db.platform_dimension.find();

      image

  2. Hologres にテーブルを作成します。

    1. Hologres コンソールにログオンし、左側のナビゲーションウィンドウで [インスタンス] をクリックし、アクセスしたい Hologres インスタンスをクリックします。右上隅にある [インスタンスに接続] をクリックします。

    2. 上部のナビゲーションバーで [メタデータ管理] をクリックし、次に [データベースの作成] をクリックします。ポップアップウィンドウの [データベース名] フィールドに test と入力し、[ポリシー][SPM] に設定します。詳細については、「データベースの作成」をご参照ください。

      image

    3. 上部のナビゲーションバーで [SQL エディター] をクリックします。クエリサイドペインの右上隅にある SQL アイコンをクリックして SQL クエリを作成します。ターゲットインスタンスとデータベースを選択し、次のコードスニペットをコピーして game_sales_details という名前のテーブルを作成します。

      CREATE TABLE game_sales_details(
        sale_id INT not null primary key,
        game_id INT,
        platform_id INT,
        sale_date VARCHAR(50),
        units_sold INT,
        sale_amt INT,
        status INT,
        game_name VARCHAR(50),
        release_date VARCHAR(50),
        developer VARCHAR(50),
        publisher VARCHAR(50),
        platform_name VARCHAR(50),
        type VARCHAR(50)
      );
  3. ApsaraMQ for Kafka トピックを作成します。

    1. ApsaraMQ for Kafka コンソールにログオンします。左側のナビゲーションウィンドウで [インスタンス] をクリックし、お使いのインスタンスをクリックします。

    2. 左側のナビゲーションウィンドウで [ホワイトリスト管理] をクリックします。Flink ワークスペースの CIDR ブロックを、新しく作成した、または既存のホワイトリストに追加します。

    3. 左側のナビゲーションウィンドウで [Topics] をクリックします。[Topic の作成] を選択します。表示される右側のペインで、名前フィールドに game_sales_fact と入力し、説明フィールドに説明を入力し、他のフィールドはデフォルト値を使用します。[OK] をクリックします。

ステップ 2: ストリームジョブの作成

ジョブ 1: game_sales のデータの PK を Kafka に書き込む

game_sales テーブルまたはディメンションテーブルが更新されると、影響を受けるプライマリキー (sale_id) が Kafka に書き込まれます。game_sales が更新された場合、その sale_id は直接書き込まれます。ディメンションテーブルが更新された場合、game_sales とのルックアップ結合により関連する sale_id が取得され、それが Kafka に書き込まれます。

  1. Realtime Compute for Apache Flink コンソール にログオンします。

  2. ワークスペースの [操作] 列で、[コンソール] をクリックします。

  3. 左側のナビゲーションメニューで、[開発] > [ETL] をクリックします。

  4. [新しい空のストリームドラフト] をクリックします。

    [新しいドラフト] ダイアログで、[名前]dwd_mongo_kafka と入力し、エンジンバージョンを選択します。

  5. [作成] をクリックします。

  6. コードを記述します。

    このコードでは、MongoDB コネクタを使用してソーステーブル game_sales を作成し、Upsert Kafka コネクタを使用して Kafka Topic game_sales_fact を作成します。セキュリティを確保するため、コードにプレーンテキストパスワードをハードコーディングすることは避けてください。代わりに、パスワードなどの機密情報には変数を使用します。詳細については、「変数の管理」をご参照ください。

    3 つの INSERT 文は、game_salesgame_dimensionplatform_dimension からの変更を独立してキャプチャします。これらの変更は、sale_id を介してリアルタイムで下流のジョブにストリーミングされ、Hologres テーブルが正確、完全、かつリアルタイムに更新されることを保証します。
    -- game_sales テーブルを作成
    CREATE TEMPORARY TABLE game_sales
    (
      `_id`       STRING,    -- MongoDB が自動生成した ID
      sale_id     INT,       -- 販売 ID
      PRIMARY KEY (_id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mongodb',      -- MongoDB コネクタを使用
      'uri' = '${secret_values.MongoDB-URI}', -- MongoDB URI の変数
      'database' = 'mongo_test',    -- MongoDB データベース名
      'collection' = 'game_sales'   -- MongoDB テーブル名
    );
    
    -- game_dimension テーブルを作成
    CREATE TEMPORARY TABLE game_dimension
    (
      `_id`        STRING,
      game_id      INT,
      PRIMARY KEY (_id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mongodb',
      'uri' = '${secret_values.MongoDB-URI}',
      'database' = 'mongo_test',
      'collection' = 'game_dimension'
    );
    
    -- platform_dimension テーブルを作成
    CREATE TEMPORARY TABLE platform_dimension
    (
      `_id`         STRING,
      platform_id   INT,
      PRIMARY KEY (_id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mongodb',
      'uri' = '${secret_values.MongoDB-URI}',
      'database' = 'mongo_test',
      'collection' = 'platform_dimension'
    );
    
    -- game_sales_dim テーブルを作成
    CREATE TEMPORARY TABLE game_sales_dim
    (
      `_id`       STRING,
      sale_id     INT,
      game_id     INT,
      platform_id INT,
      PRIMARY KEY (_id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mongodb',
      'uri' = '${secret_values.MongoDB-URI}',
      'database' = 'mongo_test',
      'collection' = 'game_sales'
    );
    
    -- PK を格納するための Kafka シンクを作成
    CREATE TEMPORARY TABLE game_sales_fact (
      sale_id      INT,
      PRIMARY KEY (sale_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',
      'topic' = 'game_sales_fact',
      'key.format' = 'json',
      'value.format' = 'json',
      'properties.enable.idempotence' = 'false'  -- ApsaraMQ for Kafka シンクに書き込む場合は、このオプションを無効にします
      );
    
    BEGIN STATEMENT SET;
    
    -- game_sales テーブルの PK を挿入
    INSERT INTO game_sales_fact (
      sale_id
    )
    SELECT
      sale_id
    FROM game_sales
    ;
    
    -- game_dimension テーブルを game_sales_dim テーブルと結合します。データが更新された場合は、影響を受ける sale_id の PK を Kafka シンクに挿入します。
    INSERT INTO game_sales_fact (
       sale_id
    )
    select
      gs.sale_id
    from game_dimension as gd
    join game_sales_dim FOR SYSTEM_TIME AS OF PROCTIME() as gs
    on gd.game_id = gs.game_id;
    
    -- platform_dimension テーブルを game_sales_dim テーブルと結合します。データが更新された場合は、影響を受ける sale_id を Kafka シンクに挿入します。
    INSERT INTO game_sales_fact (
       sale_id
    )
    select
      gs.sale_id
    from platform_dimension as pd
    join game_sales_dim FOR SYSTEM_TIME AS OF PROCTIME() as gs
    on pd.platform_id = gs.platform_id;
    
    END;
    説明
    • この例では、Upsert Kafka コネクタを使用しています。標準の Kafka コネクタとの違いについては、「Kafka、Upsert Kafka、または Kafka JSON カタログの選択」をご参照ください。

    • ルックアップ結合: ルックアップ結合は、外部データソースのディメンションテーブルをファクトテーブルに関連付けて、データストリームをエンリッチします。前提条件は次のとおりです。処理時間属性が定義され、同等の結合条件が存在すること。ディメンションテーブルとファクトテーブルのルックアップ結合の詳細については、「ディメンションテーブルの JOIN 文」をご参照ください。次のコードスニペットでは、処理時間属性は FOR SYSTEM_TIME AS OF 句を使用して定義されています。これにより、各 game_sales 行が、game_sales 行が結合演算子によって処理される時点で結合述語に一致する対応するディメンションテーブル行と結合されることが保証されます。処理時間属性を定義すると、結合されたディメンションテーブル行が将来更新されたときに結合結果が更新されるのを防ぐこともできます。次の SQL コードスニペットでは、同等の結合条件は gd.game_id = gsf.game_idpd.platform_id = gsf.platform_id です。

  7. 右上隅で [デプロイ] をクリックします。ダイアログボックスで [確認] をクリックします。

    詳細については、「ジョブのデプロイ」をご参照ください。

ジョブ 2: Kafka からのプライマリキーに基づいて MongoDB テーブルを結合し、Hologres テーブルに部分的な更新を実行する

game_sales を変更された PK および個々のディメンションテーブルと結合するジョブを作成します。このジョブは、いずれかのテーブルのデータが更新されたときに、影響を受けるデータを Hologres にストリーミングします。

ジョブ 1 の手順に従って、dws_kafka_mongo_holo という名前の新しいドラフトを作成し、デプロイします。次のコードでは、Hologres コネクタを使用してシンクテーブル game_sales_details を作成します。

このジョブは、game_sales_fact Kafka Topic から sale_id を消費します。次に、これらの Kafka から供給された sale_id に基づいて、MongoDB の game_sales ファクトテーブルと関連するディメンションテーブルの間で時間的なルックアップ結合 (FOR SYSTEM_TIME AS OF PROCTIME() を使用) を実行します。結果は、Hologres の game_sales_details テーブルにデータを Upsert するために使用され、リアルタイムで正確かつ完全な状態の反映を保証します。
-- プライマリキーを格納および消費するための Kafka テーブルを作成
CREATE TEMPORARY TABLE game_sales_fact
(
  sale_id  INT
  ,PRIMARY KEY (sale_id) NOT ENFORCED
)
WITH (
  'connector' = 'upsert-kafka'
  ,'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}'
  ,'topic' = 'game_sales_fact'
  ,'key.format' = 'json'
  ,'value.format' = 'json'
  ,'properties.group.id' = 'game_sales_fact'
  ,'properties.auto.offset.reset' = 'earliest'
);

-- game_sales テーブルを作成
CREATE TEMPORARY TABLE game_sales
(
  `_id`       STRING,
  sale_id     INT,
  game_id     INT,
  platform_id INT,
  sale_date   STRING,
  units_sold  INT,
  sale_amt    INT,
  status      INT,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'uri' = '${secret_values.MongoDB-URI}',
  'database' = 'mongo_test',
  'collection' = 'game_sales'
);


-- game_dimension テーブルを作成
CREATE TEMPORARY TABLE game_dimension
(
  `_id`        STRING,
  game_id      INT,
  game_name    STRING,
  release_date STRING,
  developer    STRING,
  publisher    STRING,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'uri' = '${secret_values.MongoDB-URI}',
  'database' = 'mongo_test',
  'collection' = 'game_dimension'
);

-- platform_dimension テーブルを作成
CREATE TEMPORARY TABLE platform_dimension
(
  `_id`          STRING
  ,platform_id   INT
  ,platform_name STRING
  ,type          STRING
  ,PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'uri' = '${secret_values.MongoDB-URI}',
  'database' = 'mongo_test',
  'collection' = 'platform_dimension'
);

-- game_sales_details シンクテーブルを作成
CREATE TEMPORARY TABLE IF NOT EXISTS game_sales_details
(
  sale_id       INT,
  game_id       INT,
  platform_id   INT,
  sale_date     STRING,
  units_sold    INT,
  sale_amt      INT,
  status        INT,
  game_name     STRING,
  release_date  STRING,
  developer     STRING,
  publisher     STRING,
  platform_name STRING,
  type          STRING,
  PRIMARY KEY (sale_id) NOT ENFORCED
)
WITH (
  -- VVR 11 用の Hologres コネクタオプション。
  'connector' = 'hologres',
  'dbname' = 'test', -- Hologres データベース名
  'tablename' = 'public.game_sales_details', -- Hologres テーブル名
  'username' = '${secret_values.AccessKeyID}', -- Alibaba Cloud アカウントの AccessKey ID
  'password' = '${secret_values.AccessKeySecret}', -- Alibaba Cloud アカウントの AccessKey シークレット
  'endpoint' = '${secret_values.Hologres-endpoint}', -- Hologres インスタンスの VPC エンドポイント
  'sink.delete-strategy'='IGNORE_DELETE',       -- リトラクトされたメッセージを処理するためのポリシー。IGNORE_DELETE はデータを挿入または更新し、削除はしません。
  'sink.on-conflict-action'='INSERT_OR_UPDATE', -- 更新モード。これを有効にすると、部分的な列の更新が実行されます。
  'sink.partial-insert.enabled'='true'
);

INSERT INTO game_sales_details (
  sale_id,
  game_id,
  platform_id,
  sale_date,
  units_sold,
  sale_amt,
  status,
  game_name,
  release_date,
  developer,
  publisher,
  platform_name,
  type
)
select
  gsf.sale_id,
  gs.game_id,
  gs.platform_id,
  gs.sale_date,
  gs.units_sold,
  gs.sale_amt,
  gs.status,
  gd.game_name,
  gd.release_date,
  gd.developer,
  gd.publisher,
  pd.platform_name,
  pd.type
from game_sales_fact as gsf
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on gsf.sale_id = gs.sale_id

join game_dimension FOR SYSTEM_TIME AS OF PROCTIME() as gd
on gs.game_id = gd.game_id

join platform_dimension FOR SYSTEM_TIME AS OF PROCTIME() as pd
on gs.platform_id = pd.platform_id;

ステップ 3: ジョブの開始

  1. 開発コンソールの左側のナビゲーションメニューで、[O&M] > [デプロイメント] を選択します。[デプロイメント] ページで、2 つのジョブデプロイメントを開始します。

  2. ジョブのステータスが実行中に変わったら、HoloWeb に移動して game_sales_details テーブルをクエリします。

    SELECT * FROM game_sales_details;

    テーブルに新しいデータ行が挿入されていることがわかります。

    image

ステップ 4: データを更新してクエリする

MongoDB の game_sales テーブルとディメンションテーブルへの更新は、クエリと分析のために Hologres に同期されます。次の例は、データがリアルタイムでどのように更新されるかを示しています。

game_sales テーブルの更新

  1. MongoDB の game_sales テーブルに 5 行を挿入します。

    db.game_sales.insert(
      [
    	{sale_id:1,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500,status:1},
    	{sale_id:2,game_id:102,platform_id:2,"sale_date":"2024-08-02",units_sold:400,sale_amt:2000,status:1},
    	{sale_id:3,game_id:103,platform_id:1,"sale_date":"2024-08-03",units_sold:300,sale_amt:1500,status:1},
    	{sale_id:4,game_id:101,platform_id:3,"sale_date":"2024-08-04",units_sold:200,sale_amt:1000,status:1},
    	{sale_id:5,game_id:104,platform_id:2,"sale_date":"2024-08-05",units_sold:100,sale_amt:3000,status:1}
      ]
    );

    Hologres の game_sales_details テーブルをクエリすると、対応する行が挿入されていることがわかります。

    image

  2. MongoDB の game_sales テーブルの sale_date フィールドの値を '2024-01-01' から '2024-08-01' に変更します。

    db.game_sales.updateMany({"sale_date": "2024-01-01"}, {$set: {"sale_date": "2024-08-01"}});

    Hologres の game_sales_details テーブルをクエリします。sale_date フィールドの値が '2024-01-01' から '2024-08-01' に変更されていることがわかります。

    image

  3. ApsaraDB for MongoDB の game_sales テーブルで、sale_id が 5 の行の status の値を 0 に変更して、論理削除を実行します。

    db.game_sales.updateMany({"sale_id": 5}, {$set: {"status": 0}});

    Hologres の game_sales_details テーブルをクエリします。sale_id が 5 のレコードの status 列の値が 0 に変更されていることがわかります。これで論理削除は完了です。

    image

ディメンションテーブルの更新

  1. game_dimension テーブルと platform_dimension テーブルに新しい行を挿入します。

    // ゲームディメンションテーブル
    db.game_dimension.insert(
      [
    	{game_id:105,"game_name":"HSHWK","release_date":"2024-08-20","developer":"GameSC","publisher":"GameSC"},
    	{game_id:106,"game_name":"HPBUBG","release_date":"2018-01-01","developer":"BLUE","publisher":"KK"}
      ]
    );
    
    // プラットフォームディメンションテーブル
    db.platform_dimension.insert(
      [
    	{platform_id:4,"platform_name":"Steam","type":"PC"},
    	{platform_id:5,"platform_name":"Epic","type":"PC"}
      ]
    );

    ディメンションテーブルに新しいデータを挿入するだけでは、MongoDB から Hologres へのデータ同期は開始されません。この同期プロセスは、購入やダウンロードに起因する game_sales テーブルへの変更によってトリガーされます。したがって、game_sales テーブルにデータを挿入する必要があります。

    // ゲーム販売テーブル
    db.game_sales.insert(
      [
    	{sale_id:6,game_id:105,platform_id:4,"sale_date":"2024-09-01",units_sold:400,sale_amt:2000,,status:1},
    	{sale_id:7,game_id:106,platform_id:1,"sale_date":"2024-09-01",units_sold:300,sale_amt:1500,,status:1}
      ]
    );

    Hologres の game_sales_details テーブルをクエリして更新を確認します。2 つの新しいデータレコードが挿入されていることがわかります。

    image

  2. MongoDB の game_dimension テーブルと platform_dimension テーブルのデータを更新します。

    // リリース日を更新
    db.game_dimension.updateMany({"release_date": "2018-01-01"}, {$set: {"release_date": "2024-01-01"}});
    
    // プラットフォームタイプを更新
    db.platform_dimension.updateMany({"type": "PC"}, {$set: {"type": "Swich"}});

    Hologres テーブルで関連フィールドが更新されていることがわかります。

    image

参考資料