All Products
Search
Document Center

ApsaraDB for MongoDB:Gunakan change stream MongoDB untuk menangkap perubahan data secara real time

Last Updated:Mar 12, 2026

Anda dapat berlangganan ke change stream MongoDB untuk merespons perubahan data di database secara real time. Topik ini menjelaskan change stream MongoDB, cara menggunakannya, serta praktik terbaiknya.

Apa itu change stream?

Change stream menyediakan aliran event perubahan secara real time dari sebuah database. Klien dapat berlangganan ke aliran ini untuk menerima notifikasi segera ketika data dimasukkan, diperbarui, atau dihapus. Skenario umum meliputi hal-hal berikut:

  • Sinkronisasi data cross-cluster: Replikasi data inkremental antar kluster MongoDB.

  • Auditing: Lacak operasi berisiko tinggi, seperti menghapus database atau koleksi.

  • Arsitektur berbasis event: Dorong perubahan ke sistem downstream untuk analisis real time, pembaruan cache, atau notifikasi.

Riwayat versi

Versi

Deskripsi pembaruan

MongoDB 3.6

  • Rilis awal.

  • Hanya mendukung langganan di tingkat koleksi.

  • Jenis event terbatas.

  • Mendukung fault recovery.

  • Mendukung post-image.

MongoDB 4.0

  • Mendukung langganan di tingkat database dan kluster.

  • Mendukung event drop, dropDatabase, dan rename.

  • Format resumeToken berubah dari BinData menjadi Hex.

MongoDB 4.2

  • Mendukung lebih banyak operator pipeline, seperti $set dan $unset.

  • Menambahkan opsi startAfter untuk memulai pendengar pada titik waktu tertentu.

  • Memodifikasi bidang _id suatu event menyebabkan change stream melemparkan exception.

  • Menghapus ketergantungan pada {readConcern: majority}.

MongoDB 5.1

  • Meningkatkan efisiensi eksekusi beberapa tahap dalam framework agregasi.

  • Meningkatkan pemanfaatan resource.

MongoDB 5.3

  • Mendukung penyaringan pembaruan dokumen orphan selama migrasi chunk.

MongoDB 6.0

  • Mendukung pre-images.

  • Mendukung statement DDL seperti create, createIndexes, modify, dan shardCollection. Untuk menggunakan fitur ini, Anda harus mengatur showExpandedEvents ke true. Untuk informasi selengkapnya, lihat Change Events.

  • Event perubahan menambahkan bidang wallTime. Timestamp mendukung beberapa operator konversi dan tampilan, seperti $toDate, $tsSeconds, dan $tsIncrement, untuk menyederhanakan konsumsi oleh layanan.

MongoDB 7.0

  • Mendukung event perubahan besar (> 16 MB). Anda dapat menggunakan operator baru $changeStreamSplitLargeEvent untuk membagi event perubahan besar tersebut.

  • Event perubahan mendukung event refineCollectionShardKey dan reshardCollection.

MongoDB 8.0

  • Perintah $queryStats meningkatkan metrik yang terkait dengan change stream.

  • Perintah movePrimary tidak lagi menyebabkan event tidak valid untuk koleksi yang memiliki change stream terbuka. Artinya, change stream kini dapat terus-menerus memproses migrasi data yang disebabkan oleh perintah movePrimary.

Batasan

Instans harus berupa instans set replika atau instans kluster sharded.

Aliran Perubahan Konfigurasi

Dengarkan lebih banyak event DDL

Prasyarat

MongoDB 6.0 atau versi yang lebih baru (Panduan upgrade).

Prosedur

  1. Sambungkan ke database menggunakan mongosh.

  2. Jalankan perintah watch dan atur showExpandedEvents: true:

    // mongo shell atau mongosh v1.x
    cursor = db.getSiblingDB("test").watch([],
      {
        showExpandedEvents: true       // Aktifkan pendengaran untuk lebih banyak event DDL
      }
    );
    cursor.next();

Verifikasi hasil

  1. Pada jendela mongosh baru, jalankan perintah perubahan, seperti db.createCollection("myCollection1").

  2. Anda dapat melihat detail eksekusi SQL di bagian Outputs pada jendela SQL asli.

    image

Aktifkan pre-images

Pre-image MongoDB adalah snapshot lengkap dokumen sebelum dimodifikasi atau dihapus. Pre-image mencatat nilai asli data sebelum perubahan terjadi.

Prasyarat

MongoDB 6.0 atau versi yang lebih baru (Panduan upgrade).

Prosedur

  1. Aktifkan pre-images di tingkat database:

    db.adminCommand({
      setClusterParameter: {
        changeStreamOptions: {
          preAndPostImages: { expireAfterSeconds: "off" } // "off" menunjukkan bahwa periode retensi oplog digunakan
        }
      }
    })
  2. Aktifkan pre-images di tingkat koleksi:

    Catatan

    Untuk mengaktifkan pre-images pada semua koleksi dalam sebuah database, Anda harus terlebih dahulu mengaktifkannya di tingkat database. Kemudian, Anda harus mengaktifkan fitur tersebut untuk setiap koleksi secara individual.

    Modifikasi koleksi yang sudah ada

    db.runCommand({
      collMod: "myCollection",
      changeStreamPreAndPostImages: { enabled: true }
    })

    Tentukan saat membuat koleksi baru

    db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})
  3. Buat pendengar change stream dan tentukan opsi pre-image:

    // Buat pendengar pada koleksi target
    cursor = db.getCollection("myCollection").watch([],
      {
        fullDocument: 'required', // atau 'whenAvailable'
        fullDocumentBeforeChange: 'required' // atau 'whenAvailable'
      }
    )
    cursor.next();
    • required: Server harus mengembalikan pre-image atau post-image. Jika tidak, error akan dilaporkan.

    • whenAvailable: Server berusaha mengembalikan gambar tersebut, tetapi tidak dijamin.

Verifikasi hasil

  1. Periksa apakah sakelar dimensi database diaktifkan:

    db.adminCommand( { getClusterParameter: "changeStreamOptions" } )

    Jika diaktifkan, output berikut dikembalikan:image

  2. Periksa konfigurasi koleksi:

    db.getCollectionInfos({name: "myCollection"}) // atau db.runCommand({listCollections: 1})

    Output yang diharapkan: Pada dokumen yang dikembalikan, cari bidang serupa dengan "options" : { "changeStreamPreAndPostImages" : { "enabled" : true } }.

    image

  3. Pada jendela mongosh lain, perbarui dokumen di myCollection.

  4. Amati event yang dikembalikan oleh cursor. Event tersebut harus berisi bidang fullDocumentBeforeChange, yaitu pre-image.

    image

Untuk informasi selengkapnya, lihat Change Streams with Document Pre- and Post-Images.

Aktifkan post-images

Post-image MongoDB adalah snapshot lengkap dokumen setelah perubahan terjadi. Post-image mencatat konten lengkap dokumen setelah perubahan.

Prasyarat

MongoDB 3.6 atau versi yang lebih baru (Panduan upgrade).

Prosedur

Saat menjalankan perintah watch, atur fullDocument: 'updateLookup'.

cursor = db.getSiblingDB("test").myCollection.watch([], 
  {
    fullDocument: 'updateLookup'
  }
);
cursor.next();

Verifikasi hasil

  1. Pada jendela mongosh lain, tambahkan atau perbarui dokumen di myCollection.

  2. Amati event yang dikembalikan oleh cursor. Event tersebut harus berisi bidang fullDocument, yaitu post-image.image

Catatan

Dokumen lengkap yang dikembalikan mungkin kosong atau bukan snapshot point-in-time. Misalnya:

  • Jika Anda memperbarui dokumen yang sama beberapa kali secara berurutan dalam waktu singkat, event perubahan untuk pembaruan pertama mungkin mengembalikan konten dokumen setelah pembaruan terakhir selesai.

  • Jika dokumen dihapus segera setelah pembaruan, event perubahan yang sesuai memiliki bidang `fullDocument` kosong karena dokumen tersebut tidak lagi ada.

Untuk informasi selengkapnya, lihat Lookup Full Document for Update Operations.

Atasi event perubahan besar (> 16 MB)

Prasyarat

MongoDB 7.0 atau versi yang lebih baru (Panduan upgrade).

Prosedur

Anda dapat menyertakan tahap $changeStreamSplitLargeEvent dalam pipeline watch():

myChangeStreamCursor = db.myCollection.watch(
  [ { $changeStreamSplitLargeEvent: {} } ], // Tambahkan tahap pemisahan
  {
    fullDocument: "required",
    fullDocumentBeforeChange: "required"
  }
)

Verifikasi hasil

  • Jalankan operasi yang menyebabkan event perubahan lebih besar dari 16 MB, seperti memperbarui dokumen yang berisi array sangat besar.

  • Amati aliran event yang dikembalikan. Aliran tersebut dibagi menjadi beberapa event fragment berurutan, diakhiri dengan event fragment terakhir.

Kurangi overhead storage pre-image

Secara default, pre-image kedaluwarsa bersama oplog. Atur waktu kedaluwarsa yang lebih pendek untuk menghemat ruang storage:

Peringatan

Jika Anda mengatur expireAfterSeconds ke durasi yang sangat singkat dan konsumsi downstream lambat, error ChangeStreamHistoryLost dapat terjadi karena pre-image kedaluwarsa terlalu cepat. Untuk informasi selengkapnya, lihat Change Streams with Document Pre- and Post-Images.

db.adminCommand({
  setClusterParameter: {
    changeStreamOptions: {
      preAndPostImages: { expireAfterSeconds: 100 } // Satuan: detik
    }
  }
})

Praktik terbaik change stream

  1. Gunakan pre-image dan post-image dengan hati-hati:

    • Mengaktifkan fullDocumentBeforeChange (pre-image) dan fullDocument (post-image) meningkatkan overhead storage (di koleksi config.system.preimages) dan latensi permintaan.

    • Aktifkan opsi ini hanya jika layanan Anda memerlukan konten dokumen lengkap sebelum dan sesudah perubahan.

  2. Poin penting untuk penerapan kluster sharded:

    • Selalu buat pendengar change stream pada instans mongos untuk memastikan urutan global event.

    • Di bawah beban write tinggi, change stream dapat menjadi bottleneck karena instans mongos perlu mengurutkan dan menggabungkan event dari shard yang berbeda.

    • Penulisan yang tidak seimbang di seluruh shard, misalnya karena kunci sharding yang dirancang buruk, dapat meningkatkan latensi change stream secara signifikan.

  3. Hindari updateLookup:

    • Opsi updateLookup mengeksekusi kueri findOne terpisah untuk setiap event pembaruan, yang tidak efisien.

    • Pada kluster sharded, operasi moveChunk dapat memperparah masalah latensi yang disebabkan oleh updateLookup.

  4. Mencegah interupsi aliran perubahan:

    • ⚠️ Skenario berikut dapat menyebabkan cursor change stream menjadi tidak valid (operationType: "invalidate") atau melemparkan error.

      • Konsumsi downstream lambat: Kecepatan konsumsi lebih lambat daripada kecepatan pembuatan event, menyebabkan resumeToken keluar dari jendela oplog.

      • resumeToken tidak valid: Mencoba melanjutkan dengan resumeToken lama yang timestamp-nya tidak lagi ada di oplog.

      • Dampak failover: Setelah failover, oplog node primary baru mungkin tidak berisi resumeToken asli.

      • Perubahan metadata: Operasi seperti drop, rename, dan dropDatabase dapat memicu event invalidate.

      • Kedaluwarsa pre-image: Pengaturan expireAfterSeconds yang pendek dikombinasikan dengan konsumsi lambat dapat menyebabkan pre-image hilang.

    • Solusi:

      • Pantau latensi change stream.

      • Pastikan jendela oplog cukup besar.

      • Rancang logika penanganan error dan pemulihan yang kuat. Ini termasuk menangkap event invalidate, mencatat resumeToken terakhir yang valid, dan membuat ulang pendengar.

      • Atur nilai expireAfterSeconds yang wajar.

  5. Strategi pemilihan cakupan:

    • Single change stream vs. multiple collection-level change streams:

      • Single stream (tingkat database atau instans): Pendekatan ini memiliki overhead resource lebih rendah karena menggunakan satu thread untuk menarik dari oplog. Namun, sistem downstream harus menyaring dan mendistribusikan event sendiri. Di bawah volume event tinggi, instans mongos dapat menjadi bottleneck.

      • Multiple collection-level streams: Pendekatan ini menggunakan penyaringan sisi server untuk mengurangi network traffic dan memberikan konkurensi lebih baik. Namun, terlalu banyak stream meningkatkan contention pembacaan oplog dan konsumsi resource.

    • Rekomendasi: Uji dan pilih solusi optimal berdasarkan workload layanan Anda, termasuk volume event dan jumlah koleksi. Biasanya, gunakan stream terpisah untuk beberapa koleksi yang sangat aktif. Untuk banyak koleksi yang kurang aktif, gunakan stream tingkat database atau instans dikombinasikan dengan penyaringan downstream.

FAQ

1. Mengapa log kueri lambat change stream selalu menampilkan COLLSCAN?

