全部產品
Search
文件中心

Realtime Compute for Apache Flink:MongoDB+Hologres使用者行為分析

更新時間:Sep 19, 2025

使用者行為資料通常具有龐大的資料量,儲存成本較高,且缺乏統一的格式,導致處理難度較大。常用的寬表模型雖查詢效率高,但冗餘度高、儲存空間大、維護複雜,更新慢。本文基於Flink+MongoDB+Hologres更好地實現寬表的資料分析,以遊戲行業的使用者行為資料分析為樣本,構建使用者行為資料寬表進行資料分析的方案。

方案架構和優勢

架構

Realtime ComputeFlink版是強大的流式計算引擎,支援對海量即時資料高效處理。ApsaraDB for MongoDB是一種文檔型的NoSQL資料庫,具有資料存放區結構靈活、讀寫效能高、支援複雜的查詢條件等特點。即時數倉Hologres是一站式即時數倉,支援資料即時寫入與更新,即時資料寫入即可查。三者結合相輔相成,能夠將複雜多變的資料進行聯合計算處理,實現對大型資料即時更新、查詢與分析。本文的樣本方案架構如下:

  1. MongoDB的業務資料更新後,將資料主鍵通過訊息佇列Kafka採用Upsert的方式更新寫入。

  2. 無論是事實表或維度資料表的更新,都會將所影響的資料主鍵通過Flink寫入訊息佇列Kafka。

  3. Flink通過消費訊息佇列Kafka中的主鍵資訊,根據主鍵擷取MongoDB的完整的業務資料後更新寫入到Hologres。

優勢

該方案有如下優勢:

  • ApsaraDB for MongoDB適用於高並發讀寫的情境的分區叢集架構,針對高頻寫入的海量資料,可無限擴充性能及儲存空間,解決寫入效率低和儲存空間不足的問題。

  • 無論是事實表還是維度資料表的更新,都會讓結果表的最新資料進行更新。這一過程確保了資料更新的及時性,並且在處理海量資料時,僅對發生變更的資料進行更新,從而有效解決了更新效率低下的問題。

  • 即時數倉Hologres支援高效更新與修正,寫入即可查,由其對外提供寬表模型的資料查詢,有效提升資料查詢與分析的效率。

實踐情境

本文以某遊戲廠商為例,實現了將使用者在平台上購買遊戲的行為資料進行即時預先處理並寫入Hologres即時查詢的業務情境。

game_sales(遊戲銷售表)

sale_id(銷售ID)

game_id(遊戲ID)

platform_id(平台ID)

sale_date(銷售日期)

units_sold(銷售量)

sale_amt(銷售金額)

status(邏輯欄位)

game_dimension(遊戲維度資料表)

game_id(遊戲ID)

game_name(遊戲名稱)

release_date(發售日期)

developer(開發人員)

publisher(發行商)

gameplatform_dimension(平台維度資料表)

platform_id(平台ID)

platform_name(平台名稱)

type(終端類型)

game_sales_details(遊戲銷售明細表)

sale_id(銷售ID)

game_id(遊戲ID)

platform_id(平台ID)

sale_date(銷售日期)

units_sold(銷售量)

sale_amt(銷售金額)

status(邏輯欄位)

game_name(遊戲名稱)

release_date(發售日期)

developer(開發人員)

publisher(發行商)

platform_name(平台名稱)

type(終端類型)

方案闡述

在該即時寬表方案中,維表變更(如遊戲資訊、平台類型更新)能夠即時同步到 Hologres 寬表,核心依賴四步鏈路:

  1. 捕獲變更:Flink 即時監聽 MongoDB 維表的資料變化。

  2. 定位影響:通過關聯鍵(如 game_id)尋找受影響的事實記錄,提取主鍵(如 sale_id)。

  3. 觸發重算:將主鍵寫入 Kafka,通知下遊作業“哪些資料需要重新整理”。

  4. 累加式更新:下遊作業拉取最新資料,重組寬表,並通過 INSERT_OR_UPDATE 模式寫入 Hologres。

前提條件

