All Products
Search
Document Center

Data Transmission Service:Sinkronisasi data dari ApsaraDB for MongoDB ke Message Queue for Apache Kafka

Last Updated:Mar 29, 2026

Layanan Transmisi Data (DTS) memungkinkan Anda mengalirkan data perubahan dari ApsaraDB for MongoDB ke instans Message Queue for Apache Kafka. Langkah-langkah berikut menjelaskan cara membuat tugas sinkronisasi dengan menggunakan replica set MongoDB sebagai sumber dan instans Kafka sebagai tujuan.

Apa yang dicakup oleh tugas ini:

  • Arsitektur yang didukung: Replica set dan kluster sharded

  • Metode sinkronisasi inkremental: Oplog (direkomendasikan) dan Change Streams

  • Format data: Canal JSON yang dikirimkan ke topik Kafka

  • Cakupan sinkronisasi: Tingkat koleksi; sinkronisasi penuh dan inkremental

  • Penagihan: Sinkronisasi data penuh gratis; sinkronisasi data inkremental dikenai biaya

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Message Queue for Apache Kafka Instans

  • Topik yang telah dibuat di instans Kafka tujuan untuk menerima data

  • (Hanya untuk kluster sharded) Titik akhir yang diajukan untuk semua node shard; semua node shard harus menggunakan password akun dan titik akhir yang sama. Lihat Ajukan titik akhir untuk shard

Untuk versi database sumber dan tujuan yang didukung, lihat Ikhtisar solusi sinkronisasi.

Batasan

Tinjau batasan berikut sebelum membuat tugas sinkronisasi.

Batasan database sumber

  • Server sumber harus memiliki bandwidth keluar yang mencukupi. Bandwidth yang tidak mencukupi akan mengurangi kecepatan sinkronisasi.

  • Saat mengonfigurasi pemetaan nama untuk koleksi, satu tugas dapat menyinkronkan hingga 1.000 koleksi. Melebihi batas ini akan menyebabkan error permintaan. Untuk menyinkronkan lebih dari 1.000 koleksi, buat beberapa tugas atau sinkronkan seluruh database tanpa pemetaan nama.

  • Untuk sumber kluster sharded: bidang _id di setiap koleksi yang disinkronkan harus unik, atau ketidakkonsistenan data dapat terjadi.

  • Untuk sumber kluster sharded: jumlah node mongos tidak boleh melebihi 10, dan instans tidak boleh berisi dokumen yatim (orphaned documents). Lihat topik FAQ untuk cara menghapus dokumen yatim.

  • Instans ApsaraDB for MongoDB standalone, kluster Azure Cosmos DB for MongoDB, dan kluster elastis Amazon DocumentDB tidak didukung sebagai sumber.

  • DTS tidak dapat terhubung ke MongoDB melalui titik akhir SRV.

  • Oplog harus diaktifkan dan menyimpan data log selama minimal 7 hari, ATAU change streams harus diaktifkan agar DTS dapat berlangganan perubahan data dari 7 hari terakhir. Jika kedua kondisi ini tidak terpenuhi, DTS mungkin gagal mengambil perubahan sumber, dan kehilangan data atau ketidakkonsistenan dapat terjadi di luar cakupan perjanjian tingkat layanan (SLA).

    Penting

    - Gunakan oplog untuk mencatat perubahan database sumber (direkomendasikan). - Change streams memerlukan MongoDB 4.0 atau lebih baru. Sinkronisasi dua arah tidak didukung saat menggunakan change streams. - Untuk kluster Amazon DocumentDB non-elastis, aktifkan change streams dan atur Migration Method ke ChangeStream serta Architecture ke Sharded Cluster.

  • Selama sinkronisasi data penuh, jangan mengubah skema database atau koleksi, atau mengubah data bertipe ARRAY.

  • Untuk sumber kluster sharded, jangan menjalankan perintah berikut selama sinkronisasi: shardCollection, reshardCollection, unshardCollection, moveCollection, atau movePrimary. Perintah-perintah ini dapat menyebabkan ketidakkonsistenan data.

  • Jika database sumber adalah instans MongoDB yang menggunakan arsitektur kluster sharded dan balancer database sumber melakukan penyeimbangan data, latensi dapat terjadi pada instans tersebut.

Batasan lainnya

  • Hanya sinkronisasi tingkat koleksi yang didukung.

  • Database admin, config, dan local tidak dapat disinkronkan.

  • Rekaman tunggal yang melebihi 10 MB akan menyebabkan tugas gagal.

  • Transaksi tidak dipertahankan. DTS mengonversi setiap transaksi menjadi rekaman individual di tujuan.

  • Jika node broker ditambahkan atau dihapus di instans Kafka tujuan saat tugas DTS sedang berjalan, restart tugas DTS tersebut.

  • Pastikan DTS dapat terhubung ke instans sumber dan tujuan. Misalnya, pengaturan keamanan instans database dan parameter listeners serta advertised.listeners dalam file server.properties instans Kafka yang dikelola sendiri tidak membatasi akses dari DTS.

  • Jalankan sinkronisasi selama jam sepi ketika pemanfaatan CPU pada database sumber dan tujuan berada di bawah 30%.

  • DTS secara otomatis mencoba ulang instans yang gagal dan telah berjalan kurang dari 7 days. Sebelum mengalihkan trafik ke tujuan, hentikan atau lepas instans sinkronisasi untuk mencegah pemulihan otomatis menimpa data tujuan.

  • DTS menghitung latensi sinkronisasi data inkremental berdasarkan stempel waktu data terbaru yang telah disinkronkan di database tujuan dan stempel waktu saat ini di database sumber. Jika tidak ada operasi pembaruan yang dilakukan pada database sumber dalam jangka waktu lama, latensi sinkronisasi mungkin tidak akurat. Jika latensi tugas sinkronisasi data terlalu tinggi, Anda dapat melakukan operasi pembaruan pada database sumber untuk memperbarui latensi tersebut.

  • Jika tugas DTS gagal dijalankan, dukungan teknis DTS akan mencoba memulihkan tugas tersebut dalam waktu 8 jam. Selama pemulihan, tugas mungkin di-restart, dan parameter tugas mungkin dimodifikasi. Hanya parameter tugas DTS yang mungkin dimodifikasi. Parameter database tidak diubah. Parameter yang mungkin dimodifikasi mencakup tetapi tidak terbatas pada parameter dalam bagian "Modify instance parameters".

  • Untuk sumber kluster sharded dengan Oplog sebagai metode sinkronisasi inkremental, DTS tidak menjamin urutan penulisan lintas shard ke topik Kafka target.

  • Untuk sumber kluster sharded, nonaktifkan balancer MongoDB selama sinkronisasi data penuh. Aktifkan kembali hanya setelah sinkronisasi penuh selesai dan sinkronisasi inkremental dimulai. Lihat Mengelola balancer ApsaraDB for MongoDB.

Penagihan

Jenis sinkronisasiBiaya
Sinkronisasi data penuhGratis
Sinkronisasi data inkrementalDikenai biaya. Lihat Ikhtisar penagihan

Jenis sinkronisasi dan operasi yang didukung

Full Data Synchronization menyalin semua data historis dari koleksi MongoDB yang dipilih ke topik Kafka tujuan. DTS mendukung objek DATABASE dan COLLECTION.

Incremental Data Synchronization terus-menerus mengirimkan event perubahan setelah sinkronisasi penuh selesai. Operasi yang didukung bergantung pada metode sinkronisasi inkremental:

OperasiOplogChange streams
INSERTDidukungDidukung
UPDATEDidukungDidukung
DELETEDidukungDidukung
CREATE COLLECTION / INDEXDidukungTidak didukung
DROP DATABASE / COLLECTION / INDEXDidukungHanya DROP DATABASE dan COLLECTION
RENAME COLLECTIONDidukungDidukung
Sinkronisasi inkremental tidak mengambil database yang dibuat setelah tugas dimulai.

Izin akun database yang diperlukan

DatabaseIzin yang diperlukan
ApsaraDB for MongoDB sumberIzin baca pada database yang akan disinkronkan, database admin, dan database local

Untuk instruksi pembuatan akun, lihat Manajemen akun.

Membuat tugas sinkronisasi

Langkah 1: Buka halaman Sinkronisasi Data

Gunakan Konsol DTS atau Konsol DMS.

DTS console

  1. Login ke Konsol DTS.Konsol DTS

  2. Di panel navigasi kiri, klik Data Synchronization.

  3. Di pojok kiri atas, pilih wilayah tempat tugas sinkronisasi berada.

Konsol DMS

