All Products
Search
Document Center

ApsaraMQ for Kafka:Buat konektor sink AnalyticDB

Last Updated:Mar 11, 2026

Konektor sink AnalyticDB membaca pesan dari topik ApsaraMQ for Kafka dan menuliskannya ke database AnalyticDB for MySQL atau AnalyticDB for PostgreSQL. Konektor ini menggunakan Function Compute untuk mentransfer data antar layanan dalam wilayah yang sama.

Cara kerja

Data mengalir melalui tiga komponen:

  1. ApsaraMQ for Kafka membaca pesan dari topik sumber data.

  2. Function Compute menerima pesan tersebut dan menuliskannya ke database tujuan.

  3. AnalyticDB menyimpan data di tabel yang ditentukan.

Secara internal, konektor ini menggunakan lima topik (untuk offset, konfigurasi, status, dead-letter queue, dan data error) serta satu kelompok konsumen. Anda dapat membuat resource ini secara otomatis atau manual.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

Kumpulkan informasi berikut untuk konfigurasi konektor:

InformasiDeskripsiContoh
Nama topik sumber dataTopik Kafka tempat mengekspor dataadb-test-input
ID instans AnalyticDBID instans database tujuanam-bp139yqk8u1ik****
Nama databaseDatabase tujuanadb_demo
Nama tabelTabel tujuanuser
Username databaseUsername login databaseadbmysql
Kata sandi databasePassword login database********

Catatan penggunaan

  • Eksportasi data hanya didukung dalam wilayah yang sama. Eksportasi lintas wilayah tidak didukung. Untuk informasi lebih lanjut, lihat Batasan.

  • Function Compute menyediakan kuota gratis. Penggunaan melebihi kuota gratis akan dikenai biaya sesuai dengan harga Function Compute.

  • ApsaraMQ for Kafka melakukan serialisasi pesan sebagai string berkode UTF-8. Data biner tidak didukung.

  • Jika database tujuan menggunakan titik akhir pribadi, konfigurasikan virtual private cloud (VPC) dan vSwitch yang sama untuk layanan Function Compute. Jika tidak, Function Compute tidak dapat mengakses database tersebut. Untuk informasi lebih lanjut, lihat Perbarui layanan.

  • ApsaraMQ for Kafka secara otomatis membuat peran terkait layanan saat Anda membuat konektor, jika belum ada.

  • Untuk memecahkan masalah eksekusi fungsi, gunakan logging Function Compute. Untuk informasi lebih lanjut, lihat Konfigurasi logging.

Siapkan konektor

Untuk menyiapkan konektor:

  1. (Opsional) Buat topik dan kelompok konsumen yang diperlukan

  2. Buat dan terapkan konektor

  3. Konfigurasi jaringan Function Compute

  4. Tambahkan blok CIDR VPC ke daftar putih AnalyticDB

  5. Verifikasi eksportasi data

Langkah 1: (Opsional) Buat topik dan kelompok konsumen yang diperlukan

Agar ApsaraMQ for Kafka membuat resource ini secara otomatis, lewati langkah ini dan atur Resource Creation Method menjadi Auto di Langkah 2.
Penting

Jika instans ApsaraMQ for Kafka Anda menjalankan versi utama 0.10.2, topik yang memerlukan mesin penyimpanan Local Storage harus dibuat secara otomatis. Pembuatan manual topik Local Storage tidak didukung pada versi ini.

Konektor ini memerlukan lima topik internal dan satu kelompok konsumen. Buat secara manual hanya jika Anda membutuhkan konfigurasi khusus.

Topik yang diperlukan

TopikAwalan namaPartisiMesin penyimpananKebijakan pembersihan log
Offset tugasconnect-offsetLebih dari 1Local StorageCompact
Konfigurasi tugasconnect-config1Local StorageCompact
Status tugasconnect-status6 (disarankan)Local StorageCompact
Dead-letter queueconnect-error6 (disarankan)Local Storage atau Cloud StorageApa saja
Data errorconnect-error6 (disarankan)Local Storage atau Cloud StorageApa saja
Tips: Topik dead-letter queue dan topik data error dapat menggunakan topik yang sama untuk menghemat resource.

Buat topik

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Pada bagian Resource Distribution di halaman Overview, pilih wilayah instans Anda.

Penting

Anda harus membuat topik di wilayah tempat instans Elastic Compute Service (ECS) Anda diterapkan. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen pesan berjalan di instans ECS yang diterapkan di wilayah China (Beijing), topik juga harus dibuat di wilayah China (Beijing).

  1. Pada halaman Instances, klik nama instans.

  2. Di panel navigasi sebelah kiri, klik Topics.

  3. Pada halaman Topics, klik Create Topic.

  4. Pada panel Create Topic, konfigurasikan parameter berikut lalu klik OK.

