AnalyticDB for MySQL menyediakan fitur AnalyticDB Pipeline Service (APS) yang memungkinkan Anda membuat tugas sinkronisasi Kafka untuk ingest data dari Kafka secara real time mulai dari offset tertentu. Topik ini menjelaskan cara menambahkan sumber data Kafka, membuat tugas sinkronisasi Kafka, dan menjalankan tugas tersebut.
Prasyarat
Kluster AnalyticDB for MySQL Edisi Perusahaan, Edisi Dasar, atau Edisi Data Lakehouse telah dibuat.
Akun database telah dibuat untuk kluster AnalyticDB for MySQL.
Jika Anda menggunakan Akun Alibaba Cloud, Anda hanya perlu membuat akun istimewa.
Jika Anda menggunakan pengguna Resource Access Management (RAM), Anda harus membuat akun istimewa dan akun standar, serta mengaitkan akun standar dengan pengguna RAM.
Anda telah membuat instans ApsaraMQ for Kafka (Kafka) dan menyebarluaskannya di VPC yang sama dengan kluster AnalyticDB for MySQL.
Anda telah membuat topik Kafka dan mengirim pesan. Untuk informasi selengkapnya, lihat Panduan Cepat Message Queue for Apache Kafka.
Catatan
Hanya data Kafka dalam format JSON yang dapat disinkronkan.
Data dalam topik Kafka secara otomatis dihapus setelah periode tertentu. Jika data dalam topik kedaluwarsa dan tugas sinkronisasi data gagal, data yang kedaluwarsa tidak dapat dibaca saat Anda me-restart tugas tersebut, sehingga berpotensi menyebabkan kehilangan data. Untuk mencegah hal ini, tingkatkan siklus hidup data topik dan segera hubungi dukungan teknis jika tugas sinkronisasi data gagal.
Jika data sampel Kafka berukuran lebih dari 8 KB, API Kafka akan memotong data tersebut, yang menyebabkan kegagalan saat mengurai data sampel sehingga informasi pemetaan bidang tidak dapat dihasilkan secara otomatis.
Jika skema tabel sumber Kafka berubah, perubahan DDL tidak dipicu secara otomatis, artinya perubahan tersebut tidak disinkronkan ke AnalyticDB for MySQL.
Penagihan
Untuk informasi tentang biaya sumber daya elastic AnalyticDB Compute Unit (ACU) untuk kluster AnalyticDB for MySQL, lihat Item penagihan untuk Edisi Data Lakehouse dan Item penagihan untuk Edisi Perusahaan dan Edisi Dasar.
Prosedur
Langkah 1: Buat sumber data
Jika Anda telah menambahkan sumber data Kafka, Anda dapat melewati langkah ini dan membuat tautan sinkronisasi baru.
Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola, lalu klik ID kluster tersebut.
Di panel navigasi sebelah kiri, pilih Data Ingestion > Data Source.
Di pojok kanan atas, klik Create Data Source.
Pada halaman Create Data Source, konfigurasikan parameter berikut:
Parameter
Deskripsi
Data Source Type
Pilih Kafka sebagai tipe sumber data.
Data Source Name
Sistem menghasilkan nama berdasarkan tipe sumber data dan waktu saat ini. Anda dapat mengubah nama sesuai kebutuhan.
Data Source Description
Deskripsi sumber data, seperti skenario aplikasi atau batasan bisnisnya.
Deployment Mode
Hanya instans Alibaba Cloud yang didukung.
Kafka Instance
ID instans Kafka.
Masuk ke Konsol ApsaraMQ for Kafka dan lihat ID instans pada halaman Clusters.
Kafka Topic
Nama topik yang dibuat di Kafka.
Masuk ke Konsol ApsaraMQ for Kafka dan lihat nama topik pada halaman Topic Management instans target.
Message Data Format
Format data pesan Kafka. Hanya JSON yang didukung.
Setelah mengonfigurasi parameter, klik Create.
Langkah 2: Buat tautan sinkronisasi
Di panel navigasi sebelah kiri, klik Simple Log Service/Kafka Data Synchronization.
Di pojok kiri atas, klik Create Synchronization Job, lalu klik tab Kafka Data Source.
Pada halaman Create Synchronization Job, konfigurasikan Source and Destination Settings, Destination Database and Table Settings, dan Synchronization Settings.
Tabel berikut menjelaskan parameter pada bagian Source and Destination Settings.
Parameter
Deskripsi
Job Name
Nama tautan data. Sistem menghasilkan nama berdasarkan tipe sumber data dan waktu saat ini. Anda dapat mengubah nama sesuai kebutuhan.
Data Source
Pilih sumber data Kafka yang sudah ada atau buat yang baru.
Data Source Format
Hanya JSON yang didukung.
Destination Type
Pilih: Data Warehouse - ADB Storage.
AnalyticDB for MySQL Account
Akun database untuk kluster AnalyticDB for MySQL.
AnalyticDB for MySQL Password
Kata sandi akun database kluster AnalyticDB for MySQL.
Tabel berikut menjelaskan parameter pada bagian Destination Database and Table Settings.
Parameter
Deskripsi
Database Name
Nama database kluster AnalyticDB for MySQL.
Table Name
Nama tabel kluster AnalyticDB for MySQL.
Sample Data
Data terbaru diambil secara otomatis dari topik Kafka dan digunakan sebagai data sampel.
CatatanData dalam topik Kafka harus dalam format JSON. Jika terdapat data dalam format lain, terjadi error selama sinkronisasi data.
Parsed JSON Layers
Jumlah lapisan bersarang yang akan diurai untuk data JSON. Nilai yang valid:
0: Tidak ada penguraian.
1 (Default): Mengurai satu lapisan.
2: Mengurai dua lapisan.
3: Mengurai tiga lapisan.
4: Mengurai empat lapisan.
Untuk informasi selengkapnya tentang kebijakan penguraian bersarang JSON, lihat Gunakan fitur sinkronisasi data (APS) untuk menyinkronkan data Kafka (Direkomendasikan).
Schema Field Mapping
Menampilkan informasi skema data sampel setelah penguraian JSON. Anda dapat mengubah nama dan tipe bidang tujuan, atau menambahkan/menghapus bidang sesuai kebutuhan.
Tabel berikut menjelaskan parameter pada bagian Synchronization Settings.
Parameter
Deskripsi
Delivery Start Point
Saat tugas sinkronisasi dimulai, sistem mengonsumsi data Kafka dari titik waktu yang dipilih. Anda dapat memilih titik waktu apa pun, dan sistem mulai mengonsumsi dari catatan data pertama di Kafka yang berada pada atau setelah waktu tersebut.
Dirty Data Processing Mode
Selama sinkronisasi data, jika tipe data suatu bidang di tabel tujuan tidak sesuai dengan tipe data aktual data sumber Kafka, sinkronisasi gagal. Misalnya, jika data sumber adalah
abcdan tipe bidang di tabel tujuan adalahint, terjadi error konversi yang menyebabkan sinkronisasi menjadi abnormal.Mode pemrosesan data dirty dapat diatur ke salah satu nilai berikut:
Stop Synchronization (Default): Sinkronisasi data berhenti. Anda harus mengubah tipe bidang di tabel tujuan atau mengganti mode pemrosesan data dirty, lalu me-restart tugas sinkronisasi.
Process as NULL: Bidang yang berisi data dirty ditulis ke tabel tujuan sebagai nilai NULL.
Contohnya, jika satu baris data Kafka memiliki tiga bidang (col1, col2, dan col3) dan bidang col2 berisi data dirty, data untuk bidang col2 dikonversi menjadi NULL dan ditulis ke tabel. Data untuk bidang col1 dan col3 ditulis secara normal.
Job Resource Group
Tentukan kelompok sumber daya khusus Pekerjaan tempat tugas dijalankan.
ACUs for Incremental Synchronization
Jumlah ACU untuk kelompok sumber daya khusus Pekerjaan tempat tugas dijalankan. Nilai minimum adalah 2 ACU. Nilai maksimum adalah sumber daya komputasi maksimum yang tersedia di kelompok sumber daya khusus Pekerjaan tersebut. Kami menyarankan Anda menambah jumlah ACU untuk meningkatkan performa ingest data dan stabilitas tugas.
CatatanSaat Anda membuat tugas sinkronisasi data, tugas tersebut menggunakan sumber daya elastis dari kelompok sumber daya khusus Pekerjaan. Tugas sinkronisasi data menempati sumber daya dalam jangka waktu lama. Oleh karena itu, sistem mengurangi sumber daya yang ditempati oleh tugas tersebut dari kelompok sumber daya. Misalnya, jika sumber daya komputasi maksimum kelompok sumber daya khusus Pekerjaan adalah 48 ACU dan Anda telah membuat tugas sinkronisasi yang menggunakan 8 ACU, jumlah maksimum ACU yang dapat Anda pilih untuk tugas sinkronisasi lain dalam kelompok sumber daya ini adalah 40.
Add To Whitelist
Anda perlu menambahkan Blok CIDR vSwitch Kafka ke daftar putih kluster AnalyticDB for MySQL untuk membangun koneksi jaringan guna sinkronisasi data.
Setelah mengonfigurasi parameter, klik Submit.
Langkah 3: Jalankan tugas sinkronisasi data
Pada halaman Simple Log Service/Kafka Data Synchronization, temukan tugas sinkronisasi data yang telah Anda buat, lalu di kolom Actions, klik Start.
Di pojok kiri atas, klik Search. Status tugas berubah menjadi Running, yang menunjukkan bahwa tugas sinkronisasi data telah berhasil dijalankan.