Langkah-langkah dapat berbeda tergantung pada mode Konsol DMS. Lihat Mode simple dan Sesuaikan tata letak dan gaya Konsol DMS.
  1. Login ke Konsol DMS.Konsol DMS

  2. Di bilah navigasi atas, arahkan kursor ke Data + AI lalu pilih DTS (DTS) > Data Synchronization.

  3. Dari daftar drop-down di sebelah kanan Data Synchronization Tasks, pilih wilayah.

Langkah 2: Konfigurasi database sumber dan tujuan

Klik Create Task, lalu konfigurasikan parameter yang dijelaskan dalam tabel berikut.

Umum

ParameterDeskripsi
Task NameNama untuk tugas DTS. DTS menghasilkan nama secara otomatis. Tentukan nama deskriptif untuk mengidentifikasi tugas. Nama tidak perlu unik.

Database sumber

ParameterDeskripsi
Select Existing ConnectionJika instans sumber telah terdaftar di DTS, pilih dari daftar drop-down. DTS akan mengisi kolom lainnya secara otomatis. Jika tidak, konfigurasikan kolom di bawah ini. Di Konsol DMS, pilih dari daftar Select a DMS database instance.
Database TypePilih MongoDB.
Access MethodPilih Alibaba Cloud Instance.
Instance RegionPilih wilayah instans ApsaraDB for MongoDB sumber.
Replicate Data Across Alibaba Cloud AccountsPilih No jika database sumber milik Akun Alibaba Cloud saat ini.
ArchitecturePilih Replica Set untuk contoh ini. Jika sumber adalah Sharded Cluster, masukkan juga Shard account dan Shard password.
Migration MethodMetode untuk menyinkronkan data inkremental. Oplog direkomendasikan jika fitur oplog diaktifkan. ChangeStream tersedia jika change streams diaktifkan pada sumber. Lihat Change Streams untuk detailnya. Catatan: Untuk kluster Amazon DocumentDB non-elastis, hanya ChangeStream yang didukung. Saat Architecture diatur ke Sharded Cluster dan Migration Method diatur ke ChangeStream, kolom Shard account dan Shard password tidak diperlukan.
Instance IDPilih ID instans ApsaraDB for MongoDB sumber.
Authentication DatabaseDatabase yang berisi kredensial akun. Nilai default-nya adalah admin.
Database AccountAkun untuk database sumber. Lihat Izin akun database yang diperlukan.
Database PasswordPassword untuk akun database.
EncryptionMenentukan jenis enkripsi koneksi: Non-encrypted, SSL-encrypted, atau Mongo Atlas SSL. Opsi yang tersedia bergantung pada nilai Access Method dan Architecture. Catatan: SSL-encrypted tidak tersedia saat Architecture adalah Sharded Cluster dan Migration Method adalah Oplog. Untuk MongoDB yang dikelola sendiri dengan arsitektur Replica Set dan opsi SSL-encrypted dipilih, unggah sertifikat CA untuk memverifikasi koneksi.

Database tujuan

ParameterDeskripsi
Select Existing ConnectionJika instans Kafka tujuan telah terdaftar di DTS, pilih dari daftar drop-down. Jika tidak, konfigurasikan kolom di bawah ini.
Database TypePilih Kafka.
Access MethodPilih Alibaba Cloud Instance.
Instance RegionPilih wilayah instans Kafka tujuan.
Kafka Instance IDPilih ID instans Kafka tujuan.
EncryptionPilih Non-encrypted atau SCRAM-SHA-256 sesuai kebutuhan keamanan Anda.
TopicPilih topik untuk menerima data yang disinkronkan.
Use Kafka Schema RegistryKafka Schema Registry adalah layanan metadata RESTful untuk menyimpan dan mengambil skema Avro. Pilih No untuk melewati, atau Yespengaturan pemberitahuan peringatan dan berikan URL atau alamat IP Schema Registry.

Langkah 3: Uji konektivitas

Klik Test Connectivity and Proceed di bagian bawah halaman.

DTS secara otomatis menambahkan blok CIDR servernya ke pengaturan keamanan database sumber dan tujuan, jika diizinkan. Untuk pengaturan manual, lihat Tambahkan blok CIDR server DTS.
Untuk database yang dikelola sendiri yang tidak menggunakan Alibaba Cloud Instance sebagai metode akses, klik Test Connectivity di kotak dialog CIDR Blocks of DTS Servers.

Langkah 4: Konfigurasi objek yang akan disinkronkan

Pada langkah Configure Objects, atur parameter berikut.

ParameterDeskripsi
Synchronization TypesIncremental Data Synchronization dipilih secara default. Secara opsional, pilih juga Full Data Synchronization. Schema Synchronization tidak tersedia. Saat sinkronisasi penuh diaktifkan, DTS terlebih dahulu menyalin semua data historis sebelum memulai sinkronisasi inkremental.
Processing Mode of Conflicting TablesPrecheck and Report Errors: pemeriksaan awal gagal jika tujuan berisi koleksi dengan nama yang sama seperti sumber. Gunakan pemetaan nama objek untuk menyelesaikan konflik penamaan. Ignore Errors and Proceed: melewati pemeriksaan konflik nama. Jika suatu rekaman di tujuan memiliki kunci primer atau kunci unik yang sama dengan rekaman sumber, rekaman tujuan dipertahankan.
Peringatan

Opsi ini dapat menyebabkan ketidakkonsistenan data.

Data Format in KafkaHanya Canal JSON yang didukung.
Kafka Data Compression FormatFormat kompresi untuk data yang ditulis ke Kafka. Opsi: LZ4 (default — rasio kompresi rendah, kecepatan tinggi), GZIP (rasio kompresi tinggi, kecepatan rendah, penggunaan CPU tinggi), Snappy (seimbang).
Policy for Shipping Data to Kafka PartitionsPilih kebijakan perutean partisi sesuai kebutuhan Anda.
Message acknowledgement mechanismPilih mekanisme acknowledgment pesan sesuai kebutuhan Anda.
Topic That Stores DDL InformationPilih topik untuk menyimpan event DDL. Jika dibiarkan kosong, event DDL ditulis ke topik yang sama dengan event data.
Capitalization of Object Names in Destination InstanceMengontrol kapitalisasi nama database dan koleksi di tujuan. Nilai default-nya adalah DTS default policy. Lihat Tentukan kapitalisasi nama objek di instans tujuan.
Source ObjectsPilih objek dari bagian Source Objects dan klik ikon panah untuk memindahkannya ke Selected Objects. Hanya pemilihan tingkat koleksi yang didukung.

Langkah 5: Konfigurasi pengaturan lanjutan

Klik Next: Advanced Settings dan konfigurasikan parameter berikut.

ParameterDeskripsi
Dedicated Cluster for Task SchedulingSecara default, DTS menjadwalkan tugas pada kluster bersama. Beli klaster khusus untuk stabilitas yang lebih tinggi.
Retry Time for Failed ConnectionsDurasi DTS mencoba ulang saat sumber atau tujuan tidak dapat dijangkau. Rentang: 10–1.440 menit. Default: 720 menit. Kami menyarankan Anda mengatur parameter ini ke nilai lebih dari 30. Jika Anda menentukan waktu coba ulang berbeda untuk beberapa tugas yang berbagi database yang sama, nilai terpendek yang berlaku. DTS mengenakan biaya untuk waktu coba ulang.
Retry Time for Other IssuesDurasi DTS mencoba ulang saat operasi DDL atau DML gagal. Rentang: 1–1.440 menit. Default: 10 menit. Atur minimal 10 menit. Harus lebih pendek dari Retry Time for Failed Connections.
Obtain the entire document after it is updatedHanya tersedia saat Migration Method adalah ChangeStream. Yes: menyinkronkan dokumen lengkap untuk setiap pembaruan, yang dapat meningkatkan beban sumber dan menyebabkan latensi. Jika DTS tidak dapat mengambil dokumen lengkap, hanya bidang yang diperbarui yang dikirim. No: hanya menyinkronkan bidang yang berubah.
Enable Throttling for Full Data SynchronizationSaat diaktifkan, konfigurasikan QPS to the source database, RPS of Full Data Migration, dan Data migration speed for full migration (MB/s) untuk mengurangi beban pada tujuan. Hanya tersedia saat Full Data Synchronization dipilih.
Only one data type for primary key _id in a table of the data to be synchronizedHanya tersedia saat Full Data Synchronization dipilih. Yes: DTS melewati pemindaian tipe data bidang _id selama sinkronisasi penuh. No: DTS memindai tipe data bidang _id.
Enable Throttling for Incremental Data SynchronizationSaat diaktifkan, konfigurasikan RPS of Incremental Data Synchronization dan Data synchronization speed for incremental synchronization (MB/s) untuk mengurangi beban pada tujuan.
Environment TagTag opsional untuk mengidentifikasi instans.
Configure ETLAktifkan untuk menerapkan logika ekstrak, transformasi, muat (ETL). Masukkan pernyataan pemrosesan data di editor kode. Lihat Konfigurasi ETL dalam tugas migrasi data atau sinkronisasi data.
Monitoring and AlertingSaat diaktifkan, tentukan ambang batas peringatan dan pengaturan notifikasi. DTS memberi tahu kontak peringatan saat tugas gagal atau latensi sinkronisasi melebihi ambang batas. Lihat Konfigurasi pemantauan dan peringatan.

