全部产品
Search
文档中心

Data Transmission Service:Sinkronisasi data dari ApsaraDB for MongoDB ke Message Queue for Apache Kafka

更新时间:Nov 11, 2025

Layanan Transmisi Data (DTS) mendukung sinkronisasi data dari MongoDB ke kluster Kafka. Topik ini menjelaskan langkah-langkah untuk melakukan operasi sinkronisasi menggunakan instans MongoDB dengan arsitektur replica set sebagai database sumber dan instans Message Queue for Apache Kafka sebagai database tujuan.

Prasyarat

  • Anda telah membuat instans Message Queue for Apache Kafka tujuan.

    Catatan

    Untuk informasi mengenai versi database sumber dan tujuan yang didukung, lihat Ikhtisar Solusi Sinkronisasi.

  • Anda telah membuat topik untuk menerima data di instans Message Queue for Apache Kafka tujuan.

  • Jika database sumber adalah kluster sharded ApsaraDB for MongoDB, Anda harus mengajukan titik akhir untuk semua node shard. Node-node shard dalam instans kluster sharded harus menggunakan kata sandi akun dan titik akhir yang sama. Untuk informasi lebih lanjut tentang cara mengajukan titik akhir, lihat Ajukan titik akhir untuk shard.

Pertimbangan

Jenis

Deskripsi

Batasan database sumber

  • Persyaratan bandwidth: Server tempat database sumber ditempatkan harus memiliki bandwidth keluar yang cukup. Jika tidak, kecepatan sinkronisasi data akan terpengaruh.

  • Jika Anda ingin memodifikasi koleksi di database tujuan, seperti mengonfigurasi pemetaan nama untuk koleksi, Anda dapat menyinkronkan hingga 1.000 koleksi dalam satu tugas sinkronisasi data. Jika Anda menjalankan tugas untuk menyinkronkan lebih dari 1.000 koleksi, akan terjadi kesalahan permintaan. Dalam kasus ini, kami menyarankan Anda mengonfigurasi beberapa tugas untuk menyinkronkan koleksi atau mengonfigurasi satu tugas untuk menyinkronkan seluruh database.

  • Jika database sumber adalah instans kluster sharded ApsaraDB for MongoDB, bidang _id dalam koleksi yang akan disinkronkan harus unik. Jika tidak, ketidakkonsistenan data dapat terjadi.

  • Jika database sumber adalah instans kluster sharded ApsaraDB for MongoDB, jumlah node Mongos dalam instans tersebut tidak boleh melebihi 10. Anda juga harus memastikan bahwa instans kluster sharded ApsaraDB for MongoDB sumber tidak berisi dokumen yatim (orphaned documents). Jika tidak, ketidakkonsistenan data dapat terjadi dan tugas mungkin gagal. Untuk informasi lebih lanjut, lihat dokumentasi MongoDB dan bagian Bagaimana cara menghapus dokumen yatim dari database MongoDB yang diterapkan dalam arsitektur kluster sharded? dalam topik FAQ.

  • Anda tidak dapat menggunakan instans standalone ApsaraDB for MongoDB, kluster Azure Cosmos DB for MongoDB, atau kluster elastis Amazon DocumentDB sebagai database sumber.

  • Database sumber harus memiliki Oplog diaktifkan, dan Oplog harus disimpan selama minimal tujuh hari. Atau, Anda dapat mengaktifkan change streams dan memastikan bahwa Layanan Transmisi Data (DTS) dapat berlangganan perubahan data dari database sumber dalam tujuh hari terakhir melalui change streams. Jika tidak, tugas mungkin gagal karena tidak dapat memperoleh perubahan data dari database sumber. Dalam kasus ekstrem, hal ini dapat menyebabkan ketidakkonsistenan data atau kehilangan data. Masalah yang disebabkan oleh hal ini tidak dicakup oleh Perjanjian Tingkat Layanan (SLA) DTS.

    Penting
    • Gunakan Oplog untuk memperoleh perubahan data dari database sumber.

    • Hanya MongoDB 4.0 dan versi yang lebih baru yang mendukung memperoleh perubahan data melalui change streams. Sinkronisasi dua arah tidak didukung saat menggunakan change streams untuk memperoleh perubahan data.

    • Jika database sumber adalah Amazon DocumentDB (kluster non-elastis), Anda harus mengaktifkan Change Streams secara manual dan, selama konfigurasi tugas, mengatur Migration Method ke ChangeStream dan Architecture ke Sharded Cluster.

  • Batasan pada operasi yang akan dilakukan pada database sumber:

    • Selama sinkronisasi data penuh, jangan mengubah skema database atau koleksi atau data bertipe ARRAY. Jika tidak, tugas sinkronisasi data akan gagal, atau terjadi ketidakkonsistenan data antara database sumber dan tujuan.

    • Jika instans MongoDB sumber menggunakan arsitektur kluster sharded, jangan menjalankan perintah yang mengubah distribusi data objek yang akan disinkronkan selama tugas sinkronisasi. Perintah-perintah ini mencakup shardCollection, reshardCollection, unshardCollection, moveCollection, dan movePrimary. Jika tidak, ketidakkonsistenan data dapat terjadi.

  • Jika database sumber adalah instans MongoDB yang menggunakan arsitektur kluster sharded dan balancer database sumber melakukan penyeimbangan data, latensi dapat terjadi pada instans tersebut.

  • Menghubungkan ke database MongoDB menggunakan alamat SRV tidak didukung.

Batasan lainnya

  • Hanya sinkronisasi tingkat koleksi yang didukung.

  • Menyinkronkan data dari database admin, config, dan local tidak didukung.

  • Jika satu bagian data yang akan disinkronkan melebihi 10 MB, tugas akan gagal.

  • Jika database sumber adalah instans MongoDB kluster sharded:

    • Selama sinkronisasi data penuh, Anda harus menonaktifkan balancer database MongoDB sumber hingga setiap subtugas memasuki fase sinkronisasi inkremental. Jika tidak, ketidakkonsistenan data dapat terjadi. Untuk informasi lebih lanjut tentang operasi balancer, lihat Mengelola balancer MongoDB.

    • Jika metode sinkronisasi data inkremental adalah Oplog, DTS tidak dapat menjamin urutan penulisan data dari shard yang berbeda di database sumber ke Kafka tujuan.

  • Informasi transaksi tidak dipertahankan. Saat transaksi disinkronkan ke database tujuan, transaksi tersebut diubah menjadi catatan tunggal.

  • Jika node broker ditambah atau dikurangi pada instans Kafka tujuan selama tugas DTS, Anda harus memulai ulang tugas DTS.

  • Pastikan bahwa DTS dapat terhubung ke instans sumber dan tujuan. Misalnya, pengaturan keamanan instans database dan parameter listeners serta advertised.listeners dalam file server.properties instans Kafka yang dikelola sendiri tidak membatasi akses dari DTS.

  • Selama tugas sinkronisasi data penuh, DTS menggunakan sumber daya baca dan tulis database sumber dan tujuan. Hal ini dapat meningkatkan beban pada server database. Oleh karena itu, kami menyarankan Anda mengevaluasi kinerja database sumber dan tujuan sebelum memulai instans DTS dan menyinkronkan data pada jam-jam tidak sibuk, misalnya ketika beban CPU database sumber dan tujuan kurang dari 30%.

  • DTS mencoba melanjutkan instans yang gagal yang telah berjalan kurang dari tujuh hari. Sebelum Anda memindahkan bisnis Anda ke instans tujuan, akhiri atau lepaskan instans sinkronisasi. Hal ini mencegah instans dilanjutkan secara otomatis, yang dapat menimpa data di database tujuan.

  • DTS menghitung latensi sinkronisasi data inkremental berdasarkan stempel waktu data terbaru yang disinkronkan di database tujuan dan stempel waktu saat ini di database sumber. Jika tidak ada operasi pembaruan yang dilakukan pada database sumber dalam jangka waktu yang lama, latensi sinkronisasi mungkin tidak akurat. Jika latensi tugas sinkronisasi data terlalu tinggi, Anda dapat melakukan operasi pembaruan pada database sumber untuk memperbarui latensi.

  • Jika sebuah instans gagal, tim dukungan DTS akan mencoba memulihkan instans tersebut dalam waktu 8 jam. Selama proses pemulihan, operasi seperti memulai ulang instans dan menyesuaikan parameter mungkin dilakukan.

    Catatan

    Saat parameter disesuaikan, hanya parameter instans DTS yang dimodifikasi. Parameter database tidak dimodifikasi. Parameter yang mungkin dimodifikasi termasuk tetapi tidak terbatas pada yang dijelaskan dalam Memodifikasi parameter instans.

Penagihan

Jenis sinkronisasi

Biaya konfigurasi tugas

Sinkronisasi data penuh

Gratis.

Sinkronisasi data inkremental

Dikenai biaya. Untuk informasi lebih lanjut, lihat Ikhtisar penagihan.

Jenis sinkronisasi

Jenis sinkronisasi

Deskripsi

Sinkronisasi penuh

Menyinkronkan semua data historis objek sinkronisasi sumber di ApsaraDB for MongoDB ke instans Kafka tujuan.

Catatan

Sinkronisasi penuh DATABASE dan KOLEKSI didukung.

Sinkronisasi inkremental

Selain sinkronisasi penuh, Anda dapat menyinkronkan pembaruan inkremental dari ApsaraDB for MongoDB sumber ke instans Kafka tujuan.

Menggunakan Oplog

Sinkronisasi inkremental tidak mendukung database yang dibuat setelah tugas dimulai. Pembaruan inkremental berikut didukung:

  • BUAT KOLEKSI, INDEKS

  • HAPUS DATABASE, KOLEKSI, INDEKS

  • GANTI NAMA KOLEKSI

  • Operasi untuk menyisipkan, memperbarui, dan menghapus dokumen dalam koleksi.

Menggunakan ChangeStream

Pembaruan inkremental berikut didukung:

  • HAPUS DATABASE, KOLEKSI

  • GANTI NAMA KOLEKSI

  • Operasi untuk menyisipkan, memperbarui, dan menghapus dokumen dalam koleksi.

Izin akun database

Database

Izin yang diperlukan

Pembuatan dan otorisasi akun

Sumber ApsaraDB for MongoDB

Izin baca pada database yang akan disinkronkan, database admin, dan database local.

Manajemen Akun

Prosedur

  1. Gunakan salah satu metode berikut untuk membuka halaman Sinkronisasi Data dan pilih wilayah tempat instans sinkronisasi data berada.

    Konsol DTS

    1. Masuk ke Konsol DTS.

    2. Di panel navigasi sebelah kiri, klik Data Synchronization.

    3. Di pojok kiri atas halaman, pilih wilayah tempat tugas sinkronisasi data berada.

    Konsol DMS

    Catatan

    Operasi aktual dapat berbeda tergantung pada mode dan tata letak konsol DMS. Untuk informasi lebih lanjut, lihat Mode sederhana dan Menyesuaikan tata letak dan gaya konsol DMS.

    1. Masuk ke Konsol DMS.

    2. Di bilah navigasi atas, arahkan kursor ke Data + AI, lalu pilih DTS (DTS) > Data Synchronization.

    3. Dari daftar drop-down di sebelah kanan Data Synchronization Tasks, pilih wilayah tempat instans sinkronisasi data berada.

  2. Klik Create Task untuk membuka halaman konfigurasi tugas.

  3. Konfigurasikan database sumber dan tujuan. Tabel berikut menjelaskan parameter tersebut.

    Kategori

    Konfigurasi

    Deskripsi

    Tidak ada

    Task Name

    Nama tugas DTS. DTS secara otomatis menghasilkan nama tugas. Kami menyarankan Anda menentukan nama deskriptif yang memudahkan identifikasi tugas. Anda tidak perlu menentukan nama tugas yang unik.

    Source Database

    Select Existing Connection

    • Jika Anda menggunakan instans database yang terdaftar di DTS, pilih instans tersebut dari daftar drop-down. DTS secara otomatis mengisi parameter database berikut untuk instans tersebut. Untuk informasi lebih lanjut, lihat Mengelola koneksi database.

      Catatan

      Di konsol DMS, Anda dapat memilih instans database dari daftar drop-down Select a DMS database instance.

    • Jika Anda gagal mendaftarkan instans ke DTS, atau Anda tidak perlu menggunakan instans yang terdaftar di DTS, Anda harus mengonfigurasi informasi database berikut.

    Database Type

    Pilih MongoDB.

    Access Method

    Pilih Alibaba Cloud Instance.

    Instance Region

    Pilih wilayah instans ApsaraDB for MongoDB sumber.

    Replicate Data Across Alibaba Cloud Accounts

    Dalam contoh ini, database dari akun Alibaba Cloud saat ini digunakan. Pilih No.

    Architecture

    Contoh ini memilih Replica Set.

    Catatan

    Jika instans ApsaraDB for MongoDB sumber Anda adalah Sharded Cluster, Anda juga perlu memasukkan Shard account dan Shard password.

    Migration Method

    Metode yang digunakan untuk menyinkronkan data inkremental dari database sumber. Pilih metode berdasarkan kebutuhan bisnis Anda. Nilai yang valid:

    • Oplog (disarankan):

      Opsi ini tersedia jika fitur oplog diaktifkan untuk database sumber.

      Catatan

      Secara default, fitur oplog diaktifkan untuk database MongoDB yang dikelola sendiri dan instans ApsaraDB for MongoDB. Fitur ini memungkinkan Anda menyinkronkan data inkremental dengan latensi rendah karena kecepatan penarikan log yang cepat. Oleh karena itu, kami menyarankan Anda memilih Oplog untuk parameter Metode Migrasi.

    • ChangeStream:

      Opsi ini tersedia jika change streams diaktifkan untuk database sumber. Untuk informasi lebih lanjut, lihat Change Streams.

      Catatan
      • Jika database sumber adalah kluster Amazon DocumentDB non-elastis, Anda hanya dapat mengatur parameter Metode Migrasi ke ChangeStream.

      • Jika Anda memilih Sharded Cluster untuk parameter Architecture, Anda tidak perlu mengonfigurasi parameter Shard account dan Shard password.

    Instance ID

    Pilih ID instans ApsaraDB for MongoDB sumber.

    Authentication Database

    Masukkan nama database untuk akun database di instans ApsaraDB for MongoDB sumber. Jika Anda belum memodifikasinya, nilai default-nya adalah admin.

    Database Account

    Masukkan akun database untuk ApsaraDB for MongoDB sumber. Untuk persyaratan izin, lihat Izin yang diperlukan untuk akun database.

    Database Password

    Kata sandi yang digunakan untuk mengakses database.

    Encryption

    Menentukan apakah akan mengenkripsi koneksi ke database sumber. Anda dapat memilih Non-encrypted, SSL-encrypted, atau Mongo Atlas SSL berdasarkan kebutuhan bisnis Anda. Opsi yang tersedia untuk parameter Encryption ditentukan oleh nilai yang dipilih untuk parameter Access Method dan Architecture. Opsi yang ditampilkan di konsol DTS berlaku.

    Catatan
    • Jika parameter Architecture diatur ke Sharded Cluster, dan parameter Migration Method diatur ke Oplog untuk database ApsaraDB for MongoDB, parameter Enkripsi SSL-encrypted tidak tersedia.

    • Jika database sumber adalah database MongoDB yang dikelola sendiri yang menggunakan arsitektur Replica Set, parameter Access Method tidak diatur ke Alibaba Cloud Instance, dan parameter Enkripsi diatur ke SSL-encrypted, Anda dapat mengunggah sertifikat otoritas sertifikasi (CA) untuk memverifikasi koneksi ke database sumber.

    Destination Database

    Select Existing Connection

    • Jika Anda menggunakan instans database yang terdaftar di DTS, pilih instans tersebut dari daftar drop-down. DTS secara otomatis mengisi parameter database berikut untuk instans tersebut. Untuk informasi lebih lanjut, lihat Mengelola koneksi database.

      Catatan

      Di konsol DMS, Anda dapat memilih instans database dari daftar drop-down Select a DMS database instance.

    • Jika Anda gagal mendaftarkan instans ke DTS, atau Anda tidak perlu menggunakan instans yang terdaftar di DTS, Anda harus mengonfigurasi informasi database berikut.

    Database Type

    Pilih Kafka.

    Access Method

    Pilih Alibaba Cloud Instance.

    Instance Region

    Pilih wilayah tempat instans Kafka tujuan berada.

    Kafka Instance ID

    Pilih ID instans Kafka tujuan.

    Encryption

    Berdasarkan kebutuhan bisnis dan keamanan Anda, pilih Non-encrypted atau SCRAM-SHA-256.

    Topic

    Dari daftar drop-down, pilih topik yang digunakan untuk menerima data.

    Use Kafka Schema Registry

    Kafka Schema Registry adalah lapisan layanan metadata yang menyediakan antarmuka RESTful untuk menyimpan dan mengambil skema Avro.

    • No: Jangan gunakan Kafka Schema Registry.

    • Yes: Gunakan Kafka Schema Registry. Anda harus memasukkan URL atau alamat IP yang terdaftar di Kafka Schema Registry untuk skema Avro Anda.

  4. Klik Test Connectivity and Proceed di bagian bawah halaman.

    Catatan
    • Pastikan bahwa blok CIDR server DTS dapat ditambahkan secara otomatis atau manual ke pengaturan keamanan database sumber dan tujuan untuk mengizinkan akses dari server DTS. Untuk informasi lebih lanjut, lihat Tambahkan alamat IP server DTS ke daftar putih.

    • Jika database sumber atau tujuan adalah database yang dikelola sendiri dan Access Method-nya tidak diatur ke Alibaba Cloud Instance, klik Test Connectivity di kotak dialog CIDR Blocks of DTS Servers.

  5. Konfigurasikan objek yang akan disinkronkan.

    1. Pada langkah Configure Objects, konfigurasikan objek yang ingin Anda sinkronkan.

      Konfigurasi

      Deskripsi

      Synchronization Types

      Secara default, Incremental Data Synchronization dipilih. Anda hanya dapat memilih Full Data Synchronization. Anda tidak dapat memilih Schema Synchronization. Setelah pemeriksaan awal selesai, DTS menyinkronkan data historis objek yang dipilih dari database sumber ke database tujuan. Data historis ini menjadi dasar untuk sinkronisasi inkremental selanjutnya.

      Processing Mode of Conflicting Tables

      • Precheck and Report Errors: memeriksa apakah database tujuan berisi koleksi yang memiliki nama yang sama dengan koleksi di database sumber. Jika database sumber dan tujuan tidak berisi koleksi dengan nama koleksi yang identik, pemeriksaan awal berhasil. Jika tidak, kesalahan akan dikembalikan selama pemeriksaan awal, dan instans sinkronisasi data tidak dapat dimulai.

        Catatan

        Jika database sumber dan tujuan memiliki koleksi dengan nama yang sama dan koleksi di database tujuan tidak dapat dihapus atau diganti namanya, Anda dapat menggunakan fitur pemetaan nama objek untuk mengganti nama koleksi yang disinkronkan ke database tujuan. Untuk informasi lebih lanjut, lihat Ganti nama objek yang akan disinkronkan.

      • Ignore Errors and Proceed: melewati pemeriksaan awal untuk nama koleksi yang identik di database sumber dan tujuan.

        Peringatan

        Jika Anda memilih Ignore Errors and Proceed, ketidakkonsistenan data dapat terjadi dan bisnis Anda mungkin terpapar risiko potensial.

        • Jika catatan data di database tujuan memiliki nilai kunci primer atau nilai kunci unik yang sama dengan catatan data di database sumber, DTS tidak akan menyinkronkan catatan data tersebut ke database tujuan. Catatan data yang ada di database tujuan akan dipertahankan.

        • Data mungkin gagal diinisialisasi, hanya kolom tertentu yang disinkronkan, atau instans sinkronisasi data gagal.

      Data Format in Kafka

      Hanya Canal JSON yang didukung.

      Catatan

      Data yang diterima oleh Kafka dapat dikategorikan menjadi tiga skenario.

      Kafka Data Compression Format

      Format kompresi untuk data Kafka yang dikompresi. Pilih format kompresi berdasarkan kebutuhan bisnis Anda. Nilai yang valid:

      • LZ4 (default): rasio kompresi rendah dan kecepatan kompresi tinggi.

      • GZIP: rasio kompresi tinggi dan kecepatan kompresi rendah.

        Catatan

        Kompresi GZIP mengonsumsi banyak sumber daya CPU.

      • Snappy: rasio kompresi sedang dan kecepatan kompresi sedang.

      Policy for Shipping Data to Kafka Partitions

      Pilih kebijakan berdasarkan kebutuhan Anda.

      Message acknowledgement mechanism

      Pilih mekanisme konfirmasi pesan berdasarkan kebutuhan Anda.

      Topic That Stores DDL Information

      Capitalization of Object Names in Destination Instance

      Kapitalisasi nama database dan nama koleksi di instans tujuan. Secara default, DTS default policy dipilih. Anda dapat memilih opsi lain untuk memastikan kapitalisasi nama objek konsisten dengan kapitalisasi default nama objek di database sumber atau tujuan. Untuk informasi lebih lanjut, lihat Tentukan kapitalisasi nama objek di instans tujuan.

      Source Objects

      Pilih satu atau beberapa objek dari bagian Source Objects dan klik ikon 向右 untuk menambahkan objek ke bagian Selected Objects.

      Catatan

      Anda dapat memilih objek untuk disinkronkan pada tingkat koleksi.

      Selected Objects

      Tidak diperlukan konfigurasi tambahan dalam contoh ini.

      Anda dapat menggunakan fitur pemetaan untuk mengatur informasi pemetaan di instans Kafka tujuan untuk koleksi di database sumber.

    2. Klik Next: Advanced Settings untuk mengonfigurasi pengaturan lanjutan.

      Konfigurasi

      Deskripsi

      Dedicated Cluster for Task Scheduling

      Secara default, DTS menjadwalkan tugas ke kluster bersama jika Anda tidak menentukan kluster khusus. Jika Anda ingin meningkatkan stabilitas instans sinkronisasi data, beli kluster khusus. Untuk informasi lebih lanjut, lihat Apa itu kluster khusus DTS.

      Retry Time for Failed Connections

      Rentang waktu coba ulang untuk koneksi yang gagal. Jika database sumber atau tujuan gagal terhubung setelah tugas sinkronisasi data dimulai, DTS segera mencoba menghubungkan kembali dalam rentang waktu tersebut. Nilai yang valid: 10 hingga 1440. Satuan: menit. Nilai default: 720. Kami menyarankan Anda mengatur parameter ini ke nilai lebih dari 30. Jika DTS berhasil menghubungkan kembali ke database sumber dan tujuan dalam rentang waktu yang ditentukan, DTS melanjutkan tugas sinkronisasi data. Jika tidak, tugas sinkronisasi data gagal.

      Catatan
      • Jika Anda menentukan rentang waktu coba ulang yang berbeda untuk beberapa tugas sinkronisasi data yang memiliki database sumber atau tujuan yang sama, rentang waktu coba ulang terpendek yang berlaku.

      • Saat DTS mencoba menghubungkan kembali, Anda dikenai biaya untuk instans DTS. Kami menyarankan Anda menentukan rentang waktu coba ulang berdasarkan kebutuhan bisnis Anda. Anda juga dapat melepaskan instans DTS sesegera mungkin setelah instans sumber dan tujuan dilepaskan.

      Retry Time for Other Issues

      Rentang waktu coba ulang untuk masalah lain. Misalnya, jika operasi DDL atau DML gagal dilakukan setelah tugas sinkronisasi data dimulai, DTS segera mencoba mengulang operasi tersebut dalam rentang waktu tersebut. Nilai yang valid: 1 hingga 1440. Satuan: menit. Nilai default: 10. Kami menyarankan Anda mengatur parameter ini ke nilai lebih dari 10. Jika operasi yang gagal berhasil dilakukan dalam rentang waktu yang ditentukan, DTS melanjutkan tugas sinkronisasi data. Jika tidak, tugas sinkronisasi data gagal.

      Penting

      Nilai parameter Retry Time for Other Issues harus lebih kecil daripada nilai parameter Retry Time for Failed Connections.

      Obtain the entire document after it is updated.

      Selama sinkronisasi data inkremental, tentukan apakah akan menyinkronkan data lengkap dokumen yang sesuai dengan operasi pembaruan ke tujuan.

      Catatan

      Item konfigurasi ini hanya tersedia ketika Migration Method diatur ke ChangeStream.

      • Yes: Menyinkronkan data lengkap dokumen yang berisi bidang yang diperbarui.

        Penting
        • Fitur ini didasarkan pada kemampuan asli MongoDB dan dapat meningkatkan beban pada database sumber. Hal ini dapat mengurangi kecepatan pengumpulan data inkremental dan menyebabkan latensi pada instans sinkronisasi.

        • Jika DTS gagal memperoleh data lengkap, hanya data bidang yang diperbarui yang disinkronkan.

      • No: Hanya menyinkronkan data bidang yang diperbarui.

      Enable Throttling for Full Data Synchronization

      Selama sinkronisasi data penuh, DTS menggunakan sumber daya baca dan tulis database sumber dan tujuan. Hal ini dapat meningkatkan beban pada server database. Anda dapat mengonfigurasi parameter Queries per second (QPS) to the source database, RPS of Full Data Migration, dan Data migration speed for full migration (MB/s) untuk tugas sinkronisasi data penuh guna mengurangi beban pada server database tujuan.

      Catatan

      Anda hanya dapat mengonfigurasi parameter ini jika Full Data Synchronization dipilih untuk parameter Synchronization Types.

      Only one data type for primary key _id in a table of the data to be synchronized

      Hanya satu tipe data untuk kunci primer _id dalam satu koleksi data yang akan disinkronkan.

      Catatan

      Parameter ini hanya ditampilkan jika Full Data Synchronization dipilih untuk parameter Synchronization Types.

      • Yes: Selama sinkronisasi data penuh, DTS tidak memindai tipe data untuk kunci primer dalam data yang akan disinkronkan dari database sumber.

      • No: Selama sinkronisasi data penuh, DTS memindai tipe data untuk kunci primer dalam data yang akan disinkronkan dari database sumber.

      Enable Throttling for Incremental Data Synchronization

      Tentukan apakah akan mengaktifkan pembatasan untuk sinkronisasi data inkremental. Anda dapat mengaktifkan pembatasan untuk sinkronisasi data inkremental berdasarkan kebutuhan bisnis Anda. Untuk mengonfigurasi pembatasan, Anda harus mengonfigurasi parameter RPS of Incremental Data Synchronization dan Data synchronization speed for incremental synchronization (MB/s). Hal ini mengurangi beban pada server database tujuan.

      Environment Tag

      Anda dapat memilih tag lingkungan untuk mengidentifikasi instans berdasarkan kebutuhan Anda. Dalam contoh ini, Anda tidak perlu memilih tag.

      Configure ETL

      Tentukan apakah akan mengaktifkan fitur ekstrak, transformasi, muat (ETL). Untuk informasi lebih lanjut, lihat Apa itu ETL? Nilai yang valid:

      Monitoring and Alerting

      Tentukan apakah akan mengonfigurasi peringatan untuk instans sinkronisasi data. Jika tugas gagal atau latensi sinkronisasi melebihi ambang batas yang ditentukan, kontak peringatan akan menerima notifikasi. Nilai yang valid:

      • No: tidak mengaktifkan peringatan.

      • Yes: mengonfigurasi peringatan. Dalam hal ini, Anda juga harus mengonfigurasi ambang batas peringatan dan pengaturan notifikasi peringatan. Untuk informasi lebih lanjut, lihat bagian "Konfigurasi pemantauan dan peringatan saat membuat tugas DTS" dalam topik Konfigurasi pemantauan dan peringatan.

  6. Simpan pengaturan tugas dan jalankan pemeriksaan awal.

    • Untuk melihat parameter yang perlu ditentukan saat Anda memanggil operasi API terkait untuk mengonfigurasi tugas DTS, arahkan kursor ke Next: Save Task Settings and Precheck, lalu klik Preview OpenAPI parameters.

    • Jika Anda tidak perlu melihat atau telah melihat parameter, klik Next: Save Task Settings and Precheck di bagian bawah halaman.

    Catatan
    • Sebelum Anda dapat memulai tugas sinkronisasi data, DTS melakukan pemeriksaan awal. Anda hanya dapat memulai tugas sinkronisasi data setelah tugas tersebut lulus pemeriksaan awal.

    • Jika tugas sinkronisasi data gagal dalam pemeriksaan awal, klik View Details di sebelah setiap item yang gagal. Setelah Anda menganalisis penyebab berdasarkan hasil pemeriksaan, atasi masalah tersebut. Kemudian, jalankan kembali pemeriksaan awal.

    • Jika peringatan dipicu untuk suatu item selama pemeriksaan awal:

      • Jika item peringatan tidak dapat diabaikan, klik View Details di sebelah item yang gagal dan atasi masalah tersebut. Kemudian, jalankan pemeriksaan awal lagi.

      • Jika item peringatan dapat diabaikan, klik Confirm Alert Details. Di kotak dialog Lihat Detail, klik Ignore. Di pesan yang muncul, klik OK. Kemudian, klik Precheck Again untuk menjalankan pemeriksaan awal lagi. Jika Anda mengabaikan item peringatan, ketidakkonsistenan data dapat terjadi, dan bisnis Anda mungkin terpapar risiko potensial.

  7. Beli instance tersebut.

    1. Tunggu hingga Success Rate menjadi 100%, lalu klik Next: Purchase Instance.

    2. Di halaman buy, konfigurasikan parameter Metode Penagihan dan Kelas Instans untuk tugas sinkronisasi data. Tabel berikut menjelaskan parameter tersebut.

      Bagian

      Parameter

      Deskripsi

      New Instance Class

      Metode Penagihan

      • Langganan: Anda membayar untuk langganan saat membuat instans sinkronisasi data. Metode penagihan langganan lebih hemat biaya dibandingkan metode penagihan bayar sesuai pemakaian untuk penggunaan jangka panjang.

      • Bayar sesuai pemakaian: Instans bayar sesuai pemakaian ditagih per jam. Metode penagihan bayar sesuai pemakaian cocok untuk penggunaan jangka pendek. Jika Anda tidak lagi memerlukan instans sinkronisasi data bayar sesuai pemakaian, Anda dapat melepaskan instans tersebut untuk mengurangi biaya.

      Pengaturan Grup Sumber Daya

      Grup sumber daya tempat instans sinkronisasi data berada. Nilai default: default resource group. Untuk informasi lebih lanjut, lihat Apa itu Manajemen Sumber Daya?

      Kelas Instans

      DTS menyediakan kelas instans dengan kecepatan sinkronisasi yang berbeda. Anda dapat memilih kelas instans berdasarkan kebutuhan bisnis Anda. Untuk informasi lebih lanjut, lihat Kelas instans untuk instans sinkronisasi data.

      Durasi Langganan

      Jika Anda memilih metode penagihan langganan, tentukan durasi langganan dan jumlah instans sinkronisasi data yang ingin Anda buat. Durasi langganan dapat berupa satu hingga sembilan bulan, satu tahun, dua tahun, tiga tahun, atau lima tahun.

      Catatan

      Parameter ini hanya tersedia jika Anda memilih metode penagihan Subscription.

    3. Baca dan pilih Data Transmission Service (Pay-as-you-go) Service Terms.

    4. Klik Buy and Start. Di kotak dialog yang muncul, klik OK.

      Anda dapat melihat progres tugas di daftar tugas.

Informasi pemetaan

  1. Di area Selected Objects, arahkan kursor ke nama Topik tujuan pada tingkat koleksi.

  2. Klik Edit di sebelah nama Topik tujuan.

  3. Di kotak dialog Edit Table yang muncul, konfigurasikan informasi pemetaan.

    Konfigurasi

    Deskripsi

    Name of target Topic

    Nama Topik tujuan tempat koleksi sumber disinkronkan, yang secara default adalah Topic yang dipilih di bagian Destination Database pada langkah Configurations for Source and Destination Databases.

    Penting
    • Nama topik yang Anda masukkan harus ada di instans Kafka tujuan. Jika tidak, sinkronisasi data akan gagal.

    • Jika Anda memodifikasi Name of target Topic, data akan ditulis ke Topik yang Anda masukkan.

    Filter Conditions

    Untuk informasi lebih lanjut, lihat Atur kondisi filter.

    Number of Partitions

    Jumlah partisi untuk menulis data ke topik tujuan.

  4. Klik OK.

Skenario pengiriman data

Skenario 1: Sinkronisasi data inkremental menggunakan Oplog

Konfigurasi instans utama

Untuk Migration Method, pilih Oplog.

Contoh pengiriman data

Jenis perubahan inkremental sumber

Pernyataan perubahan inkremental sumber

Data yang diterima oleh topik tujuan

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973438,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848051984,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147734,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"$unset": {
			"salary": true
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208186,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848289798,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

Lihat data (klik untuk memperluas)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 1741847893000000005,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847893760,
	"type": "DDL"
}

Skenario 2: Sinkronisasi data inkremental menggunakan ChangeStream (menyinkronkan data bidang yang diperbarui)

Konfigurasi instans utama

Untuk Migration Method, pilih ChangeStream. Untuk Retrieve Full Document After Update Operation, pilih No.

Contoh pengiriman data

Jenis perubahan inkremental sumber

Pernyataan perubahan inkremental sumber

Data yang diterima oleh topik tujuan

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973803,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"$set": {
			"person.age": 20
		}
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052912,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"$set": {
			"salary": 100.0
		}
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848148056,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"$unset": {
			"salary": 1
		}
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848209142,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290254,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

Lihat data (klik untuk memperluas)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894679,
	"type": "DDL"
}

Skenario 3: Sinkronisasi data inkremental menggunakan ChangeStream (menyinkronkan data lengkap dokumen yang sesuai dengan bidang yang diperbarui)

Konfigurasi instans utama

Atur Migration Method ke ChangeStream, dan atur Retrieve Full Document After Update Operation ke Yes.

Contoh pengiriman data

