Topik ini menjelaskan cara membuat konektor sink Tablestore untuk menyinkronkan data dari topik sumber pada instance ApsaraMQ for Kafka ke tabel dalam instance Tablestore.
Prasyarat
Tablestore telah diaktifkan dan sebuah instance telah dibuat. Untuk informasi lebih lanjut, lihat Aktifkan Tablestore dan buat instance.
Sebuah instance ApsaraMQ for Kafka telah dibeli dan diterapkan, serta sebuah topik telah dibuat pada instance tersebut. Untuk informasi lebih lanjut, lihat Langkah 2: Beli dan terapkan instance dan Langkah 3: Buat sumber daya.
Saat Anda membuat konektor sink Tablestore, peran yang ditautkan ke layanan akan dibuat secara otomatis. Anda harus menambahkan kebijakan
AliyunOTSFullAccesssecara manual ke peran tersebut. Kebijakan ini digunakan untuk memberikan izin kepada peran yang ditautkan ke layanan untuk mengakses Tablestore. Untuk informasi lebih lanjut, lihat Metode 1: Berikan izin kepada peran RAM dengan mengklik Izinkan pada halaman Peran.
Langkah 1: Buat tabel Tablestore
Buat tabel Tablestore untuk menyinkronkan data dari ApsaraMQ for Kafka ke Tablestore. Untuk informasi lebih lanjut, lihat Prosedur.
Dalam contoh ini, sebuah instance bernama ots-sink dan tabel data bernama ots_sink_table dibuat. Kunci utama topic, partition, dan offset ditentukan saat tabel data dibuat.
Langkah 2: Buat dan mulai konektor sink Tablestore
Masuk ke Konsol ApsaraMQ for Kafka. Di bagian Resource Distribution pada halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Di panel navigasi sisi kiri, pilih .
Di halaman Tasks, klik Create Task.
Di halaman Create Task, konfigurasikan parameter Task Name dan Description dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Save. Bagian berikut menjelaskan parameter-parameter tersebut:
Pembuatan Tugas
Di langkah Source, atur parameter Data Provider menjadi ApsaraMQ for Kafka dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Next Step. Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter
Deskripsi
Contoh
Region
Wilayah tempat instance sumber ApsaraMQ for Kafka berada.
Tiongkok (Beijing)
ApsaraMQ for Kafka Instance
Instance ApsaraMQ for Kafka tempat pesan yang ingin Anda rutekan diproduksi.
alikafka_post-cn-jte3****
Topic
Topik pada instance ApsaraMQ for Kafka tempat pesan yang ingin Anda rutekan diproduksi.
demo-topic
Group ID
Nama grup konsumen pada instance ApsaraMQ for Kafka sumber.
Quickly Create: Sistem secara otomatis membuat grup konsumen dengan nama dalam format
GID_EVENTBRIDGE_xxx. Kami merekomendasikan Anda memilih nilai ini.Use Existing Group: Pilih ID grup yang ada yang tidak sedang digunakan. Jika Anda memilih grup yang ada yang sedang digunakan, publikasi dan langganan pesan yang ada akan terpengaruh.
Buat Cepat
Consumer Offset
Offset dari mana pesan dikonsumsi. Nilai valid:
Latest Offset
Earliest Offset
Offset Terbaru
Network Configuration
Jenis jaringan tempat Anda ingin merutekan pesan. Nilai valid:
Basic Network
Self-managed Internet
Jaringan Dasar
VPC
ID virtual private cloud (VPC) tempat instance ApsaraMQ for Kafka diterapkan. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.
vpc-bp17fapfdj0dwzjkd****
vSwitch
ID vSwitch tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.
vsw-bp1gbjhj53hdjdkg****
Security Group
ID grup keamanan tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.
alikafka_pre-cn-7mz2****
Data Format
Fitur format data digunakan untuk mengkodekan data biner yang dikirim dari sumber ke dalam format data tertentu. Beberapa format data didukung. Jika Anda tidak memiliki persyaratan khusus untuk pengkodean, tentukan Json sebagai nilainya.
Json: Data biner dikodekan menjadi data berformat JSON berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload.
Text: Data biner dikodekan menjadi string berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload. Ini adalah nilai default.
Binary: Data biner dikodekan menjadi string berdasarkan pengkodean Base64 dan kemudian dimasukkan ke dalam payload.
Json
Messages
Jumlah maksimum pesan yang dapat dikirim dalam setiap pemanggilan fungsi. Permintaan hanya dikirim ketika jumlah pesan dalam backlog mencapai nilai yang ditentukan. Nilai valid: 1 hingga 10000.
100
Interval (Unit: Seconds)
Interval waktu di mana fungsi dipanggil. Sistem mengirimkan pesan yang terkumpul ke Function Compute pada interval waktu yang ditentukan. Nilai valid: 0 hingga 15. Satuan: detik. Nilai 0 menentukan bahwa pesan dikirim segera setelah pengumpulan.
3
Di langkah Filtering, definisikan pola data di editor kode Pattern Content untuk menyaring data. Untuk informasi lebih lanjut, lihat Pola acara.
Di langkah Transformation, tentukan metode pembersihan data untuk mengimplementasikan kemampuan pemisahan, pemetaan, pengayaan, dan pengarahan data. Untuk informasi lebih lanjut, lihat Gunakan Function Compute untuk melakukan pembersihan pesan.
Di langkah Sink, atur parameter Service Type menjadi Tablestore dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter
Deskripsi
Contoh
Instance Name
Nama instance Tablestore yang Anda buat.
ost-sink
Destination Table
Tabel data Tablestore yang Anda buat.
ost_sink_table
Primary Key
Metode yang ingin Anda gunakan untuk menghasilkan kunci utama dan kolom atribut di Tablestore. Anda harus mendefinisikan aturan dalam sintaks JSONPath untuk mengekstrak konten setiap kolom atribut. Jika Anda mengatur parameter Data Format menjadi Json di langkah Source, format data yang diteruskan dari ApsaraMQ for Kafka ditampilkan seperti pada kode berikut:
{ "data": { "topic": "demo-topic", "partition": 0, "offset": 2, "timestamp": 1739756629123, "headers": { "headers": [], "isReadOnly": false }, "key":"ots-sink-k1", "value": "ots-sink-v1" }, "id": "7702ca16-f944-4b08-***-***-0-2", "source": "acs:alikafka", "specversion": "1.0", "type": "alikafka:Topic:Message", "datacontenttype": "application/json; charset=utf-8", "time": "2025-02-17T01:43:49.123Z", "subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic", "aliyunaccountid": "1******6789" }Sebagai contoh, Anda dapat menentukan topic sebagai nama kunci utama dan
$.data.topicsebagai aturan ekstraksi numerik.Attribute Column
Sebagai contoh, Anda dapat menentukan key sebagai nama kolom atribut dan
$.data.keysebagai aturan ekstraksi numerik.Operation Mode
Mode di mana data ditulis ke Tablestore. Nilai valid:
put: Jika kunci utama dari dua entri data sama, entri data baru akan menimpa entri data lama.
update: Jika kunci utama dari dua entri data sama, entri data baru ditulis ke baris dan entri data lama tetap dipertahankan.
delete: Kunci yang ditentukan dihapus.
put
Konfigurasi Jaringan
VPC: Pesan di ApsaraMQ for Kafka dikirim ke Tablestore dalam virtual private cloud (VPC).
Internet: Pesan di ApsaraMQ for Kafka dikirim ke Tablestore melalui Internet.
VPC
VPC
ID VPC. Parameter ini hanya diperlukan jika Anda mengatur parameter Konfigurasi Jaringan menjadi VPC.
vpc-bp17fapfdj0dwzjkd****
vSwitch
ID vSwitch. Parameter ini hanya diperlukan jika Anda mengatur parameter Konfigurasi Jaringan menjadi VPC.
vsw-bp1gbjhj53hdjdkg****
Grup Keamanan
ID grup keamanan. Parameter ini hanya diperlukan jika Anda mengatur parameter Konfigurasi Jaringan menjadi VPC.
test_group
Properti Tugas
Konfigurasikan kebijakan ulang yang ingin Anda gunakan saat acara gagal didorong dan metode yang ingin Anda gunakan untuk menangani titik kegagalan. Untuk informasi lebih lanjut, lihat Kebijakan ulang dan antrian pesan gagal.
Kembali ke halaman Tasks, temukan konektor sink Tablestore yang Anda buat, lalu klik Enable di kolom Actions.
Di pesan Note, klik OK.
Konektor sink memerlukan waktu 30 hingga 60 detik untuk diaktifkan. Anda dapat melihat kemajuan di kolom Status pada halaman Tasks.
Langkah 3: Uji konektor sink Tablestore
Di halaman Tasks, temukan konektor sink Tablestore yang Anda buat dan klik nama topik sumber di kolom Event Source.
- Di halaman Detail Topik, klik Send Message.
Di panel Start to Send and Consume Message, konfigurasikan parameter berdasarkan gambar berikut dan klik OK.

Di halaman Tasks, temukan konektor sink Tablestore yang Anda buat dan klik nama tabel tujuan di kolom Event Target.
Di tab Query Data pada halaman Manage Table, lihat data yang disimpan dalam tabel Tablestore.
