全部产品
Search
文档中心

ApsaraMQ for Kafka:Buat konektor sink AnalyticDB

更新时间:Jul 06, 2025

Topik ini menjelaskan cara membuat konektor sink AnalyticDB untuk mengekspor data dari topik sumber data ApsaraMQ for Kafka ke database AnalyticDB for MySQL atau AnalyticDB for PostgreSQL menggunakan Alibaba Cloud Function Compute.

Prasyarat

Persyaratan berikut harus dipenuhi:

Catatan Penggunaan

  • Anda hanya dapat mengekspor data dari topik sumber data instance ApsaraMQ for Kafka ke database AnalyticDB for MySQL atau AnalyticDB for PostgreSQL melalui Function Compute dalam wilayah yang sama. Untuk informasi tentang batasan pada konektor, lihat Batasan.

  • Konektor sink AnalyticDB mengekspor data menggunakan Function Compute. Function Compute menyediakan kuota gratis sumber daya untuk Anda. Jika penggunaan Anda melebihi kuota gratis ini, Anda akan dikenakan biaya untuk kelebihannya berdasarkan aturan penagihan Function Compute. Untuk informasi lebih lanjut, lihat Ikhtisar Penagihan.

  • Function Compute memungkinkan Anda untuk menanyakan log pemanggilan fungsi untuk memecahkan masalah. Untuk informasi lebih lanjut, lihat Konfigurasikan Logging.

  • ApsaraMQ for Kafka mengubah pesan menjadi string yang dikodekan UTF-8 untuk transfer. Message Queue for Apache Kafka tidak mendukung data biner.

  • Jika Anda menentukan titik akhir pribadi dari database tujuan untuk konektor sink AnalyticDB, Anda harus menentukan VPC dan vSwitch yang sama dengan yang digunakan oleh database tujuan untuk fungsi yang sesuai di Konsol Function Compute. Jika tidak, Function Compute tidak dapat mengakses database tujuan. Untuk informasi lebih lanjut, lihat Perbarui Layanan.

  • Saat membuat konektor, ApsaraMQ for Kafka membuat peran terkait layanan untuk Anda.

    • Jika tidak ada peran terkait layanan yang tersedia, ApsaraMQ for Kafka secara otomatis membuat peran terkait layanan untuk Anda guna menggunakan konektor sink AnalyticDB untuk mengekspor data dari ApsaraMQ for Kafka ke AnalyticDB for MySQL atau AnalyticDB for PostgreSQL.

    • Jika peran terkait layanan tersedia, ApsaraMQ for Kafka tidak membuat yang baru.

    Untuk informasi lebih lanjut tentang peran terkait layanan, lihat Peran Terkait Layanan.

Prosedur

Bagian ini menjelaskan cara menggunakan konektor sink AnalyticDB untuk mengekspor data dari topik sumber data instance ApsaraMQ for Kafka ke database AnalyticDB for MySQL atau AnalyticDB for PostgreSQL.

  1. Opsional: Buat topik dan grup yang diperlukan oleh konektor sink AnalyticDB.

    Jika Anda tidak ingin membuat topik dan grup secara manual, lewati langkah ini dan atur parameter Metode Pembuatan Sumber Daya ke Otomatis di langkah berikutnya.

    Penting

    Topik tertentu yang diperlukan oleh konektor sink AnalyticDB harus menggunakan mesin penyimpanan lokal. Jika versi utama instance ApsaraMQ for Kafka Anda adalah 0.10.2, topik yang menggunakan mesin penyimpanan lokal tidak dapat dibuat secara manual. Dalam versi ini, topik-topik tersebut harus dibuat secara otomatis.

    1. Buat Topik yang Diperlukan oleh Konektor Sink AnalyticDB

    2. Buat Grup yang Diperlukan oleh Konektor Sink AnalyticDB

  2. Buat dan Sebarkan Konektor Sink AnalyticDB

  3. Konfigurasikan Function Compute dan AnalyticDB for MySQL atau AnalyticDB for PostgreSQL.

    1. Konfigurasikan Layanan Function Compute Terkait

    2. Konfigurasikan AnalyticDB for MySQL atau AnalyticDB for PostgreSQL

  4. Verifikasi Hasilnya.

    1. Kirim Pesan Uji

    2. Verifikasi Hasil Ekspor Data

Buat topik yang diperlukan oleh konektor sink AnalyticDB

Di konsol ApsaraMQ for Kafka, Anda dapat membuat lima topik yang diperlukan oleh konektor sink AnalyticDB secara manual. Lima topik tersebut adalah topik offset tugas, topik konfigurasi tugas, topik status tugas, topik antrian pesan gagal, dan topik data kesalahan. Lima topik tersebut berbeda dalam mesin penyimpanan dan jumlah partisi. Untuk informasi lebih lanjut, lihat Parameter dalam Langkah Konfigurasi Layanan Sumber.

  1. Masuk ke konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.

    Penting

    Anda harus membuat topik di wilayah tempat Instance Elastic Compute Service (ECS) Anda diterapkan. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen pesan berjalan di instance ECS yang diterapkan di wilayah Tiongkok (Beijing), topik juga harus dibuat di wilayah Tiongkok (Beijing).

  3. Pada halaman Instances, klik nama instance yang ingin Anda kelola.

  4. Di panel navigasi sisi kiri, klik Topics.

  5. Pada halaman Topics, klik Create Topic.

  6. Di panel Create Topic, tentukan properti topik dan klik OK.

    Parameter

    Deskripsi

    Contoh

    Name

    Nama topik.

    demo

    Description

    Deskripsi topik.

    demo test

    Partitions

    Jumlah partisi dalam topik.

    12

    Storage Engine

    Catatan

    Anda dapat menentukan jenis mesin penyimpanan hanya jika Anda menggunakan instance Edisi Profesional non-serverless. Untuk jenis instance lainnya, Cloud Storage dipilih secara default.

    Jenis mesin penyimpanan yang digunakan untuk menyimpan pesan dalam topik.

    ApsaraMQ for Kafka mendukung jenis mesin penyimpanan berikut:

    • Cloud Storage: Jika Anda memilih nilai ini, sistem menggunakan disk Alibaba Cloud untuk topik dan menyimpan data dalam tiga replika dalam mode terdistribusi. Mesin penyimpanan ini memiliki latensi rendah, performa tinggi, durabilitas panjang, dan keandalan tinggi. Jika Anda mengatur parameter Instance Edition ke Standard (High Write) saat Anda membuat instance, Anda hanya dapat mengatur parameter ini ke Cloud Storage.

    • Local Storage: Jika Anda memilih nilai ini, sistem menggunakan algoritma in-sync replicas (ISR) dari Apache Kafka open source dan menyimpan data dalam tiga replika dalam mode terdistribusi.

    Cloud Storage

    Message Type

    Jenis pesan topik. Nilai valid:

    • Normal Message: Secara default, pesan dengan kunci yang sama disimpan dalam partisi yang sama dalam urutan pengiriman pesan. Jika broker dalam kluster gagal, urutan pesan yang disimpan dalam partisi mungkin tidak dipertahankan. Jika Anda mengatur parameter Storage Engine ke Cloud Storage, parameter ini secara otomatis diatur ke Normal Message.

    • Partitionally Ordered Message: Secara default, pesan dengan kunci yang sama disimpan dalam partisi yang sama dalam urutan pengiriman pesan. Jika broker dalam kluster gagal, pesan masih disimpan dalam partisi dalam urutan pengiriman pesan. Pesan dalam beberapa partisi tidak dapat dikirim hingga partisi dipulihkan. Jika Anda mengatur parameter Storage Engine ke Local Storage, parameter ini secara otomatis diatur ke Partitionally Ordered Message.

    Normal Message

    Log Cleanup Policy

    Kebijakan pembersihan log yang digunakan oleh topik.

    Jika Anda mengatur parameter Storage Engine ke Local Storage, Anda harus mengonfigurasi parameter Log Cleanup Policy. Anda dapat mengatur Parameter Penyimpanan hanya ke Penyimpanan Lokal jika Anda menggunakan instance Edisi Profesional ApsaraMQ for Kafka.

    ApsaraMQ for Kafka menyediakan kebijakan pembersihan log berikut:

    • Delete: kebijakan pembersihan log default. Jika ruang penyimpanan yang cukup tersedia dalam sistem, pesan disimpan berdasarkan periode retensi maksimum. Setelah penggunaan penyimpanan melebihi 85%, sistem menghapus pesan yang disimpan paling awal untuk memastikan ketersediaan layanan.

    • Compact: kebijakan kompaksi log yang digunakan di Apache Kafka. Kompaksi log memastikan bahwa nilai terbaru dipertahankan untuk pesan dengan kunci yang sama. Kebijakan ini cocok untuk skenario seperti memulihkan sistem yang gagal atau memuat ulang cache setelah sistem dimulai ulang. Misalnya, ketika Anda menggunakan Kafka Connect atau Confluent Schema Registry, Anda harus menyimpan informasi tentang status dan konfigurasi sistem dalam topik yang dikompaksi log.

      Penting

      Anda hanya dapat menggunakan topik yang dikompaksi log dalam komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi lebih lanjut, lihat aliware-kafka-demos.

    Compact

    Tag

    Tag yang ingin Anda lampirkan ke topik.

    demo

    Setelah topik dibuat, Anda dapat melihat topik tersebut di halaman Topics.

Buat grup yang diperlukan oleh konektor sink AnalyticDB

Di konsol ApsaraMQ for Kafka, Anda dapat membuat grup yang diperlukan oleh konektor sink AnalyticDB secara manual. Nama grup harus dalam format connect-Nama Tugas. Untuk informasi lebih lanjut, lihat Parameter dalam Langkah Konfigurasi Layanan Sumber.

  1. Masuk ke konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.

  3. Pada halaman Instances, klik nama instance yang ingin Anda kelola.

  4. Di panel navigasi sisi kiri, klik Groups.

  5. Pada halaman Groups, klik Create Group.

  6. Di panel Create Group, masukkan nama grup di bidang Group ID dan deskripsi grup di bidang Description, lampirkan tag ke grup, lalu klik OK.

    Setelah Anda membuat grup konsumen, Anda dapat melihat grup konsumen tersebut di halaman Groups.

Buat dan sebarkan konektor sink AnalyticDB

  1. Masuk ke konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.

  3. Di panel navigasi sisi kiri, klik Connectors.

  4. Pada halaman Connectors, klik Create Connector.

  5. Di wizard Create Connector, lakukan langkah-langkah berikut:

    1. Di langkah Configure Basic Information, atur parameter yang dijelaskan dalam tabel berikut dan klik Next.

      Parameter

      Deskripsi

      Contoh

      Name

      Nama konektor. Perhatikan aturan berikut saat menentukan nama konektor:

      • Nama konektor harus memiliki panjang 1 hingga 48 karakter. Dapat berisi angka, huruf kecil, dan tanda hubung (-), tetapi tidak boleh dimulai dengan tanda hubung (-).

      • Setiap nama konektor harus unik dalam satu instance ApsaraMQ for Kafka.

      Nama grup yang digunakan oleh tugas konektor harus dalam format connect-Nama Tugas. Jika Anda belum membuat grup seperti itu, Message Queue for Apache Kafka akan otomatis membuat satu untuk Anda.

      kafka-adb-sink

      Instance

      Informasi tentang instance Message Queue for Apache Kafka. Secara default, nama dan ID instance ditampilkan.

      demo alikafka_post-cn-st21p8vj****

    2. Di langkah Configure Source Service, pilih Message Queue for Apache Kafka sebagai layanan sumber, atur parameter yang dijelaskan dalam tabel berikut, lalu klik Next.

      Tabel 1. Parameter dalam Langkah Konfigurasi Layanan Sumber

      Parameter

      Deskripsi

      Contoh

      Data Source Topic

      Nama topik sumber data dari mana data akan diekspor.

      adb-test-input

      Consumer Thread Concurrency

      Jumlah thread konsumen konkuren yang digunakan untuk mengekspor data dari topik sumber data. Nilai default: 6. Nilai valid:

      • 1

      • 2

      • 3

      • 6

      • 12

      6

      Consumer Offset

      Offset tempat konsumsi dimulai. Nilai valid:

      • Earliest Offset: Konsumsi dimulai dari offset paling awal.

      • Latest Offset: Konsumsi dimulai dari offset terbaru.

      Earliest Offset

      VPC ID

      ID VPC tempat tugas ekspor data berjalan. Klik Configure Runtime Environment untuk menampilkan parameter. Nilai default adalah ID VPC yang Anda tentukan saat menerapkan instance ApsaraMQ for Kafka. Anda tidak perlu mengubah nilai ini.

      vpc-bp1xpdnd3l***

      vSwitch ID

      ID vSwitch tempat tugas ekspor data berjalan. Klik Configure Runtime Environment untuk menampilkan parameter. vSwitch harus diterapkan dalam VPC yang sama dengan instance ApsaraMQ for Kafka. Nilai default adalah ID vSwitch yang Anda tentukan saat menerapkan instance ApsaraMQ for Kafka.

      vsw-bp1d2jgg81***

      Failure Handling Policy

      Menentukan apakah akan mempertahankan langganan ke partisi tempat kesalahan menyebabkan kegagalan pengiriman pesan. Klik Configure Runtime Environment untuk menampilkan parameter. Nilai valid:

      • Continue Subscription: mempertahankan langganan ke partisi tempat kesalahan terjadi dan mengembalikan log.

      • Stop Subscription: menghentikan langganan ke partisi tempat kesalahan terjadi dan mengembalikan log.

      Catatan
      • Untuk informasi tentang cara melihat log konektor, lihat Kelola konektor.

      • Untuk informasi lebih lanjut tentang cara memecahkan masalah berdasarkan kode kesalahan, lihat Kode Kesalahan.

      Continue Subscription

      Resource Creation Method

      Metode pembuatan topik dan grup yang diperlukan oleh konektor sink AnalyticDB. Klik Configure Runtime Environment untuk menampilkan parameter. Nilai valid:

      • Auto

      • Manual

      Auto

      Connector Consumer Group

      Grup yang digunakan oleh konektor.Grup Klik Configure Runtime Environment untuk menampilkan parameter. Nama grup harus dalam format connect-Nama Tugas.

      connect-kafka-adb-sink

      Task Offset Topic

      Topik yang digunakan untuk menyimpan offset konsumen. Klik Configure Runtime Environment untuk menampilkan parameter.

      • Topik: Kami sarankan Anda memulai nama topik dengan connect-offset.

      • Partisi: Jumlah partisi dalam topik harus lebih dari 1.

      • Mesin Penyimpanan: Mesin penyimpanan topik harus disetel ke Penyimpanan Lokal.

      • cleanup.policy: Kebijakan pembersihan log untuk topik harus disetel ke Padatkan.

      connect-offset-kafka-adb-sink

      Task Configuration Topic

      Topik yang digunakan untuk menyimpan konfigurasi tugas. Klik Configure Runtime Environment untuk menampilkan parameter.

      • Topik: Kami sarankan Anda memulai nama topik dengan connect-config.

      • Partisi: Topik hanya dapat berisi satu partisi.

      • Mesin Penyimpanan: Mesin penyimpanan topik harus disetel ke Penyimpanan Lokal.

      • cleanup.policy: Kebijakan pembersihan log untuk topik harus disetel ke Padatkan.

      connect-config-kafka-adb-sink

      Task Status Topic

      Topik yang digunakan untuk menyimpan status tugas. Klik Configure Runtime Environment untuk menampilkan parameter.

      • Topik: Kami sarankan Anda memulai nama topik dengan connect-status.

      • Partisi: Kami sarankan Anda mengatur jumlah partisi dalam topik menjadi 6.

      • Mesin Penyimpanan: Mesin penyimpanan topik harus disetel ke Penyimpanan Lokal.

      • cleanup.policy: Kebijakan pembersihan log untuk topik harus disetel ke Padatkan.

      connect-status-kafka-adb-sink

      Dead-letter Queue Topic

      Topik yang digunakan untuk menyimpan data kesalahan kerangka kerja Kafka Connect. Klik Configure Runtime Environment untuk menampilkan parameter. Untuk menghemat sumber daya topik, Anda dapat membuat topik sebagai topik antrian pesan gagal dan topik data kesalahan.

      • Topik: Kami sarankan Anda memulai nama topik dengan connect-error.

      • Partisi: Kami sarankan Anda mengatur jumlah partisi dalam topik menjadi 6.

      • Mesin Penyimpanan: Mesin penyimpanan topik dapat disetel ke Penyimpanan Lokal atau Penyimpanan Cloud.

      connect-error-kafka-adb-sink

      Error Data Topic

      Topik yang digunakan untuk menyimpan data kesalahan konektor Sink. Klik Configure Runtime Environment untuk menampilkan parameter. Untuk menghemat sumber daya topik, Anda dapat membuat topik sebagai topik antrian pesan gagal dan topik data kesalahan.

      • Topik: Kami sarankan Anda memulai nama topik dengan connect-error.

      • Partisi: Kami sarankan Anda mengatur jumlah partisi dalam topik menjadi 6.

      • Mesin Penyimpanan: Mesin penyimpanan topik dapat disetel ke Penyimpanan Lokal atau Penyimpanan Cloud.

      connect-error-kafka-adb-sink

    3. Di langkah Configure Destination Service, pilih AnalyticDB sebagai layanan tujuan, atur parameter yang dijelaskan dalam tabel berikut, lalu klik Create.

      Parameter

      Deskripsi

      Contoh

      Instance Type

      Tipe instance database tujuan. Nilai valid: AnalyticDB for MySQL dan AnalyticDB for PostgreSQL.

      AnalyticDB for MySQL

      AnalyticDB Instance ID

      ID instance AnalyticDB for MySQL atau AnalyticDB for PostgreSQL tujuan.

      am-bp139yqk8u1ik****

      Database Name

      Nama database tujuan.

      adb_demo

      Table Name

      Nama tabel dalam database tujuan tempat data yang diekspor disimpan.

      user

      Database Username

      Nama pengguna yang Anda gunakan untuk masuk ke database tujuan.

      adbmysql

      Database Password

      Kata sandi yang Anda gunakan untuk masuk ke database tujuan. Kata sandi ditentukan saat Anda membuat instance AnalyticDB for MySQL AnalyticDB for PostgreSQL tujuan. Jika Anda lupa kata sandi, Anda dapat menyetel ulangnya.

      • Jika Anda ingin menyetel ulang kata sandi akun database AnalyticDB for MySQL, lakukan langkah-langkah yang dijelaskan dalam Ubah kata sandi akun database.

      • Jika Anda ingin menyetel ulang kata sandi akun database AnalyticDB for PostgreSQL, masuk ke Konsol AnalyticDB for PostgreSQL dan klik instance tujuan. Di panel navigasi sisi kiri, klik Account Management, temukan akun database yang ingin Anda setel ulang kata sandinya, dan klik Reset Password di kolom Actions.

      ********

      Catatan

      Nama pengguna dan kata sandi dilewatkan ke fungsi di Function Compute sebagai variabel lingkungan saat ApsaraMQ for Kafka membuat tugas ekspor data. Setelah tugas dibuat, ApsaraMQ for Kafka tidak menyimpan nama pengguna atau kata sandi.

      Setelah konektor dibuat, Anda dapat melihatnya di halaman Connectors.

  6. Pergi ke halaman Connectors, temukan konektor yang Anda buat, dan klik Deploy di kolom Actions.

