全部产品
Search
文档中心

ApsaraMQ for Kafka:Buat Konektor Sink Tablestore

更新时间:Jul 02, 2025

Topik ini menjelaskan cara membuat konektor sink Tablestore untuk menyinkronkan data dari topik sumber pada instance ApsaraMQ for Kafka ke tabel dalam instance Tablestore.

Prasyarat

Langkah 1: Buat tabel Tablestore

Buat tabel Tablestore untuk menyinkronkan data dari ApsaraMQ for Kafka ke Tablestore. Untuk informasi lebih lanjut, lihat Prosedur.

Dalam contoh ini, sebuah instance bernama ots-sink dan tabel data bernama ots_sink_table dibuat. Kunci utama topic, partition, dan offset ditentukan saat tabel data dibuat.image

Langkah 2: Buat dan mulai konektor sink Tablestore

  1. Masuk ke Konsol ApsaraMQ for Kafka. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.

  2. Di panel navigasi sisi kiri, pilih Connector Ecosystem Integration > Tasks.

  3. Di halaman Tasks, klik Create Task.

  4. Di halaman Create Task, konfigurasikan parameter Task Name dan Description dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Save. Bagian berikut menjelaskan parameter-parameter tersebut:

    • Pembuatan Tugas

      1. Di langkah Source, atur parameter Data Provider menjadi ApsaraMQ for Kafka dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Next Step. Tabel berikut menjelaskan parameter-parameter tersebut.

        Parameter

        Deskripsi

        Contoh

        Region

        Wilayah tempat instance sumber ApsaraMQ for Kafka berada.

        Tiongkok (Beijing)

        ApsaraMQ for Kafka Instance

        Instance ApsaraMQ for Kafka tempat pesan yang ingin Anda rutekan diproduksi.

        alikafka_post-cn-jte3****

        Topic

        Topik pada instance ApsaraMQ for Kafka tempat pesan yang ingin Anda rutekan diproduksi.

        demo-topic

        Group ID

        Nama grup konsumen pada instance ApsaraMQ for Kafka sumber.

        • Quickly Create: Sistem secara otomatis membuat grup konsumen dengan nama dalam format GID_EVENTBRIDGE_xxx. Kami merekomendasikan Anda memilih nilai ini.

        • Use Existing Group: Pilih ID grup yang ada yang tidak sedang digunakan. Jika Anda memilih grup yang ada yang sedang digunakan, publikasi dan langganan pesan yang ada akan terpengaruh.

        Buat Cepat

        Consumer Offset

        Offset dari mana pesan dikonsumsi. Nilai valid:

        • Latest Offset

        • Earliest Offset

        Offset Terbaru

        Network Configuration

        Jenis jaringan tempat Anda ingin merutekan pesan. Nilai valid:

        • Basic Network

        • Self-managed Internet

        Jaringan Dasar

        VPC

        ID virtual private cloud (VPC) tempat instance ApsaraMQ for Kafka diterapkan. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        ID vSwitch tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.

        vsw-bp1gbjhj53hdjdkg****

        Security Group

        ID grup keamanan tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.

        alikafka_pre-cn-7mz2****

        Data Format

        Fitur format data digunakan untuk mengkodekan data biner yang dikirim dari sumber ke dalam format data tertentu. Beberapa format data didukung. Jika Anda tidak memiliki persyaratan khusus untuk pengkodean, tentukan Json sebagai nilainya.

        • Json: Data biner dikodekan menjadi data berformat JSON berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload.

        • Text: Data biner dikodekan menjadi string berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload. Ini adalah nilai default.

        • Binary: Data biner dikodekan menjadi string berdasarkan pengkodean Base64 dan kemudian dimasukkan ke dalam payload.

        Json

        Messages

        Jumlah maksimum pesan yang dapat dikirim dalam setiap pemanggilan fungsi. Permintaan hanya dikirim ketika jumlah pesan dalam backlog mencapai nilai yang ditentukan. Nilai valid: 1 hingga 10000.

        100

        Interval (Unit: Seconds)

        Interval waktu di mana fungsi dipanggil. Sistem mengirimkan pesan yang terkumpul ke Function Compute pada interval waktu yang ditentukan. Nilai valid: 0 hingga 15. Satuan: detik. Nilai 0 menentukan bahwa pesan dikirim segera setelah pengumpulan.

        3

      2. Di langkah Filtering, definisikan pola data di editor kode Pattern Content untuk menyaring data. Untuk informasi lebih lanjut, lihat Pola acara.

      3. Di langkah Transformation, tentukan metode pembersihan data untuk mengimplementasikan kemampuan pemisahan, pemetaan, pengayaan, dan pengarahan data. Untuk informasi lebih lanjut, lihat Gunakan Function Compute untuk melakukan pembersihan pesan.

      4. Di langkah Sink, atur parameter Service Type menjadi Tablestore dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Tabel berikut menjelaskan parameter-parameter tersebut.

        Parameter

        Deskripsi

        Contoh

        Instance Name

        Nama instance Tablestore yang Anda buat.

        ost-sink

        Destination Table

        Tabel data Tablestore yang Anda buat.

        ost_sink_table

        Primary Key

        Metode yang ingin Anda gunakan untuk menghasilkan kunci utama dan kolom atribut di Tablestore. Anda harus mendefinisikan aturan dalam sintaks JSONPath untuk mengekstrak konten setiap kolom atribut. Jika Anda mengatur parameter Data Format menjadi Json di langkah Source, format data yang diteruskan dari ApsaraMQ for Kafka ditampilkan seperti pada kode berikut:

        {
            "data": {
                "topic": "demo-topic",
                "partition": 0,
                "offset": 2,
                "timestamp": 1739756629123,
                "headers": {
                    "headers": [],
                    "isReadOnly": false
                },
                "key":"ots-sink-k1",
                "value": "ots-sink-v1"
            },
            "id": "7702ca16-f944-4b08-***-***-0-2",
            "source": "acs:alikafka",
            "specversion": "1.0",
            "type": "alikafka:Topic:Message",
            "datacontenttype": "application/json; charset=utf-8",
            "time": "2025-02-17T01:43:49.123Z",
            "subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic",
            "aliyunaccountid": "1******6789"
        }

        Sebagai contoh, Anda dapat menentukan topic sebagai nama kunci utama dan $.data.topic sebagai aturan ekstraksi numerik.

        Attribute Column

        Sebagai contoh, Anda dapat menentukan key sebagai nama kolom atribut dan $.data.key sebagai aturan ekstraksi numerik.

        Operation Mode

        Mode di mana data ditulis ke Tablestore. Nilai valid:

        • put: Jika kunci utama dari dua entri data sama, entri data baru akan menimpa entri data lama.

        • update: Jika kunci utama dari dua entri data sama, entri data baru ditulis ke baris dan entri data lama tetap dipertahankan.

        • delete: Kunci yang ditentukan dihapus.

        put

        Konfigurasi Jaringan

        • VPC: Pesan di ApsaraMQ for Kafka dikirim ke Tablestore dalam virtual private cloud (VPC).

        • Internet: Pesan di ApsaraMQ for Kafka dikirim ke Tablestore melalui Internet.

        VPC

        VPC

        ID VPC. Parameter ini hanya diperlukan jika Anda mengatur parameter Konfigurasi Jaringan menjadi VPC.

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        ID vSwitch. Parameter ini hanya diperlukan jika Anda mengatur parameter Konfigurasi Jaringan menjadi VPC.

        vsw-bp1gbjhj53hdjdkg****

        Grup Keamanan

        ID grup keamanan. Parameter ini hanya diperlukan jika Anda mengatur parameter Konfigurasi Jaringan menjadi VPC.

        test_group

    • Properti Tugas

      Konfigurasikan kebijakan ulang yang ingin Anda gunakan saat acara gagal didorong dan metode yang ingin Anda gunakan untuk menangani titik kegagalan. Untuk informasi lebih lanjut, lihat Kebijakan ulang dan antrian pesan gagal.

  5. Kembali ke halaman Tasks, temukan konektor sink Tablestore yang Anda buat, lalu klik Enable di kolom Actions.

  6. Di pesan Note, klik OK.

    Konektor sink memerlukan waktu 30 hingga 60 detik untuk diaktifkan. Anda dapat melihat kemajuan di kolom Status pada halaman Tasks.

Langkah 3: Uji konektor sink Tablestore

  1. Di halaman Tasks, temukan konektor sink Tablestore yang Anda buat dan klik nama topik sumber di kolom Event Source.

  2. Di halaman Detail Topik, klik Send Message.
  3. Di panel Start to Send and Consume Message, konfigurasikan parameter berdasarkan gambar berikut dan klik OK.

    image

  4. Di halaman Tasks, temukan konektor sink Tablestore yang Anda buat dan klik nama tabel tujuan di kolom Event Target.

  5. Di tab Query Data pada halaman Manage Table, lihat data yang disimpan dalam tabel Tablestore.

    image