Jenis perubahan inkremental sumber

Pernyataan perubahan inkremental sumber

Data yang diterima oleh topik tujuan

insert

db.kafka_test.insert({"cid":"a","person":{"name":"testName","age":NumberInt(18),"skills":["database","ai"]}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 18
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741847972000,
	"gtid": null,
	"id": 174184797200000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847973128,
	"type": "INSERT"
}

update $set

db.kafka_test.update({"cid":"a"},{$set:{"person.age":NumberInt(20)}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848051000,
	"gtid": null,
	"id": 174184805100000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848052219,
	"type": "UPDATE"
}

update $set new filed

db.kafka_test.update({"cid":"a"},{$set:{"salary":100}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"salary": 100.0,
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848146000,
	"gtid": null,
	"id": 174184814600000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848147327,
	"type": "UPDATE"
}

update $unset remove field

db.kafka_test.update({"cid":"a"},{$unset:{"salary":1}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"person": {
			"skills": ["database", "ai"],
			"name": "testName",
			"age": 20
		},
		"_id": {
			"$oid": "67d27da49591697476e1****"
		},
		"cid": "a"
	}],
	"database": "kafkadb",
	"es": 1741848207000,
	"gtid": null,
	"id": 174184820700000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848208401,
	"type": "UPDATE"
}

delete

db.kafka_test.deleteOne({"cid":"a"})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"_id": {
			"$oid": "67d27da49591697476e1****"
		}
	}],
	"database": "kafkadb",
	"es": 1741848289000,
	"gtid": null,
	"id": 174184828900000****,
	"isDdl": false,
	"mysqlType": null,
	"old": null,
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741848290499,
	"type": "DELETE"
}

