すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for MongoDB:MongoDB change streams を使用してリアルタイムでデータ変更をキャプチャする

最終更新日:Nov 09, 2025

MongoDB change streams をサブスクライブすることで、データベース内のデータ変更にリアルタイムで対応できます。このトピックでは、MongoDB change streams について説明し、その使用方法とベストプラクティスを解説します。

change stream とは何か?

change stream は、データベースからの変更イベントのリアルタイムストリームを提供します。クライアントはこのストリームをサブスクライブして、データが挿入、更新、または削除されたときに即時通知を受け取ることができます。一般的なシナリオは次のとおりです:

  • クラスター間のデータ同期: MongoDB クラスター間で増分データをレプリケートします。

  • 監査: データベースやコレクションの削除など、リスクの高い操作を追跡します。

  • イベント駆動型アーキテクチャ: リアルタイム分析、キャッシュの更新、または通知のために、変更をダウンストリームシステムにプッシュします。

バージョン履歴

バージョン

更新内容

MongoDB 3.6

  • 初回リリース。

  • コレクションレベルでのサブスクリプションのみをサポート。

  • 限定された events タイプ。

  • エラー回復をサポート。

  • post-image をサポート。

MongoDB 4.0

  • データベースおよびクラスターレベルでのサブスクリプションをサポート。

  • dropdropDatabase、および rename イベントをサポート。

  • resumeToken のフォーマットが BinData から Hex に変更。

MongoDB 4.2

  • $set$unset などのより多くのパイプラインオペレーターをサポート。

  • リスナーを特定の時点で開始するための startAfter オプションを追加。

  • イベントの _id フィールドを変更すると、change stream が例外をスローするようになります。

  • {readConcern: majority} への依存関係を削除。

MongoDB 5.1

  • 集約フレームワークの一部のステージの実行効率を向上。

  • リソース使用率を向上。

MongoDB 5.3

  • チャンク移行中に孤立したドキュメントへの更新をフィルタリングすることをサポート。

MongoDB 6.0

  • pre-image をサポート。

  • DDL 文 (createcreateIndexesmodifyshardCollection など) をサポート。この機能を使用するには、showExpandedEvents を true に設定する必要があります。詳細については、「Change Events」をご参照ください。

  • 変更イベントに wallTime フィールドを追加。タイムスタンプは、$toDate$tsSeconds$tsIncrement などの複数の変換および表示オペレーターをサポートし、サービスによる消費を簡素化。

MongoDB 7.0

  • 大規模な変更イベント (> 16 MB) をサポート。新しい $changeStreamSplitLargeEvent オペレーターを使用して、大規模な変更イベントを分割できます。

  • 変更イベントは refineCollectionShardKey および reshardCollection イベントをサポート。

MongoDB 8.0

  • $queryStats コマンドが change streams に関連するメトリックを強化。

  • movePrimary コマンドは、開いている change stream を持つコレクションに対して無効なイベントを引き起こさなくなりました。これは、change stream が movePrimary コマンドによって引き起こされるデータ移行を継続的に処理できるようになったことを意味します。

制限事項

インスタンスはレプリカセットインスタンスまたはシャードクラスターインスタンスである必要があります。

change stream の構成

より多くの DDL イベントをリッスンする

前提条件

MongoDB 6.0 以降 (アップグレードガイド)。

手順

  1. mongosh を使用してデータベースに接続します。

  2. watch コマンドを実行し、showExpandedEvents: true を設定します:

    // mongo shell または mongosh v1.x
    cursor = db.getSiblingDB("test").watch([],
      {
        showExpandedEvents: true       // より多くの DDL イベントのリッスンを有効にする
      }
    );
    cursor.next();

結果の検証

  1. 新しい mongosh ウィンドウで、db.createCollection("myCollection1") などの変更コマンドを実行します。

  2. 元の SQL ウィンドウの出力で SQL 実行の詳細を表示できます。

    image

pre-image を有効にする

MongoDB の pre-image は、ドキュメントが変更または削除される前の完全なスナップショットです。変更前のデータの元の値を記録します。

前提条件

MongoDB 6.0 以降 (アップグレードガイド)。

