Topik ini menjelaskan cara membuat konektor sink AnalyticDB untuk menyinkronkan data dari topik sumber di instance ApsaraMQ for Kafka ke tabel di AnalyticDB.
Prasyarat
Untuk informasi tentang prasyarat, lihat Prasyarat.
Langkah 1: Buat Sumber Daya AnalyticDB
Buat sumber daya AnalyticDB for MySQL atau AnalyticDB for PostgreSQL.
Jika Anda ingin mengekspor data ke tabel AnalyticDB for MySQL, buat kluster, akun basis data, dan basis data, lalu hubungkan ke kluster di AnalyticDB for MySQL. Untuk informasi lebih lanjut, lihat Buat Kluster, Buat Akun Basis Data, Hubungkan ke Kluster AnalyticDB for MySQL, dan Buat Basis Data.
Jika Anda ingin mengekspor data ke tabel AnalyticDB for PostgreSQL, buat instance dan akun basis data, lalu hubungkan ke basis data di Konsol AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Buat Instance, Buat dan Kelola Akun Basis Data, dan Koneksi Klien.
Dalam topik ini, sebuah basis data AnalyticDB for MySQL bernama adb_sink_database dan tabel data bernama adb_sink_table dibuat.
Langkah 2: Buat dan Mulai Konektor Sink AnalyticDB
Masuk ke Konsol ApsaraMQ for Kafka. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Di panel navigasi di sebelah kiri, pilih .
Di halaman Tasks, klik Create Task.
Pembuatan Tugas
Di langkah Source, atur parameter Data Provider menjadi ApsaraMQ for Kafka dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Next Step. Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter
Deskripsi
Contoh
Region
Wilayah tempat instance sumber ApsaraMQ for Kafka berada.
Tiongkok (Beijing)
ApsaraMQ for Kafka Instance
Instance ApsaraMQ for Kafka tempat pesan yang ingin Anda rutekan diproduksi.
alikafka_post-cn-jte3****
Topic
Topik pada instance ApsaraMQ for Kafka tempat pesan yang ingin Anda rutekan diproduksi.
demo-topic
Group ID
Nama grup konsumen pada instance ApsaraMQ for Kafka sumber.
Quickly Create: Sistem secara otomatis membuat grup konsumen dengan format
GID_EVENTBRIDGE_xxx. Kami merekomendasikan Anda memilih nilai ini.Use Existing Group: Pilih ID grup yang ada yang tidak sedang digunakan. Jika Anda memilih grup yang ada yang sedang digunakan, publikasi dan langganan pesan yang ada akan terpengaruh.
Buat Cepat
Consumer Offset
Offset dari mana pesan dikonsumsi. Nilai valid:
Latest Offset
Earliest Offset
Offset Terbaru
Network Configuration
Tipe jaringan tempat Anda ingin merutekan pesan. Nilai valid:
Basic Network
Self-managed Internet
Jaringan Dasar
VPC
ID virtual private cloud (VPC) tempat instance ApsaraMQ for Kafka diterapkan. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration ke Internet Mandiri.
vpc-bp17fapfdj0dwzjkd****
vSwitch
ID vSwitch tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration ke Internet Mandiri.
vsw-bp1gbjhj53hdjdkg****
Security Group
ID grup keamanan tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration ke Internet Mandiri.
alikafka_pre-cn-7mz2****
Data Format
Fitur format data digunakan untuk mengkodekan data biner yang dikirim dari sumber ke dalam format data tertentu. Beberapa format data didukung. Jika Anda tidak memiliki persyaratan khusus untuk pengkodean, tentukan Json sebagai nilainya.
Json: Data biner dikodekan menjadi data berformat JSON berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload.
Text: Data biner dikodekan menjadi string berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload. Ini adalah nilai default.
Binary: Data biner dikodekan menjadi string berdasarkan pengkodean Base64 dan kemudian dimasukkan ke dalam payload.
Json
Messages
Jumlah maksimum pesan yang dapat dikirim dalam setiap pemanggilan fungsi. Permintaan hanya dikirim ketika jumlah pesan dalam backlog mencapai nilai yang ditentukan. Nilai valid: 1 hingga 10000.
100
Interval (Unit: Seconds)
Interval waktu saat fungsi dipanggil. Sistem mengirimkan pesan agregat ke Function Compute pada interval waktu yang ditentukan. Nilai valid: 0 hingga 15. Satuan: detik. Nilai 0 menentukan bahwa pesan dikirim segera setelah agregasi.
3
Di langkah Filtering, definisikan pola data di editor kode Pattern Content untuk menyaring data. Untuk informasi lebih lanjut, lihat Pola Peristiwa.
Di langkah Transformation, tentukan metode pembersihan data untuk mengimplementasikan kemampuan pemisahan, pemetaan, pengayaan, dan perutean data. Untuk informasi lebih lanjut, lihat Gunakan Function Compute untuk melakukan pembersihan pesan.
Di langkah Sink, atur parameter Service Type menjadi AnalyticDB dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Save. Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter
Deskripsi
Contoh
Tipe Instance
Tipe instance yang Anda buat. Dalam contoh ini, AnalyticDB for MySQL dipilih. Nilai valid:
AnalyticDB for MySQL
AnalyticDB for PostgreSQL
AnalyticDB for MySQL
ID Instance AnalyticDB
ID instance AnalyticDB for MySQL yang Anda buat.
gp-bp10uo5n536wd****
Nama Basis Data
Nama basis data yang Anda buat.
adb_sink_database
Nama Tabel
Nama tabel yang Anda buat.
adb_sink_table
Data Mapping
Format data yang diteruskan dari ApsaraMQ for Kafka ke AnalyticDB. Anda dapat menentukan aturan ekstraksi nilai di tabel basis data menggunakan aturan JSONPath. Jika Anda mengatur parameter Data Format menjadi Json di langkah Source, format data yang diteruskan dari ApsaraMQ for Kafka adalah seperti yang ditunjukkan dalam kode berikut:
{ "data": { "topic": "demo-topic", "partition": 0, "offset": 2, "timestamp": 1739756629123, "headers": { "headers": [], "isReadOnly": false }, "key":"adb-sink-k1", "value": { "userid":"xiaoming", "source":"shanghai" } }, "id": "7702ca16-f944-4b08-***-***-0-2", "source": "acs:alikafka", "specversion": "1.0", "type": "alikafka:Topic:Message", "datacontenttype": "application/json; charset=utf-8", "time": "2025-02-17T01:43:49.123Z", "subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic", "aliyunaccountid": "1******6789" }Tentukan aturan JSONPath berdasarkan nama kolom tabel. Misalnya, jika nama kolom tabel adalah userid, tentukan
$.data.value.useridsebagai aturan ekstraksi nilai.Nama Pengguna Basis Data
Nama pengguna yang digunakan untuk mengakses akun basis data.
user
Kata Sandi Basis Data
Kata sandi yang digunakan untuk mengakses akun basis data.
******
Pengaturan Jaringan
VPC: Pesan di ApsaraMQ for Kafka dikirim ke AnalyticDB dalam virtual private cloud (VPC).
Internet: Pesan di ApsaraMQ for Kafka dikirim ke AnalyticDB melalui Internet.
VPC
VPC
ID VPC. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.
vpc-bp17fapfdj0dwzjkd****
vSwitch
ID vSwitch. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.
PentingSetelah Anda memilih vSwitch, Anda harus menambahkan blok CIDR tempat vSwitch termasuk ke dalam daftar putih alamat IP instance AnalyticDB for MySQL. Untuk informasi lebih lanjut, lihat Daftar Putih Alamat IP.
vsw-bp1gbjhj53hdjdkg****
Grup Keamanan
ID grup keamanan. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.
test_group
Kembali ke halaman Tasks, temukan konektor sink OSS yang Anda buat, lalu klik Enable di kolom Actions.
Di pesan Note, klik OK.
Konektor sink memerlukan waktu 30 hingga 60 detik untuk diaktifkan. Anda dapat melihat kemajuan di kolom Status pada halaman Tasks.
Langkah 3: Uji Konektor Sink AnalyticDB
Di halaman Tasks, temukan konektor sink AnalyticDB yang Anda buat dan klik nama topik sumber di kolom Event Source.
- Di halaman Detail Topik, klik Send Message.
Di panel Start to Send and Consume Message, konfigurasikan parameter berdasarkan gambar berikut dan klik OK.
CatatanDalam contoh ini, isi pesan adalah string JSON yang berisi semua kolom tabel data yang dibuat. Sistem menulis nilai bidang yang memiliki nama sama dengan kolom di tabel data ke kolom yang sesuai.

Di halaman Tasks, temukan konektor sink AnalyticDB yang Anda buat dan klik nama instance tujuan di kolom Event Target.
Di sudut kanan atas halaman Basic Information, klik Log on to Database.
Di konsol Data Management (DMS), jalankan pernyataan berikut untuk menanyakan semua data di tabel:
SELECT * FROM adb_sink_table;Gambar berikut menunjukkan hasil kueri.
