All Products
Search
Document Center

AnalyticDB:Sinkronisasi data Kafka ke AnalyticDB for MySQL menggunakan sinkronisasi data (APS)

Last Updated:Dec 02, 2025

AnalyticDB for MySQL menyediakan fitur AnalyticDB Pipeline Service (APS) yang memungkinkan Anda membuat tugas sinkronisasi Kafka untuk ingest data dari Kafka secara real time mulai dari offset tertentu. Topik ini menjelaskan cara menambahkan sumber data Kafka, membuat tugas sinkronisasi Kafka, dan menjalankan tugas tersebut.

Prasyarat

Catatan

  • Hanya data Kafka dalam format JSON yang dapat disinkronkan.

  • Data dalam topik Kafka secara otomatis dihapus setelah periode tertentu. Jika data dalam topik kedaluwarsa dan tugas sinkronisasi data gagal, data yang kedaluwarsa tidak dapat dibaca saat Anda me-restart tugas tersebut, sehingga berpotensi menyebabkan kehilangan data. Untuk mencegah hal ini, tingkatkan siklus hidup data topik dan segera hubungi dukungan teknis jika tugas sinkronisasi data gagal.

  • Jika data sampel Kafka berukuran lebih dari 8 KB, API Kafka akan memotong data tersebut, yang menyebabkan kegagalan saat mengurai data sampel sehingga informasi pemetaan bidang tidak dapat dihasilkan secara otomatis.

  • Jika skema tabel sumber Kafka berubah, perubahan DDL tidak dipicu secara otomatis, artinya perubahan tersebut tidak disinkronkan ke AnalyticDB for MySQL.

Penagihan

Untuk informasi tentang biaya sumber daya elastic AnalyticDB Compute Unit (ACU) untuk kluster AnalyticDB for MySQL, lihat Item penagihan untuk Edisi Data Lakehouse dan Item penagihan untuk Edisi Perusahaan dan Edisi Dasar.

Prosedur

Langkah 1: Buat sumber data

Catatan

Jika Anda telah menambahkan sumber data Kafka, Anda dapat melewati langkah ini dan membuat tautan sinkronisasi baru.

  1. Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola, lalu klik ID kluster tersebut.

  2. Di panel navigasi sebelah kiri, pilih Data Ingestion > Data Source.

  3. Di pojok kanan atas, klik Create Data Source.

  4. Pada halaman Create Data Source, konfigurasikan parameter berikut:

    Parameter

    Deskripsi

    Data Source Type

    Pilih Kafka sebagai tipe sumber data.

    Data Source Name

    Sistem menghasilkan nama berdasarkan tipe sumber data dan waktu saat ini. Anda dapat mengubah nama sesuai kebutuhan.

    Data Source Description

    Deskripsi sumber data, seperti skenario aplikasi atau batasan bisnisnya.

    Deployment Mode

    Hanya instans Alibaba Cloud yang didukung.

    Kafka Instance

    ID instans Kafka.

    Masuk ke Konsol ApsaraMQ for Kafka dan lihat ID instans pada halaman Clusters.

    Kafka Topic

    Nama topik yang dibuat di Kafka.

    Masuk ke Konsol ApsaraMQ for Kafka dan lihat nama topik pada halaman Topic Management instans target.

    Message Data Format

    Format data pesan Kafka. Hanya JSON yang didukung.

  5. Setelah mengonfigurasi parameter, klik Create.

Langkah 2: Buat tautan sinkronisasi

  1. Di panel navigasi sebelah kiri, klik Simple Log Service/Kafka Data Synchronization.

  2. Di pojok kiri atas, klik Create Synchronization Job, lalu klik tab Kafka Data Source.

  3. Pada halaman Create Synchronization Job, konfigurasikan Source and Destination Settings, Destination Database and Table Settings, dan Synchronization Settings.

    • Tabel berikut menjelaskan parameter pada bagian Source and Destination Settings.

      Parameter

      Deskripsi

      Job Name

      Nama tautan data. Sistem menghasilkan nama berdasarkan tipe sumber data dan waktu saat ini. Anda dapat mengubah nama sesuai kebutuhan.

      Data Source

      Pilih sumber data Kafka yang sudah ada atau buat yang baru.

      Data Source Format

      Hanya JSON yang didukung.

      Destination Type

      Pilih: Data Warehouse - ADB Storage.

      AnalyticDB for MySQL Account

      Akun database untuk kluster AnalyticDB for MySQL.

      AnalyticDB for MySQL Password

      Kata sandi akun database kluster AnalyticDB for MySQL.

    • Tabel berikut menjelaskan parameter pada bagian Destination Database and Table Settings.

      Parameter

      Deskripsi

      Database Name

      Nama database kluster AnalyticDB for MySQL.

      Table Name

      Nama tabel kluster AnalyticDB for MySQL.

      Sample Data

      Data terbaru diambil secara otomatis dari topik Kafka dan digunakan sebagai data sampel.

      Catatan

      Data dalam topik Kafka harus dalam format JSON. Jika terdapat data dalam format lain, terjadi error selama sinkronisasi data.

      Parsed JSON Layers

      Jumlah lapisan bersarang yang akan diurai untuk data JSON. Nilai yang valid:

      • 0: Tidak ada penguraian.

      • 1 (Default): Mengurai satu lapisan.

      • 2: Mengurai dua lapisan.

      • 3: Mengurai tiga lapisan.

      • 4: Mengurai empat lapisan.

      Untuk informasi selengkapnya tentang kebijakan penguraian bersarang JSON, lihat Gunakan fitur sinkronisasi data (APS) untuk menyinkronkan data Kafka (Direkomendasikan).

      Schema Field Mapping

      Menampilkan informasi skema data sampel setelah penguraian JSON. Anda dapat mengubah nama dan tipe bidang tujuan, atau menambahkan/menghapus bidang sesuai kebutuhan.

    • Tabel berikut menjelaskan parameter pada bagian Synchronization Settings.

      Parameter

      Deskripsi

      Delivery Start Point

      Saat tugas sinkronisasi dimulai, sistem mengonsumsi data Kafka dari titik waktu yang dipilih. Anda dapat memilih titik waktu apa pun, dan sistem mulai mengonsumsi dari catatan data pertama di Kafka yang berada pada atau setelah waktu tersebut.

      Dirty Data Processing Mode

      Selama sinkronisasi data, jika tipe data suatu bidang di tabel tujuan tidak sesuai dengan tipe data aktual data sumber Kafka, sinkronisasi gagal. Misalnya, jika data sumber adalah abc dan tipe bidang di tabel tujuan adalah int, terjadi error konversi yang menyebabkan sinkronisasi menjadi abnormal.

      Mode pemrosesan data dirty dapat diatur ke salah satu nilai berikut:

      • Stop Synchronization (Default): Sinkronisasi data berhenti. Anda harus mengubah tipe bidang di tabel tujuan atau mengganti mode pemrosesan data dirty, lalu me-restart tugas sinkronisasi.

      • Process as NULL: Bidang yang berisi data dirty ditulis ke tabel tujuan sebagai nilai NULL.

      Contohnya, jika satu baris data Kafka memiliki tiga bidang (col1, col2, dan col3) dan bidang col2 berisi data dirty, data untuk bidang col2 dikonversi menjadi NULL dan ditulis ke tabel. Data untuk bidang col1 dan col3 ditulis secara normal.

      Job Resource Group

      Tentukan kelompok sumber daya khusus Pekerjaan tempat tugas dijalankan.

      ACUs for Incremental Synchronization

      Jumlah ACU untuk kelompok sumber daya khusus Pekerjaan tempat tugas dijalankan. Nilai minimum adalah 2 ACU. Nilai maksimum adalah sumber daya komputasi maksimum yang tersedia di kelompok sumber daya khusus Pekerjaan tersebut. Kami menyarankan Anda menambah jumlah ACU untuk meningkatkan performa ingest data dan stabilitas tugas.

      Catatan

      Saat Anda membuat tugas sinkronisasi data, tugas tersebut menggunakan sumber daya elastis dari kelompok sumber daya khusus Pekerjaan. Tugas sinkronisasi data menempati sumber daya dalam jangka waktu lama. Oleh karena itu, sistem mengurangi sumber daya yang ditempati oleh tugas tersebut dari kelompok sumber daya. Misalnya, jika sumber daya komputasi maksimum kelompok sumber daya khusus Pekerjaan adalah 48 ACU dan Anda telah membuat tugas sinkronisasi yang menggunakan 8 ACU, jumlah maksimum ACU yang dapat Anda pilih untuk tugas sinkronisasi lain dalam kelompok sumber daya ini adalah 40.

      Add To Whitelist

      Anda perlu menambahkan Blok CIDR vSwitch Kafka ke daftar putih kluster AnalyticDB for MySQL untuk membangun koneksi jaringan guna sinkronisasi data.

  4. Setelah mengonfigurasi parameter, klik Submit.

Langkah 3: Jalankan tugas sinkronisasi data

  1. Pada halaman Simple Log Service/Kafka Data Synchronization, temukan tugas sinkronisasi data yang telah Anda buat, lalu di kolom Actions, klik Start.

  2. Di pojok kiri atas, klik Search. Status tugas berubah menjadi Running, yang menunjukkan bahwa tugas sinkronisasi data telah berhasil dijalankan.