MongoDB チェンジストリームをサブスクライブすることで、データベース内のデータ変更にリアルタイムで対応できます。このトピックでは、MongoDB チェンジストリームの概要、使用方法、およびベストプラクティスについて説明します。
チェンジストリームとは
チェンジストリームは、データベースから変更イベントのリアルタイムストリームを提供します。クライアントはこのストリームをサブスクライブし、データが挿入・更新・削除された際に即座に通知を受け取ることができます。適用シナリオには以下のようなものがあります。
クラスター間データ同期: MongoDB クラスター間で増分データをレプリケートします。
監査: データベースやコレクションの削除など、高リスク操作を追跡します。
イベント駆動型アーキテクチャ: 変更内容をダウンストリームシステムにプッシュし、リアルタイム分析、キャッシュ更新、通知などを実行します。
制限事項
インスタンスは、レプリカセットインスタンスまたはシャードクラスターインスタンスである必要があります。
チェンジストリームの構成
より多くの DDL イベントをリッスンする
前提条件
MongoDB 6.0 以降(アップグレードガイド)。
操作手順
mongosh を使用してデータベースに接続します。
watchコマンドを実行し、showExpandedEvents: trueを設定します。// mongo shell or mongosh v1.x cursor = db.getSiblingDB("test").watch([], { showExpandedEvents: true // より多くの DDL イベントのリッスンを有効化 } ); cursor.next();
結果の確認
新しい mongosh ウィンドウで、
db.createCollection("myCollection1")などの変更コマンドを実行します。元の SQL ウィンドウの出力(Outputs)に SQL 実行の詳細が表示されます。

プリイメージの有効化
MongoDB のプリイメージとは、ドキュメントが変更または削除される前の完全なスナップショットです。これは、変更前のデータの元の値を記録します。
前提条件
MongoDB 6.0 以降(アップグレードガイド)。
操作手順
データベースレベルでプリイメージを有効化します。
db.adminCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: "off" } // "off" は oplog 保存期間を使用することを示します } } })コレクションレベルでプリイメージを有効化します。
説明データベース内のすべてのコレクションでプリイメージを有効化するには、まずデータベースレベルで有効化する必要があります。その後、各コレクションに対して個別にこの機能を有効化する必要があります。
既存のコレクションを変更する
db.runCommand({ collMod: "myCollection", changeStreamPreAndPostImages: { enabled: true } })新規コレクション作成時に指定する
db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})チェンジストリームリスナーを作成し、プリイメージオプションを指定します。
// 対象コレクションでリスナーを作成 cursor = db.getCollection("myCollection").watch([], { fullDocument: 'required', // または 'whenAvailable' fullDocumentBeforeChange: 'required' // または 'whenAvailable' } ) cursor.next();required:サーバーは必ずプリイメージまたはポストイメージを返す必要があります。そうでない場合はエラーが報告されます。whenAvailable:サーバーはイメージの返却を試みますが、保証されません。
結果の確認
データベースディメンションのスイッチが有効になっているか確認します。
db.adminCommand( { getClusterParameter: "changeStreamOptions" } )有効化されている場合、以下の出力が返されます。