步驟一:準備資料

  1. 在MongoDB建立資料庫和三張業務表並插入資料。

    1. 登入執行個體

    2. 將Flink工作空間網段設定為白名單,詳情請參見設定白名單如何設定白名單

    3. Data Management控制台SQL Console頁面中,建立mongo_test資料庫,命令如下:

      use mongo_test;
    4. 在mongo_test資料庫建立game_sales(遊戲銷售表),game_dimension(遊戲維度資料表)和platform_dimension(平台維度資料表)並插入資料,命令如下:

      //遊戲銷售表(status為邏輯欄位,1為存在,0為刪除)
      db.game_sales.insert(
        [
      	{sale_id:0,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500,status:1},
        ]
      );
      
      //遊戲維度資料表
      db.game_dimension.insert(
        [
      	{game_id:101,"game_name":"SpaceInvaders","release_date":"2023-06-15","developer":"DevCorp","publisher":"PubInc"},
      	{game_id:102,"game_name":"PuzzleQuest","release_date":"2023-07-20","developer":"PuzzleDev","publisher":"QuestPub"},
      	{game_id:103,"game_name":"RacingFever","release_date":"2023-08-10","developer":"SpeedCo","publisher":"RaceLtd"},
      	{game_id:104,"game_name":"AdventureLand","release_date":"2023-09-05","developer":"Adventure","publisher":"LandCo"},
        ]
      );
      
      //平台維度資料表
      db.platform_dimension.insert(
        [
      	{platform_id:1,"platform_name":"PCGaming","type":"PC"},
      	{platform_id:2,"platform_name":"PlayStation","type":"Console"},
      	{platform_id:3,"platform_name":"Mobile","type":"Mobile"}
        ]
      );
    5. 查詢建立的表資訊,命令如下:

      db.game_sales.find();
      db.game_dimension.find();
      db.platform_dimension.find();

      image

  2. Hologres建立資料分析寬表。

    1. 登入Hologres控制台,在執行個體列表頁面,單擊目標執行個體,單擊右上方的登入執行個體

    2. 單擊上方導覽列的建立庫,建立資料庫名稱為test,本樣本權限原則選擇SPM,詳情請參見使用Hologres管理主控台建立資料庫

      image

    3. 選擇上方導覽列中的SQL編輯器,單擊左側導覽列上方的SQL表徵圖,建立SQL查詢,選擇對應執行個體名和資料庫,填入下方代碼建立銷售明細表。

      CREATE TABLE game_sales_details(
        sale_id INT not null primary key,
        game_id INT,
        platform_id INT,
        sale_date VARCHAR(50),
        units_sold INT,
        sale_amt INT,
        status INT,
        game_name VARCHAR(50),
        release_date VARCHAR(50),
        developer VARCHAR(50),
        publisher VARCHAR(50),
        platform_name VARCHAR(50),
        type VARCHAR(50)
      );
  3. 訊息佇列Kafka建立Topic。

    1. 登入Kafka控制台,在執行個體列表頁面,單擊目標執行個體名稱。

    2. 單擊左側導覽列的白名單管理,添加或修改白名單分組,將Flink工作空間網段設定為白名單。

    3. 左側導覽列選擇Topic管理,單擊建立Topic,名稱為game_sales_fact,其餘選擇預設設定,單擊確認

步驟二:建立流作業

作業一:將銷售表主鍵寫入訊息佇列

將銷售表和維度資料表的更新都以銷售表的主鍵(sale_id)寫入訊息佇列Kafka。 銷售表的更新直接寫入主鍵(sale_id)。維度資料表的更新,通過與銷售表進行維表JOIN後擷取關聯的銷售表資料的對應主鍵(sale_id)寫入訊息佇列Kafka。作業流程如下圖:

  1. 登入Realtime Compute控制台

  2. 單擊目標工作空間操作列下的控制台

  3. 在左側導覽列,單擊資料開發 > ETL

  4. 單擊image後,單擊建立流作業,填寫檔案名稱並選擇引擎版本

    修改檔案名稱為dwd_mongo_kafka。

  5. 單擊建立

  6. 編寫作業代碼。

    下列代碼中使用MongoDB連接器建立了源表game_sales,使用Upsert Kafka連接器建立了Kafka的Topic為game_sales_fact。為避免作業中出現純文字密碼,造成安全隱患,可以參見變數管理來配置密碼和地址等變數。

    三個INSERT語句分別捕獲事實表自身、遊戲維度資料表和平台維度資料表的變更,通過sale_id將變化即時傳遞至下遊。該設計確保Hologres寬表實現“即時、準確、完整”更新。任一INSERT缺失都將導致對應情境變更遺漏,引發資料滯後或不一致。
    //建立遊戲銷售表
    CREATE TEMPORARY TABLE game_sales
    (
      `_id`       STRING,    --MongoDB自產生Id
      sale_id     INT,       --銷售Id
      PRIMARY KEY (_id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mongodb',      --使用的連接器
      'uri' = '${secret_values.MongoDB-URI}', --MongoDB的URI
      'database' = 'mongo_test',    --資料庫名稱
      'collection' = 'game_sales'   --資料庫表名
    );
    
    //建立遊戲維度資料表
    CREATE TEMPORARY TABLE game_dimension
    (
      `_id`        STRING,
      game_id      INT,
      PRIMARY KEY (_id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mongodb',
      'uri' = '${secret_values.MongoDB-URI}',
      'database' = 'mongo_test',
      'collection' = 'game_dimension'
    );
    
    //建立平台維度資料表
    CREATE TEMPORARY TABLE platform_dimension
    (
      `_id`         STRING,
      platform_id   INT,
      PRIMARY KEY (_id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mongodb',
      'uri' = '${secret_values.MongoDB-URI}',
      'database' = 'mongo_test',
      'collection' = 'platform_dimension'
    );
    
    // 建立遊戲銷售表(維表)
    CREATE TEMPORARY TABLE game_sales_dim
    (
      `_id`       STRING,
      sale_id     INT,
      game_id     INT,
      platform_id INT,
      PRIMARY KEY (_id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mongodb',
      'uri' = '${secret_values.MongoDB-URI}',
      'database' = 'mongo_test',
      'collection' = 'game_sales'
    );
    
    //建立Kafka Topic儲存主鍵資訊的事實表(結果表)
    CREATE TEMPORARY TABLE game_sales_fact (
      sale_id      INT,
      PRIMARY KEY (sale_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',
      'topic' = 'game_sales_fact',
      'key.format' = 'json',
      'value.format' = 'json',
      'properties.enable.idempotence' = 'false'  --使用阿里雲訊息佇列Kafka,寫入結果表需關閉此參數
      );
    
    BEGIN STATEMENT SET;
    
    // 插入遊戲銷售表主鍵
    INSERT INTO game_sales_fact (
      sale_id
    )
    SELECT
      sale_id
    FROM game_sales
    ;
    
    //遊戲維度資料表與遊戲銷售表關聯,有資料更新則將所影響的銷售表主鍵插入Kafka Topic
    INSERT INTO game_sales_fact (
       sale_id
    )
    select
      gs.sale_id
    from game_dimension as gd
    join game_sales_dim FOR SYSTEM_TIME AS OF PROCTIME() as gs
    on gd.game_id = gs.game_id;
    
    //平台維度資料表與銷售表關聯,有資料更新則將所影響的銷售主鍵插入Kafka事實表
    INSERT INTO game_sales_fact (
       sale_id
    )
    select
      gs.sale_id
    from platform_dimension as pd
    join game_sales_dim FOR SYSTEM_TIME AS OF PROCTIME() as gs
    on pd.platform_id = gs.platform_id;
    
    END;
    說明
    • 本樣本使用的是Upsert Kafka連接器,以Upsert方式向Kafka寫入資料,其與Kafka連接器的區別請參見Kafka、Upsert Kafka或Kafka JSON catalog的選擇

    • 維表JOIN:通常用於使用從外部系統查詢的資料來豐富表。join 要求一個表具有處理時間屬性,還需要一個強制的相等串連條件,詳情請參見維表JOIN語句。在作業中,帶有process time屬性的FOR SYSTEM_TIME AS OF 子句在確保聯結處理game_sales表每一行時,都能與join條件匹配的維錶行串連。它還防止串連的維表在未來發生更新時變更串連的結果。強制的串連條件則是gd.game_id = gsf.game_id和pd.platform_id = gsf.platform_id。

  7. 單擊右上方的部署,單擊確認

    更多部署參數詳情請參見部署作業

作業二:消費主鍵資訊同步更新Hologres明細表

消費Kafka Topic當中的主鍵資訊(sale_id),通過三個維表Join操作得到銷售明細資訊,寫入最終的明細表。作業流程如下圖:

參考作業一建立並部署作業(dws_kafka_mongo_holo)。下列代碼中使用Hologres連接器建立了結果表game_sales_details。

通過消費Kafka中的sale_id,基於主鍵即時關聯三張維度資料表,利用FOR SYSTEM_TIME AS OF PROCTIME() 實現變更資料的動態捕獲與快照關聯,將最新維度資訊與事實資料拼接後寫入Hologres寬表。該JOIN邏輯確保了寬表能即時、完整地反映事實表與維度資料表的最新狀態,支撐“即時、準確、完整”的資料融合與更新。
//建立Kafka Topic儲存主鍵的事實表,消費主鍵資訊
CREATE TEMPORARY TABLE game_sales_fact
(
  sale_id  INT
  ,PRIMARY KEY (sale_id) NOT ENFORCED
)
WITH (
  'connector' = 'upsert-kafka'
  ,'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}'
  ,'topic' = 'game_sales_fact'
  ,'key.format' = 'json'
  ,'value.format' = 'json'
  ,'properties.group.id' = 'game_sales_fact'
  ,'properties.auto.offset.reset' = 'earliest'
);

//遊戲銷售表(維表)
CREATE TEMPORARY TABLE game_sales
(
  `_id`       STRING,
  sale_id     INT,
  game_id     INT,
  platform_id INT,
  sale_date   STRING,
  units_sold  INT,
  sale_amt    INT,
  status      INT,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'uri' = '${secret_values.MongoDB-URI}',
  'database' = 'mongo_test',
  'collection' = 'game_sales'
);


//遊戲維度資料表(維表)
CREATE TEMPORARY TABLE game_dimension
(
  `_id`        STRING,
  game_id      INT,
  game_name    STRING,
  release_date STRING,
  developer    STRING,
  publisher    STRING,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'uri' = '${secret_values.MongoDB-URI}',
  'database' = 'mongo_test',
  'collection' = 'game_dimension'
);

//平台維度資料表(維表)
CREATE TEMPORARY TABLE platform_dimension
(
  `_id`          STRING
  ,platform_id   INT
  ,platform_name STRING
  ,type          STRING
  ,PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'uri' = '${secret_values.MongoDB-URI}',
  'database' = 'mongo_test',
  'collection' = 'platform_dimension'
);

//遊戲銷售明細表(結果表)
CREATE TEMPORARY TABLE IF NOT EXISTS game_sales_details
(
  sale_id       INT,
  game_id       INT,
  platform_id   INT,
  sale_date     STRING,
  units_sold    INT,
  sale_amt      INT,
  status        INT,
  game_name     STRING,
  release_date  STRING,
  developer     STRING,
  publisher     STRING,
  platform_name STRING,
  type          STRING,
  PRIMARY KEY (sale_id) NOT ENFORCED
)
WITH (
  // 下列參數採用了VVR11版本的參數配置
  'connector' = 'hologres',
  'dbname' = 'test', --Hologres的資料庫名稱
  'tablename' = 'public.game_sales_details', --Hologres用於接收資料的表名稱
  'username' = '${secret_values.AccessKeyID}', --當前阿里雲帳號的AccessKey ID
  'password' = '${secret_values.AccessKeySecret}', --當前阿里雲帳號的AccessKey Secret
  'endpoint' = '${secret_values.Hologres-endpoint}', --當前Hologres執行個體VPC網路的Endpoint
  'sink.delete-strategy'='IGNORE_DELETE',       -- 撤回訊息的處理策略,IGNORE_DELETE適用於僅需插入或更新資料,而無需刪除資料的情境
  'sink.on-conflict-action'='INSERT_OR_UPDATE', -- 更新模式。寬表merge需要開啟此參數,實現部分列更新。
  'sink.partial-insert.enabled'='true'
);

INSERT INTO game_sales_details (
  sale_id,
  game_id,
  platform_id,
  sale_date,
  units_sold,
  sale_amt,
  status,
  game_name,
  release_date,
  developer,
  publisher,
  platform_name,
  type
)
select
  gsf.sale_id,
  gs.game_id,
  gs.platform_id,
  gs.sale_date,
  gs.units_sold,
  gs.sale_amt,
  gs.status,
  gd.game_name,
  gd.release_date,
  gd.developer,
  gd.publisher,
  pd.platform_name,
  pd.type
from game_sales_fact as gsf
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on gsf.sale_id = gs.sale_id

join game_dimension FOR SYSTEM_TIME AS OF PROCTIME() as gd
on gs.game_id = gd.game_id

join platform_dimension FOR SYSTEM_TIME AS OF PROCTIME() as pd
on gs.platform_id = pd.platform_id;

步驟三:啟動作業

  1. 完成兩個作業開發部署後,在左側導航選擇營運中心 > 作業營運啟動兩個作業。

  2. 作業運行後,前往Hologres控制台,對明細表game_sales_details進行查詢。

    SELECT * FROM game_sales_details;

    此時game_sales_details表中插入了一條資料。

    image

步驟四:資料更新和查詢

對於銷售表和維度資料表的資料變更,能直接更新反饋到明細表上進行查詢分析。下面將列舉幾種常見的資料變更操作,以展示Hologres寬表資料即時更新的情況。

銷售表資料變更

  1. 對MongoDB的game_sales表插入五條資料,並觀察Hologres的game_sale_details表的結果。

    db.game_sales.insert(
      [
    	{sale_id:1,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500,status:1},
    	{sale_id:2,game_id:102,platform_id:2,"sale_date":"2024-08-02",units_sold:400,sale_amt:2000,status:1},
    	{sale_id:3,game_id:103,platform_id:1,"sale_date":"2024-08-03",units_sold:300,sale_amt:1500,status:1},
    	{sale_id:4,game_id:101,platform_id:3,"sale_date":"2024-08-04",units_sold:200,sale_amt:1000,status:1},
    	{sale_id:5,game_id:104,platform_id:2,"sale_date":"2024-08-05",units_sold:100,sale_amt:3000,status:1}
      ]
    );

    查詢Hologres的game_sale_details表,可以看到,該表增加了五條相應的資料。

    image

  2. 將MongoDB表game_sales中sale_date=2024-01-01的資料均修改為2024-08-01。

    db.game_sales.updateMany({"sale_date": "2024-01-01"}, {$set: {"sale_date": "2024-08-01"}});

    查詢Hologres的game_sale_details表,可以看到,該表中sale_date=2024-01-01的資料都被修改成2024-08-01了。

    image

  3. 對MongoDB的game_sales表進行邏輯刪除,將sale_id=5的資料中的status欄位改為0。

    db.game_sales.updateMany({"sale_id": 5}, {$set: {"status": 0}});

    查詢Hologres的game_sale_details表,可以看到,該表中sale_id=5的資料中的status欄位值被修改成0,實現了邏輯刪除。

    image

維度資料表資料變更

  1. 對MongoDB的game_dimension表和platform_dimension表插入資料,插入新的遊戲和新的平台資料。

    //遊戲維度資料表
    db.game_dimension.insert(
      [
    	{game_id:105,"game_name":"HSHWK","release_date":"2024-08-20","developer":"GameSC","publisher":"GameSC"},
    	{game_id:106,"game_name":"HPBUBG","release_date":"2018-01-01","developer":"BLUE","publisher":"KK"}
      ]
    );
    
    //平台維度資料表
    db.platform_dimension.insert(
      [
    	{platform_id:4,"platform_name":"Steam","type":"PC"},
    	{platform_id:5,"platform_name":"Epic","type":"PC"}
      ]
    );

    新增遊戲資料和新增平台資料,並不會增加明細表資料。使用者需要產生購買或者下載行為才可以,所以繼續添加game_sales表的資料。

    // 遊戲銷售表
    db.game_sales.insert(
      [
    	{sale_id:6,game_id:105,platform_id:4,"sale_date":"2024-09-01",units_sold:400,sale_amt:2000,,status:1},
    	{sale_id:7,game_id:106,platform_id:1,"sale_date":"2024-09-01",units_sold:300,sale_amt:1500,,status:1}
      ]
    );

    而後查詢Hologres的game_sale_details表,可以看到,明細表新增了兩條關於新遊戲和新平台的資料記錄。

    image

  2. 對MongoDB的game_dimension表和platform_dimension表更新資料,更新遊戲資訊和平台資訊。

    //更新開發日期
    db.game_dimension.updateMany({"release_date": "2018-01-01"}, {$set: {"release_date": "2024-01-01"}});
    
    //更新平台類型
    db.platform_dimension.updateMany({"type": "PC"}, {$set: {"type": "Swich"}});

    而後查詢Hologres的game_sale_details表,可以看到相關資訊的變更,開發日期和平台類型都被即時更新了。

    image

相關文檔