当您需要实时响应数据库中的数据变更时,可以通过订阅变更MongoDB变更流(Change Streams)解决。本文将为您介绍MongoDB中变更流(Change Stream)相关的内容、使用方法及最佳实践。
什么是变更流(Change Stream)
变更流将数据库的变更事件转化为实时流。客户端可订阅此流,在数据插入、更新或删除时立即收到通知。典型应用场景包括:
跨集群数据同步: 实现MongoDB集群间的增量数据复制。
操作审计: 跟踪高风险操作(如删除数据库或集合)。
事件驱动架构: 将变更推送到下游系统进行实时分析、缓存更新或通知。
使用限制
实例类型为副本集实例或分片集群实例。
配置变更流
监听更多的DDL事件
前提条件
MongoDB 6.0及以上版本(升级指南)。
操作步骤
使用 mongosh 连接数据库。
执行命令
watch,设置showExpandedEvents: true:// mongo shell or mongosh v1.x cursor = db.getSiblingDB("test").watch([], { showExpandedEvents: true // 启用更多DDL 事件监听 } ); cursor.next();
结果验证
在新的SQL窗口执行变更SQL,如
db.createCollection("myCollection1")。观察原SQL窗口输出上述执行SQL相关信息。

开启前镜像(Pre-image)
MongoDB的pre-image是指文档在修改或删除前的完整状态快照,用于记录数据变更前的原始值。
前提条件
MongoDB 6.0及以上版本(升级指南)。
操作步骤
开启数据库维度前镜像 :
db.adminCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: "off" } // "off" 表示使用oplog保留时间 } } })开启集合维度前镜像 :
说明若需为某数据库内所有集合启用前镜像,则必须先开启数据库维度前镜像,然后在该库的每个集合上单独启用集合级前镜像功能。
修改现有集合
db.runCommand({ collMod: "myCollection", changeStreamPreAndPostImages: { enabled: true } })创建新集合时指定
db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})建立变更流监听 (指定Pre-image选项):
// 在目标集合上建立监听 cursor = db.getCollection("myCollection").watch([], { fullDocument: 'required', // 或 'whenAvailable' fullDocumentBeforeChange: 'required' // 或 'whenAvailable' } ) cursor.next();required: 服务端必须返回Pre/Post-image,否则报错。whenAvailable: 服务端尽力返回,不保证一定有。
结果验证
检查数据库维度的开关是否已开启:
db.adminCommand( { getClusterParameter: "changeStreamOptions" } )开启成功后输出如下信息:

检查集合配置:
db.getCollectionInfos({name: "myCollection"}) // 或 db.runCommand({listCollections: 1})预期输出:在返回的文档中找到类似
"options" : { "changeStreamPreAndPostImages" : { "enabled" : true } }的字段。
在另一个mongosh窗口更新
myCollection中的文档。观察
cursor返回的事件,应包含fullDocumentBeforeChange字段(变更前文档)。
开启后镜像(Post-image)
MongoDB的Post-image是变更发生后文档的最新完整状态快照,用于记录变更后的完整文档内容。
前提条件
MongoDB 3.6及以上版本(升级指南)。
操作步骤
执行watch命令时设置fullDocument: 'updateLookup'。
cursor = db.getSiblingDB("test").myCollection.watch([],
{
fullDocument: 'updateLookup'
}
);
cursor.next();
结果验证
在另一个mongosh窗口新增或更新
myCollection中的文档。观察
cursor返回的事件,应包含fullDocument字段(变更后文档)。
返回的完整文档可能为空或者不是point-in-time的。比如:
当连续对同一个文档进行多次更新时,第一个update的change events可能返回的是最近update操作完成后的文档内容;
当一个文档更新后立即删除时,其对应的change events中因查不到变更后的文档内容而展示空的fullDocument字段。
处理超大变更事件 (>16MB)
前提条件
MongoDB 7.0及以上版本(升级指南)。
操作步骤
在 watch() 的管道中包含 $changeStreamSplitLargeEvent 阶段:
myChangeStreamCursor = db.myCollection.watch(
[ { $changeStreamSplitLargeEvent: {} } ], // 添加切分阶段
{
fullDocument: "required",
fullDocumentBeforeChange: "required"
}
)结果验证
执行一个会导致变更事件 >16MB 的操作(如更新一个包含超大数组的文档)。
观察返回的事件流会被拆分成多个连续的
fragment事件,最终以一个fragment事件结束。
降低前镜像存储开销
默认Pre-image随oplog过期。可设置更短的过期时间以节省空间:
设置过短的 expireAfterSeconds 且下游消费速度不足时,可能导致 ChangeStreamHistoryLost 错误(因 Pre-image 过早过期),详情可参考Change Streams with Document Pre- and Post-Images。
db.adminCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: { expireAfterSeconds: 100 } // 单位:秒
}
}
})变更流最佳实践
谨慎开启前后镜像:
启用
fullDocumentBeforeChange(Pre-image) 和fullDocument(Post-image) 会增加存储开销 (config.system.preimages表) 和请求延迟。仅在业务确实需要文档变更前后完整内容时开启。
分片集群部署要点:
始终在
mongos上建立变更流监听,以保证全局事件顺序。高写入负载下,变更流可能成为瓶颈(
mongos需排序合并分片事件)。分片间写入不均衡(如设计不佳的分片键)会显著增加变更流延迟。
避免
updateLookup:updateLookup会为每个更新事件执行单独的findOne查询,效率低下。在分片集群中,
moveChunk操作会进一步加剧updateLookup的延迟问题。
防范变更流中断:
⚠️以下场景会导致变更流游标失效 (
operationType: "invalidate") 或错误。下游消费滞后:消费速度慢于事件产生速度,导致
resumeToken超出 oplog 窗口。无效
resumeToken:使用过旧的resumeToken恢复,其对应时间点已不在 oplog 中。故障转移影响:Failover 后新主节点的 oplog 可能不包含原
resumeToken。元数据变更:
drop、rename、dropDatabase操作可能会触发invalidate事件。Pre-image 过期:
expireAfterSeconds设置过短且消费慢导致 Pre-image 丢失。
应对方案:
监控变更流延迟。
确保 oplog 窗口足够大。
设计健壮的错误处理和恢复逻辑(捕获
invalidate事件,记录最后有效resumeToken,重建监听)。设置合理的
expireAfterSeconds。
作用域选择策略:
单一变更流 vs 多个集合级变更流:
单一流 (库/实例级):资源开销较小(单线程拉取 oplog),但需下游自行过滤分发事件。高事件量下
mongos可能成为瓶颈。多个集合级流:可利用服务端过滤减少网络传输,并发性更好。但过多流会增加
oplog读取争抢和资源消耗。
建议: 根据业务负载(事件量、集合数量)进行测试,选择最优方案。通常,对少量高活跃度集合使用独立流,对大量低活跃度集合使用库/实例级流配合下游过滤。
常见问题
1. 为什么 Change Stream 的慢日志总是显示 COLLSCAN?
这是正常且符合预期的行为,无需优化。Change Stream的cursor最终会建立在local.oplog.rs上(只有这一个地方记录了实例所有的修改),而该表没有索引。所以固定是COLLSCAN,无法避免也没有优化空间。
如果您在寻找待优化查询语句时使用了COLLSCAN作为日志过滤的关键字,推荐也额外过滤掉$changeStream关键字。
推荐在遇到changeStream性能问题(比如下游消费拉取存在延迟上涨)时才需要关注Change Stream相关的慢日志。
2. 为什么分片实例上对于非分片表建立监听,也能在除了primary shard之外的其他分片上观察到changeStream cursor?
为了应对可能存在的分片(shardCollection)操作。您可以随时对一个已经建立监听的非分片表执行shardCollection操作来将其数据分布到所有分片上(movePrimary类似)。提前在其他分片上建立Change Stream cursors就是为了应对这种情况。通常这些分片上的cursor并不会返回任何变更事件,性能开销也比较小。
与之类似的,为了应对可能存在的addShard/removeShard操作,分片实例上建立监听时,mongos也会到Config Server上去建立相关的cursor。
3. 为什么建立Change Stream cursor时指定了readPreference:secondary,但这个cursor的慢日志却出现在primary中?
游标(cursor)创建后会“固定”在某个节点上,不会因节点角色变化而自动迁移。因此一旦Change Stream的cursor建立之后,就固定在某个mongod节点(主或从节点)上并持续通过getMore消费。当发生过主从切换、变配、迁移或其他可能导致主从切换的事件时,原本在secondary上的cursor可能会转移到primary上。
如果不期望这部分负载持续在primary节点上消耗资源,则可以通过killCursors的方式来清理相关的changeStream cursor,下游消费逻辑处理会根据resumeToken并结合readPreference将cursor恢复到secondary节点上去,此操作不会导致变更事件的丢失或中断。
db.runCommand(
{
killCursors: <collection>,
cursors: [ <cursor id1>, ... ], comment: <any>
}
)
db.getSiblingDB("<testDB>").runCommand( { killCursors: "<testColl>", cursors: [NumberLong("2452840976689696187") ] } ) 4. 为什么在mongos上看到大量关于Change Stream的长达1000ms的慢日志?
2020-08-26T04:34:45.045+0000 I COMMAND [conn21283] command altconfig-b2b-perf.oplog command: getMore \{ getMore: 3513599116181216748, collection: "oplog", $db: "altconfig-b2b-perf", $clusterTime: { clusterTime: Timestamp(1598416483, 1), signature: { hash: BinData(0, EC3841EB1FB7A34F897688BB5983E32E2ADF6763), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } originatingCommand: \{ find: "oplog", filter: { timestamp: { $gte: new Date(1597895390009) } }, tailable: true, awaitData: true, $db: "altconfig-b2b-perf", $clusterTime: \{ clusterTime: Timestamp(1597895487, 1), signature: { hash: BinData(0, AAAE33D5688F935C70469CBDB8EB1F6882749A40), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } nShards:1 cursorid:3513599116181216748 numYields:0 nreturned:0 reslen:237 protocol:op_msg 1000ms这是旧版本 MongoDB (6.0 之前) 的正常现象,它代表的是等待超时而非性能瓶颈。在分片实例架构中,mongos到分片建立的Change Stream cursors会指定tailable:true、awaitData:true以及maxTimeMS:1000来尽可能在1000ms内返回更多的变更事件。
在6.0之前的大版本中,mongos上会记录这些长达1000ms的慢日志,存在误导客户的可能性。官方已经对相关行为进行了优化,细节可参考SERVER-50559。