手順

  1. データベースレベルで pre-image を有効にします:

    db.adminCommand({
      setClusterParameter: {
        changeStreamOptions: {
          preAndPostImages: { expireAfterSeconds: "off" } // "off" は oplog の保存期間が使用されることを示します
        }
      }
    })
  2. コレクションレベルで pre-image を有効にします:

    説明

    データベース内のすべてのコレクションで pre-image を有効にするには、まずデータベースレベルで有効にする必要があります。次に、各コレクションに対して個別にこの機能を有効にする必要があります。

    既存のコレクションの変更

    db.runCommand({
      collMod: "myCollection",
      changeStreamPreAndPostImages: { enabled: true }
    })

    新しいコレクションの作成時に指定

    db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})
  3. change stream リスナーを作成し、pre-image オプションを指定します:

    // ターゲットコレクションにリスナーを作成
    cursor = db.getCollection("myCollection").watch([],
      {
        fullDocument: 'required', // または 'whenAvailable'
        fullDocumentBeforeChange: 'required' // または 'whenAvailable'
      }
    )
    cursor.next();
    • required: サーバーは pre-image または post-image を返す必要があります。そうでない場合、エラーが報告されます。

    • whenAvailable: サーバーはイメージを返そうとしますが、保証はされません。

結果の検証

  1. データベースレベルで pre-image が有効になっているかどうかを確認します:

    db.adminCommand( { getClusterParameter: "changeStreamOptions" } )

    有効な場合、次の出力が返されます:image

  2. コレクションの構成を確認します:

    db.getCollectionInfos({name: "myCollection"}) // または db.runCommand({listCollections: 1})

    期待される出力: 返されたドキュメントで、"options" : { "changeStreamPreAndPostImages" : { "enabled" : true } } のようなフィールドを探します。

    image

  3. 別の mongosh ウィンドウで、myCollection 内のドキュメントを更新します。

  4. cursor によって返されたイベントを監視します。これには、pre-image である fullDocumentBeforeChange フィールドが含まれている必要があります。

    image

詳細については、「Change Streams with Document Pre- and Post-Images」をご参照ください。

post-image を有効にする

MongoDB の post-image は、変更が発生した後のドキュメントの完全なスナップショットです。変更後のドキュメントの完全な内容を記録します。

前提条件

MongoDB 3.6 以降 (アップグレードガイド)。

手順

watch コマンドを実行するときに、fullDocument: 'updateLookup' を設定します。

cursor = db.getSiblingDB("test").myCollection.watch([], 
  {
    fullDocument: 'updateLookup'
  }
);
cursor.next();

結果の検証

  1. 別の mongosh ウィンドウで、myCollection にドキュメントを追加または更新します。

  2. cursor によって返されたイベントを監視します。これには、post-image である fullDocument フィールドが含まれている必要があります。image

説明

返された完全なドキュメントは空であるか、ポイントインタイムのスナップショットではない場合があります。例:

  • 同じドキュメントを短時間で複数回更新した場合、最初の更新の変更イベントは、最終的な更新が完了した後のドキュメントの内容を返すことがあります。

  • 更新直後にドキュメントが削除された場合、対応する変更イベントの `fullDocument` フィールドは、ドキュメントがもはや存在しないため空になります。

詳細については、「Lookup Full Document for Update Operations」をご参照ください。

大規模な変更イベント (> 16 MB) の処理

前提条件

MongoDB 7.0 以降 (アップグレードガイド)。

手順

watch() パイプラインに $changeStreamSplitLargeEvent ステージを含めることができます:

myChangeStreamCursor = db.myCollection.watch(
  [ { $changeStreamSplitLargeEvent: {} } ], // 分割ステージを追加
  {
    fullDocument: "required",
    fullDocumentBeforeChange: "required"
  }
)

結果の検証

  • 非常に大きな配列を含むドキュメントの更新など、16 MB を超える変更イベントを引き起こす操作を実行します。

  • 返されたイベントストリームを監視します。それは複数の連続した fragment イベントに分割され、最後の fragment イベントで終わります。

pre-image のストレージオーバーヘッドを削減する

デフォルトでは、pre-image は oplog とともに有効期限が切れます。より短い有効期限を設定して、ストレージスペースを節約できます:

警告

expireAfterSeconds を非常に短い期間に設定し、ダウンストリームの消費が遅い場合、pre-image が早期に期限切れになるため、ChangeStreamHistoryLost エラーが発生する可能性があります。詳細については、「Change Streams with Document Pre- and Post-Images」をご参照ください。

db.adminCommand({
  setClusterParameter: {
    changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 } // 単位: 秒
    }
  }
})

change stream のベストプラクティス

  1. pre-image と post-image は注意して使用する:

    • fullDocumentBeforeChange (pre-image) と fullDocument (post-image) を有効にすると、ストレージオーバーヘッド (config.system.preimages コレクション内) とリクエストのレイテンシが増加します。

    • これらのオプションは、サービスが変更前後の完全なドキュメントコンテンツを必要とする場合にのみ有効にしてください。

  2. シャードクラスターデプロイのキーポイント:

    • イベントのグローバルな順序を保証するために、常に mongos インスタンス上に change stream リスナーを作成してください。

    • 高い書き込み負荷の下では、mongos インスタンスが異なるシャードからのイベントをソートしてマージする必要があるため、change stream がボトルネックになる可能性があります。

    • 不適切なシャーディングキーなどが原因でシャード間で書き込みが不均衡になると、change stream のレイテンシが大幅に増加する可能性があります。

  3. updateLookup を避ける:

    • updateLookup オプションは、更新イベントごとに個別の findOne クエリを実行するため、非効率です。

    • シャードクラスターでは、moveChunk 操作が updateLookup によって引き起こされるレイテンシの問題を悪化させる可能性があります。

  4. change stream の中断を防ぐ:

    • ⚠️ 次のシナリオでは、change stream カーソルが無効 (operationType: "invalidate") になったり、エラーがスローされたりする可能性があります。

      • ダウンストリームの消費が遅い: 消費速度がイベント生成速度よりも遅いため、resumeToken が oplog ウィンドウの外に出てしまいます。

      • 無効な resumeToken: 対応するタイムスタンプが oplog にもはや存在しない古い resumeToken で再開しようとする。

      • フェールオーバーの影響: フェールオーバー後、新しいプライマリノードの oplog には元の resumeToken が含まれていない場合があります。

      • メタデータの変更: droprenamedropDatabase などの操作は、invalidate イベントをトリガーする可能性があります。

      • pre-image の有効期限切れ: 短い expireAfterSeconds 設定と遅い消費が組み合わさると、pre-image が失われる可能性があります。

    • ソリューション:

      • change stream のレイテンシを監視します。

      • oplog ウィンドウが十分に大きいことを確認します。

      • 堅牢なエラー処理と回復ロジックを設計します。これには、invalidate イベントをキャッチし、最後に有効だった resumeToken を記録し、リスナーを再作成することが含まれます。

      • expireAfterSeconds に妥当な値を設定します。

  5. スコープ選択戦略:

    • 単一の change stream と複数のコレクションレベルの change stream の比較:

      • 単一ストリーム (データベースまたはインスタンスレベル): このアプローチは、oplog からプルするために単一のスレッドを使用するため、リソースのオーバーヘッドが低くなります。ただし、ダウンストリームシステムはイベントを自身でフィルタリングして配布する必要があります。イベント量が多い場合、mongos インスタンスがボトルネックになる可能性があります。

      • 複数のコレクションレベルのストリーム: このアプローチは、サーバーサイドのフィルタリングを使用してネットワークトラフィックを削減し、より良い並行性を提供します。ただし、ストリームが多すぎると oplog の読み取り競合とリソース消費が増加します。

    • 推奨事項: イベント量やコレクション数など、サービスのワークロードに基づいてテストし、最適なソリューションを選択してください。通常、アクティブなコレクションが少ない場合は個別のストリームを使用します。アクティブでないコレクションが多い場合は、データベースレベルまたはインスタンスレベルのストリームとダウンストリームのフィルタリングを組み合わせて使用します。