全部产品
Search
文档中心

ApsaraMQ for Kafka:Buat Konektor Sink AnalyticDB

更新时间:Jul 06, 2025

Topik ini menjelaskan cara membuat konektor sink AnalyticDB untuk menyinkronkan data dari topik sumber di instance ApsaraMQ for Kafka ke tabel di AnalyticDB.

Prasyarat

Untuk informasi tentang prasyarat, lihat Prasyarat.

Langkah 1: Buat Sumber Daya AnalyticDB

Buat sumber daya AnalyticDB for MySQL atau AnalyticDB for PostgreSQL.

Dalam topik ini, sebuah basis data AnalyticDB for MySQL bernama adb_sink_database dan tabel data bernama adb_sink_table dibuat.

Langkah 2: Buat dan Mulai Konektor Sink AnalyticDB

  1. Masuk ke Konsol ApsaraMQ for Kafka. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.

  2. Di panel navigasi di sebelah kiri, pilih Connector Ecosystem Integration > Tasks.

  3. Di halaman Tasks, klik Create Task.

    • Pembuatan Tugas

      1. Di langkah Source, atur parameter Data Provider menjadi ApsaraMQ for Kafka dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Next Step. Tabel berikut menjelaskan parameter-parameter tersebut.

        Parameter

        Deskripsi

        Contoh

        Region

        Wilayah tempat instance sumber ApsaraMQ for Kafka berada.

        Tiongkok (Beijing)

        ApsaraMQ for Kafka Instance

        Instance ApsaraMQ for Kafka tempat pesan yang ingin Anda rutekan diproduksi.

        alikafka_post-cn-jte3****

        Topic

        Topik pada instance ApsaraMQ for Kafka tempat pesan yang ingin Anda rutekan diproduksi.

        demo-topic

        Group ID

        Nama grup konsumen pada instance ApsaraMQ for Kafka sumber.

        • Quickly Create: Sistem secara otomatis membuat grup konsumen dengan format GID_EVENTBRIDGE_xxx. Kami merekomendasikan Anda memilih nilai ini.

        • Use Existing Group: Pilih ID grup yang ada yang tidak sedang digunakan. Jika Anda memilih grup yang ada yang sedang digunakan, publikasi dan langganan pesan yang ada akan terpengaruh.

        Buat Cepat

        Consumer Offset

        Offset dari mana pesan dikonsumsi. Nilai valid:

        • Latest Offset

        • Earliest Offset

        Offset Terbaru

        Network Configuration

        Tipe jaringan tempat Anda ingin merutekan pesan. Nilai valid:

        • Basic Network

        • Self-managed Internet

        Jaringan Dasar

        VPC

        ID virtual private cloud (VPC) tempat instance ApsaraMQ for Kafka diterapkan. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration ke Internet Mandiri.

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        ID vSwitch tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration ke Internet Mandiri.

        vsw-bp1gbjhj53hdjdkg****

        Security Group

        ID grup keamanan tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration ke Internet Mandiri.

        alikafka_pre-cn-7mz2****

        Data Format

        Fitur format data digunakan untuk mengkodekan data biner yang dikirim dari sumber ke dalam format data tertentu. Beberapa format data didukung. Jika Anda tidak memiliki persyaratan khusus untuk pengkodean, tentukan Json sebagai nilainya.

        • Json: Data biner dikodekan menjadi data berformat JSON berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload.

        • Text: Data biner dikodekan menjadi string berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload. Ini adalah nilai default.

        • Binary: Data biner dikodekan menjadi string berdasarkan pengkodean Base64 dan kemudian dimasukkan ke dalam payload.

        Json

        Messages

        Jumlah maksimum pesan yang dapat dikirim dalam setiap pemanggilan fungsi. Permintaan hanya dikirim ketika jumlah pesan dalam backlog mencapai nilai yang ditentukan. Nilai valid: 1 hingga 10000.

        100

        Interval (Unit: Seconds)

        Interval waktu saat fungsi dipanggil. Sistem mengirimkan pesan agregat ke Function Compute pada interval waktu yang ditentukan. Nilai valid: 0 hingga 15. Satuan: detik. Nilai 0 menentukan bahwa pesan dikirim segera setelah agregasi.

        3

      2. Di langkah Filtering, definisikan pola data di editor kode Pattern Content untuk menyaring data. Untuk informasi lebih lanjut, lihat Pola Peristiwa.

      3. Di langkah Transformation, tentukan metode pembersihan data untuk mengimplementasikan kemampuan pemisahan, pemetaan, pengayaan, dan perutean data. Untuk informasi lebih lanjut, lihat Gunakan Function Compute untuk melakukan pembersihan pesan.

      4. Di langkah Sink, atur parameter Service Type menjadi AnalyticDB dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Save. Tabel berikut menjelaskan parameter-parameter tersebut.

        Parameter

        Deskripsi

        Contoh

        Tipe Instance

        Tipe instance yang Anda buat. Dalam contoh ini, AnalyticDB for MySQL dipilih. Nilai valid:

        • AnalyticDB for MySQL

        • AnalyticDB for PostgreSQL

        AnalyticDB for MySQL

        ID Instance AnalyticDB

        ID instance AnalyticDB for MySQL yang Anda buat.

        gp-bp10uo5n536wd****

        Nama Basis Data

        Nama basis data yang Anda buat.

        adb_sink_database

        Nama Tabel

        Nama tabel yang Anda buat.

        adb_sink_table

        Data Mapping

        Format data yang diteruskan dari ApsaraMQ for Kafka ke AnalyticDB. Anda dapat menentukan aturan ekstraksi nilai di tabel basis data menggunakan aturan JSONPath. Jika Anda mengatur parameter Data Format menjadi Json di langkah Source, format data yang diteruskan dari ApsaraMQ for Kafka adalah seperti yang ditunjukkan dalam kode berikut:

        {
            "data": {
                "topic": "demo-topic",
                "partition": 0,
                "offset": 2,
                "timestamp": 1739756629123,
                "headers": {
                    "headers": [],
                    "isReadOnly": false
                },
                "key":"adb-sink-k1",
                "value": {
                    "userid":"xiaoming",
                    "source":"shanghai"
                }
            },
            "id": "7702ca16-f944-4b08-***-***-0-2",
            "source": "acs:alikafka",
            "specversion": "1.0",
            "type": "alikafka:Topic:Message",
            "datacontenttype": "application/json; charset=utf-8",
            "time": "2025-02-17T01:43:49.123Z",
            "subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic",
            "aliyunaccountid": "1******6789"
        }

        Tentukan aturan JSONPath berdasarkan nama kolom tabel. Misalnya, jika nama kolom tabel adalah userid, tentukan $.data.value.userid sebagai aturan ekstraksi nilai.

        Nama Pengguna Basis Data

        Nama pengguna yang digunakan untuk mengakses akun basis data.

        user

        Kata Sandi Basis Data

        Kata sandi yang digunakan untuk mengakses akun basis data.

        ******

        Pengaturan Jaringan

        • VPC: Pesan di ApsaraMQ for Kafka dikirim ke AnalyticDB dalam virtual private cloud (VPC).

        • Internet: Pesan di ApsaraMQ for Kafka dikirim ke AnalyticDB melalui Internet.

        VPC

        VPC

        ID VPC. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        ID vSwitch. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.

        Penting

        Setelah Anda memilih vSwitch, Anda harus menambahkan blok CIDR tempat vSwitch termasuk ke dalam daftar putih alamat IP instance AnalyticDB for MySQL. Untuk informasi lebih lanjut, lihat Daftar Putih Alamat IP.

        vsw-bp1gbjhj53hdjdkg****

        Grup Keamanan

        ID grup keamanan. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.

        test_group

  4. Kembali ke halaman Tasks, temukan konektor sink OSS yang Anda buat, lalu klik Enable di kolom Actions.

  5. Di pesan Note, klik OK.

    Konektor sink memerlukan waktu 30 hingga 60 detik untuk diaktifkan. Anda dapat melihat kemajuan di kolom Status pada halaman Tasks.

Langkah 3: Uji Konektor Sink AnalyticDB

  1. Di halaman Tasks, temukan konektor sink AnalyticDB yang Anda buat dan klik nama topik sumber di kolom Event Source.

  2. Di halaman Detail Topik, klik Send Message.
  3. Di panel Start to Send and Consume Message, konfigurasikan parameter berdasarkan gambar berikut dan klik OK.

    Catatan

    Dalam contoh ini, isi pesan adalah string JSON yang berisi semua kolom tabel data yang dibuat. Sistem menulis nilai bidang yang memiliki nama sama dengan kolom di tabel data ke kolom yang sesuai.

    image

  4. Di halaman Tasks, temukan konektor sink AnalyticDB yang Anda buat dan klik nama instance tujuan di kolom Event Target.

  5. Di sudut kanan atas halaman Basic Information, klik Log on to Database.

  6. Di konsol Data Management (DMS), jalankan pernyataan berikut untuk menanyakan semua data di tabel:

    SELECT * FROM adb_sink_table;

    Gambar berikut menunjukkan hasil kueri.image