MongoDB change streams をサブスクライブすることで、データベース内のデータ変更にリアルタイムで対応できます。このトピックでは、MongoDB change streams について説明し、その使用方法とベストプラクティスを解説します。
change stream とは何か?
change stream は、データベースからの変更イベントのリアルタイムストリームを提供します。クライアントはこのストリームをサブスクライブして、データが挿入、更新、または削除されたときに即時通知を受け取ることができます。一般的なシナリオは次のとおりです:
クラスター間のデータ同期: MongoDB クラスター間で増分データをレプリケートします。
監査: データベースやコレクションの削除など、リスクの高い操作を追跡します。
イベント駆動型アーキテクチャ: リアルタイム分析、キャッシュの更新、または通知のために、変更をダウンストリームシステムにプッシュします。
制限事項
インスタンスはレプリカセットインスタンスまたはシャードクラスターインスタンスである必要があります。
change stream の構成
より多くの DDL イベントをリッスンする
前提条件
MongoDB 6.0 以降 (アップグレードガイド)。
手順
mongosh を使用してデータベースに接続します。
watchコマンドを実行し、showExpandedEvents: trueを設定します:// mongo shell または mongosh v1.x cursor = db.getSiblingDB("test").watch([], { showExpandedEvents: true // より多くの DDL イベントのリッスンを有効にする } ); cursor.next();
結果の検証
新しい mongosh ウィンドウで、
db.createCollection("myCollection1")などの変更コマンドを実行します。元の SQL ウィンドウの出力で SQL 実行の詳細を表示できます。

pre-image を有効にする
MongoDB の pre-image は、ドキュメントが変更または削除される前の完全なスナップショットです。変更前のデータの元の値を記録します。
前提条件
MongoDB 6.0 以降 (アップグレードガイド)。
手順
データベースレベルで pre-image を有効にします:
db.adminCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: "off" } // "off" は oplog の保存期間が使用されることを示します } } })コレクションレベルで pre-image を有効にします:
説明データベース内のすべてのコレクションで pre-image を有効にするには、まずデータベースレベルで有効にする必要があります。次に、各コレクションに対して個別にこの機能を有効にする必要があります。
既存のコレクションの変更
db.runCommand({ collMod: "myCollection", changeStreamPreAndPostImages: { enabled: true } })新しいコレクションの作成時に指定
db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})change stream リスナーを作成し、pre-image オプションを指定します:
// ターゲットコレクションにリスナーを作成 cursor = db.getCollection("myCollection").watch([], { fullDocument: 'required', // または 'whenAvailable' fullDocumentBeforeChange: 'required' // または 'whenAvailable' } ) cursor.next();required: サーバーは pre-image または post-image を返す必要があります。そうでない場合、エラーが報告されます。whenAvailable: サーバーはイメージを返そうとしますが、保証はされません。
結果の検証
データベースレベルで pre-image が有効になっているかどうかを確認します:
db.adminCommand( { getClusterParameter: "changeStreamOptions" } )有効な場合、次の出力が返されます:

コレクションの構成を確認します:
db.getCollectionInfos({name: "myCollection"}) // または db.runCommand({listCollections: 1})期待される出力: 返されたドキュメントで、
"options" : { "changeStreamPreAndPostImages" : { "enabled" : true } }のようなフィールドを探します。
別の mongosh ウィンドウで、
myCollection内のドキュメントを更新します。cursorによって返されたイベントを監視します。これには、pre-image であるfullDocumentBeforeChangeフィールドが含まれている必要があります。
詳細については、「Change Streams with Document Pre- and Post-Images」をご参照ください。
post-image を有効にする
MongoDB の post-image は、変更が発生した後のドキュメントの完全なスナップショットです。変更後のドキュメントの完全な内容を記録します。
前提条件
MongoDB 3.6 以降 (アップグレードガイド)。
手順
watch コマンドを実行するときに、fullDocument: 'updateLookup' を設定します。
cursor = db.getSiblingDB("test").myCollection.watch([],
{
fullDocument: 'updateLookup'
}
);
cursor.next();
結果の検証
別の mongosh ウィンドウで、
myCollectionにドキュメントを追加または更新します。cursorによって返されたイベントを監視します。これには、post-image であるfullDocumentフィールドが含まれている必要があります。
返された完全なドキュメントは空であるか、ポイントインタイムのスナップショットではない場合があります。例:
同じドキュメントを短時間で複数回更新した場合、最初の更新の変更イベントは、最終的な更新が完了した後のドキュメントの内容を返すことがあります。
更新直後にドキュメントが削除された場合、対応する変更イベントの `fullDocument` フィールドは、ドキュメントがもはや存在しないため空になります。
詳細については、「Lookup Full Document for Update Operations」をご参照ください。
大規模な変更イベント (> 16 MB) の処理
前提条件
MongoDB 7.0 以降 (アップグレードガイド)。
手順
watch() パイプラインに $changeStreamSplitLargeEvent ステージを含めることができます:
myChangeStreamCursor = db.myCollection.watch(
[ { $changeStreamSplitLargeEvent: {} } ], // 分割ステージを追加
{
fullDocument: "required",
fullDocumentBeforeChange: "required"
}
)結果の検証
非常に大きな配列を含むドキュメントの更新など、16 MB を超える変更イベントを引き起こす操作を実行します。
返されたイベントストリームを監視します。それは複数の連続した
fragmentイベントに分割され、最後のfragmentイベントで終わります。
pre-image のストレージオーバーヘッドを削減する
デフォルトでは、pre-image は oplog とともに有効期限が切れます。より短い有効期限を設定して、ストレージスペースを節約できます:
expireAfterSeconds を非常に短い期間に設定し、ダウンストリームの消費が遅い場合、pre-image が早期に期限切れになるため、ChangeStreamHistoryLost エラーが発生する可能性があります。詳細については、「Change Streams with Document Pre- and Post-Images」をご参照ください。
db.adminCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: { expireAfterSeconds: 100 } // 単位: 秒
}
}
})change stream のベストプラクティス
pre-image と post-image は注意して使用する:
fullDocumentBeforeChange(pre-image) とfullDocument(post-image) を有効にすると、ストレージオーバーヘッド (config.system.preimagesコレクション内) とリクエストのレイテンシが増加します。これらのオプションは、サービスが変更前後の完全なドキュメントコンテンツを必要とする場合にのみ有効にしてください。
シャードクラスターデプロイのキーポイント:
イベントのグローバルな順序を保証するために、常に
mongosインスタンス上に change stream リスナーを作成してください。高い書き込み負荷の下では、
mongosインスタンスが異なるシャードからのイベントをソートしてマージする必要があるため、change stream がボトルネックになる可能性があります。不適切なシャーディングキーなどが原因でシャード間で書き込みが不均衡になると、change stream のレイテンシが大幅に増加する可能性があります。
updateLookupを避ける:updateLookupオプションは、更新イベントごとに個別のfindOneクエリを実行するため、非効率です。シャードクラスターでは、
moveChunk操作がupdateLookupによって引き起こされるレイテンシの問題を悪化させる可能性があります。
change stream の中断を防ぐ:
⚠️ 次のシナリオでは、change stream カーソルが無効 (
operationType: "invalidate") になったり、エラーがスローされたりする可能性があります。ダウンストリームの消費が遅い: 消費速度がイベント生成速度よりも遅いため、
resumeTokenが oplog ウィンドウの外に出てしまいます。無効な
resumeToken: 対応するタイムスタンプが oplog にもはや存在しない古いresumeTokenで再開しようとする。フェールオーバーの影響: フェールオーバー後、新しいプライマリノードの oplog には元の
resumeTokenが含まれていない場合があります。メタデータの変更:
drop、rename、dropDatabaseなどの操作は、invalidateイベントをトリガーする可能性があります。pre-image の有効期限切れ: 短い
expireAfterSeconds設定と遅い消費が組み合わさると、pre-image が失われる可能性があります。
ソリューション:
change stream のレイテンシを監視します。
oplog ウィンドウが十分に大きいことを確認します。
堅牢なエラー処理と回復ロジックを設計します。これには、
invalidateイベントをキャッチし、最後に有効だったresumeTokenを記録し、リスナーを再作成することが含まれます。expireAfterSecondsに妥当な値を設定します。
スコープ選択戦略:
単一の change stream と複数のコレクションレベルの change stream の比較:
単一ストリーム (データベースまたはインスタンスレベル): このアプローチは、oplog からプルするために単一のスレッドを使用するため、リソースのオーバーヘッドが低くなります。ただし、ダウンストリームシステムはイベントを自身でフィルタリングして配布する必要があります。イベント量が多い場合、
mongosインスタンスがボトルネックになる可能性があります。複数のコレクションレベルのストリーム: このアプローチは、サーバーサイドのフィルタリングを使用してネットワークトラフィックを削減し、より良い並行性を提供します。ただし、ストリームが多すぎると
oplogの読み取り競合とリソース消費が増加します。
推奨事項: イベント量やコレクション数など、サービスのワークロードに基づいてテストし、最適なソリューションを選択してください。通常、アクティブなコレクションが少ない場合は個別のストリームを使用します。アクティブでないコレクションが多い場合は、データベースレベルまたはインスタンスレベルのストリームとダウンストリームのフィルタリングを組み合わせて使用します。