ddl drop

db.kafka_test.drop()

Lihat data (klik untuk memperluas)

{
	"data": null,
	"database": "kafkadb",
	"es": 1741847893000,
	"gtid": null,
	"id": 174184789300000****,
	"isDdl": true,
	"mysqlType": null,
	"old": null,
	"pkNames": null,
	"serverId": null,
	"sql": {
		"drop": "kafka_test"
	},
	"sqlType": null,
	"table": "kafka_test",
	"ts": 1741847894045,
	"type": "DDL"
}

Kasus khusus

Perhatian

Ketika bidang fullDocument dari event pembaruan hilang, hasil pengiriman data sama dengan saat Anda menyinkronkan data inkremental menggunakan Oplog.

Contoh

Data dasar sumber

Pernyataan perubahan inkremental sumber

Data yang diterima oleh topik tujuan

use admin
db.runCommand({ enablesharding:"dts_test" }) 
use dts_test
sh.shardCollection("dts_test.cstest",{"name":"hashed"})
db.cstest.insert({"_id":1,"name":"a"})
db.cstest.updateOne({"_id":1,"name":"a"},{$set:{"name":"b"}})

Lihat data (klik untuk memperluas)

{
	"data": [{
		"$set": {
			"name": "b"
		}
	}],
	"database": "dts_test",
	"es": 1740720994000,
	"gtid": null,
	"id": 174072099400000****,
	"isDdl": false,
	"mysqlType": null,
	"old": [{
		"name": "a",
		"_id": 1.0
	}],
	"pkNames": ["_id"],
	"serverId": null,
	"sql": null,
	"sqlType": null,
	"table": "cstest",
	"ts": 1740721007099,
	"type": "UPDATE"
}

FAQ

  • Apakah saya dapat memodifikasi Kafka Data Compression Format?

    Ya. Untuk informasi lebih lanjut, lihat Modify the objects to be synchronized.

  • Apakah saya dapat memodifikasi Message acknowledgement mechanism?

    Ya. Untuk informasi lebih lanjut, lihat Modify the objects to be synchronized.