Anda dapat berlangganan ke change stream MongoDB untuk merespons perubahan data di database secara real time. Topik ini menjelaskan change stream MongoDB, cara menggunakannya, serta praktik terbaiknya.
Apa itu change stream?
Change stream menyediakan aliran event perubahan secara real time dari sebuah database. Klien dapat berlangganan ke aliran ini untuk menerima notifikasi segera ketika data dimasukkan, diperbarui, atau dihapus. Skenario umum meliputi hal-hal berikut:
Sinkronisasi data cross-cluster: Replikasi data inkremental antar kluster MongoDB.
Auditing: Lacak operasi berisiko tinggi, seperti menghapus database atau koleksi.
Arsitektur berbasis event: Dorong perubahan ke sistem downstream untuk analisis real time, pembaruan cache, atau notifikasi.
Batasan
Instans harus berupa instans set replika atau instans kluster sharded.
Aliran Perubahan Konfigurasi
Dengarkan lebih banyak event DDL
Prasyarat
MongoDB 6.0 atau versi yang lebih baru (Panduan upgrade).
Prosedur
Sambungkan ke database menggunakan mongosh.
Jalankan perintah
watchdan aturshowExpandedEvents: true:// mongo shell atau mongosh v1.x cursor = db.getSiblingDB("test").watch([], { showExpandedEvents: true // Aktifkan pendengaran untuk lebih banyak event DDL } ); cursor.next();
Verifikasi hasil
Pada jendela mongosh baru, jalankan perintah perubahan, seperti
db.createCollection("myCollection1").Anda dapat melihat detail eksekusi SQL di bagian Outputs pada jendela SQL asli.

Aktifkan pre-images
Pre-image MongoDB adalah snapshot lengkap dokumen sebelum dimodifikasi atau dihapus. Pre-image mencatat nilai asli data sebelum perubahan terjadi.
Prasyarat
MongoDB 6.0 atau versi yang lebih baru (Panduan upgrade).
Prosedur
Aktifkan pre-images di tingkat database:
db.adminCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: "off" } // "off" menunjukkan bahwa periode retensi oplog digunakan } } })Aktifkan pre-images di tingkat koleksi:
CatatanUntuk mengaktifkan pre-images pada semua koleksi dalam sebuah database, Anda harus terlebih dahulu mengaktifkannya di tingkat database. Kemudian, Anda harus mengaktifkan fitur tersebut untuk setiap koleksi secara individual.
Modifikasi koleksi yang sudah ada
db.runCommand({ collMod: "myCollection", changeStreamPreAndPostImages: { enabled: true } })Tentukan saat membuat koleksi baru
db.createCollection("myCollection", { changeStreamPreAndPostImages: { enabled: true }})Buat pendengar change stream dan tentukan opsi pre-image:
// Buat pendengar pada koleksi target cursor = db.getCollection("myCollection").watch([], { fullDocument: 'required', // atau 'whenAvailable' fullDocumentBeforeChange: 'required' // atau 'whenAvailable' } ) cursor.next();required: Server harus mengembalikan pre-image atau post-image. Jika tidak, error akan dilaporkan.whenAvailable: Server berusaha mengembalikan gambar tersebut, tetapi tidak dijamin.
Verifikasi hasil
Periksa apakah sakelar dimensi database diaktifkan:
db.adminCommand( { getClusterParameter: "changeStreamOptions" } )Jika diaktifkan, output berikut dikembalikan:

Periksa konfigurasi koleksi:
db.getCollectionInfos({name: "myCollection"}) // atau db.runCommand({listCollections: 1})Output yang diharapkan: Pada dokumen yang dikembalikan, cari bidang serupa dengan
"options" : { "changeStreamPreAndPostImages" : { "enabled" : true } }.
Pada jendela mongosh lain, perbarui dokumen di
myCollection.Amati event yang dikembalikan oleh
cursor. Event tersebut harus berisi bidangfullDocumentBeforeChange, yaitu pre-image.
Untuk informasi selengkapnya, lihat Change Streams with Document Pre- and Post-Images.
Aktifkan post-images
Post-image MongoDB adalah snapshot lengkap dokumen setelah perubahan terjadi. Post-image mencatat konten lengkap dokumen setelah perubahan.
Prasyarat
MongoDB 3.6 atau versi yang lebih baru (Panduan upgrade).
Prosedur
Saat menjalankan perintah watch, atur fullDocument: 'updateLookup'.
cursor = db.getSiblingDB("test").myCollection.watch([],
{
fullDocument: 'updateLookup'
}
);
cursor.next();
Verifikasi hasil
Pada jendela mongosh lain, tambahkan atau perbarui dokumen di
myCollection.Amati event yang dikembalikan oleh
cursor. Event tersebut harus berisi bidangfullDocument, yaitu post-image.
Dokumen lengkap yang dikembalikan mungkin kosong atau bukan snapshot point-in-time. Misalnya:
Jika Anda memperbarui dokumen yang sama beberapa kali secara berurutan dalam waktu singkat, event perubahan untuk pembaruan pertama mungkin mengembalikan konten dokumen setelah pembaruan terakhir selesai.
Jika dokumen dihapus segera setelah pembaruan, event perubahan yang sesuai memiliki bidang `fullDocument` kosong karena dokumen tersebut tidak lagi ada.
Untuk informasi selengkapnya, lihat Lookup Full Document for Update Operations.
Atasi event perubahan besar (> 16 MB)
Prasyarat
MongoDB 7.0 atau versi yang lebih baru (Panduan upgrade).
Prosedur
Anda dapat menyertakan tahap $changeStreamSplitLargeEvent dalam pipeline watch():
myChangeStreamCursor = db.myCollection.watch(
[ { $changeStreamSplitLargeEvent: {} } ], // Tambahkan tahap pemisahan
{
fullDocument: "required",
fullDocumentBeforeChange: "required"
}
)Verifikasi hasil
Jalankan operasi yang menyebabkan event perubahan lebih besar dari 16 MB, seperti memperbarui dokumen yang berisi array sangat besar.
Amati aliran event yang dikembalikan. Aliran tersebut dibagi menjadi beberapa event
fragmentberurutan, diakhiri dengan eventfragmentterakhir.
Kurangi overhead storage pre-image
Secara default, pre-image kedaluwarsa bersama oplog. Atur waktu kedaluwarsa yang lebih pendek untuk menghemat ruang storage:
Jika Anda mengatur expireAfterSeconds ke durasi yang sangat singkat dan konsumsi downstream lambat, error ChangeStreamHistoryLost dapat terjadi karena pre-image kedaluwarsa terlalu cepat. Untuk informasi selengkapnya, lihat Change Streams with Document Pre- and Post-Images.
db.adminCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: { expireAfterSeconds: 100 } // Satuan: detik
}
}
})Praktik terbaik change stream
Gunakan pre-image dan post-image dengan hati-hati:
Mengaktifkan
fullDocumentBeforeChange(pre-image) danfullDocument(post-image) meningkatkan overhead storage (di koleksiconfig.system.preimages) dan latensi permintaan.Aktifkan opsi ini hanya jika layanan Anda memerlukan konten dokumen lengkap sebelum dan sesudah perubahan.
Poin penting untuk penerapan kluster sharded:
Selalu buat pendengar change stream pada instans
mongosuntuk memastikan urutan global event.Di bawah beban write tinggi, change stream dapat menjadi bottleneck karena instans
mongosperlu mengurutkan dan menggabungkan event dari shard yang berbeda.Penulisan yang tidak seimbang di seluruh shard, misalnya karena kunci sharding yang dirancang buruk, dapat meningkatkan latensi change stream secara signifikan.
Hindari
updateLookup:Opsi
updateLookupmengeksekusi kuerifindOneterpisah untuk setiap event pembaruan, yang tidak efisien.Pada kluster sharded, operasi
moveChunkdapat memperparah masalah latensi yang disebabkan olehupdateLookup.
Mencegah interupsi aliran perubahan:
⚠️ Skenario berikut dapat menyebabkan cursor change stream menjadi tidak valid (
operationType: "invalidate") atau melemparkan error.Konsumsi downstream lambat: Kecepatan konsumsi lebih lambat daripada kecepatan pembuatan event, menyebabkan
resumeTokenkeluar dari jendela oplog.resumeTokentidak valid: Mencoba melanjutkan denganresumeTokenlama yang timestamp-nya tidak lagi ada di oplog.Dampak failover: Setelah failover, oplog node primary baru mungkin tidak berisi
resumeTokenasli.Perubahan metadata: Operasi seperti
drop,rename, dandropDatabasedapat memicu eventinvalidate.Kedaluwarsa pre-image: Pengaturan
expireAfterSecondsyang pendek dikombinasikan dengan konsumsi lambat dapat menyebabkan pre-image hilang.
Solusi:
Pantau latensi change stream.
Pastikan jendela oplog cukup besar.
Rancang logika penanganan error dan pemulihan yang kuat. Ini termasuk menangkap event
invalidate, mencatatresumeTokenterakhir yang valid, dan membuat ulang pendengar.Atur nilai
expireAfterSecondsyang wajar.
Strategi pemilihan cakupan:
Single change stream vs. multiple collection-level change streams:
Single stream (tingkat database atau instans): Pendekatan ini memiliki overhead resource lebih rendah karena menggunakan satu thread untuk menarik dari oplog. Namun, sistem downstream harus menyaring dan mendistribusikan event sendiri. Di bawah volume event tinggi, instans
mongosdapat menjadi bottleneck.Multiple collection-level streams: Pendekatan ini menggunakan penyaringan sisi server untuk mengurangi network traffic dan memberikan konkurensi lebih baik. Namun, terlalu banyak stream meningkatkan contention pembacaan
oplogdan konsumsi resource.
Rekomendasi: Uji dan pilih solusi optimal berdasarkan workload layanan Anda, termasuk volume event dan jumlah koleksi. Biasanya, gunakan stream terpisah untuk beberapa koleksi yang sangat aktif. Untuk banyak koleksi yang kurang aktif, gunakan stream tingkat database atau instans dikombinasikan dengan penyaringan downstream.
FAQ
1. Mengapa log kueri lambat change stream selalu menampilkan COLLSCAN?
Ini adalah perilaku normal dan diharapkan. Tidak diperlukan optimasi. Cursor change stream pada akhirnya dibuat pada local.oplog.rs, yaitu satu-satunya koleksi yang mencatat semua modifikasi instans. Koleksi ini tidak memiliki indeks. Oleh karena itu, operasi selalu berupa COLLSCAN, yang tidak dapat dihindari atau dioptimalkan.
Jika Anda menggunakan COLLSCAN sebagai kata kunci untuk menyaring log saat mencari kueri yang perlu dioptimalkan, saring juga kata kunci $changeStream.
Fokuslah pada log kueri lambat terkait change stream hanya jika Anda mengalami masalah performa, seperti peningkatan latensi dalam konsumsi downstream.
2. Pada kluster sharded, mengapa cursor change stream diamati pada shard selain shard utama ketika pendengar dibuat untuk koleksi unsharded?
Hal ini mengantisipasi potensi operasi sharding (shardCollection). Anda dapat menjalankan operasi shardCollection kapan saja pada koleksi unsharded yang memiliki pendengar yang ada untuk mendistribusikan datanya ke seluruh shard (mirip dengan movePrimary). Membuat cursor change stream pada shard lain sebelumnya mempersiapkan skenario ini. Umumnya, cursor ini pada shard lain tidak mengembalikan event perubahan apa pun dan memiliki overhead performa minimal.
Demikian pula, ketika pendengar dibuat pada kluster sharded, mongos membuat cursor terkait pada Config Server untuk menangani potensi operasi addShard atau removeShard.
3. Mengapa log kueri lambat untuk cursor change stream muncul di node primary, meskipun readPreference:secondary ditentukan saat pembuatan cursor?
Setelah cursor dibuat, cursor tersebut "terikat" ke node tertentu dan tidak secara otomatis bermigrasi jika peran node berubah. Oleh karena itu, begitu cursor change stream dibuat, cursor tersebut tetap berada di node mongod tertentu (primary atau secondary) dan terus-menerus mengonsumsi event menggunakan getMore. Jika terjadi alih bencana primary/secondary, perubahan konfigurasi, migrasi, atau event lain yang dapat memicu alih bencana, cursor yang awalnya berada di node secondary mungkin berpindah ke node primary.
Jika Anda tidak ingin beban ini terus mengonsumsi resource di node primary, gunakan killCursors untuk menutup cursor change stream terkait. Logika konsumsi downstream akan menggunakan resumeToken dan readPreference untuk membuat ulang cursor di node secondary. Operasi ini tidak menyebabkan kehilangan atau gangguan event perubahan.
db.runCommand(
{
killCursors: <collection>,
cursors: [ <cursor id1>, ... ], comment: <any>
}
)
db.getSiblingDB("<testDB>").runCommand( { killCursors: "<testColl>", cursors: [NumberLong("2452840976689696187") ] } ) 4. Mengapa saya melihat banyak log kueri lambat 1000 ms untuk change stream di mongos?
2020-08-26T04:34:45.045+0000 I COMMAND [conn21283] command altconfig-b2b-perf.oplog command: getMore \{ getMore: 3513599116181216748, collection: "oplog", $db: "altconfig-b2b-perf", $clusterTime: { clusterTime: Timestamp(1598416483, 1), signature: { hash: BinData(0, EC3841EB1FB7A34F897688BB5983E32E2ADF6763), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } originatingCommand: \{ find: "oplog", filter: { timestamp: { $gte: new Date(1597895390009) } }, tailable: true, awaitData: true, $db: "altconfig-b2b-perf", $clusterTime: \{ clusterTime: Timestamp(1597895487, 1), signature: { hash: BinData(0, AAAE33D5688F935C70469CBDB8EB1F6882749A40), keyId: 6855385090000683010 } }, lsid: \{ id: UUID("72864d11-c0f8-48bb-823e-de5f18a9c409") } } nShards:1 cursorid:3513599116181216748 numYields:0 nreturned:0 reslen:237 protocol:op_msg 1000msIni adalah perilaku normal di versi MongoDB sebelum 6.0. Ini menunjukkan timeout tunggu, bukan bottleneck performa. Dalam arsitektur kluster sharded, cursor change stream yang dibuat dari mongos ke shard menentukan tailable:true, awaitData:true, dan maxTimeMS:1000 untuk mengembalikan sebanyak mungkin event perubahan dalam 1000 ms.
Pada versi utama sebelum 6.0, mongos mencatat log kueri lambat 1000 ms ini, yang bisa menyesatkan. Perilaku ini telah dioptimalkan. Untuk detailnya, lihat SERVER-50559.