コレクション構成を確認します。
db.getCollectionInfos({name: "myCollection"}) // または db.runCommand({listCollections: 1})期待される出力:返されたドキュメント内で、
"options" : { "changeStreamPreAndPostImages" : { "enabled" : true } }に類似したフィールドを探します。
別の mongosh ウィンドウで、
myCollection内のドキュメントを更新します。cursorによって返されたイベントを観察します。fullDocumentBeforeChangeフィールド(プリイメージ)が含まれているはずです。
詳細については、「Change Streams with Document Pre- and Post-Images」をご参照ください。
ポストイメージの有効化
MongoDB のポストイメージとは、変更が発生した後のドキュメントの完全なスナップショットです。これは、変更後のドキュメントの完全な内容を記録します。
前提条件
MongoDB 3.6 以降(アップグレードガイド)。
操作手順
fullDocument: 'updateLookup' を設定して watch コマンドを実行します。
cursor = db.getSiblingDB("test").myCollection.watch([],
{
fullDocument: 'updateLookup'
}
);
cursor.next();
結果の確認
別の mongosh ウィンドウで、
myCollectionにドキュメントを追加または更新します。cursorによって返されたイベントを観察します。fullDocumentフィールド(ポストイメージ)が含まれているはずです。
返されたフルドキュメントは空である場合や、時点のスナップショットでない場合があります。例として以下が挙げられます。
同一ドキュメントを短時間で複数回更新した場合、最初の更新に対応するチェンジイベントが、最終更新完了後のドキュメント内容を返す可能性があります。
ドキュメントが更新直後に削除された場合、対応するチェンジイベントの fullDocument フィールドは空になります。これは、ドキュメントがすでに存在しないためです。
詳細については、「Lookup Full Document for Update Operations」をご参照ください。
大規模なチェンジイベント(> 16 MB)の処理
前提条件
MongoDB 7.0 以降(アップグレードガイド)。
操作手順
watch() パイプラインに $changeStreamSplitLargeEvent ステージを含めます。
myChangeStreamCursor = db.myCollection.watch(
[ { $changeStreamSplitLargeEvent: {} } ], // 分割ステージを追加
{
fullDocument: "required",
fullDocumentBeforeChange: "required"
}
)結果の確認
非常に大きな配列を含むドキュメントを更新するなど、16 MB を超えるチェンジイベントを発生させる操作を実行します。
返されたイベントストリームを観察します。これは複数の連続する
fragmentイベントに分割され、最後のfragmentイベントで終了します。
プリイメージのストレージオーバーヘッドを削減する
デフォルトでは、プリイメージは oplog とともに有効期限切れになります。ストレージスペースを節約するために、より短い有効期限を設定できます。
expireAfterSeconds を非常に短い時間に設定し、かつダウンストリームの消費が遅い場合、プリイメージが早期に有効期限切れとなり、ChangeStreamHistoryLost エラーが発生する可能性があります。詳細については、「Change Streams with Document Pre- and Post-Images」をご参照ください。
db.adminCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: { expireAfterSeconds: 100 } // 単位:秒
}
}
})チェンジストリームのベストプラクティス
プリイメージおよびポストイメージの慎重な使用:
fullDocumentBeforeChange(プリイメージ)およびfullDocument(ポストイメージ)を有効化すると、ストレージオーバーヘッド(config.system.preimagesコレクション内)およびリクエストレイテンシが増加します。これらのオプションは、サービスが変更前後のフルドキュメントコンテンツを必要とする場合にのみ有効化してください。
シャードクラスターデプロイ時の重要ポイント:
イベントのグローバル順序を保証するため、常に
mongosインスタンス上にチェンジストリームリスナーを作成してください。書き込み負荷が高い場合、
mongosインスタンスが異なるシャードからのイベントをソートおよびマージする必要があるため、チェンジストリームがボトルネックになる可能性があります。シャーディングキーの設計が不適切なためにシャード間の書き込みが不均衡になると、チェンジストリームのレイテンシが大幅に増加する可能性があります。
非推奨の
updateLookup:updateLookupオプションは、各更新イベントに対して別途findOneクエリを実行するため、非効率的です。シャードクラスターでは、
moveChunk操作により、updateLookupが引き起こすレイテンシの問題が悪化する可能性があります。
チェンジストリームの中断を防止する:
⚠️ 以下のシナリオにより、チェンジストリームカーソルが無効(
operationType: "invalidate")になったり、エラーがスローされたりする可能性があります。ダウンストリーム消費の遅延: 消費速度がイベント生成速度より遅く、
resumeTokenが oplog ウィンドウ外に落ちる場合。無効な
resumeToken: oplog に該当タイムスタンプがもう存在しない古いresumeTokenを使用して再開を試みる場合。フェールオーバーの影響: フェールオーバー後、新しいプライマリノードの oplog に元の
resumeTokenが含まれていない可能性があります。メタデータの変更:
drop、rename、およびdropDatabaseなどの操作により、invalidateイベントがトリガーされる可能性があります。プリイメージの有効期限切れ:
expireAfterSeconds設定が短く、かつ消費が遅いと、プリイメージが失われる可能性があります。
ソリューション:
チェンジストリームのレイテンシを監視します。
oplog ウィンドウが十分に大きいことを保証します。
堅牢なエラー処理および復旧ロジックを設計します。これには、
invalidateイベントのキャッチ、最新の有効なresumeTokenの記録、およびリスナーの再作成が含まれます。expireAfterSecondsに妥当な値を設定します。
スコープ選択戦略:
単一チェンジストリーム vs 複数のコレクションレベルチェンジストリーム:
単一ストリーム(データベースまたはインスタンスレベル): oplog からプルするための単一スレッドを使用するため、リソースオーバーヘッドが低くなります。ただし、ダウンストリームシステムが自らイベントをフィルタリングおよび分配する必要があります。イベント量が多い場合、
mongosインスタンスがボトルネックになる可能性があります。複数のコレクションレベルストリーム: サーバー側フィルタリングによりネットワークトラフィックを削減し、同時実行性を向上させます。ただし、ストリーム数が多すぎると、
oplog読み取り競合およびリソース消費が増加します。
推奨事項: イベント量やコレクション数を含むサービスワークロードに基づき、最適なソリューションをテストして選択してください。通常、アクティブ度の高い少数のコレクションには個別のストリームを使用し、アクティブ度の低い多数のコレクションにはデータベースレベルまたはインスタンスレベルのストリームを用いてダウンストリームでフィルタリングを行います。
よくある質問
1. チェンジストリームのスロークエリログに常に COLLSCAN と表示されるのはなぜですか?
これは正常かつ予想される動作であり、最適化は不要です。 チェンジストリームカーソルは最終的に local.oplog.rs(インスタンスのすべての変更を記録する唯一のコレクション)上に作成されます。このコレクションにはインデックスが存在しないため、操作は常に COLLSCAN となり、回避または最適化できません。
最適化対象のクエリを検索する際に COLLSCAN をキーワードとしてログをフィルタリングする場合は、$changeStream キーワードも併せてフィルタリングしてください。
ダウンストリーム消費のレイテンシ増加など、パフォーマンスの問題が発生した場合にのみ、チェンジストリーム関連のスロークエリログに注目してください。
2. シャードクラスター上で、アンシャードコレクションに対してリスナーを作成した場合、プライマリシャード以外のシャード上にもチェンジストリームカーソルが観測されるのはなぜですか?
これは、潜在的なシャーディング(shardCollection)操作に対応するためです。 既存のリスナーを持つアンシャードコレクションに対して、いつでも shardCollection 操作を実行し、そのデータをすべてのシャードに分散できます(movePrimary と同様)。他のシャード上に事前にチェンジストリームカーソルを作成しておくことで、このシナリオに備えています。通常、これらの他のシャード上のカーソルは変更イベントを一切返さず、パフォーマンスオーバーヘッドも最小限です。
同様に、シャードクラスター上でリスナーを確立すると、mongos は潜在的な addShard または removeShard 操作を処理するために、Config Server 上にも関連カーソルを作成します。
3. カーソル作成時に readPreference:secondary を指定したにもかかわらず、チェンジストリームカーソルのスロークエリログがプライマリノード上に表示されるのはなぜですか?
カーソル作成後、特定のノードに「ピン留め」され、ノードのロール変更があっても自動的に移動しません。 そのため、一度確立されたチェンジストリームカーソルは特定の mongod ノード(プライマリまたはセカンダリ)上に留まり、getMore を使用してイベントを継続的に消費します。プライマリ/セカンダリのスイッチオーバー、構成変更、移行、またはスイッチオーバーをトリガーする可能性のある他のイベントが発生した場合、元々セカンダリノード上にあったカーソルがプライマリノードに移動する可能性があります。
この負荷がプライマリノード上でリソースを継続的に消費することを望まない場合は、killCursors を使用して関連するチェンジストリームカーソルを閉じてください。ダウンストリームの消費ロジックは、resumeToken および readPreference を使用して、セカンダリノード上にカーソルを再確立します。この操作により、チェンジイベントの損失または中断は発生しません。
db.runCommand(
{
killCursors: <collection>,
cursors: [ <cursor id1>, ... ], comment: <any>
}
)
db.getSiblingDB("<testDB>").runCommand( { killCursors: "<testColl>", cursors: [NumberLong("2452840976689696187") ] } ) 4. mongos 上でチェンジストリームに関する 1000 ms のスロークエリログが多数表示されるのはなぜですか?
2020-08-26T04:34:45.045+0000 I COMMAND [conn21283] command altconfig-b2b-perf.oplog command: getMore \{ getMore: 3513599116181216748, collection: "oplog", $db: "altconfig-b2b-perf", $clusterTime: { clusterTime: Timestamp(1598416483, 1), signature: { hash: BinData(0, EC3841EB1FB7A34F897688BB5983E32E2ADF6763), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } originatingCommand: \{ find: "oplog", filter: { timestamp: { $gte: new Date(1597895390009) } }, tailable: true, awaitData: true, $db: "altconfig-b2b-perf", $clusterTime: \{ clusterTime: Timestamp(1597895487, 1), signature: { hash: BinData(0, AAAE33D5688F935C70469CBDB8EB1F6882749A40), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } nShards:1 cursorid:3513599116181216748 numYields:0 nreturned:0 reslen:237 protocol:op_msg 1000msこれは MongoDB 6.0 より前のバージョンにおける正常な動作です。これはパフォーマンスボトルネックではなく、待機タイムアウトを示しています。 シャードクラスター アーキテクチャでは、mongos からシャードへのチェンジストリームカーソルが tailable:true、awaitData:true、および maxTimeMS:1000 を指定し、1000 ms 以内に可能な限り多くのチェンジイベントを返します。
6.0 より前のメジャーバージョンでは、mongos がこれらの 1000 ms のスロークエリログを記録していましたが、これは誤解を招く可能性がありました。この動作は最適化されています。詳細については、「SERVER-50559」をご参照ください。