データベースの変更にリアルタイムで対応する必要がある場合は、MongoDB の変更ストリームをサブスクライブします。このトピックでは、変更ストリームの概要、使用方法、およびベストプラクティスについて説明します。
変更ストリームとは
変更ストリームは、データベースの変更イベントをリアルタイムのストリームに変換します。クライアントはこのストリームをサブスクライブすることで、データが挿入、更新、または削除されたときに即座に通知を受け取ることができます。代表的なシナリオは次のとおりです。
-
クラスター間のデータ同期: MongoDB クラスター間で増分データレプリケーションを実行します。
-
操作監査: データベースやコレクションの削除など、高リスクな操作を追跡します。
-
イベント駆動型アーキテクチャ: リアルタイム分析、キャッシュ更新、または通知のために、変更を下流システムにプッシュします。
制限事項
サポートされるインスタンスタイプ:レプリカセットインスタンスまたはシャードクラスターインスタンス。
変更ストリームの設定
追加の DDL イベントのリッスン
前提条件
MongoDB 6.0 以降 (アップグレードガイド)。
操作手順
-
showExpandedEvents: trueを指定してwatchコマンドを実行します。// mongo shell または mongosh v1.x cursor = db.getSiblingDB("test").watch([], { showExpandedEvents: true // 追加の DDL イベントのリッスンを有効化 } ); cursor.next();
結果の検証
-
新しい SQL ウィンドウで、
db.createCollection("myCollection1")などの変更文を実行します。 -
元の SQL ウィンドウで、実行された文に関連する出力を確認します。
mg xxx test> cursor = db.getSiblingDB("test").watch([], ... { ... showExpandedEvents: true // 追加の DDL イベントのリッスンを有効化 ... } ... ); ... cursor.next(); Warning: If there are no documents in the batch, next will block. Use tryNext if you want to check if there are any documents without waiting. { _id: { _data: '8268777614000000012B042C0100296E5A100434919C64814940F3A61F9D97A3F60D5C463C6Fxxx03C63726561746500466F70657274696F6E4465737372697074696F6E0046466964496E6465787800461E76002B04466B657990046E5F6964002B02003C6E616D65003C5F69645F000000004' }, operationType: 'create', clusterTime: Timestamp({ t: 1752659476, i: 1 }), collectionUUID: UUID('34919c64-8149-40f3-a61f-9xxx'), wallTime: ISODate('2025-07-16T09:51:16.043Z'), ns: { db: 'test', coll: 'myCollection1' }, operationDescription: { idIndex: { v: 2, key: { _id: 1 }, name: '_id_' } } }
プレイメージの有効化
MongoDB のプレイメージは、ドキュメントが変更または削除される前の完全なスナップショットです。変更が発生する前の元の値を記録します。
前提条件
MongoDB 6.0 以降 (アップグレードガイド)。
操作手順
-
データベースレベルでプレイメージを有効にします。
db.adminCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: "off" } // "off" は oplog の保持期間を使用 } } }) -
コレクションレベルでプレイメージを有効にします。
説明データベース内のすべてのコレクションでプレイメージを有効にするには、まずデータベースレベルで有効にします。その後、そのデータベース内の各コレクションで個別に有効にします。
既存のコレクションの変更
db.runCommand({ collMod: "myCollection", changeStreamPreAndPostImages: { enabled: true } })新規コレクション作成時の指定
db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }}) -
変更ストリームリスナーを作成します (プレイメージオプションを指定)。
// ターゲットコレクションにリスナーを作成 cursor = db.getCollection("myCollection").watch([], { fullDocument: 'required', // または 'whenAvailable' fullDocumentBeforeChange: 'required' // または 'whenAvailable' } ) cursor.next();-
required:サーバーはプレ/ポストイメージを返す必要があります。そうでない場合、エラーが返されます。 -
whenAvailable:サーバーはイメージを返そうとしますが、保証はしません。
-
結果の検証
-
データベースレベルの設定が有効になっているか確認します。
db.adminCommand( { getClusterParameter: "changeStreamOptions" } )成功した場合、コマンドは次のような出力を返します。
test> db.adminCommand( { getClusterParameter: "changeStreamOptions" } ) { clusterParameters: [ { _id: 'changeStreamOptions', clusterParameterTime: Timestamp({ t: 1752655937, i: 1 }), preAndPostImages: { expireAfterSeconds: Long('100') } } ], ok: 1, '$clusterTime': { clusterTime: Timestamp({ t: 1752656717, i: 1 }), signature: { hash: Binary.createFromBase64('xxx=', 0), keyId: Long('xxx') } }, operationTime: Timestamp({ t: 1752656717, i: 1 }) } -
コレクション設定を確認します。
db.getCollectionInfos({name: "myCollection"}) // または db.runCommand({listCollections: 1})期待される出力: 返されたドキュメント内で
"options" : { "changeStreamPreAndPostImages" : { "enabled" : true } }のようなフィールドを見つけます。test> db.getCollectionInfos({name: "myCollection"}) [ { name: 'myCollection', type: 'collection', options: { changeStreamPreAndPostImages: { enabled: true } }, info: { readOnly: false, uuid: UUID('55ed1b7c-7575-4ba3-8afa-xxx') }, idIndex: { v: 2, key: { _id: 1 }, name: '_id_' } } ] -
別の mongosh ウィンドウで、
myCollection内のドキュメントを更新します。 -
cursorによって返されたイベントを確認します。これにはfullDocumentBeforeChangeフィールド (変更前のドキュメント) が含まれているはずです。... cursor.next(); { _id: { _data: '8268786B15000000012B042C0100296E5A100455ED1B7C75754BA38AFA0C5B56A360BC463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046xxx697CDDF6EE279xxx' }, operationType: 'update', clusterTime: Timestamp({ t: 1752722197, i: 1 }), wallTime: ISODate('2025-07-17T03:16:37.626Z'), fullDocument: { _id: ObjectId('6878697cddfxxx'), name: 'test', count: 111 }, ns: { db: 'test', coll: 'myCollection' }, documentKey: { _id: ObjectId('6878697cddfxxx') }, updateDescription: { updatedFields: { count: 111 }, removedFields: [], truncatedArrays: [] }, fullDocumentBeforeChange: { _id: ObjectId('6878697cddf6xxx'), name: 'test' } }
詳細については、「Change Streams with Document Pre- and Post-Images」をご参照ください。
ポストイメージの有効化
MongoDB のポストイメージは、変更が発生した後のドキュメントの完全なスナップショットです。変更後の完全なドキュメントコンテンツを記録します。
前提条件
MongoDB 3.6 以降 (アップグレードガイド)。
操作手順
watch コマンドを実行する際に、fullDocument: 'updateLookup' を設定します。
cursor = db.getSiblingDB("test").myCollection.watch([],
{
fullDocument: 'updateLookup'
}
);
cursor.next();
結果の検証
-
別の mongosh ウィンドウで、
myCollectionにドキュメントを挿入または更新します。 -
cursorによって返されたイベントを確認します。これにはfullDocumentフィールド (変更後のドキュメント) が含まれているはずです。xxx [primary] test> cursor = db.getSiblingDB("test").myCollection.watch([], ... { ... fullDocument: 'updateLookup' ... } ... ); ... cursor.next(); [... { _id: { _data: 'xxx100296E5A10040D6C3FBC28484F08240555576066945463C6F7065726174696F6E54797065003C757064617465500046646F63756D656E744B6579005F6964004B657390046645F696400646881ADE741E7D4B638ED7CA1000004' }, operationType: 'update', clusterTime: Timestamp({ t: 1753329166, i: 1 }), wallTime: ISODate('2025-07-24T03:52:46.966Z'), fullDocument: { _id: ObjectId('xxx8ed7ca1'), name: 'test1', age: 12, count: 2222 }, ns: { db: 'test', coll: 'myCollection' }, documentKey: { _id: ObjectId('xxx7ca1') }, updateDescription: { updatedFields: { count: 2222 }, removedFields: [], truncatedArrays: [] } }]
返される完全なドキュメントは空であるか、正確な特定時点の状態を反映していない可能性があります。例:
-
同じドキュメントが短時間に複数回更新された場合、最初の更新の変更イベントは、最新の更新が完了した後のドキュメント状態を返すことがあります。
-
ドキュメントが更新された直後に削除された場合、変更後のドキュメントが存在しなくなるため、その変更イベントは空の fullDocument フィールドを表示します。
詳細については、「Lookup Full Document for Update Operations」をご参照ください。
非常に大きな変更イベント (>16 MB) の処理
前提条件
MongoDB 7.0 以降 (アップグレードガイド)。
操作手順
watch() コマンドのパイプラインに $changeStreamSplitLargeEvent ステージを含めます。
myChangeStreamCursor = db.myCollection.watch(
[ { $changeStreamSplitLargeEvent: {} } ], // 分割ステージを追加
{
fullDocument: "required",
fullDocumentBeforeChange: "required"
}
)
結果の検証
-
16 MB を超える変更イベントを生成する操作 (非常に大きな配列を含むドキュメントの更新など) を実行します。
-
返されるイベントストリームが、複数の連続した
fragmentイベントに分割され、最後のfragmentイベントで終わることを確認します。
プレイメージのストレージオーバーヘッドの削減
デフォルトでは、プレイメージは oplog と共に有効期限が切れます。より短い有効期限を設定して、スペースを節約します。
expireAfterSeconds を短く設定しすぎると、下流のコンシューマーが追いつけない場合に ChangeStreamHistoryLost エラーが発生する可能性があります (プレイメージが早期に期限切れになるため)。詳細については、「Change Streams with Document Pre- and Post-Images」をご参照ください。
db.adminCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: { expireAfterSeconds: 100 } // 単位:秒
}
}
})
変更ストリームのベストプラクティス
-
プレイメージとポストイメージは慎重に使用する:
-
fullDocumentBeforeChange(プレイメージ) とfullDocument(ポストイメージ) を有効にすると、ストレージのオーバーヘッド (config.system.preimagesコレクション内) とリクエストのレイテンシーが増加します。 -
これらの機能は、アプリケーションが変更前後の完全なドキュメントコンテンツを本当に必要とする場合にのみ有効にしてください。
-
-
シャードクラスターのデプロイにおける主な考慮事項:
-
グローバルなイベント順序を保証するために、変更ストリームリスナーは常に
mongos上に作成してください。 -
高い書き込み負荷の下では、変更ストリームがボトルネックになる可能性があります (
mongosがシャードからのイベントをソートしてマージする必要があるため)。 -
シャード間の書き込みの不均等な分散 (不適切なシャーディングキーによるものなど) は、変更ストリームのレイテンシーを大幅に増加させます。
-
-
updateLookupを避ける:-
updateLookupは、更新イベントごとに個別のfindOneクエリを実行するため、非効率です。 -
シャードクラスターでは、
moveChunk操作がupdateLookupのレイテンシーをさらに悪化させます。
-
-
変更ストリームの中断を防ぐ:
-
⚠️ 次のシナリオでは、変更ストリームカーソルが無効 (
operationType: "invalidate") になったり、エラーが発生したりします。-
下流コンシューマーの遅延: コンシューマーがイベント生成よりも遅くイベントを処理するため、
resumeTokenが oplog ウィンドウから外れます。 -
無効な
resumeToken: タイムスタンプが oplog にもはや存在しない古いresumeTokenを使用する。 -
フェイルオーバーの影響: フェイルオーバー後、新しいプライマリノードの oplog に元の
resumeTokenが含まれていない場合があります。 -
メタデータの変更:
drop、rename、dropDatabaseなどの操作は、invalidateイベントをトリガーする可能性があります。 -
プレイメージの有効期限切れ: 消費が遅い状態で
expireAfterSecondsを短く設定しすぎると、プレイメージが失われます。
-
-
緩和戦略:
-
変更ストリームのレイテンシーを監視する。
-
oplog ウィンドウが十分に大きいことを確認する。
-
堅牢なエラーハンドリングと回復ロジックを実装する (
invalidateイベントをキャッチし、最後の有効なresumeTokenを記録し、リスナーを再作成する)。 -
合理的な
expireAfterSeconds値を設定する。
-
-
-
スコープ選択戦略:
-
単一の変更ストリーム vs. 複数のコレクションレベルの変更ストリーム:
-
単一ストリーム (データベース/インスタンスレベル): リソースのオーバーヘッドが低い (シングルスレッドの oplog フェッチ) が、下流でのフィルタリングとディスパッチが必要です。イベントボリュームが大きい場合、
mongosがボトルネックになる可能性があります。 -
複数のコレクションレベルのストリーム: サーバーサイドフィルタリングを活用してネットワークトラフィックを削減し、より良い同時実行性を提供できます。ただし、ストリームが多すぎると
oplogの読み取り競合とリソース消費が増加します。
-
-
推奨事項: ワークロード (イベントボリューム、コレクション数) に基づいてテストし、最適なアプローチを選択してください。通常、非常にアクティブな少数のコレクションには専用のストリームを使用し、アクティビティの低い多数のコレクションには下流フィルタリングを備えたデータベース/インスタンスレベルのストリームを使用します。
-
よくある質問
1. 変更ストリームのスローログに常に COLLSCAN が表示されるのはなぜですか?
この動作は正常であり、想定内のものです。最適化は必要ありません。 変更ストリームカーソルは最終的に local.oplog.rs (インスタンスのすべての変更を記録する唯一の場所) から読み取ります。このコレクションにはインデックスがないため、常に COLLSCAN を実行します。これは避けられず、最適化の余地はありません。
最適化するクエリを検索する際に COLLSCAN をキーワードとしてスローログをフィルタリングする場合は、$changeStream キーワードも除外してください。
変更ストリーム関連のスローログを調査するのは、パフォーマンスの問題 (下流の消費におけるレイテンシーの増加など) が発生した場合のみにしてください。
2. シャードインスタンスで、非シャードコレクションをリッスンしているのに、プライマリシャード以外のシャードで変更ストリームカーソルが表示されるのはなぜですか?
これは、将来の shardCollection 操作に備えるためです。 アクティブなリスナーを持つ非シャードコレクションは、いつでもシャードコレクションに変換でき、そのデータはすべてのシャードに分散されます (movePrimary に似ています)。他のシャードにあらかじめ変更ストリームカーソルを作成しておくことで、このシナリオに対応します。通常、他のシャード上のこれらのカーソルは変更イベントを返さず、パフォーマンスのオーバーヘッドは最小限です。
同様に、将来の addShard/removeShard 操作に対応するため、シャードインスタンスにリスナーを設定すると、mongos は Config Server にも対応するカーソルを作成します。
3. カーソル作成時に readPreference:secondary を指定したにもかかわらず、変更ストリームカーソルのスローログがプライマリノードに表示されるのはなぜですか?
カーソルは作成後に特定のノードに「ピン留め」され、ノードの役割が変更されても自動的に移行しません。 変更ストリームカーソルが作成されると、特定の mongod ノード (プライマリまたはセカンダリ) に固定され、getMore を介して消費を続けます。プライマリ/セカンダリのスイッチオーバー、インスタンスのサイズ変更、または移行などのイベントの後、元々セカンダリにあったカーソルがプライマリに配置されることがあります。
この負荷がプライマリノードに持続することを望まない場合は、killCursors を使用して関連する変更ストリームカーソルをクリーンアップしてください。その後、下流のコンシューマーロジックは resumeToken と readPreference を使用してカーソルをセカンダリノードに復元できます。この操作はイベントの損失や中断を引き起こしません。
db.runCommand(
{
killCursors: <collection>,
cursors: [ <cursor id1>, ... ], comment: <any>
}
)
db.getSiblingDB("<testDB>").runCommand( { killCursors: "<testColl>", cursors: [NumberLong("2452840976689696187") ] } )
4. Mongos で 1000 ms 続く変更ストリームに関するスローログが多数表示されるのはなぜですか?
2020-08-26T04:34:45.045+0000 I COMMAND [conn21283] command altconfig-b2b-perf.oplog command: getMore \{ getMore: 3513599116181216748, collection: "oplog", $db: "altconfig-b2b-perf", $clusterTime: { clusterTime: Timestamp(1598416483, 1), signature: { hash: BinData(0, EC3841EB1FB7A34F897688BB5983E32E2ADF6763), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } originatingCommand: \{ find: "oplog", filter: { timestamp: { $gte: new Date(1597895390009) } }, tailable: true, awaitData: true, $db: "altconfig-b2b-perf", $clusterTime: \{ clusterTime: Timestamp(1597895487, 1), signature: { hash: BinData(0, AAAE33D5688F935C70469CBDB8EB1F6882749A40), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } nShards:1 cursorid:3513599116181216748 numYields:0 nreturned:0 reslen:237 protocol:op_msg 1000ms
これは MongoDB 6.0 より前のバージョンでは正常な動作です。パフォーマンスのボトルネックではなく、待機タイムアウトを示しています。 シャードクラスターアーキテクチャでは、mongos は tailable:true、awaitData:true、および maxTimeMS:1000 を使用してシャード上に変更ストリームカーソルを作成し、1000 ms 以内にできるだけ多くの変更イベントを返します。
6.0 より前のメジャーバージョンでは、mongos はこれらの 1000 ms のスローログを記録し、ユーザーを誤解させる可能性があります。MongoDB はこの動作を最適化しました。詳細については、SERVER-50559 をご参照ください。