全部產品
Search
文件中心

ApsaraDB for MongoDB:使用MongoDB變更流(Change Stream)即時捕獲資料變更

更新時間:Aug 20, 2025

當您需要即時響應資料庫中的資料變更時,可以通過訂閱變更MongoDB變更流(Change Streams)解決。本文將為您介紹MongoDB中變更流(Change Stream)相關的內容、使用方法及最佳實務。

什麼是變更流(Change Stream)

變更流將資料庫的變更事件轉化為即時資料流。用戶端可訂閱此流,在資料插入、更新或刪除時立即收到通知。典型應用情境包括:

  • 跨叢集資料同步: 實現MongoDB叢集間的增量資料複製。

  • Action Trail: 跟蹤高風險操作(如刪除資料庫或集合)。

  • 事件驅動架構: 將變更推送到下遊系統進行即時分析、緩衝更新或通知。

版本演化歷史

版本

更新說明

MongoDB 3.6

  • 首次發布。

  • 僅支援訂閱集合(Collection)維度。

  • events類型有限。

  • 支援故障恢複。

  • 支援查看變更後的視圖(Post-image)。

MongoDB 4.0

  • 支援訂閱庫(Database)以及叢集(Cluster)維度。

  • 支援dropdropDatabaserename事件。

  • resumeToken格式從BinData變更為Hex

MongoDB 4.2

  • 支援$set$unset等更多pipeline操作符。

  • 新增startAfter選項,用於按時間點啟動監聽功能。

  • 修改事件的_id欄位,變更流將拋出異常。

  • 移除對{readConcern: majority}的依賴。

MongoDB 5.1

  • 提升部分彙總架構下stage的執行效率。

  • 提升資源利用效率。

MongoDB 5.3

  • 支援在Chunk遷移期間過濾對孤立文檔的更新。

MongoDB 6.0

  • 支援查看變更前的視圖(Pre-image)。

  • 支援createcreateIndexesmodifyshardCollection等DDL語句,需要指定showExpandedEvents:true來支援;更多資訊,請參見Change Events

  • Change Events新增wallTime欄位,時間戳記支援多種轉換和展示運算元(包括$toDate$tsSeconds$tsIncrement)以方便業務消費。

MongoDB 7.0

  • 支援超大變更事件(>16MB),通過全新的$changeStreamSplitLargeEvent運算元來將超大變更事件進行切分。

  • change events支援refineCollectionShardKey以及reshardCollection事件。

MongoDB 8.0

  • $queryStats命令增強了跟change stream相關的指標。

  • movePrimary命令不再會使得建立了change stream的表產生非法事件了,即change stream可以正常連續處理movePrimary命令引起的資料移轉了。

使用限制

執行個體類型為複本集執行個體或分區叢集執行個體。

配置變更流

監聽更多的DDL事件

前提條件

MongoDB 6.0及以上版本(升級指南)。

操作步驟

  1. 使用 mongosh 串連資料庫

  2. 執行命令watch,設定 showExpandedEvents: true

    // mongo shell or mongosh v1.x
    cursor = db.getSiblingDB("test").watch([],
      {
        showExpandedEvents: true       // 啟用 更多DDL 事件監聽
      }
    );
    cursor.next();

結果驗證

  1. 在新的SQL視窗執行變更SQL,如db.createCollection("myCollection1")

  2. 觀察原SQL視窗輸出上述執行SQL相關資訊。

    image

開啟前鏡像(Pre-image)

MongoDB的pre-image是指文檔在修改或刪除前的完整狀態快照,用於記錄資料變更前的原始值。

前提條件

MongoDB 6.0及以上版本(升級指南)。

操作步驟

  1. 開啟資料庫維度前鏡像 :

    db.adminCommand({
      setClusterParameter: {
        changeStreamOptions: {
          preAndPostImages: { expireAfterSeconds: "off" } // "off" 表示使用oplog保留時間
        }
      }
    })
  2. 開啟集合維度前鏡像 :

    說明

    若需為某資料庫內所有集合啟用前鏡像,則必須先開啟資料庫維度前鏡像,然後在該庫的每個集合上單獨啟用集合級前鏡像功能。

    修改現有集合

    db.runCommand({
      collMod: "myCollection",
      changeStreamPreAndPostImages: { enabled: true }
    })

    建立新集合時指定

    db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})
  3. 建立變更流監聽 (指定Pre-image選項):

    // 在目的地組合上建立監聽
    cursor = db.getCollection("myCollection").watch([],
      {
        fullDocument: 'required', // 或 'whenAvailable'
        fullDocumentBeforeChange: 'required' // 或 'whenAvailable'
      }
    )
    cursor.next();
    • required: 服務端必須返回Pre/Post-image,否則報錯。

    • whenAvailable: 服務端儘力返回,不保證一定有。

結果驗證

  1. 檢查資料庫維度開關是否已開啟:

    db.adminCommand( { getClusterParameter: "changeStreamOptions" } )

    開啟成功後輸出如下資訊:image

  2. 檢查集合配置:

    db.getCollectionInfos({name: "myCollection"}) // 或 db.runCommand({listCollections: 1})

    預期輸出:在返回的文檔中找到類似 "options" : { "changeStreamPreAndPostImages" : { "enabled" : true } } 的欄位。 

    image

  3. 在另一個mongosh視窗更新 myCollection 中的文檔。

  4. 觀察 cursor 返回的事件,應包含 fullDocumentBeforeChange 欄位(變更前文檔)。

    image

更多資訊參考Change Streams with Document Pre- and Post-Images

開啟後鏡像(Post-image)

MongoDB的Post-image是變更發生後文檔的最新完整狀態快照,用於記錄變更後的完整文檔內容。

前提條件

MongoDB 3.6及以上版本(升級指南)。

操作步驟

執行watch命令時設定fullDocument: 'updateLookup'

cursor = db.getSiblingDB("test").myCollection.watch([], 
  {
    fullDocument: 'updateLookup'
  }
);
cursor.next();

結果驗證

  1. 在另一個mongosh視窗新增或更新 myCollection 中的文檔。

  2. 觀察 cursor 返回的事件,應包含 fullDocument 欄位(變更後文檔)。image

說明

返回的完整文檔可能為空白或者不是point-in-time的。比如:

  • 當連續對同一個文檔進行多次更新時,第一個update的change events可能返回的是最近update操作完成後的文檔內容;

  • 當一個文檔更新後立即刪除時,其對應的change events中因查不到變更後的文檔內容而展示空的fullDocument欄位。

更多資訊參考Lookup Full Document for Update Operations

處理超大變更事件 (>16MB)

前提條件

MongoDB 7.0及以上版本(升級指南)。

操作步驟 

在 watch() 的管道中包含 $changeStreamSplitLargeEvent 階段:

myChangeStreamCursor = db.myCollection.watch(
  [ { $changeStreamSplitLargeEvent: {} } ], // 添加切分階段
  {
    fullDocument: "required",
    fullDocumentBeforeChange: "required"
  }
)

結果驗證

  • 執行一個會導致變更事件 >16MB 的操作(如更新一個包含超大數組的文檔)。

  • 觀察返回的事件流會被拆分成多個連續的fragment事件,最終以一個fragment事件結束。

降低前鏡像儲存開銷

預設Pre-image隨oplog到期。可設定更短的到期時間以節省空間的:

警告

設定過短的 expireAfterSeconds 且下遊消費速度不足時,可能導致 ChangeStreamHistoryLost 錯誤(因 Pre-image 過早到期),詳情可參考Change Streams with Document Pre- and Post-Images

db.adminCommand({
  setClusterParameter: {
    changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 } // 單位:秒
    }
  }
})

變更流最佳實務 

  1. 謹慎開啟前後鏡像:

    • 啟用 fullDocumentBeforeChange (Pre-image) 和 fullDocument (Post-image) 會增加儲存開銷 (config.system.preimages 表) 和請求延遲。

    • 僅在業務確實需要文檔變更前後完整內容時開啟。

  2. 分區叢集部署要點:

    • 始終在 mongos 上建立變更流監聽,以保證全域事件順序。

    • 高寫入負載下,變更流可能成為瓶頸(mongos 需排序合并分區事件)。

    • 分區間寫入不均衡(如設計不佳的分區鍵)會顯著增加變更流延遲。

  3. 避免 updateLookup

    • updateLookup會為每個更新事件執行單獨的findOne查詢,效率低下。

    • 在分區叢集中,moveChunk操作會進一步加劇updateLookup的延遲問題。

  4. 防範變更流中斷:

    • ⚠️以下情境會導致變更流遊標失效 (operationType: "invalidate") 或錯誤。

      • 下遊消費滯後:消費速度慢於事件產生速度,導致resumeToken超出 oplog 視窗。

      • 無效 resumeToken使用過舊的resumeToken恢複,其對應時間點已不在 oplog 中。

      • 容錯移轉影響:Failover 後新主節點的 oplog 可能不包含原resumeToken

      • 中繼資料變更:droprenamedropDatabase操作可能會觸發invalidate事件。

      • Pre-image 到期:expireAfterSeconds 設定過短且消費慢導致 Pre-image 丟失。

    • 應對方案:

      • 監控變更流延遲。

      • 確保 oplog 視窗足夠大。

      • 設計健壯的錯誤處理和恢複邏輯(捕獲invalidate事件,記錄最後有效resumeToken,重建監聽)。

      • 設定合理的expireAfterSeconds

  5. 範圍選擇策略:

    • 單一變更流 vs 多個集合級變更流:

      • 單一流 (庫/執行個體級):資源開銷較小(單線程拉取 oplog),但需下遊自行過濾分發事件。高事件量下 mongos 可能成為瓶頸。

      • 多個集合級流:可利用服務端過濾減少網路傳輸,並發性更好。但過多流會增加 oplog 讀取爭搶和資源消耗。

    • 建議: 根據業務負載(事件量、集合數量)進行測試,選擇最優方案。通常,對少量高活躍度集合使用獨立流,對大量低活躍度集合使用庫/執行個體級流配合下遊過濾。