Ini adalah perilaku normal dan diharapkan. Tidak diperlukan optimasi. Cursor change stream pada akhirnya dibuat pada local.oplog.rs, yaitu satu-satunya koleksi yang mencatat semua modifikasi instans. Koleksi ini tidak memiliki indeks. Oleh karena itu, operasi selalu berupa COLLSCAN, yang tidak dapat dihindari atau dioptimalkan.

Jika Anda menggunakan COLLSCAN sebagai kata kunci untuk menyaring log saat mencari kueri yang perlu dioptimalkan, saring juga kata kunci $changeStream.

Fokuslah pada log kueri lambat terkait change stream hanya jika Anda mengalami masalah performa, seperti peningkatan latensi dalam konsumsi downstream.

2. Pada kluster sharded, mengapa cursor change stream diamati pada shard selain shard utama ketika pendengar dibuat untuk koleksi unsharded?

Hal ini mengantisipasi potensi operasi sharding (shardCollection). Anda dapat menjalankan operasi shardCollection kapan saja pada koleksi unsharded yang memiliki pendengar yang ada untuk mendistribusikan datanya ke seluruh shard (mirip dengan movePrimary). Membuat cursor change stream pada shard lain sebelumnya mempersiapkan skenario ini. Umumnya, cursor ini pada shard lain tidak mengembalikan event perubahan apa pun dan memiliki overhead performa minimal.

Demikian pula, ketika pendengar dibuat pada kluster sharded, mongos membuat cursor terkait pada Config Server untuk menangani potensi operasi addShard atau removeShard.

3. Mengapa log kueri lambat untuk cursor change stream muncul di node primary, meskipun readPreference:secondary ditentukan saat pembuatan cursor?

Setelah cursor dibuat, cursor tersebut "terikat" ke node tertentu dan tidak secara otomatis bermigrasi jika peran node berubah. Oleh karena itu, begitu cursor change stream dibuat, cursor tersebut tetap berada di node mongod tertentu (primary atau secondary) dan terus-menerus mengonsumsi event menggunakan getMore. Jika terjadi alih bencana primary/secondary, perubahan konfigurasi, migrasi, atau event lain yang dapat memicu alih bencana, cursor yang awalnya berada di node secondary mungkin berpindah ke node primary.

Jika Anda tidak ingin beban ini terus mengonsumsi resource di node primary, gunakan killCursors untuk menutup cursor change stream terkait. Logika konsumsi downstream akan menggunakan resumeToken dan readPreference untuk membuat ulang cursor di node secondary. Operasi ini tidak menyebabkan kehilangan atau gangguan event perubahan.

db.runCommand(
   {
     killCursors: <collection>,
     cursors: [ <cursor id1>, ... ], comment: <any>
   }
)

db.getSiblingDB("<testDB>").runCommand( { killCursors: "<testColl>", cursors: [NumberLong("2452840976689696187") ] } ) 

4. Mengapa saya melihat banyak log kueri lambat 1000 ms untuk change stream di mongos?

2020-08-26T04:34:45.045+0000 I COMMAND [conn21283] command altconfig-b2b-perf.oplog command: getMore \{ getMore: 3513599116181216748, collection: "oplog", $db: "altconfig-b2b-perf", $clusterTime: { clusterTime: Timestamp(1598416483, 1), signature: { hash: BinData(0, EC3841EB1FB7A34F897688BB5983E32E2ADF6763), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } originatingCommand: \{ find: "oplog", filter: { timestamp: { $gte: new Date(1597895390009) } }, tailable: true, awaitData: true, $db: "altconfig-b2b-perf", $clusterTime: \{ clusterTime: Timestamp(1597895487, 1), signature: { hash: BinData(0, AAAE33D5688F935C70469CBDB8EB1F6882749A40), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } nShards:1 cursorid:3513599116181216748 numYields:0 nreturned:0 reslen:237 protocol:op_msg 1000ms

Ini adalah perilaku normal di versi MongoDB sebelum 6.0. Ini menunjukkan timeout tunggu, bukan bottleneck performa. Dalam arsitektur kluster sharded, cursor change stream yang dibuat dari mongos ke shard menentukan tailable:true, awaitData:true, dan maxTimeMS:1000 untuk mengembalikan sebanyak mungkin event perubahan dalam 1000 ms.

Pada versi utama sebelum 6.0, mongos mencatat log kueri lambat 1000 ms ini, yang bisa menyesatkan. Perilaku ini telah dioptimalkan. Untuk detailnya, lihat SERVER-50559.