Konfigurasikan layanan Function Compute terkait

Setelah konektor sink AnalyticDB dibuat dan diterapkan di konsol ApsaraMQ for Kafka, Function Compute secara otomatis membuat layanan fungsi dan fungsi untuk konektor tersebut. Layanan fungsi dinamai dalam format kafka-service-<nama_konektor>-<String Acak>, dan fungsi dinamai dalam format fc-adb-<String Acak>.

  1. Di halaman Connectors, temukan konektor yang ingin Anda konfigurasi layanan Function Compute-nya dan klik Configure Function di kolom Actions.

    Halaman dialihkan ke Konsol Function Compute.

  2. Di Konsol Function Compute, temukan layanan yang dibuat secara otomatis dan konfigurasikan VPC dan vSwitch untuk layanan tersebut. Untuk informasi lebih lanjut, lihat Perbarui Layanan.

Konfigurasikan AnalyticDB for MySQL atau AnalyticDB for PostgreSQL

Setelah layanan Function Compute diterapkan, Anda harus menambahkan blok CIDR untuk VPC yang Anda tentukan di Konsol Function Compute ke daftar putih untuk instance AnalyticDB for MySQL atau AnalyticDB for PostgreSQL tujuan. Anda dapat melihat blok CIDR di halaman vSwitch dari Konsol VPC. Blok CIDR berada di baris tempat VPC dan vSwitch layanan Function Compute berada.

Kirim pesan uji

Anda dapat mengirim pesan ke topik sumber data instance ApsaraMQ for Kafka untuk memeriksa apakah data dalam topik dapat diekspor ke AnalyticDB for MySQL atau AnalyticDB for PostgreSQL.

Catatan

Nilai parameter Konten Pesan harus dalam format JSON dan akan diurai menjadi pasangan kunci-nilai. Kunci adalah nama kolom tabel database tujuan dan nilainya adalah data dalam kolom. Oleh karena itu, pastikan bahwa setiap kunci dari konten pesan memiliki nama kolom yang sesuai dalam tabel database tujuan.ApsaraMQ for Kafka Anda dapat masuk ke Konsol AnalyticDB for MySQL atau Konsol AnalyticDB for PostgreSQL dan hubungkan ke database tujuan untuk memeriksa nama kolom tabel database tujuan.

  1. Di halaman Connectors, temukan konektor yang ingin Anda kelola dan klik Test di kolom Actions.

  2. Di panel Send Message, konfigurasikan parameter untuk mengirim pesan untuk pengujian.

    • Jika Anda mengatur parameter Sending Method ke Console, lakukan langkah-langkah berikut:

      1. Di bidang Message Key, masukkan kunci pesan. Contoh: demo.

      2. Di bidang Message Content, masukkan konten pesan. Contoh: {"key": "test"}.

      3. Konfigurasikan parameter Send to Specified Partition untuk menentukan apakah akan mengirim pesan uji ke partisi tertentu.

        • Jika Anda ingin mengirim pesan uji ke partisi tertentu, klik Yes dan masukkan ID partisi di bidang Partition ID. Contoh: 0. Untuk informasi tentang cara menanyakan ID partisi, lihat Lihat Status Partisi.

        • Jika Anda tidak ingin mengirim pesan uji ke partisi tertentu, klik No.

    • Jika Anda mengatur parameter Sending Method ke Docker, jalankan perintah Docker di bagian Run the Docker container to produce a sample message untuk mengirim pesan uji.

    • Jika Anda mengatur parameter Sending Method ke SDK, pilih SDK untuk bahasa pemrograman atau framework yang diperlukan dan metode akses untuk mengirim dan berlangganan pesan uji.

Verifikasi hasil ekspor data

Setelah Anda mengirim pesan ke topik sumber data instance ApsaraMQ for Kafka, masuk ke Konsol AnalyticDB for MySQL atau Konsol AnalyticDB for PostgreSQL, dan hubungkan ke database tujuan.Pada jendela perintah SQLConsole dari Konsol Data Management Service 5.0, klik tabel tujuan untuk memeriksa apakah data dalam topik sumber data berhasil diekspor.

Gambar berikut menunjukkan hasil tugas ekspor data dari ApsaraMQ for Kafka ke AnalyticDB for MySQL:ADB-Connector-Result