ParameterDeskripsi
NameNama topik. Gunakan awalan penamaan dari tabel di atas (misalnya, connect-offset-kafka-adb-sink).
PartitionsJumlah partisi. Lihat nilai yang diperlukan pada tabel di atas.
Storage EngineJenis mesin penyimpanan. Hanya tersedia pada instans Edisi Profesional. Instans Edisi Standar menggunakan Cloud Storage secara default. Opsi: Cloud Storage (disk Alibaba Cloud, penyimpanan terdistribusi 3-replika dengan latensi rendah dan keandalan tinggi) atau Local Storage (algoritma Apache Kafka ISR, penyimpanan terdistribusi 3-replika). Instans Standard (High Write) hanya mendukung Cloud Storage.
Message TypeJaminan pengurutan pesan. Normal Message (default untuk Cloud Storage): urutan partisi mungkin tidak dipertahankan selama kegagalan broker. Partitionally Ordered Message (default untuk Local Storage): urutan dipertahankan selama kegagalan broker, tetapi partisi yang terpengaruh tidak tersedia hingga dipulihkan.
Log Cleanup PolicyDiperlukan ketika Storage Engine adalah Local Storage. Delete (default): menyimpan pesan berdasarkan periode retensi; menghapus pesan tertua saat penyimpanan melebihi 85%. Compact: hanya menyimpan nilai terbaru per kunci. Diperlukan untuk topik internal konektor yang menggunakan Kafka Connect.
Penting

Topik dengan log-compacted hanya dapat digunakan pada komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi lebih lanjut, lihat aliware-kafka-demos.

DescriptionDeskripsi opsional.
TagTag opsional.
  1. Ulangi untuk membuat semua lima topik yang diperlukan.

Buat kelompok konsumen

Konektor ini memerlukan kelompok konsumen bernama connect-<nama-konektor> (misalnya, connect-kafka-adb-sink).

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Pada bagian Resource Distribution di halaman Overview, pilih wilayah instans Anda.

  3. Pada halaman Instances, klik nama instans.

  4. Di panel navigasi sebelah kiri, klik Groups.

  5. Pada halaman Groups, klik Create Group.

  6. Pada panel Create Group, masukkan nama kelompok di bidang Group ID, tambahkan deskripsi dan tag opsional, lalu klik OK.

Langkah 2: Buat dan terapkan konektor

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Pada bagian Resource Distribution di halaman Overview, pilih wilayah instans Anda.

  3. Pada halaman Instances, klik nama instans.

  4. Di panel navigasi sebelah kiri, klik Connectors.

  5. Pada halaman Connectors, klik Create Connector.

  6. Lengkapi wizard Create Connector:

Konfigurasi informasi dasar

ParameterDeskripsiContoh
NameNama konektor. 1–48 karakter: angka, huruf kecil, dan tanda hubung (-). Tidak boleh dimulai dengan tanda hubung. Harus unik dalam instans.kafka-adb-sink
InstanceInstans ApsaraMQ for Kafka. Menampilkan nama dan ID instans.demo alikafka_post-cn-st21p8vj****

Klik Next.

Konfigurasi layanan sumber

Pilih Message Queue for Apache Kafka sebagai layanan sumber dan konfigurasikan parameter berikut.

ParameterDeskripsiContoh
Data Source TopicTopik tempat mengekspor data.adb-test-input
Consumer Thread ConcurrencyJumlah thread konsumen konkuren. Default: 6. Nilai valid: 1, 2, 3, 6, 12.6
Consumer OffsetTitik mulai konsumsi. Earliest Offset: mulai dari awal. Latest Offset: mulai dari pesan terbaru.Earliest Offset

Klik Configure Runtime Environment untuk memperluas pengaturan lanjutan.

ParameterDeskripsiContoh
VPC IDVPC untuk tugas ekspor. Default ke VPC instans ApsaraMQ for Kafka.vpc-bp1xpdnd3l***
vSwitch IDvSwitch untuk tugas ekspor. Harus berada dalam VPC yang sama dengan instans.vsw-bp1d2jgg81***
Failure Handling PolicyTindakan saat pengiriman pesan gagal. Continue Subscription: terus mengonsumsi dan mencatat error. Stop Subscription: hentikan konsumsi dan catat error. Untuk detail log, lihat Kelola konektor. Untuk kode kesalahan, lihat Kode kesalahan.Continue Subscription
Resource Creation MethodCara membuat topik internal dan kelompok konsumen yang diperlukan. Auto: ApsaraMQ for Kafka membuatnya secara otomatis. Manual: gunakan resource yang dibuat di Langkah 1.Auto
Connector Consumer GroupKelompok konsumen untuk konektor. Format: connect-<nama-konektor>.connect-kafka-adb-sink
Task Offset TopicMenyimpan offset konsumen. Awalan nama: connect-offset. Partisi: lebih dari 1. Mesin penyimpanan: Local Storage. Kebijakan pembersihan: Compact.connect-offset-kafka-adb-sink
Task Configuration TopicMenyimpan konfigurasi tugas. Awalan nama: connect-config. Partisi: 1. Mesin penyimpanan: Local Storage. Kebijakan pembersihan: Compact.connect-config-kafka-adb-sink
Task Status TopicMenyimpan status tugas. Awalan nama: connect-status. Partisi: 6 (disarankan). Mesin penyimpanan: Local Storage. Kebijakan pembersihan: Compact.connect-status-kafka-adb-sink
Dead-letter Queue TopicMenyimpan data error framework Kafka Connect. Awalan nama: connect-error. Partisi: 6 (disarankan). Mesin penyimpanan: Local Storage atau Cloud Storage. Dapat berbagi topik dengan topik data error.connect-error-kafka-adb-sink
Error Data TopicMenyimpan data error konektor sink. Awalan nama: connect-error. Partisi: 6 (disarankan). Mesin penyimpanan: Local Storage atau Cloud Storage. Dapat berbagi topik dengan topik dead-letter queue.connect-error-kafka-adb-sink