Langkah 6: Jalankan pemeriksaan awal

Klik Next: Save Task Settings and Precheck.

Untuk melihat pratinjau parameter API untuk konfigurasi ini, arahkan kursor ke tombol tersebut lalu klik Preview OpenAPI parameters sebelum melanjutkan.
Tugas tidak dapat dimulai hingga pemeriksaan awal berhasil.
Jika pemeriksaan awal gagal, klik View Details di sebelah setiap item yang gagal, selesaikan masalahnya, lalu klik Precheck Again.
Jika muncul peringatan untuk suatu item: perbaiki peringatan yang tidak dapat diabaikan sebelum melanjutkan. Untuk peringatan yang dapat diabaikan, klik Confirm Alert Details, lalu klik Ignore di dialog, konfirmasi, dan klik Precheck Again. Mengabaikan peringatan dapat menyebabkan ketidakkonsistenan data.

Langkah 7: Beli dan mulai instans

  1. Tunggu hingga Success Rate mencapai 100%, lalu klik Next: Purchase Instance.

  2. Di halaman buy, konfigurasikan parameter berikut.

    ParameterDeskripsi
    Billing MethodSubscription: bayar di muka; lebih hemat biaya untuk penggunaan jangka panjang. Pay-as-you-go: ditagih per jam; cocok untuk penggunaan jangka pendek. Lepaskan instans saat tidak lagi digunakan untuk menghentikan penagihan.
    Resource Group SettingsKelompok sumber daya untuk instans sinkronisasi. Default: default resource group. Lihat Apa itu Resource Management?
    Instance ClassTingkat kecepatan sinkronisasi. Lihat Kelas instans untuk instansi sinkronisasi data.
    Subscription DurationTersedia untuk penagihan Subscription. Opsi: 1–9 bulan, 1 tahun, 2 tahun, 3 tahun, atau 5 tahun.
  3. Baca dan terima Data Transmission Service (Pay-as-you-go) Service Terms.

  4. Klik Buy and Start, lalu klik OK di dialog konfirmasi.

Tugas akan muncul di daftar tugas. Pantau perkembangannya di sana.

Konfigurasi pemetaan koleksi ke topik

Secara default, semua koleksi ditulis ke topik yang dipilih dalam konfigurasi database tujuan. Untuk mengarahkan koleksi tertentu ke topik berbeda:

  1. Di area Selected Objects, arahkan kursor ke nama topik tujuan pada tingkat koleksi.

  2. Klik Edit di sebelah nama topik.

  3. Di dialog Edit Table, konfigurasikan pengaturan berikut.

    ParameterDeskripsi
    Name of target TopicTopik tujuan untuk koleksi ini. Topik tersebut harus ada di instans Kafka. Jika diubah, data akan ditulis ke topik baru. Default: topik yang dipilih selama konfigurasi database tujuan.
    Filter ConditionsFilter baris opsional. Lihat Atur kondisi filter.
    Number of PartitionsJumlah partisi untuk menulis data ke topik.
  4. Klik OK.

Contoh pengiriman data

Setiap perubahan inkremental dari MongoDB diserialisasi sebagai pesan Canal JSON dan dikirimkan ke topik Kafka yang dikonfigurasi. Struktur pesan bervariasi tergantung pada metode sinkronisasi inkremental dan pengaturan pembaruan.

Pilih skenario Anda

