AnalyticDB for MySQL menyediakan fitur sinkronisasi data AnalyticDB Pipeline Service (APS). Anda dapat menggunakan fitur ini untuk membuat tautan data Kafka guna mengingesti data dari Kafka secara real time mulai dari offset tertentu. Fitur ini mendukung output data near real-time, pengarsipan data historis lengkap, dan analitik elastis. Topik ini menjelaskan cara menambahkan sumber data Kafka, membuat dan memulai tautan data Kafka, lalu menganalisis serta mengelola sumber data tersebut.
Prasyarat
Kluster AnalyticDB for MySQL Edisi Perusahaan, Edisi Dasar, atau Edisi Danau Data Terpadu telah dibuat.
Akun database telah dibuat untuk kluster AnalyticDB for MySQL.
Jika Anda menggunakan akun Alibaba Cloud, Anda hanya perlu membuat akun istimewa.
Jika Anda menggunakan pengguna Resource Access Management (RAM), Anda harus membuat akun istimewa dan akun standar serta mengaitkan akun standar dengan pengguna RAM.
Instans ApsaraMQ for Kafka (Kafka) telah dibuat. Instans tersebut ditempatkan di wilayah yang sama dengan kluster AnalyticDB for MySQL.
Topik Kafka telah dibuat dan pesan telah dikirim ke dalamnya. Untuk informasi selengkapnya, lihat Panduan Cepat ApsaraMQ for Kafka.
Catatan
Hanya data Kafka dalam format JSON yang dapat disinkronkan.
Data dalam topik Kafka secara otomatis dihapus setelah periode tertentu. Jika tugas sinkronisasi data gagal dan data topik telah kedaluwarsa, data yang dihapus tidak dapat dipulihkan saat Anda memulai ulang tugas tersebut. Hal ini dapat menyebabkan kehilangan data. Untuk mencegahnya, tingkatkan siklus hidup data topik tersebut. Jika tugas sinkronisasi gagal, segera hubungi dukungan teknis.
Jika data sampel Kafka berukuran lebih dari 8 KB, API Kafka akan memotong data tersebut. Hal ini menyebabkan sistem gagal mengurai data sampel menjadi format JSON, sehingga informasi pemetaan bidang tidak dapat dihasilkan secara otomatis.
Perubahan pada skema tabel Kafka sumber tidak secara otomatis memicu perubahan DDL di AnalyticDB for MySQL.
Setelah data diingesti, operasi Commit harus dieksekusi agar data yang ditulis menjadi terlihat. Untuk mencegah interval operasi Commit yang terlalu pendek memengaruhi stabilitas pekerjaan serta performa baca-tulis, fitur sinkronisasi data AnalyticDB for MySQL memiliki interval operasi Commit default selama 5 menit. Oleh karena itu, saat pertama kali membuat dan memulai pekerjaan sinkronisasi data, Anda harus menunggu setidaknya 5 menit untuk melihat batch pertama data yang ditulis.
Penagihan
Penggunaan fitur sinkronisasi data AnalyticDB for MySQL dikenakan biaya sebagai berikut.
Biaya sumber daya ACU elastis untuk AnalyticDB for MySQL. Untuk informasi selengkapnya, lihat Penagihan untuk Edisi Danau Data Terpadu dan Penagihan untuk Edisi Perusahaan dan Edisi Dasar.
Biaya penyimpanan, permintaan GET, serta permintaan PUT dan lainnya untuk OSS. Untuk informasi selengkapnya, lihat Ikhtisar penagihan.
Prosedur
Langkah 1: Buat sumber data.
Langkah 2: Buat tautan data.
Langkah 3: Mulai tugas sinkronisasi data.
Langkah 4: Analisis data.
Langkah 5 (Opsional): Kelola sumber data.
Buat sumber data
Jika Anda telah menambahkan sumber data Kafka, lewati langkah ini dan lanjutkan ke Buat tautan data.
Masuk ke Konsol AnalyticDB for MySQL. Di pojok kiri atas konsol, pilih wilayah. Di panel navigasi sebelah kiri, klik Clusters. Temukan kluster yang ingin Anda kelola dan klik ID kluster tersebut.
Di panel navigasi sebelah kiri, pilih Data Ingestion > Data Sources.
Di pojok kiri atas, klik Create Data Source.
Pada halaman Create Data Source, konfigurasikan parameter-parameter berikut. Tabel berikut menjelaskan parameter tersebut.
Parameter
Deskripsi
Data Source Type
Pilih Kafka.
Data Source Name
Sistem secara otomatis menghasilkan nama berdasarkan jenis sumber data dan waktu saat ini. Anda dapat mengubah nama sesuai kebutuhan.
Data Source Description
Masukkan deskripsi untuk sumber data, misalnya skenario danau data terpadu atau batasan bisnis.
Deployment Mode
Hanya Alibaba Cloud Instance yang didukung.
Kafka Instance
ID instans Kafka.
Masuk ke Konsol ApsaraMQ for Kafka dan lihat ID instans pada halaman Instances.
Kafka Topic
Nama topik yang dibuat di Kafka.
Masuk ke Konsol ApsaraMQ for Kafka dan lihat nama topik pada halaman Topics dari instans tujuan.
Message Data Format
Format data pesan Kafka. Hanya JSON yang didukung.
Setelah mengonfigurasi parameter, klik Create.
Buat tautan data
Di panel navigasi sebelah kiri, klik Simple Log Service/Kafka Data Synchronization.
Di pojok kiri atas, klik Create Synchronization Job.
Pada halaman Create Synchronization Job, konfigurasikan bagian Source and Destination Settings, Destination Database and Table Settings, dan Synchronization Settings.
Tabel berikut menjelaskan parameter untuk Source and Destination Settings.
Parameter
Deskripsi
Job Name
Nama tautan data. Sistem secara otomatis menghasilkan nama berdasarkan jenis sumber data dan waktu saat ini. Anda dapat mengubah nama sesuai kebutuhan.
Data Source
Pilih sumber data Kafka yang sudah ada atau buat yang baru.
Destination Type
Nilai yang valid:
Data Lake - User OSS.
Data Lake - AnalyticDB Lake Storage (Direkomendasikan).
PentingJika Anda mengatur Destination type ke Data Lake - AnalyticDB Lake Storage, Anda harus mengaktifkan fitur penyimpanan danau.
ADB Lake Storage
Nama penyimpanan danau tempat data danau AnalyticDB for MySQL berada.
Pilih penyimpanan danau tujuan dari daftar drop-down. Jika belum ada penyimpanan danau yang dibuat, klik Automatically Created dalam daftar drop-down untuk membuatnya secara otomatis.
PentingParameter ini wajib diisi ketika Anda mengatur Destination Type ke Data Lake - AnalyticDB Lake Storage.
OSS Path
Jalur penyimpanan di OSS untuk data danau AnalyticDB for MySQL.
PentingParameter ini wajib diisi ketika Anda mengatur Destination Type ke Data Lake - User OSS.
Bucket yang ditampilkan adalah semua bucket di wilayah yang sama dengan kluster AnalyticDB for MySQL. Anda dapat memilih salah satunya. Rencanakan jalur penyimpanan dengan cermat. Anda tidak dapat mengubahnya setelah pembuatan.
Pilih folder kosong. Jalur OSS tidak boleh memiliki hubungan awalan (prefix) dengan jalur OSS tugas lainnya untuk mencegah data tertimpa. Misalnya, jika jalur OSS untuk dua tugas sinkronisasi data adalah
oss://testBucketName/test/sls1/danoss://testBucketName/test/, keduanya memiliki hubungan awalan, yang akan menyebabkan data tertimpa selama sinkronisasi data.
Storage Format
Format penyimpanan data. Nilai yang valid:
PAIMON.
PentingFormat ini hanya didukung ketika Destination Type diatur ke Data Lake - User OSS.
ICEBERG.
Tabel berikut menjelaskan parameter untuk Destination Database and Table Settings.
Parameter
Deskripsi
Database Name
Nama database tujuan di AnalyticDB for MySQL. Jika database dengan nama yang sama belum ada, database baru akan dibuat. Jika database dengan nama yang sama sudah ada, data akan disinkronkan ke database yang sudah ada. Untuk informasi selengkapnya tentang konvensi penamaan database, lihat Batasan.
PentingPada bagian Source and Destination Settings, jika Anda mengatur Storage Format ke PAIMON, database yang sudah ada harus memenuhi kondisi berikut. Jika tidak, tugas sinkronisasi data akan gagal.
Harus merupakan database eksternal. Pernyataan untuk membuat database harus berupa
CREATE EXTERNAL DATABASE<database_name>.Parameter `DBPROPERTIES` dalam pernyataan `CREATE DATABASE` harus mencakup properti
catalog, dan nilaicatalogharuspaimon.Parameter `DBPROPERTIES` dalam pernyataan `CREATE DATABASE` harus mencakup properti
adb.paimon.warehouse. Contoh:adb.paimon.warehouse=oss://testBucketName/aps/data.Parameter `DBPROPERTIES` dalam pernyataan `CREATE DATABASE` harus mencakup properti
LOCATION, dan Anda harus menambahkan.dbsetelah nama database. Jika tidak, kueri XIHE akan gagal. Contoh:LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/.Direktori bucket dalam jalur OSS yang dikonfigurasi untuk
LOCATIONharus sudah ada. Jika tidak, pembuatan database akan gagal.
Table Name
Nama tabel tujuan di AnalyticDB for MySQL. Jika tabel dengan nama yang sama belum ada di database, tabel baru akan dibuat. Jika tabel dengan nama yang sama sudah ada, sinkronisasi data akan gagal. Untuk informasi selengkapnya tentang konvensi penamaan tabel, lihat Batasan.
Sample Data
Data terbaru secara otomatis diambil dari topik Kafka dan digunakan sebagai data sampel.
CatatanData dalam topik Kafka harus dalam format JSON. Jika format data lainnya ada, akan terjadi error selama sinkronisasi data.
Parsed JSON Layers
Atur jumlah level bersarang yang akan diurai dalam data JSON. Nilai yang valid:
0: Tidak ada penguraian.
1 (Default): Uraikan satu level.
2: Uraikan dua level.
3: Uraikan tiga level.
4: Uraikan empat level.
Untuk informasi selengkapnya tentang kebijakan penguraian bersarang JSON, lihat Contoh level penguraian JSON dan inferensi skema.
Schema Field Mapping
Menampilkan informasi skema data sampel setelah penguraian JSON. Anda dapat menyesuaikan nama dan tipe bidang tujuan, atau menambah/menghapus bidang sesuai kebutuhan.
Partition Key Settings
Atur kunci partisi untuk tabel tujuan. Kami merekomendasikan mengonfigurasi partisi berdasarkan waktu log atau logika bisnis untuk memastikan performa ingesti dan kueri data. Jika Anda tidak mengatur kunci partisi, tabel tujuan tidak akan memiliki partisi secara default.
Anda dapat memformat kunci partisi tujuan menggunakan format waktu atau dengan menentukan bidang partisi.
Untuk mempartisi berdasarkan tanggal dan waktu, pilih bidang bertipe tanggal-waktu sebagai nama bidang partisi. Untuk metode penanganan format, pilih Time Formatting, lalu pilih format bidang sumber dan format partisi tujuan. AnalyticDB for MySQL mengidentifikasi nilai bidang partisi berdasarkan format bidang sumber dan mengonversinya ke format partisi tujuan untuk partisi. Misalnya, jika bidang sumber adalah gmt_created dengan nilai 1711358834, format bidang sumber adalah timestamp presisi tingkat detik, dan format partisi tujuan adalah yyyyMMdd, data akan dipartisi berdasarkan 20240325.
Untuk mempartisi berdasarkan nilai bidang, pilih Specify Partition Field sebagai metode penanganan format.
Tabel berikut menjelaskan parameter untuk Synchronization Settings.
Parameter
Deskripsi
Starting Consumer Offset for Incremental Synchronization
Saat tugas sinkronisasi dimulai, sistem mulai mengonsumsi data Kafka dari titik waktu yang dipilih. Nilai yang valid:
Earliest offset (begin_cursor): Secara otomatis mengonsumsi data dari titik waktu paling awal dalam data Kafka.
Latest offset (end_cursor): Secara otomatis mengonsumsi data dari titik waktu paling akhir dalam data Kafka.
Custom offset: Anda dapat memilih titik waktu apa pun. Sistem akan mulai mengonsumsi dari data pertama di Kafka yang berada pada atau setelah waktu ini.
Job Resource Group
Tentukan kelompok sumber daya pekerjaan tempat tugas akan dijalankan.
ACUs for Incremental Synchronization
Tentukan jumlah ACU untuk kelompok sumber daya pekerjaan. Jumlah minimum ACU adalah 2, dan maksimum adalah sumber daya komputasi maksimum yang tersedia di kelompok sumber daya pekerjaan. Kami merekomendasikan menentukan jumlah ACU yang lebih tinggi untuk meningkatkan performa ingesti data dan stabilitas tugas.
CatatanSaat Anda membuat tugas sinkronisasi data, tugas tersebut menggunakan sumber daya elastis dari kelompok sumber daya pekerjaan. Tugas sinkronisasi data menempati sumber daya dalam jangka waktu lama, sehingga sistem mengurangi sumber daya yang digunakan oleh tugas tersebut dari kelompok sumber daya. Misalnya, jika kelompok sumber daya pekerjaan memiliki maksimum 48 ACU dan Anda telah membuat tugas sinkronisasi yang menggunakan 8 ACU, jumlah maksimum ACU yang dapat Anda pilih untuk tugas sinkronisasi lain dalam kelompok sumber daya ini adalah 40.
Advanced Settings
Konfigurasi lanjutan memungkinkan Anda menyesuaikan tugas sinkronisasi. Untuk melakukan konfigurasi khusus, hubungi dukungan teknis.
Setelah mengonfigurasi parameter, klik Submit.
Mulai tugas sinkronisasi data
Pada halaman Simple Log Service/Kafka Data Synchronization, temukan tugas sinkronisasi data yang telah Anda buat dan klik Start di kolom Actions.
Di pojok kiri atas, klik Search. Tugas berhasil dimulai ketika statusnya berubah menjadi Running.
Analitik data
Setelah data disinkronkan, Anda dapat menggunakan fitur pengembangan Spark Jar untuk menganalisis data di AnalyticDB for MySQL. Untuk informasi selengkapnya tentang pengembangan Spark, lihat Editor pengembangan Spark dan Pengembangan aplikasi Spark offline.
Di panel navigasi sebelah kiri, pilih .
Masukkan pernyataan contoh dalam templat default dan klik Run Now.
-- Here is just an example of SparkSQL. Modify the content and run your spark program. conf spark.driver.resourceSpec=medium; conf spark.executor.instances=2; conf spark.executor.resourceSpec=medium; conf spark.app.name=Spark SQL Test; conf spark.adb.connectors=oss; -- Here are your sql statements show tables from lakehouse20220413156_adbTest;(Opsional) Pada tab Applications, klik Logs di kolom Actions untuk melihat log eksekusi pekerjaan Spark SQL.
Kelola sumber data
Di panel navigasi sebelah kiri, pilih Data Ingestion > Data Sources. Anda dapat melakukan operasi berikut di kolom Actions.
Operasi | Deskripsi |
Create Job | Langsung menuju halaman pembuatan tugas sinkronisasi data atau migrasi data untuk sumber data ini. |
View | Lihat konfigurasi detail sumber data. |
Edit | Edit properti sumber data, seperti nama dan deskripsinya. |
Delete | Hapus sumber data saat ini. Catatan Jika terdapat tugas sinkronisasi data atau migrasi data untuk sumber data tersebut, Anda tidak dapat langsung menghapus sumber data. Anda harus terlebih dahulu menuju halaman Simple Log Service/Kafka Data Synchronization, temukan tugas sinkronisasi target, lalu klik Delete di kolom Actions untuk menghapus tugas sinkronisasi atau migrasi data tersebut. |
Contoh level penguraian JSON dan inferensi skema
Level penguraian menentukan jumlah level bersarang yang akan diurai dalam data JSON. Misalnya, pengguna mengirim data JSON berikut ke Kafka.
{
"name" : "zhangle",
"age" : 18,
"device" : {
"os" : {
"test":lag,
"member":{
"fa":zhangsan,
"mo":limei
}
},
"brand" : "none",
"version" : "11.4.2"
}
}Bagian berikut menunjukkan hasil penguraian untuk level 0 hingga 4.
Penguraian Level 0
Data tidak diurai. Data JSON asli langsung dioutput.
JSON field | Value | Destination field name |
__value__ | { "name" : "zhangle","age" : 18, "device" : { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }} | __value__ |
Penguraian Level 1
Level pertama data JSON diurai.
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device | { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" } | device |
Penguraian Level 2
Level kedua data JSON diurai. Jika suatu bidang tidak bersarang, bidang tersebut langsung dioutput. Misalnya, bidang name dan age langsung dioutput. Jika suatu bidang bersarang, sub-bidangnya dioutput. Misalnya, bidang device bersarang, sehingga sub-bidangnya device.os, device.brand, dan device.version dioutput.
Karena nama bidang tujuan tidak boleh mengandung titik (.), titik secara otomatis diganti dengan garis bawah (_).
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device.os | { "test":lag,"member":{ "fa":zhangsan,"mo":limei } | device_os |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |
Penguraian Level 3
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device.os.test | lag | device_os_test |
device.os.member | { "fa":zhangsan,"mo":limei } | device_os_member |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |
Penguraian Level 4
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device.os.test | lag | device_os_test |
device.os.member.fa | zhangsan | device_os_member_fa |
device.os.member.mo | lime | device_os_member_mo |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |