當您需要即時響應資料庫中的資料變更時,可以通過訂閱變更MongoDB變更流(Change Streams)解決。本文將為您介紹MongoDB中變更流(Change Stream)相關的內容、使用方法及最佳實務。
什麼是變更流(Change Stream)
變更流將資料庫的變更事件轉化為即時資料流。用戶端可訂閱此流,在資料插入、更新或刪除時立即收到通知。典型應用情境包括:
跨叢集資料同步: 實現MongoDB叢集間的增量資料複製。
Action Trail: 跟蹤高風險操作(如刪除資料庫或集合)。
事件驅動架構: 將變更推送到下遊系統進行即時分析、緩衝更新或通知。
使用限制
執行個體類型為複本集執行個體或分區叢集執行個體。
配置變更流
監聽更多的DDL事件
前提條件
MongoDB 6.0及以上版本(升級指南)。
操作步驟
使用 mongosh 串連資料庫。
執行命令
watch,設定showExpandedEvents: true:// mongo shell or mongosh v1.x cursor = db.getSiblingDB("test").watch([], { showExpandedEvents: true // 啟用更多DDL 事件監聽 } ); cursor.next();
結果驗證
在新的SQL視窗執行變更SQL,如
db.createCollection("myCollection1")。觀察原SQL視窗輸出上述執行SQL相關資訊。

開啟前鏡像(Pre-image)
MongoDB的pre-image是指文檔在修改或刪除前的完整狀態快照,用於記錄資料變更前的原始值。
前提條件
MongoDB 6.0及以上版本(升級指南)。
操作步驟
開啟資料庫維度前鏡像 :
db.adminCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: "off" } // "off" 表示使用oplog保留時間 } } })開啟集合維度前鏡像 :
說明若需為某資料庫內所有集合啟用前鏡像,則必須先開啟資料庫維度前鏡像,然後在該庫的每個集合上單獨啟用集合級前鏡像功能。
修改現有集合
db.runCommand({ collMod: "myCollection", changeStreamPreAndPostImages: { enabled: true } })建立新集合時指定
db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})建立變更流監聽 (指定Pre-image選項):
// 在目的地組合上建立監聽 cursor = db.getCollection("myCollection").watch([], { fullDocument: 'required', // 或 'whenAvailable' fullDocumentBeforeChange: 'required' // 或 'whenAvailable' } ) cursor.next();required: 服務端必須返回Pre/Post-image,否則報錯。whenAvailable: 服務端儘力返回,不保證一定有。
結果驗證
檢查資料庫維度開關是否已開啟:
db.adminCommand( { getClusterParameter: "changeStreamOptions" } )開啟成功後輸出如下資訊:

檢查集合配置:
db.getCollectionInfos({name: "myCollection"}) // 或 db.runCommand({listCollections: 1})預期輸出:在返回的文檔中找到類似
"options" : { "changeStreamPreAndPostImages" : { "enabled" : true } }的欄位。
在另一個mongosh視窗更新
myCollection中的文檔。觀察
cursor返回的事件,應包含fullDocumentBeforeChange欄位(變更前文檔)。
開啟後鏡像(Post-image)
MongoDB的Post-image是變更發生後文檔的最新完整狀態快照,用於記錄變更後的完整文檔內容。
前提條件
MongoDB 3.6及以上版本(升級指南)。
操作步驟
執行watch命令時設定fullDocument: 'updateLookup'。
cursor = db.getSiblingDB("test").myCollection.watch([],
{
fullDocument: 'updateLookup'
}
);
cursor.next();
結果驗證
在另一個mongosh視窗新增或更新
myCollection中的文檔。觀察
cursor返回的事件,應包含fullDocument欄位(變更後文檔)。
返回的完整文檔可能為空白或者不是point-in-time的。比如:
當連續對同一個文檔進行多次更新時,第一個update的change events可能返回的是最近update操作完成後的文檔內容;
當一個文檔更新後立即刪除時,其對應的change events中因查不到變更後的文檔內容而展示空的fullDocument欄位。
處理超大變更事件 (>16MB)
前提條件
MongoDB 7.0及以上版本(升級指南)。
操作步驟
在 watch() 的管道中包含 $changeStreamSplitLargeEvent 階段:
myChangeStreamCursor = db.myCollection.watch(
[ { $changeStreamSplitLargeEvent: {} } ], // 添加切分階段
{
fullDocument: "required",
fullDocumentBeforeChange: "required"
}
)結果驗證
執行一個會導致變更事件 >16MB 的操作(如更新一個包含超大數組的文檔)。
觀察返回的事件流會被拆分成多個連續的
fragment事件,最終以一個fragment事件結束。
降低前鏡像儲存開銷
預設Pre-image隨oplog到期。可設定更短的到期時間以節省空間的:
設定過短的 expireAfterSeconds 且下遊消費速度不足時,可能導致 ChangeStreamHistoryLost 錯誤(因 Pre-image 過早到期),詳情可參考Change Streams with Document Pre- and Post-Images。
db.adminCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: { expireAfterSeconds: 100 } // 單位:秒
}
}
})變更流最佳實務
謹慎開啟前後鏡像:
啟用
fullDocumentBeforeChange(Pre-image) 和fullDocument(Post-image) 會增加儲存開銷 (config.system.preimages表) 和請求延遲。僅在業務確實需要文檔變更前後完整內容時開啟。
分區叢集部署要點:
始終在
mongos上建立變更流監聽,以保證全域事件順序。高寫入負載下,變更流可能成為瓶頸(
mongos需排序合并分區事件)。分區間寫入不均衡(如設計不佳的分區鍵)會顯著增加變更流延遲。
避免
updateLookup:updateLookup會為每個更新事件執行單獨的findOne查詢,效率低下。在分區叢集中,
moveChunk操作會進一步加劇updateLookup的延遲問題。
防範變更流中斷:
⚠️以下情境會導致變更流遊標失效 (
operationType: "invalidate") 或錯誤。下遊消費滯後:消費速度慢於事件產生速度,導致
resumeToken超出 oplog 視窗。無效
resumeToken:使用過舊的resumeToken恢複,其對應時間點已不在 oplog 中。容錯移轉影響:Failover 後新主節點的 oplog 可能不包含原
resumeToken。中繼資料變更:
drop、rename、dropDatabase操作可能會觸發invalidate事件。Pre-image 到期:
expireAfterSeconds設定過短且消費慢導致 Pre-image 丟失。
應對方案:
監控變更流延遲。
確保 oplog 視窗足夠大。
設計健壯的錯誤處理和恢複邏輯(捕獲
invalidate事件,記錄最後有效resumeToken,重建監聽)。設定合理的
expireAfterSeconds。
範圍選擇策略:
單一變更流 vs 多個集合級變更流:
單一流 (庫/執行個體級):資源開銷較小(單線程拉取 oplog),但需下遊自行過濾分發事件。高事件量下
mongos可能成為瓶頸。多個集合級流:可利用服務端過濾減少網路傳輸,並發性更好。但過多流會增加
oplog讀取爭搶和資源消耗。
建議: 根據業務負載(事件量、集合數量)進行測試,選擇最優方案。通常,對少量高活躍度集合使用獨立流,對大量低活躍度集合使用庫/執行個體級流配合下遊過濾。
常見問題
1. 為什麼 Change Stream 的慢日誌總是顯示 COLLSCAN?
這是正常且符合預期的行為,無需最佳化。Change Stream的cursor最終會建立在local.oplog.rs上(只有這一個地方記錄了執行個體所有的修改),而該表沒有索引。所以固定是COLLSCAN,無法避免也沒有最佳化空間。
如果您在尋找待最佳化查詢語句時使用了COLLSCAN作為日誌過濾的關鍵字,推薦也額外過濾掉$changeStream關鍵字。
推薦在遇到changeStream效能問題(比如下遊消費拉取存在延遲上漲)時才需要關注Change Stream相關的慢日誌。
2. 為什麼分區執行個體上對於非分區表建立監聽,也能在除了primary shard之外的其他分區上觀察到changeStream cursor?
為了應對可能存在的分區(shardCollection)操作。您可以隨時對一個已經建立監聽的非分區表執行shardCollection操作來將其資料分布到所有分區上(movePrimary類似)。提前在其他分區上建立Change Stream cursors就是為了應對這種情況。通常這些分區上的cursor並不會返回任何變更事件,效能開銷也比較小。
與之類似的,為了應對可能存在的addShard/removeShard操作,分區執行個體上建立監聽時,mongos也會到Config Server上去建立相關的cursor。
3. 為什麼建立Change Stream cursor時指定了readPreference:secondary,但這個cursor的慢日誌卻出現在primary中?
遊標(cursor)建立後會“固定”在某個節點上,不會因節點角色變化而自動遷移。因此一旦Change Stream的cursor建立之後,就固定在某個mongod節點(主或從節點)上並持續通過getMore消費。當發生過主從切換、變更配置、遷移或其他可能導致主從切換的事件時,原本在secondary上的cursor可能會轉移到primary上。
如果不期望這部分負載持續在primary節點上消耗資源,則可以通過killCursors的方式來清理相關的changeStream cursor,下遊消費邏輯處理會根據resumeToken並結合readPreference將cursor恢複到secondary節點上去,此操作不會導致變更事件的丟失或中斷。
db.runCommand(
{
killCursors: <collection>,
cursors: [ <cursor id1>, ... ], comment: <any>
}
)
db.getSiblingDB("<testDB>").runCommand( { killCursors: "<testColl>", cursors: [NumberLong("2452840976689696187") ] } ) 4. 為什麼在mongos上看到大量關於Change Stream的長達1000ms的慢日誌?
2020-08-26T04:34:45.045+0000 I COMMAND [conn21283] command altconfig-b2b-perf.oplog command: getMore \{ getMore: 3513599116181216748, collection: "oplog", $db: "altconfig-b2b-perf", $clusterTime: { clusterTime: Timestamp(1598416483, 1), signature: { hash: BinData(0, EC3841EB1FB7A34F897688BB5983E32E2ADF6763), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } originatingCommand: \{ find: "oplog", filter: { timestamp: { $gte: new Date(1597895390009) } }, tailable: true, awaitData: true, $db: "altconfig-b2b-perf", $clusterTime: \{ clusterTime: Timestamp(1597895487, 1), signature: { hash: BinData(0, AAAE33D5688F935C70469CBDB8EB1F6882749A40), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } nShards:1 cursorid:3513599116181216748 numYields:0 nreturned:0 reslen:237 protocol:op_msg 1000ms這是舊版本 MongoDB (6.0 之前) 的正常現象,它代表的是等待逾時而非效能瓶頸。在分區執行個體架構中,mongos到分區建立的Change Stream cursors會指定tailable:true、awaitData:true以及maxTimeMS:1000來儘可能在1000ms內返回更多的變更事件。
在6.0之前的大版本中,mongos上會記錄這些長達1000ms的慢日誌,存在誤導客戶的可能性。官方已經對相關行為進行了最佳化,細節可參考SERVER-50559。