全部产品
Search
文档中心

云数据库 MongoDB:使用MongoDB变更流(Change Stream)实时捕获数据变更

更新时间:Mar 09, 2026

当您需要实时响应数据库中的数据变更时,可以通过订阅变更MongoDB变更流(Change Streams)解决。本文将为您介绍MongoDB中变更流(Change Stream)相关的内容、使用方法及最佳实践。

什么是变更流(Change Stream)

变更流将数据库的变更事件转化为实时流。客户端可订阅此流,在数据插入、更新或删除时立即收到通知。典型应用场景包括:

  • 跨集群数据同步: 实现MongoDB集群间的增量数据复制。

  • 操作审计: 跟踪高风险操作(如删除数据库或集合)。

  • 事件驱动架构: 将变更推送到下游系统进行实时分析、缓存更新或通知。

版本演进历史

版本

更新说明

MongoDB 3.6

  • 首次发布。

  • 仅支持订阅集合(Collection)维度。

  • events类型有限。

  • 支持故障恢复。

  • 支持查看变更后的视图(Post-image)。

MongoDB 4.0

  • 支持订阅库(Database)以及集群(Cluster)维度。

  • 支持dropdropDatabaserename事件。

  • resumeToken格式从BinData变更为Hex

MongoDB 4.2

  • 支持$set$unset等更多pipeline操作符。

  • 新增startAfter选项,用于按时间点启动监听功能。

  • 修改事件的_id字段,变更流将抛出异常。

  • 移除对{readConcern: majority}的依赖。

MongoDB 5.1

  • 提升部分聚合框架下stage的执行效率。

  • 提升资源利用效率。

MongoDB 5.3

  • 支持在Chunk迁移期间过滤对孤立文档的更新。

MongoDB 6.0

  • 支持查看变更前的视图(Pre-image)。

  • 支持createcreateIndexesmodifyshardCollection等DDL语句,需要指定showExpandedEvents:true来支持;更多信息,请参见Change Events

  • Change Events新增wallTime字段,时间戳支持多种转换和展示算子(包括$toDate$tsSeconds$tsIncrement)以方便业务消费。

MongoDB 7.0

  • 支持超大变更事件(>16MB),通过全新的$changeStreamSplitLargeEvent算子来将超大变更事件进行切分。

  • change events支持refineCollectionShardKey以及reshardCollection事件。

MongoDB 8.0

  • $queryStats命令增强了跟change stream相关的指标。

  • movePrimary命令不再会使得建立了change stream的表产生非法事件了,即change stream可以正常连续处理movePrimary命令引起的数据迁移了。

使用限制

实例类型为副本集实例或分片集群实例。

配置变更流

监听更多的DDL事件

前提条件

MongoDB 6.0及以上版本(升级指南)。

操作步骤

  1. 使用 mongosh 连接数据库

  2. 执行命令watch,设置 showExpandedEvents: true

    // mongo shell or mongosh v1.x
    cursor = db.getSiblingDB("test").watch([],
      {
        showExpandedEvents: true       // 启用更多DDL 事件监听
      }
    );
    cursor.next();

结果验证

  1. 在新的SQL窗口执行变更SQL,如db.createCollection("myCollection1")

  2. 观察原SQL窗口输出上述执行SQL相关信息。

    image

开启前镜像(Pre-image)

MongoDB的pre-image是指文档在修改或删除前的完整状态快照,用于记录数据变更前的原始值。

前提条件

MongoDB 6.0及以上版本(升级指南)。

操作步骤

  1. 开启数据库维度前镜像 :

    db.adminCommand({
      setClusterParameter: {
        changeStreamOptions: {
          preAndPostImages: { expireAfterSeconds: "off" } // "off" 表示使用oplog保留时间
        }
      }
    })
  2. 开启集合维度前镜像 :

    说明

    若需为某数据库内所有集合启用前镜像,则必须先开启数据库维度前镜像,然后在该库的每个集合上单独启用集合级前镜像功能。

    修改现有集合

    db.runCommand({
      collMod: "myCollection",
      changeStreamPreAndPostImages: { enabled: true }
    })

    创建新集合时指定

    db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})
  3. 建立变更流监听 (指定Pre-image选项):

    // 在目标集合上建立监听
    cursor = db.getCollection("myCollection").watch([],
      {
        fullDocument: 'required', // 或 'whenAvailable'
        fullDocumentBeforeChange: 'required' // 或 'whenAvailable'
      }
    )
    cursor.next();
    • required: 服务端必须返回Pre/Post-image,否则报错。

    • whenAvailable: 服务端尽力返回,不保证一定有。

结果验证

  1. 检查数据库维度的开关是否已开启:

    db.adminCommand( { getClusterParameter: "changeStreamOptions" } )

    开启成功后输出如下信息:image

  2. 检查集合配置:

    db.getCollectionInfos({name: "myCollection"}) // 或 db.runCommand({listCollections: 1})

    预期输出:在返回的文档中找到类似 "options" : { "changeStreamPreAndPostImages" : { "enabled" : true } } 的字段。 

    image

  3. 在另一个mongosh窗口更新 myCollection 中的文档。

  4. 观察 cursor 返回的事件,应包含 fullDocumentBeforeChange 字段(变更前文档)。

    image

更多信息参考Change Streams with Document Pre- and Post-Images

开启后镜像(Post-image)

MongoDB的Post-image是变更发生后文档的最新完整状态快照,用于记录变更后的完整文档内容。

前提条件

MongoDB 3.6及以上版本(升级指南)。

操作步骤

执行watch命令时设置fullDocument: 'updateLookup'

cursor = db.getSiblingDB("test").myCollection.watch([], 
  {
    fullDocument: 'updateLookup'
  }
);
cursor.next();

结果验证

  1. 在另一个mongosh窗口新增或更新 myCollection 中的文档。

  2. 观察 cursor 返回的事件,应包含 fullDocument 字段(变更后文档)。image

说明

返回的完整文档可能为空或者不是point-in-time的。比如:

  • 当连续对同一个文档进行多次更新时,第一个update的change events可能返回的是最近update操作完成后的文档内容;

  • 当一个文档更新后立即删除时,其对应的change events中因查不到变更后的文档内容而展示空的fullDocument字段。

更多信息参考Lookup Full Document for Update Operations

处理超大变更事件 (>16MB)

前提条件

MongoDB 7.0及以上版本(升级指南)。

操作步骤 

在 watch() 的管道中包含 $changeStreamSplitLargeEvent 阶段:

myChangeStreamCursor = db.myCollection.watch(
  [ { $changeStreamSplitLargeEvent: {} } ], // 添加切分阶段
  {
    fullDocument: "required",
    fullDocumentBeforeChange: "required"
  }
)

结果验证

  • 执行一个会导致变更事件 >16MB 的操作(如更新一个包含超大数组的文档)。

  • 观察返回的事件流会被拆分成多个连续的fragment事件,最终以一个fragment事件结束。

降低前镜像存储开销

默认Pre-image随oplog过期。可设置更短的过期时间以节省空间:

警告

设置过短的 expireAfterSeconds 且下游消费速度不足时,可能导致 ChangeStreamHistoryLost 错误(因 Pre-image 过早过期),详情可参考Change Streams with Document Pre- and Post-Images

db.adminCommand({
  setClusterParameter: {
    changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 } // 单位:秒
    }
  }
})

变更流最佳实践 

  1. 谨慎开启前后镜像:

    • 启用 fullDocumentBeforeChange (Pre-image) 和 fullDocument (Post-image) 会增加存储开销 (config.system.preimages 表) 和请求延迟。

    • 仅在业务确实需要文档变更前后完整内容时开启。

  2. 分片集群部署要点:

    • 始终在 mongos 上建立变更流监听,以保证全局事件顺序。

    • 高写入负载下,变更流可能成为瓶颈(mongos 需排序合并分片事件)。

    • 分片间写入不均衡(如设计不佳的分片键)会显著增加变更流延迟。

  3. 避免 updateLookup

    • updateLookup会为每个更新事件执行单独的findOne查询,效率低下。

    • 在分片集群中,moveChunk操作会进一步加剧updateLookup的延迟问题。

  4. 防范变更流中断:

    • ⚠️以下场景会导致变更流游标失效 (operationType: "invalidate") 或错误。

      • 下游消费滞后:消费速度慢于事件产生速度,导致resumeToken超出 oplog 窗口。

      • 无效 resumeToken使用过旧的resumeToken恢复,其对应时间点已不在 oplog 中。

      • 故障转移影响:Failover 后新主节点的 oplog 可能不包含原resumeToken

      • 元数据变更:droprenamedropDatabase操作可能会触发invalidate事件。

      • Pre-image 过期:expireAfterSeconds 设置过短且消费慢导致 Pre-image 丢失。

    • 应对方案:

      • 监控变更流延迟。

      • 确保 oplog 窗口足够大。

      • 设计健壮的错误处理和恢复逻辑(捕获invalidate事件,记录最后有效resumeToken,重建监听)。

      • 设置合理的expireAfterSeconds

  5. 作用域选择策略:

    • 单一变更流 vs 多个集合级变更流:

      • 单一流 (库/实例级):资源开销较小(单线程拉取 oplog),但需下游自行过滤分发事件。高事件量下 mongos 可能成为瓶颈。

      • 多个集合级流:可利用服务端过滤减少网络传输,并发性更好。但过多流会增加 oplog 读取争抢和资源消耗。

    • 建议: 根据业务负载(事件量、集合数量)进行测试,选择最优方案。通常,对少量高活跃度集合使用独立流,对大量低活跃度集合使用库/实例级流配合下游过滤。

常见问题

1. 为什么 Change Stream 的慢日志总是显示 COLLSCAN

这是正常且符合预期的行为,无需优化。Change Stream的cursor最终会建立在local.oplog.rs上(只有这一个地方记录了实例所有的修改),而该表没有索引。所以固定是COLLSCAN,无法避免也没有优化空间。

如果您在寻找待优化查询语句时使用了COLLSCAN作为日志过滤的关键字,推荐也额外过滤掉$changeStream关键字。

推荐在遇到changeStream性能问题(比如下游消费拉取存在延迟上涨)时才需要关注Change Stream相关的慢日志

2. 为什么分片实例上对于非分片表建立监听,也能在除了primary shard之外的其他分片上观察到changeStream cursor?

为了应对可能存在的分片(shardCollection)操作。您可以随时对一个已经建立监听的非分片表执行shardCollection操作来将其数据分布到所有分片上(movePrimary类似)。提前在其他分片上建立Change Stream cursors就是为了应对这种情况。通常这些分片上的cursor并不会返回任何变更事件,性能开销也比较小。

与之类似的,为了应对可能存在的addShard/removeShard操作,分片实例上建立监听时,mongos也会到Config Server上去建立相关的cursor。

3. 为什么建立Change Stream cursor时指定了readPreference:secondary,但这个cursor的慢日志却出现在primary中?

游标(cursor)创建后会“固定”在某个节点上,不会因节点角色变化而自动迁移。因此一旦Change Stream的cursor建立之后,就固定在某个mongod节点(主或从节点)上并持续通过getMore消费。当发生过主从切换、变配、迁移或其他可能导致主从切换的事件时,原本在secondary上的cursor可能会转移到primary上。

如果不期望这部分负载持续在primary节点上消耗资源,则可以通过killCursors的方式来清理相关的changeStream cursor,下游消费逻辑处理会根据resumeToken并结合readPreference将cursor恢复到secondary节点上去,此操作不会导致变更事件的丢失或中断

db.runCommand(
   {
     killCursors: <collection>,
     cursors: [ <cursor id1>, ... ], comment: <any>
   }
)

db.getSiblingDB("<testDB>").runCommand( { killCursors: "<testColl>", cursors: [NumberLong("2452840976689696187") ] } ) 

4. 为什么在mongos上看到大量关于Change Stream的长达1000ms的慢日志?

2020-08-26T04:34:45.045+0000 I COMMAND [conn21283] command altconfig-b2b-perf.oplog command: getMore \{ getMore: 3513599116181216748, collection: "oplog", $db: "altconfig-b2b-perf", $clusterTime: { clusterTime: Timestamp(1598416483, 1), signature: { hash: BinData(0, EC3841EB1FB7A34F897688BB5983E32E2ADF6763), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } originatingCommand: \{ find: "oplog", filter: { timestamp: { $gte: new Date(1597895390009) } }, tailable: true, awaitData: true, $db: "altconfig-b2b-perf", $clusterTime: \{ clusterTime: Timestamp(1597895487, 1), signature: { hash: BinData(0, AAAE33D5688F935C70469CBDB8EB1F6882749A40), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } nShards:1 cursorid:3513599116181216748 numYields:0 nreturned:0 reslen:237 protocol:op_msg 1000ms

这是旧版本 MongoDB (6.0 之前) 的正常现象,它代表的是等待超时而非性能瓶颈。在分片实例架构中,mongos到分片建立的Change Stream cursors会指定tailable:trueawaitData:true以及maxTimeMS:1000来尽可能在1000ms内返回更多的变更事件。

在6.0之前的大版本中,mongos上会记录这些长达1000ms的慢日志,存在误导客户的可能性。官方已经对相关行为进行了优化,细节可参考SERVER-50559