TujuanKonfigurasi
Sinkronisasi latensi rendah dengan dukungan DDL lengkapAtur Migration Method ke Oplog (Skenario 1)
Change Streams dengan pembaruan dokumen parsialAtur Migration Method ke ChangeStream dan Obtain the entire document after it is updated ke No (Skenario 2)
Change Streams dengan dokumen lengkap pada setiap pembaruanAtur Migration Method ke ChangeStream dan Obtain the entire document after it is updated ke Yes (Skenario 3)

Bidang pesan Canal JSON

Ketiga skenario menggunakan envelope Canal JSON tingkat atas yang sama. Bidang berikut muncul di setiap pesan.

BidangTipeDeskripsi
databasestringNama database sumber
tablestringNama koleksi sumber
typestringJenis operasi: INSERT, UPDATE, DELETE, atau DDL
isDdlbooleantrue untuk event DDL (penghapusan koleksi, penggantian nama); false untuk event DML
esnumberStempel waktu event di database sumber (milidetik Unix)
tsnumberStempel waktu saat DTS memproses event (milidetik Unix)
idnumberID event internal DTS
pkNamesarrayNama bidang kunci primer (biasanya ["_id"])
dataarrayData dokumen setelah operasi. Untuk pembaruan parsial (Oplog atau ChangeStream tanpa pengambilan dokumen lengkap), hanya berisi bidang yang berubah menggunakan operator pembaruan MongoDB ($set, $unset). null untuk event DDL.
oldarrayStatus dokumen sebelum pembaruan: hanya bidang _id yang disertakan. null untuk event INSERT dan DDL.
sqlobject atau nullDetail pernyataan DDL (untuk isDdl: true). null untuk event DML.
gtidnullTidak berlaku untuk MongoDB (dipesan untuk kompatibilitas MySQL).
mysqlTypenullTidak berlaku untuk MongoDB.
serverIdnullTidak berlaku untuk MongoDB.
sqlTypenullTidak berlaku untuk MongoDB.

Skenario 1: Oplog

Atur Migration Method ke Oplog.

Jenis perubahan sumberPernyataan sumberData yang diterima oleh topik tujuan
insertdb.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})Lihat contoh di bawah
update $setdb.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})Lihat contoh di bawah
update $set new fielddb.kafka_test.update({"cid":"a"},{$set:{"salary":100}})Lihat contoh di bawah
update $unset (remove field)db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})Lihat contoh di bawah
deletedb.kafka_test.deleteOne({"cid":"a"})Lihat contoh di bawah
ddl dropdb.kafka_test.drop()Lihat contoh di bawah

Lihat data (klik untuk membentangkan)

{
    "data": [{
        "person": {
            "skills": ["database", "ai"],
            "name": "testName",
            "age": 18
        },
        "_id": {
            "$oid": "67d27da49591697476e1****"
        },
        "cid": "a"
    }],
    "database": "kafkadb",
    "es": 1741847972000,
    "gtid": null,
    "id": 174184797200000****,
    "isDdl": false,
    "mysqlType": null,
    "old": null,
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741847973438,
    "type": "INSERT"
}

Lihat data (klik untuk membentangkan)

{
    "data": [{
        "$set": {
            "person.age": 20
        }
    }],
    "database": "kafkadb",
    "es": 1741848051000,
    "gtid": null,
    "id": 174184805100000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848051984,
    "type": "UPDATE"
}

Lihat data (klik untuk membentangkan)

{
    "data": [{
        "$set": {
            "salary": 100.0
        }
    }],
    "database": "kafkadb",
    "es": 1741848146000,
    "gtid": null,
    "id": 174184814600000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848147734,
    "type": "UPDATE"
}

Lihat data (klik untuk membentangkan)

{
    "data": [{
        "$unset": {
            "salary": true
        }
    }],
    "database": "kafkadb",
    "es": 1741848207000,
    "gtid": null,
    "id": 174184820700000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848208186,
    "type": "UPDATE"
}

Lihat data (klik untuk membentangkan)

{
    "data": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "database": "kafkadb",
    "es": 1741848289000,
    "gtid": null,
    "id": 174184828900000****,
    "isDdl": false,
    "mysqlType": null,
    "old": null,
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848289798,
    "type": "DELETE"
}

Lihat data (klik untuk membentangkan)

{
    "data": null,
    "database": "kafkadb",
    "es": 1741847893000,
    "gtid": null,
    "id": 1741847893000000005,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "serverId": null,
    "sql": {
        "drop": "kafka_test"
    },
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741847893760,
    "type": "DDL"
}

Skenario 2: ChangeStream — hanya bidang yang diperbarui

Atur Migration Method ke ChangeStream. Atur Obtain the entire document after it is updated ke No.

Pesan insert dan delete identik dengan Skenario 1. Pesan update hanya berisi bidang yang berubah.

Lihat data (klik untuk membentangkan)

{
    "data": [{
        "$set": {
            "person.age": 20
        }
    }],
    "database": "kafkadb",
    "es": 1741848051000,
    "gtid": null,
    "id": 174184805100000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848052912,
    "type": "UPDATE"
}

Lihat data (klik untuk membentangkan)

{
    "data": [{
        "$unset": {
            "salary": 1
        }
    }],
    "database": "kafkadb",
    "es": 1741848207000,
    "gtid": null,
    "id": 174184820700000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848209142,
    "type": "UPDATE"
}

Skenario 3: ChangeStream — dokumen lengkap saat pembaruan

Atur Migration Method ke ChangeStream. Atur Obtain the entire document after it is updated ke Yes.

Event update mengirimkan dokumen lengkap setelah perubahan, bukan hanya bidang yang berubah.

Lihat data (klik untuk membentangkan)

{
    "data": [{
        "person": {
            "skills": ["database", "ai"],
            "name": "testName",
            "age": 20
        },
        "_id": {
            "$oid": "67d27da49591697476e1****"
        },
        "cid": "a"
    }],
    "database": "kafkadb",
    "es": 1741848051000,
    "gtid": null,
    "id": 174184805100000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848052219,
    "type": "UPDATE"
}

Lihat data (klik untuk membentangkan)

{
    "data": [{
        "person": {
            "skills": ["database", "ai"],
            "name": "testName",
            "age": 20
        },
        "_id": {
            "$oid": "67d27da49591697476e1****"
        },
        "salary": 100.0,
        "cid": "a"
    }],
    "database": "kafkadb",
    "es": 1741848146000,
    "gtid": null,
    "id": 174184814600000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848147327,
    "type": "UPDATE"
}

Lihat data (klik untuk membentangkan)

{
    "data": [{
        "person": {
            "skills": ["database", "ai"],
            "name": "testName",
            "age": 20
        },
        "_id": {
            "$oid": "67d27da49591697476e1****"
        },
        "cid": "a"
    }],
    "database": "kafkadb",
    "es": 1741848207000,
    "gtid": null,
    "id": 174184820700000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "_id": {
            "$oid": "67d27da49591697476e1****"
        }
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "kafka_test",
    "ts": 1741848208401,
    "type": "UPDATE"
}

Kasus khusus: missing fullDocument di ChangeStream

Saat bidang fullDocument pada event pembaruan ChangeStream tidak tersedia — misalnya, saat dokumen berpindah antar shard dalam koleksi sharded — pesan yang dikirimkan kembali ke perilaku Oplog (pembaruan parsial dengan operator $set atau $unset).

Contoh: pembaruan koleksi sharded dengan fullDocument tidak tersedia

Data dasar sumber:

use admin
db.runCommand({ enablesharding:"dts_test" })

Perubahan inkremental sumber:

use dts_test
sh.shardCollection("dts_test.cstest",{"name":"hashed"})
db.cstest.insert({"_id":1,"name":"a"})
db.cstest.updateOne({"_id":1,"name":"a"},{$set:{"name":"b"}})

Lihat data (klik untuk membentangkan)

{
    "data": [{
        "$set": {
            "name": "b"
        }
    }],
    "database": "dts_test",
    "es": 1740720994000,
    "gtid": null,
    "id": 174072099400000****,
    "isDdl": false,
    "mysqlType": null,
    "old": [{
        "name": "a",
        "_id": 1.0
    }],
    "pkNames": ["_id"],
    "serverId": null,
    "sql": null,
    "sqlType": null,
    "table": "cstest",
    "ts": 1740721007099,
    "type": "UPDATE"
}

FAQ

Apakah saya dapat mengubah Kafka Data Compression Format setelah tugas dibuat?

Ya. Lihat Modify the objects to be synchronized.

Apakah saya dapat mengubah mekanisme pengakuan pesan setelah Tugas dibuat?

Ya. Lihat Modify the objects to be synchronized.

Langkah selanjutnya