Topik ini menjelaskan cara membuat konektor sink AnalyticDB untuk mengekspor data dari topik sumber data ApsaraMQ for Kafka ke database AnalyticDB for MySQL atau AnalyticDB for PostgreSQL menggunakan Alibaba Cloud Function Compute.
Prasyarat
Persyaratan berikut harus dipenuhi:
ApsaraMQ for Kafka
Fitur konektor diaktifkan untuk instance ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Aktifkan Fitur Konektor.
Topik sumber data dibuat dalam instance ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Langkah 1: Buat Topik.
Function Compute
Function Compute diaktifkan. Untuk informasi lebih lanjut, lihat Aktifkan Function Compute.
AnalyticDB for MySQL dan AnalyticDB for PostgreSQL
Jika Anda ingin mengekspor data ke database AnalyticDB for MySQL, pastikan Anda telah membuat kluster dan akun database, terhubung ke kluster, serta membuat database dalam kluster di Konsol AnalyticDB for MySQL. Untuk informasi lebih lanjut, lihat Buat Kluster, Buat Akun Database, Hubungkan ke Kluster AnalyticDB for MySQL, dan Buat Database.
Jika Anda ingin mengekspor data ke database AnalyticDB for PostgreSQL, pastikan Anda telah membuat instance dan akun database serta terhubung ke database di Konsol AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Buat Instance, Buat dan Kelola Akun Database, dan Koneksi Klien.
Catatan Penggunaan
Anda hanya dapat mengekspor data dari topik sumber data instance ApsaraMQ for Kafka ke database AnalyticDB for MySQL atau AnalyticDB for PostgreSQL melalui Function Compute dalam wilayah yang sama. Untuk informasi tentang batasan pada konektor, lihat Batasan.
Konektor sink AnalyticDB mengekspor data menggunakan Function Compute. Function Compute menyediakan kuota gratis sumber daya untuk Anda. Jika penggunaan Anda melebihi kuota gratis ini, Anda akan dikenakan biaya untuk kelebihannya berdasarkan aturan penagihan Function Compute. Untuk informasi lebih lanjut, lihat Ikhtisar Penagihan.
Function Compute memungkinkan Anda untuk menanyakan log pemanggilan fungsi untuk memecahkan masalah. Untuk informasi lebih lanjut, lihat Konfigurasikan Logging.
ApsaraMQ for Kafka mengubah pesan menjadi string yang dikodekan UTF-8 untuk transfer. Message Queue for Apache Kafka tidak mendukung data biner.
Jika Anda menentukan titik akhir pribadi dari database tujuan untuk konektor sink AnalyticDB, Anda harus menentukan VPC dan vSwitch yang sama dengan yang digunakan oleh database tujuan untuk fungsi yang sesuai di Konsol Function Compute. Jika tidak, Function Compute tidak dapat mengakses database tujuan. Untuk informasi lebih lanjut, lihat Perbarui Layanan.
Saat membuat konektor, ApsaraMQ for Kafka membuat peran terkait layanan untuk Anda.
Jika tidak ada peran terkait layanan yang tersedia, ApsaraMQ for Kafka secara otomatis membuat peran terkait layanan untuk Anda guna menggunakan konektor sink AnalyticDB untuk mengekspor data dari ApsaraMQ for Kafka ke AnalyticDB for MySQL atau AnalyticDB for PostgreSQL.
Jika peran terkait layanan tersedia, ApsaraMQ for Kafka tidak membuat yang baru.
Untuk informasi lebih lanjut tentang peran terkait layanan, lihat Peran Terkait Layanan.
Prosedur
Bagian ini menjelaskan cara menggunakan konektor sink AnalyticDB untuk mengekspor data dari topik sumber data instance ApsaraMQ for Kafka ke database AnalyticDB for MySQL atau AnalyticDB for PostgreSQL.
Opsional: Buat topik dan grup yang diperlukan oleh konektor sink AnalyticDB.
Jika Anda tidak ingin membuat topik dan grup secara manual, lewati langkah ini dan atur parameter Metode Pembuatan Sumber Daya ke Otomatis di langkah berikutnya.
PentingTopik tertentu yang diperlukan oleh konektor sink AnalyticDB harus menggunakan mesin penyimpanan lokal. Jika versi utama instance ApsaraMQ for Kafka Anda adalah 0.10.2, topik yang menggunakan mesin penyimpanan lokal tidak dapat dibuat secara manual. Dalam versi ini, topik-topik tersebut harus dibuat secara otomatis.
Konfigurasikan Function Compute dan AnalyticDB for MySQL atau AnalyticDB for PostgreSQL.
Verifikasi Hasilnya.
Buat topik yang diperlukan oleh konektor sink AnalyticDB
Di konsol ApsaraMQ for Kafka, Anda dapat membuat lima topik yang diperlukan oleh konektor sink AnalyticDB secara manual. Lima topik tersebut adalah topik offset tugas, topik konfigurasi tugas, topik status tugas, topik antrian pesan gagal, dan topik data kesalahan. Lima topik tersebut berbeda dalam mesin penyimpanan dan jumlah partisi. Untuk informasi lebih lanjut, lihat Parameter dalam Langkah Konfigurasi Layanan Sumber.
Masuk ke konsol ApsaraMQ for Kafka.
Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
PentingAnda harus membuat topik di wilayah tempat Instance Elastic Compute Service (ECS) Anda diterapkan. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen pesan berjalan di instance ECS yang diterapkan di wilayah Tiongkok (Beijing), topik juga harus dibuat di wilayah Tiongkok (Beijing).
Pada halaman Instances, klik nama instance yang ingin Anda kelola.
Di panel navigasi sisi kiri, klik Topics.
Pada halaman Topics, klik Create Topic.
Di panel Create Topic, tentukan properti topik dan klik OK.
Parameter
Deskripsi
Contoh
Name
Nama topik.
demo
Description
Deskripsi topik.
demo test
Partitions
Jumlah partisi dalam topik.
12
Storage Engine
CatatanAnda dapat menentukan jenis mesin penyimpanan hanya jika Anda menggunakan instance Edisi Profesional non-serverless. Untuk jenis instance lainnya, Cloud Storage dipilih secara default.
Jenis mesin penyimpanan yang digunakan untuk menyimpan pesan dalam topik.
ApsaraMQ for Kafka mendukung jenis mesin penyimpanan berikut:
Cloud Storage: Jika Anda memilih nilai ini, sistem menggunakan disk Alibaba Cloud untuk topik dan menyimpan data dalam tiga replika dalam mode terdistribusi. Mesin penyimpanan ini memiliki latensi rendah, performa tinggi, durabilitas panjang, dan keandalan tinggi. Jika Anda mengatur parameter Instance Edition ke Standard (High Write) saat Anda membuat instance, Anda hanya dapat mengatur parameter ini ke Cloud Storage.
Local Storage: Jika Anda memilih nilai ini, sistem menggunakan algoritma in-sync replicas (ISR) dari Apache Kafka open source dan menyimpan data dalam tiga replika dalam mode terdistribusi.
Cloud Storage
Message Type
Jenis pesan topik. Nilai valid:
Normal Message: Secara default, pesan dengan kunci yang sama disimpan dalam partisi yang sama dalam urutan pengiriman pesan. Jika broker dalam kluster gagal, urutan pesan yang disimpan dalam partisi mungkin tidak dipertahankan. Jika Anda mengatur parameter Storage Engine ke Cloud Storage, parameter ini secara otomatis diatur ke Normal Message.
Partitionally Ordered Message: Secara default, pesan dengan kunci yang sama disimpan dalam partisi yang sama dalam urutan pengiriman pesan. Jika broker dalam kluster gagal, pesan masih disimpan dalam partisi dalam urutan pengiriman pesan. Pesan dalam beberapa partisi tidak dapat dikirim hingga partisi dipulihkan. Jika Anda mengatur parameter Storage Engine ke Local Storage, parameter ini secara otomatis diatur ke Partitionally Ordered Message.
Normal Message
Log Cleanup Policy
Kebijakan pembersihan log yang digunakan oleh topik.
Jika Anda mengatur parameter Storage Engine ke Local Storage, Anda harus mengonfigurasi parameter Log Cleanup Policy. Anda dapat mengatur Parameter Penyimpanan hanya ke Penyimpanan Lokal jika Anda menggunakan instance Edisi Profesional ApsaraMQ for Kafka.
ApsaraMQ for Kafka menyediakan kebijakan pembersihan log berikut:
Delete: kebijakan pembersihan log default. Jika ruang penyimpanan yang cukup tersedia dalam sistem, pesan disimpan berdasarkan periode retensi maksimum. Setelah penggunaan penyimpanan melebihi 85%, sistem menghapus pesan yang disimpan paling awal untuk memastikan ketersediaan layanan.
Compact: kebijakan kompaksi log yang digunakan di Apache Kafka. Kompaksi log memastikan bahwa nilai terbaru dipertahankan untuk pesan dengan kunci yang sama. Kebijakan ini cocok untuk skenario seperti memulihkan sistem yang gagal atau memuat ulang cache setelah sistem dimulai ulang. Misalnya, ketika Anda menggunakan Kafka Connect atau Confluent Schema Registry, Anda harus menyimpan informasi tentang status dan konfigurasi sistem dalam topik yang dikompaksi log.
PentingAnda hanya dapat menggunakan topik yang dikompaksi log dalam komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi lebih lanjut, lihat aliware-kafka-demos.
Compact
Tag
Tag yang ingin Anda lampirkan ke topik.
demo
Setelah topik dibuat, Anda dapat melihat topik tersebut di halaman Topics.
Buat grup yang diperlukan oleh konektor sink AnalyticDB
Di konsol ApsaraMQ for Kafka, Anda dapat membuat grup yang diperlukan oleh konektor sink AnalyticDB secara manual. Nama grup harus dalam format connect-Nama Tugas. Untuk informasi lebih lanjut, lihat Parameter dalam Langkah Konfigurasi Layanan Sumber.
Masuk ke konsol ApsaraMQ for Kafka.
Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Pada halaman Instances, klik nama instance yang ingin Anda kelola.
Di panel navigasi sisi kiri, klik Groups.
Pada halaman Groups, klik Create Group.
Di panel Create Group, masukkan nama grup di bidang Group ID dan deskripsi grup di bidang Description, lampirkan tag ke grup, lalu klik OK.
Setelah Anda membuat grup konsumen, Anda dapat melihat grup konsumen tersebut di halaman Groups.
Buat dan sebarkan konektor sink AnalyticDB
Masuk ke konsol ApsaraMQ for Kafka.
Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Di panel navigasi sisi kiri, klik Connectors.
Pada halaman Connectors, klik Create Connector.
Di wizard Create Connector, lakukan langkah-langkah berikut:
Di langkah Configure Basic Information, atur parameter yang dijelaskan dalam tabel berikut dan klik Next.
Parameter
Deskripsi
Contoh
Name
Nama konektor. Perhatikan aturan berikut saat menentukan nama konektor:
Nama konektor harus memiliki panjang 1 hingga 48 karakter. Dapat berisi angka, huruf kecil, dan tanda hubung (-), tetapi tidak boleh dimulai dengan tanda hubung (-).
Setiap nama konektor harus unik dalam satu instance ApsaraMQ for Kafka.
Nama grup yang digunakan oleh tugas konektor harus dalam format connect-Nama Tugas. Jika Anda belum membuat grup seperti itu, Message Queue for Apache Kafka akan otomatis membuat satu untuk Anda.
kafka-adb-sink
Instance
Informasi tentang instance Message Queue for Apache Kafka. Secara default, nama dan ID instance ditampilkan.
demo alikafka_post-cn-st21p8vj****
Di langkah Configure Source Service, pilih Message Queue for Apache Kafka sebagai layanan sumber, atur parameter yang dijelaskan dalam tabel berikut, lalu klik Next.
Tabel 1. Parameter dalam Langkah Konfigurasi Layanan Sumber
Parameter
Deskripsi
Contoh
Data Source Topic
Nama topik sumber data dari mana data akan diekspor.
adb-test-input
Consumer Thread Concurrency
Jumlah thread konsumen konkuren yang digunakan untuk mengekspor data dari topik sumber data. Nilai default: 6. Nilai valid:
1
2
3
6
12
6
Consumer Offset
Offset tempat konsumsi dimulai. Nilai valid:
Earliest Offset: Konsumsi dimulai dari offset paling awal.
Latest Offset: Konsumsi dimulai dari offset terbaru.
Earliest Offset
VPC ID
ID VPC tempat tugas ekspor data berjalan. Klik Configure Runtime Environment untuk menampilkan parameter. Nilai default adalah ID VPC yang Anda tentukan saat menerapkan instance ApsaraMQ for Kafka. Anda tidak perlu mengubah nilai ini.
vpc-bp1xpdnd3l***
vSwitch ID
ID vSwitch tempat tugas ekspor data berjalan. Klik Configure Runtime Environment untuk menampilkan parameter. vSwitch harus diterapkan dalam VPC yang sama dengan instance ApsaraMQ for Kafka. Nilai default adalah ID vSwitch yang Anda tentukan saat menerapkan instance ApsaraMQ for Kafka.
vsw-bp1d2jgg81***
Failure Handling Policy
Menentukan apakah akan mempertahankan langganan ke partisi tempat kesalahan menyebabkan kegagalan pengiriman pesan. Klik Configure Runtime Environment untuk menampilkan parameter. Nilai valid:
Continue Subscription: mempertahankan langganan ke partisi tempat kesalahan terjadi dan mengembalikan log.
Stop Subscription: menghentikan langganan ke partisi tempat kesalahan terjadi dan mengembalikan log.
CatatanUntuk informasi tentang cara melihat log konektor, lihat Kelola konektor.
Untuk informasi lebih lanjut tentang cara memecahkan masalah berdasarkan kode kesalahan, lihat Kode Kesalahan.
Continue Subscription
Resource Creation Method
Metode pembuatan topik dan grup yang diperlukan oleh konektor sink AnalyticDB. Klik Configure Runtime Environment untuk menampilkan parameter. Nilai valid:
Auto
Manual
Auto
Connector Consumer Group
Grup yang digunakan oleh konektor.Grup Klik Configure Runtime Environment untuk menampilkan parameter. Nama grup harus dalam format connect-Nama Tugas.
connect-kafka-adb-sink
Task Offset Topic
Topik yang digunakan untuk menyimpan offset konsumen. Klik Configure Runtime Environment untuk menampilkan parameter.
Topik: Kami sarankan Anda memulai nama topik dengan connect-offset.
Partisi: Jumlah partisi dalam topik harus lebih dari 1.
Mesin Penyimpanan: Mesin penyimpanan topik harus disetel ke Penyimpanan Lokal.
cleanup.policy: Kebijakan pembersihan log untuk topik harus disetel ke Padatkan.
connect-offset-kafka-adb-sink
Task Configuration Topic
Topik yang digunakan untuk menyimpan konfigurasi tugas. Klik Configure Runtime Environment untuk menampilkan parameter.
Topik: Kami sarankan Anda memulai nama topik dengan connect-config.
Partisi: Topik hanya dapat berisi satu partisi.
Mesin Penyimpanan: Mesin penyimpanan topik harus disetel ke Penyimpanan Lokal.
cleanup.policy: Kebijakan pembersihan log untuk topik harus disetel ke Padatkan.
connect-config-kafka-adb-sink
Task Status Topic
Topik yang digunakan untuk menyimpan status tugas. Klik Configure Runtime Environment untuk menampilkan parameter.
Topik: Kami sarankan Anda memulai nama topik dengan connect-status.
Partisi: Kami sarankan Anda mengatur jumlah partisi dalam topik menjadi 6.
Mesin Penyimpanan: Mesin penyimpanan topik harus disetel ke Penyimpanan Lokal.
cleanup.policy: Kebijakan pembersihan log untuk topik harus disetel ke Padatkan.
connect-status-kafka-adb-sink
Dead-letter Queue Topic
Topik yang digunakan untuk menyimpan data kesalahan kerangka kerja Kafka Connect. Klik Configure Runtime Environment untuk menampilkan parameter. Untuk menghemat sumber daya topik, Anda dapat membuat topik sebagai topik antrian pesan gagal dan topik data kesalahan.
Topik: Kami sarankan Anda memulai nama topik dengan connect-error.
Partisi: Kami sarankan Anda mengatur jumlah partisi dalam topik menjadi 6.
Mesin Penyimpanan: Mesin penyimpanan topik dapat disetel ke Penyimpanan Lokal atau Penyimpanan Cloud.
connect-error-kafka-adb-sink
Error Data Topic
Topik yang digunakan untuk menyimpan data kesalahan konektor Sink. Klik Configure Runtime Environment untuk menampilkan parameter. Untuk menghemat sumber daya topik, Anda dapat membuat topik sebagai topik antrian pesan gagal dan topik data kesalahan.
Topik: Kami sarankan Anda memulai nama topik dengan connect-error.
Partisi: Kami sarankan Anda mengatur jumlah partisi dalam topik menjadi 6.
Mesin Penyimpanan: Mesin penyimpanan topik dapat disetel ke Penyimpanan Lokal atau Penyimpanan Cloud.
connect-error-kafka-adb-sink
Di langkah Configure Destination Service, pilih AnalyticDB sebagai layanan tujuan, atur parameter yang dijelaskan dalam tabel berikut, lalu klik Create.
Parameter
Deskripsi
Contoh
Instance Type
Tipe instance database tujuan. Nilai valid: AnalyticDB for MySQL dan AnalyticDB for PostgreSQL.
AnalyticDB for MySQL
AnalyticDB Instance ID
ID instance AnalyticDB for MySQL atau AnalyticDB for PostgreSQL tujuan.
am-bp139yqk8u1ik****
Database Name
Nama database tujuan.
adb_demo
Table Name
Nama tabel dalam database tujuan tempat data yang diekspor disimpan.
user
Database Username
Nama pengguna yang Anda gunakan untuk masuk ke database tujuan.
adbmysql
Database Password
Kata sandi yang Anda gunakan untuk masuk ke database tujuan. Kata sandi ditentukan saat Anda membuat instance AnalyticDB for MySQL AnalyticDB for PostgreSQL tujuan. Jika Anda lupa kata sandi, Anda dapat menyetel ulangnya.
Jika Anda ingin menyetel ulang kata sandi akun database AnalyticDB for MySQL, lakukan langkah-langkah yang dijelaskan dalam Ubah kata sandi akun database.
Jika Anda ingin menyetel ulang kata sandi akun database AnalyticDB for PostgreSQL, masuk ke Konsol AnalyticDB for PostgreSQL dan klik instance tujuan. Di panel navigasi sisi kiri, klik Account Management, temukan akun database yang ingin Anda setel ulang kata sandinya, dan klik Reset Password di kolom Actions.
********
CatatanNama pengguna dan kata sandi dilewatkan ke fungsi di Function Compute sebagai variabel lingkungan saat ApsaraMQ for Kafka membuat tugas ekspor data. Setelah tugas dibuat, ApsaraMQ for Kafka tidak menyimpan nama pengguna atau kata sandi.
Setelah konektor dibuat, Anda dapat melihatnya di halaman Connectors.
Pergi ke halaman Connectors, temukan konektor yang Anda buat, dan klik Deploy di kolom Actions.
Konfigurasikan layanan Function Compute terkait
Setelah konektor sink AnalyticDB dibuat dan diterapkan di konsol ApsaraMQ for Kafka, Function Compute secara otomatis membuat layanan fungsi dan fungsi untuk konektor tersebut. Layanan fungsi dinamai dalam format kafka-service-<nama_konektor>-<String Acak>, dan fungsi dinamai dalam format fc-adb-<String Acak>.
Di halaman Connectors, temukan konektor yang ingin Anda konfigurasi layanan Function Compute-nya dan klik Configure Function di kolom Actions.
Halaman dialihkan ke Konsol Function Compute.
Di Konsol Function Compute, temukan layanan yang dibuat secara otomatis dan konfigurasikan VPC dan vSwitch untuk layanan tersebut. Untuk informasi lebih lanjut, lihat Perbarui Layanan.
Konfigurasikan AnalyticDB for MySQL atau AnalyticDB for PostgreSQL
Setelah layanan Function Compute diterapkan, Anda harus menambahkan blok CIDR untuk VPC yang Anda tentukan di Konsol Function Compute ke daftar putih untuk instance AnalyticDB for MySQL atau AnalyticDB for PostgreSQL tujuan. Anda dapat melihat blok CIDR di halaman vSwitch dari Konsol VPC. Blok CIDR berada di baris tempat VPC dan vSwitch layanan Function Compute berada.
Anda harus masuk ke Konsol AnalyticDB for MySQL untuk menyetel daftar putih untuk kluster. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih Alamat IP.
Anda harus masuk ke Konsol AnalyticDB for PostgreSQL untuk menyetel daftar putih untuk kluster. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih Alamat IP.
Kirim pesan uji
Anda dapat mengirim pesan ke topik sumber data instance ApsaraMQ for Kafka untuk memeriksa apakah data dalam topik dapat diekspor ke AnalyticDB for MySQL atau AnalyticDB for PostgreSQL.
Nilai parameter Konten Pesan harus dalam format JSON dan akan diurai menjadi pasangan kunci-nilai. Kunci adalah nama kolom tabel database tujuan dan nilainya adalah data dalam kolom. Oleh karena itu, pastikan bahwa setiap kunci dari konten pesan memiliki nama kolom yang sesuai dalam tabel database tujuan.ApsaraMQ for Kafka Anda dapat masuk ke Konsol AnalyticDB for MySQL atau Konsol AnalyticDB for PostgreSQL dan hubungkan ke database tujuan untuk memeriksa nama kolom tabel database tujuan.
Di halaman Connectors, temukan konektor yang ingin Anda kelola dan klik Test di kolom Actions.
Di panel Send Message, konfigurasikan parameter untuk mengirim pesan untuk pengujian.
Jika Anda mengatur parameter Sending Method ke Console, lakukan langkah-langkah berikut:
Di bidang Message Key, masukkan kunci pesan. Contoh: demo.
Di bidang Message Content, masukkan konten pesan. Contoh: {"key": "test"}.
Konfigurasikan parameter Send to Specified Partition untuk menentukan apakah akan mengirim pesan uji ke partisi tertentu.
Jika Anda ingin mengirim pesan uji ke partisi tertentu, klik Yes dan masukkan ID partisi di bidang Partition ID. Contoh: 0. Untuk informasi tentang cara menanyakan ID partisi, lihat Lihat Status Partisi.
Jika Anda tidak ingin mengirim pesan uji ke partisi tertentu, klik No.
Jika Anda mengatur parameter Sending Method ke Docker, jalankan perintah Docker di bagian Run the Docker container to produce a sample message untuk mengirim pesan uji.
Jika Anda mengatur parameter Sending Method ke SDK, pilih SDK untuk bahasa pemrograman atau framework yang diperlukan dan metode akses untuk mengirim dan berlangganan pesan uji.
Verifikasi hasil ekspor data
Setelah Anda mengirim pesan ke topik sumber data instance ApsaraMQ for Kafka, masuk ke Konsol AnalyticDB for MySQL atau Konsol AnalyticDB for PostgreSQL, dan hubungkan ke database tujuan.Pada jendela perintah SQLConsole dari Konsol Data Management Service 5.0, klik tabel tujuan untuk memeriksa apakah data dalam topik sumber data berhasil diekspor.
Gambar berikut menunjukkan hasil tugas ekspor data dari ApsaraMQ for Kafka ke AnalyticDB for MySQL: