Konektor sink AnalyticDB membaca pesan dari topik ApsaraMQ for Kafka dan menuliskannya ke database AnalyticDB for MySQL atau AnalyticDB for PostgreSQL. Konektor ini menggunakan Function Compute untuk mentransfer data antar layanan dalam wilayah yang sama.
Cara kerja
Data mengalir melalui tiga komponen:
ApsaraMQ for Kafka membaca pesan dari topik sumber data.
Function Compute menerima pesan tersebut dan menuliskannya ke database tujuan.
AnalyticDB menyimpan data di tabel yang ditentukan.
Secara internal, konektor ini menggunakan lima topik (untuk offset, konfigurasi, status, dead-letter queue, dan data error) serta satu kelompok konsumen. Anda dapat membuat resource ini secara otomatis atau manual.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Instans ApsaraMQ for Kafka dengan fitur konektor diaktifkan
Topik sumber data di instans ApsaraMQ for Kafka
Database tujuan AnalyticDB:
AnalyticDB for MySQL: Sebuah kluster, sebuah akun database, sebuah klien koneksi, dan sebuah database
AnalyticDB for PostgreSQL: Sebuah instans, sebuah akun database, dan sebuah klien koneksi
Kumpulkan informasi berikut untuk konfigurasi konektor:
| Informasi | Deskripsi | Contoh |
|---|---|---|
| Nama topik sumber data | Topik Kafka tempat mengekspor data | adb-test-input |
| ID instans AnalyticDB | ID instans database tujuan | am-bp139yqk8u1ik**** |
| Nama database | Database tujuan | adb_demo |
| Nama tabel | Tabel tujuan | user |
| Username database | Username login database | adbmysql |
| Kata sandi database | Password login database | ******** |
Catatan penggunaan
Eksportasi data hanya didukung dalam wilayah yang sama. Eksportasi lintas wilayah tidak didukung. Untuk informasi lebih lanjut, lihat Batasan.
Function Compute menyediakan kuota gratis. Penggunaan melebihi kuota gratis akan dikenai biaya sesuai dengan harga Function Compute.
ApsaraMQ for Kafka melakukan serialisasi pesan sebagai string berkode UTF-8. Data biner tidak didukung.
Jika database tujuan menggunakan titik akhir pribadi, konfigurasikan virtual private cloud (VPC) dan vSwitch yang sama untuk layanan Function Compute. Jika tidak, Function Compute tidak dapat mengakses database tersebut. Untuk informasi lebih lanjut, lihat Perbarui layanan.
ApsaraMQ for Kafka secara otomatis membuat peran terkait layanan saat Anda membuat konektor, jika belum ada.
Untuk memecahkan masalah eksekusi fungsi, gunakan logging Function Compute. Untuk informasi lebih lanjut, lihat Konfigurasi logging.
Siapkan konektor
Untuk menyiapkan konektor:
Langkah 1: (Opsional) Buat topik dan kelompok konsumen yang diperlukan
Agar ApsaraMQ for Kafka membuat resource ini secara otomatis, lewati langkah ini dan atur Resource Creation Method menjadi Auto di Langkah 2.
Jika instans ApsaraMQ for Kafka Anda menjalankan versi utama 0.10.2, topik yang memerlukan mesin penyimpanan Local Storage harus dibuat secara otomatis. Pembuatan manual topik Local Storage tidak didukung pada versi ini.
Konektor ini memerlukan lima topik internal dan satu kelompok konsumen. Buat secara manual hanya jika Anda membutuhkan konfigurasi khusus.
Topik yang diperlukan
| Topik | Awalan nama | Partisi | Mesin penyimpanan | Kebijakan pembersihan log |
|---|---|---|---|---|
| Offset tugas | connect-offset | Lebih dari 1 | Local Storage | Compact |
| Konfigurasi tugas | connect-config | 1 | Local Storage | Compact |
| Status tugas | connect-status | 6 (disarankan) | Local Storage | Compact |
| Dead-letter queue | connect-error | 6 (disarankan) | Local Storage atau Cloud Storage | Apa saja |
| Data error | connect-error | 6 (disarankan) | Local Storage atau Cloud Storage | Apa saja |
Tips: Topik dead-letter queue dan topik data error dapat menggunakan topik yang sama untuk menghemat resource.
Buat topik
Masuk ke Konsol ApsaraMQ for Kafka.
Pada bagian Resource Distribution di halaman Overview, pilih wilayah instans Anda.
Anda harus membuat topik di wilayah tempat instans Elastic Compute Service (ECS) Anda diterapkan. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen pesan berjalan di instans ECS yang diterapkan di wilayah China (Beijing), topik juga harus dibuat di wilayah China (Beijing).
Pada halaman Instances, klik nama instans.
Di panel navigasi sebelah kiri, klik Topics.
Pada halaman Topics, klik Create Topic.
Pada panel Create Topic, konfigurasikan parameter berikut lalu klik OK.
| Parameter | Deskripsi |
|---|---|
| Name | Nama topik. Gunakan awalan penamaan dari tabel di atas (misalnya, connect-offset-kafka-adb-sink). |
| Partitions | Jumlah partisi. Lihat nilai yang diperlukan pada tabel di atas. |
| Storage Engine | Jenis mesin penyimpanan. Hanya tersedia pada instans Edisi Profesional. Instans Edisi Standar menggunakan Cloud Storage secara default. Opsi: Cloud Storage (disk Alibaba Cloud, penyimpanan terdistribusi 3-replika dengan latensi rendah dan keandalan tinggi) atau Local Storage (algoritma Apache Kafka ISR, penyimpanan terdistribusi 3-replika). Instans Standard (High Write) hanya mendukung Cloud Storage. |
| Message Type | Jaminan pengurutan pesan. Normal Message (default untuk Cloud Storage): urutan partisi mungkin tidak dipertahankan selama kegagalan broker. Partitionally Ordered Message (default untuk Local Storage): urutan dipertahankan selama kegagalan broker, tetapi partisi yang terpengaruh tidak tersedia hingga dipulihkan. |
| Log Cleanup Policy | Diperlukan ketika Storage Engine adalah Local Storage. Delete (default): menyimpan pesan berdasarkan periode retensi; menghapus pesan tertua saat penyimpanan melebihi 85%. Compact: hanya menyimpan nilai terbaru per kunci. Diperlukan untuk topik internal konektor yang menggunakan Kafka Connect. Penting Topik dengan log-compacted hanya dapat digunakan pada komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi lebih lanjut, lihat aliware-kafka-demos. |
| Description | Deskripsi opsional. |
| Tag | Tag opsional. |
Ulangi untuk membuat semua lima topik yang diperlukan.
Buat kelompok konsumen
Konektor ini memerlukan kelompok konsumen bernama connect-<nama-konektor> (misalnya, connect-kafka-adb-sink).
Masuk ke Konsol ApsaraMQ for Kafka.
Pada bagian Resource Distribution di halaman Overview, pilih wilayah instans Anda.
Pada halaman Instances, klik nama instans.
Di panel navigasi sebelah kiri, klik Groups.
Pada halaman Groups, klik Create Group.
Pada panel Create Group, masukkan nama kelompok di bidang Group ID, tambahkan deskripsi dan tag opsional, lalu klik OK.
Langkah 2: Buat dan terapkan konektor
Masuk ke Konsol ApsaraMQ for Kafka.
Pada bagian Resource Distribution di halaman Overview, pilih wilayah instans Anda.
Pada halaman Instances, klik nama instans.
Di panel navigasi sebelah kiri, klik Connectors.
Pada halaman Connectors, klik Create Connector.
Lengkapi wizard Create Connector:
Konfigurasi informasi dasar
| Parameter | Deskripsi | Contoh |
|---|---|---|
| Name | Nama konektor. 1–48 karakter: angka, huruf kecil, dan tanda hubung (-). Tidak boleh dimulai dengan tanda hubung. Harus unik dalam instans. | kafka-adb-sink |
| Instance | Instans ApsaraMQ for Kafka. Menampilkan nama dan ID instans. | demo alikafka_post-cn-st21p8vj**** |
Klik Next.
Konfigurasi layanan sumber
Pilih Message Queue for Apache Kafka sebagai layanan sumber dan konfigurasikan parameter berikut.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| Data Source Topic | Topik tempat mengekspor data. | adb-test-input |
| Consumer Thread Concurrency | Jumlah thread konsumen konkuren. Default: 6. Nilai valid: 1, 2, 3, 6, 12. | 6 |
| Consumer Offset | Titik mulai konsumsi. Earliest Offset: mulai dari awal. Latest Offset: mulai dari pesan terbaru. | Earliest Offset |
Klik Configure Runtime Environment untuk memperluas pengaturan lanjutan.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| VPC ID | VPC untuk tugas ekspor. Default ke VPC instans ApsaraMQ for Kafka. | vpc-bp1xpdnd3l*** |
| vSwitch ID | vSwitch untuk tugas ekspor. Harus berada dalam VPC yang sama dengan instans. | vsw-bp1d2jgg81*** |
| Failure Handling Policy | Tindakan saat pengiriman pesan gagal. Continue Subscription: terus mengonsumsi dan mencatat error. Stop Subscription: hentikan konsumsi dan catat error. Untuk detail log, lihat Kelola konektor. Untuk kode kesalahan, lihat Kode kesalahan. | Continue Subscription |
| Resource Creation Method | Cara membuat topik internal dan kelompok konsumen yang diperlukan. Auto: ApsaraMQ for Kafka membuatnya secara otomatis. Manual: gunakan resource yang dibuat di Langkah 1. | Auto |
| Connector Consumer Group | Kelompok konsumen untuk konektor. Format: connect-<nama-konektor>. | connect-kafka-adb-sink |
| Task Offset Topic | Menyimpan offset konsumen. Awalan nama: connect-offset. Partisi: lebih dari 1. Mesin penyimpanan: Local Storage. Kebijakan pembersihan: Compact. | connect-offset-kafka-adb-sink |
| Task Configuration Topic | Menyimpan konfigurasi tugas. Awalan nama: connect-config. Partisi: 1. Mesin penyimpanan: Local Storage. Kebijakan pembersihan: Compact. | connect-config-kafka-adb-sink |
| Task Status Topic | Menyimpan status tugas. Awalan nama: connect-status. Partisi: 6 (disarankan). Mesin penyimpanan: Local Storage. Kebijakan pembersihan: Compact. | connect-status-kafka-adb-sink |
| Dead-letter Queue Topic | Menyimpan data error framework Kafka Connect. Awalan nama: connect-error. Partisi: 6 (disarankan). Mesin penyimpanan: Local Storage atau Cloud Storage. Dapat berbagi topik dengan topik data error. | connect-error-kafka-adb-sink |
| Error Data Topic | Menyimpan data error konektor sink. Awalan nama: connect-error. Partisi: 6 (disarankan). Mesin penyimpanan: Local Storage atau Cloud Storage. Dapat berbagi topik dengan topik dead-letter queue. | connect-error-kafka-adb-sink |
Klik Next.
Konfigurasi layanan tujuan
Pilih AnalyticDB sebagai layanan tujuan dan konfigurasikan parameter berikut.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| Instance Type | Jenis database tujuan: AnalyticDB for MySQL atau AnalyticDB for PostgreSQL. | AnalyticDB for MySQL |
| AnalyticDB Instance ID | ID instans tujuan. | am-bp139yqk8u1ik**** |
| Database Name | Database tujuan. | adb_demo |
| Table Name | Tabel tujuan untuk data yang diekspor. | user |
| Database Username | Username login database. | adbmysql |
| Database Password | Password login database. Untuk mengatur ulang password yang lupa: untuk AnalyticDB for MySQL, atur ulang di konsol AnalyticDB for MySQL. Untuk AnalyticDB for PostgreSQL, buka Account Management lalu klik Reset Password. | ******** |
Username dan password database diteruskan ke Function Compute sebagai variabel lingkungan saat ApsaraMQ for Kafka membuat tugas ekspor. ApsaraMQ for Kafka tidak menyimpan kredensial ini setelah tugas dibuat.
Klik Create.
Pada halaman Connectors, temukan konektor lalu klik Deploy di kolom Actions.
Langkah 3: Konfigurasi jaringan Function Compute
Setelah penerapan, Function Compute secara otomatis membuat layanan (bernama kafka-service-<nama_konektor>-<string_acak>) dan fungsi (bernama fc-adb-<string_acak>) untuk konektor tersebut.
Pada halaman Connectors, temukan konektor lalu klik Configure Function di kolom Actions. Ini akan membuka konsol Function Compute.
Di Konsol Function Compute, temukan layanan yang dibuat secara otomatis dan konfigurasikan VPC serta vSwitch agar sesuai dengan database tujuan. Untuk informasi lebih lanjut, lihat Perbarui layanan.
Langkah 4: Tambahkan blok CIDR VPC ke daftar putih AnalyticDB
Tambahkan blok CIDR VPC yang dikonfigurasi di Function Compute ke daftar putih AnalyticDB. Temukan blok CIDR di halaman vSwitch di Konsol VPC — terletak di baris yang sesuai dengan VPC dan vSwitch layanan Function Compute.
AnalyticDB for MySQL: Konfigurasikan daftar putih di Konsol AnalyticDB for MySQL. Untuk informasi lebih lanjut, lihat Konfigurasi daftar putih alamat IP.
AnalyticDB for PostgreSQL: Konfigurasikan daftar putih di Konsol AnalyticDB for PostgreSQL. Untuk informasi lebih lanjut, lihat Konfigurasi daftar putih alamat IP.
Langkah 5: Verifikasi eksportasi data
Kirim pesan uji
Konten pesan harus berupa JSON yang valid. Setiap kunci JSON dipetakan ke nama kolom di tabel tujuan, dan setiap nilai dipetakan ke data kolom. Pastikan kunci JSON sesuai dengan nama kolom di tabel tujuan sebelum mengirim pesan uji.
Pada halaman Connectors, temukan konektor lalu klik Test di kolom Actions.
Pada panel Send Message, pilih Sending Method:
Console:
Di bidang Message Key, masukkan kunci (misalnya,
demo).Di bidang Message Content, masukkan konten JSON (misalnya,
{"key": "test"}).Untuk Send to Specified Partition, pilih Yes dan masukkan Partition ID (misalnya,
0) untuk menargetkan partisi tertentu, atau pilih No agar ApsaraMQ for Kafka menetapkan partisi. Untuk informasi tentang ID partisi, lihat Lihat status partisi.
Docker: Jalankan perintah Docker yang ditampilkan di bagian Run the Docker container to produce a sample message.
SDK: Pilih SDK untuk bahasa pemrograman Anda dan metode akses untuk mengirim pesan uji.
Verifikasi hasil
Setelah mengirim pesan, periksa tabel tujuan untuk memastikan data telah diekspor:
Masuk ke Konsol AnalyticDB for MySQL atau Konsol AnalyticDB for PostgreSQL.
Sambungkan ke database tujuan.
Di SQLConsole di Konsol Data Management Service 5.0, buka tabel tujuan dan pastikan data yang diekspor telah ada.
Langkah selanjutnya
Kelola konektor: Lihat status konektor, jeda, lanjutkan, atau hapus konektor.
Konfigurasi logging: Siapkan log Function Compute untuk memecahkan masalah eksportasi data.