當您需要即時響應資料庫中的資料變更時,可以通過訂閱變更MongoDB變更流(Change Streams)解決。本文將為您介紹MongoDB中變更流(Change Stream)相關的內容、使用方法及最佳實務。
什麼是變更流(Change Stream)
變更流將資料庫的變更事件轉化為即時資料流。用戶端可訂閱此流,在資料插入、更新或刪除時立即收到通知。典型應用情境包括:
跨叢集資料同步: 實現MongoDB叢集間的增量資料複製。
Action Trail: 跟蹤高風險操作(如刪除資料庫或集合)。
事件驅動架構: 將變更推送到下遊系統進行即時分析、緩衝更新或通知。
使用限制
執行個體類型為複本集執行個體或分區叢集執行個體。
配置變更流
監聽更多的DDL事件
前提條件
MongoDB 6.0及以上版本(升級指南)。
操作步驟
使用 mongosh 串連資料庫。
執行命令
watch,設定showExpandedEvents: true:// mongo shell or mongosh v1.x cursor = db.getSiblingDB("test").watch([], { showExpandedEvents: true // 啟用 更多DDL 事件監聽 } ); cursor.next();
結果驗證
在新的SQL視窗執行變更SQL,如
db.createCollection("myCollection1")。觀察原SQL視窗輸出上述執行SQL相關資訊。

開啟前鏡像(Pre-image)
MongoDB的pre-image是指文檔在修改或刪除前的完整狀態快照,用於記錄資料變更前的原始值。
前提條件
MongoDB 6.0及以上版本(升級指南)。
操作步驟
開啟資料庫維度前鏡像 :
db.adminCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: "off" } // "off" 表示使用oplog保留時間 } } })開啟集合維度前鏡像 :
說明若需為某資料庫內所有集合啟用前鏡像,則必須先開啟資料庫維度前鏡像,然後在該庫的每個集合上單獨啟用集合級前鏡像功能。
修改現有集合
db.runCommand({ collMod: "myCollection", changeStreamPreAndPostImages: { enabled: true } })建立新集合時指定
db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})建立變更流監聽 (指定Pre-image選項):
// 在目的地組合上建立監聽 cursor = db.getCollection("myCollection").watch([], { fullDocument: 'required', // 或 'whenAvailable' fullDocumentBeforeChange: 'required' // 或 'whenAvailable' } ) cursor.next();required: 服務端必須返回Pre/Post-image,否則報錯。whenAvailable: 服務端儘力返回,不保證一定有。
結果驗證
檢查資料庫維度開關是否已開啟:
db.adminCommand( { getClusterParameter: "changeStreamOptions" } )開啟成功後輸出如下資訊:

檢查集合配置:
db.getCollectionInfos({name: "myCollection"}) // 或 db.runCommand({listCollections: 1})預期輸出:在返回的文檔中找到類似
"options" : { "changeStreamPreAndPostImages" : { "enabled" : true } }的欄位。
在另一個mongosh視窗更新
myCollection中的文檔。觀察
cursor返回的事件,應包含fullDocumentBeforeChange欄位(變更前文檔)。
開啟後鏡像(Post-image)
MongoDB的Post-image是變更發生後文檔的最新完整狀態快照,用於記錄變更後的完整文檔內容。
前提條件
MongoDB 3.6及以上版本(升級指南)。
操作步驟
執行watch命令時設定fullDocument: 'updateLookup'。
cursor = db.getSiblingDB("test").myCollection.watch([],
{
fullDocument: 'updateLookup'
}
);
cursor.next();
結果驗證
在另一個mongosh視窗新增或更新
myCollection中的文檔。觀察
cursor返回的事件,應包含fullDocument欄位(變更後文檔)。
返回的完整文檔可能為空白或者不是point-in-time的。比如:
當連續對同一個文檔進行多次更新時,第一個update的change events可能返回的是最近update操作完成後的文檔內容;
當一個文檔更新後立即刪除時,其對應的change events中因查不到變更後的文檔內容而展示空的fullDocument欄位。
處理超大變更事件 (>16MB)
前提條件
MongoDB 7.0及以上版本(升級指南)。
操作步驟
在 watch() 的管道中包含 $changeStreamSplitLargeEvent 階段:
myChangeStreamCursor = db.myCollection.watch(
[ { $changeStreamSplitLargeEvent: {} } ], // 添加切分階段
{
fullDocument: "required",
fullDocumentBeforeChange: "required"
}
)結果驗證
執行一個會導致變更事件 >16MB 的操作(如更新一個包含超大數組的文檔)。
觀察返回的事件流會被拆分成多個連續的
fragment事件,最終以一個fragment事件結束。
降低前鏡像儲存開銷
預設Pre-image隨oplog到期。可設定更短的到期時間以節省空間的:
設定過短的 expireAfterSeconds 且下遊消費速度不足時,可能導致 ChangeStreamHistoryLost 錯誤(因 Pre-image 過早到期),詳情可參考Change Streams with Document Pre- and Post-Images。
db.adminCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: { expireAfterSeconds: 100 } // 單位:秒
}
}
})變更流最佳實務
謹慎開啟前後鏡像:
啟用
fullDocumentBeforeChange(Pre-image) 和fullDocument(Post-image) 會增加儲存開銷 (config.system.preimages表) 和請求延遲。僅在業務確實需要文檔變更前後完整內容時開啟。
分區叢集部署要點:
始終在
mongos上建立變更流監聽,以保證全域事件順序。高寫入負載下,變更流可能成為瓶頸(
mongos需排序合并分區事件)。分區間寫入不均衡(如設計不佳的分區鍵)會顯著增加變更流延遲。
避免
updateLookup:updateLookup會為每個更新事件執行單獨的findOne查詢,效率低下。在分區叢集中,
moveChunk操作會進一步加劇updateLookup的延遲問題。
防範變更流中斷:
⚠️以下情境會導致變更流遊標失效 (
operationType: "invalidate") 或錯誤。下遊消費滯後:消費速度慢於事件產生速度,導致
resumeToken超出 oplog 視窗。無效
resumeToken:使用過舊的resumeToken恢複,其對應時間點已不在 oplog 中。容錯移轉影響:Failover 後新主節點的 oplog 可能不包含原
resumeToken。中繼資料變更:
drop、rename、dropDatabase操作可能會觸發invalidate事件。Pre-image 到期:
expireAfterSeconds設定過短且消費慢導致 Pre-image 丟失。
應對方案:
監控變更流延遲。
確保 oplog 視窗足夠大。
設計健壯的錯誤處理和恢複邏輯(捕獲
invalidate事件,記錄最後有效resumeToken,重建監聽)。設定合理的
expireAfterSeconds。
範圍選擇策略:
單一變更流 vs 多個集合級變更流:
單一流 (庫/執行個體級):資源開銷較小(單線程拉取 oplog),但需下遊自行過濾分發事件。高事件量下
mongos可能成為瓶頸。多個集合級流:可利用服務端過濾減少網路傳輸,並發性更好。但過多流會增加
oplog讀取爭搶和資源消耗。
建議: 根據業務負載(事件量、集合數量)進行測試,選擇最優方案。通常,對少量高活躍度集合使用獨立流,對大量低活躍度集合使用庫/執行個體級流配合下遊過濾。