Klik Next.

Konfigurasi layanan tujuan

Pilih AnalyticDB sebagai layanan tujuan dan konfigurasikan parameter berikut.

ParameterDeskripsiContoh
Instance TypeJenis database tujuan: AnalyticDB for MySQL atau AnalyticDB for PostgreSQL.AnalyticDB for MySQL
AnalyticDB Instance IDID instans tujuan.am-bp139yqk8u1ik****
Database NameDatabase tujuan.adb_demo
Table NameTabel tujuan untuk data yang diekspor.user
Database UsernameUsername login database.adbmysql
Database PasswordPassword login database. Untuk mengatur ulang password yang lupa: untuk AnalyticDB for MySQL, atur ulang di konsol AnalyticDB for MySQL. Untuk AnalyticDB for PostgreSQL, buka Account Management lalu klik Reset Password.********
Username dan password database diteruskan ke Function Compute sebagai variabel lingkungan saat ApsaraMQ for Kafka membuat tugas ekspor. ApsaraMQ for Kafka tidak menyimpan kredensial ini setelah tugas dibuat.

Klik Create.

  1. Pada halaman Connectors, temukan konektor lalu klik Deploy di kolom Actions.

Langkah 3: Konfigurasi jaringan Function Compute

Setelah penerapan, Function Compute secara otomatis membuat layanan (bernama kafka-service-<nama_konektor>-<string_acak>) dan fungsi (bernama fc-adb-<string_acak>) untuk konektor tersebut.

  1. Pada halaman Connectors, temukan konektor lalu klik Configure Function di kolom Actions. Ini akan membuka konsol Function Compute.

  2. Di Konsol Function Compute, temukan layanan yang dibuat secara otomatis dan konfigurasikan VPC serta vSwitch agar sesuai dengan database tujuan. Untuk informasi lebih lanjut, lihat Perbarui layanan.

Langkah 4: Tambahkan blok CIDR VPC ke daftar putih AnalyticDB

Tambahkan blok CIDR VPC yang dikonfigurasi di Function Compute ke daftar putih AnalyticDB. Temukan blok CIDR di halaman vSwitch di Konsol VPC — terletak di baris yang sesuai dengan VPC dan vSwitch layanan Function Compute.

Langkah 5: Verifikasi eksportasi data

Kirim pesan uji

Penting

Konten pesan harus berupa JSON yang valid. Setiap kunci JSON dipetakan ke nama kolom di tabel tujuan, dan setiap nilai dipetakan ke data kolom. Pastikan kunci JSON sesuai dengan nama kolom di tabel tujuan sebelum mengirim pesan uji.

  1. Pada halaman Connectors, temukan konektor lalu klik Test di kolom Actions.

  2. Pada panel Send Message, pilih Sending Method:

    • Console:

      1. Di bidang Message Key, masukkan kunci (misalnya, demo).

      2. Di bidang Message Content, masukkan konten JSON (misalnya, {"key": "test"}).

      3. Untuk Send to Specified Partition, pilih Yes dan masukkan Partition ID (misalnya, 0) untuk menargetkan partisi tertentu, atau pilih No agar ApsaraMQ for Kafka menetapkan partisi. Untuk informasi tentang ID partisi, lihat Lihat status partisi.

    • Docker: Jalankan perintah Docker yang ditampilkan di bagian Run the Docker container to produce a sample message.

    • SDK: Pilih SDK untuk bahasa pemrograman Anda dan metode akses untuk mengirim pesan uji.

Verifikasi hasil

Setelah mengirim pesan, periksa tabel tujuan untuk memastikan data telah diekspor:

  1. Masuk ke Konsol AnalyticDB for MySQL atau Konsol AnalyticDB for PostgreSQL.

  2. Sambungkan ke database tujuan.

  3. Di SQLConsole di Konsol Data Management Service 5.0, buka tabel tujuan dan pastikan data yang diekspor telah ada.

Langkah selanjutnya