Topik ini menjelaskan cara membuat konektor sink Object Storage Service (OSS) untuk menyinkronkan data dari topik di ApsaraMQ for Kafka ke objek di OSS.
Prasyarat
Untuk informasi tentang prasyarat, lihat Prasyarat.
Catatan Penggunaan
Konektor sink OSS mengarahkan data ke OSS berdasarkan waktu pemrosesan acara, bukan waktu pembuatan acara. Jika Anda mengonfigurasi subdirektori berdasarkan waktu saat membuat konektor sink OSS, data yang dihasilkan pada batas waktu mungkin dikirim ke subdirektori berikutnya.
Pemrosesan data kotor: Jika Anda menggunakan sintaks JSONPath untuk mengonfigurasi jalur kustom atau konten objek saat membuat konektor sink OSS, konektor akan mengarahkan data yang tidak memenuhi sintaks JSONPath ke direktori bernama invalidRuleData/ di bucket berdasarkan kebijakan batch yang Anda konfigurasikan. Jika direktori invalidRuleData/ ditampilkan di bucket, periksa apakah sintaks JSONPath benar dan pastikan semua pesan dikonsumsi oleh konsumen.
Latensi berkisar dari beberapa detik hingga menit mungkin terjadi dalam pengalihan.
Jika badan pesan dalam topik sumber ApsaraMQ for Kafka perlu diekstraksi berdasarkan sintaks JSONPath yang digunakan untuk jalur kustom atau konten objek, Anda harus mengkodekan atau mendekode badan pesan ke format JSON di topik sumber ApsaraMQ for Kafka.
Konektor sink OSS menulis data dari aplikasi upstream ke OSS dengan menambahkan data ke objek yang ada secara real-time. Oleh karena itu, dalam jalur tanpa subdirektori, data sedang ditulis ke objek terlihat terbaru. Dalam hal ini, berhati-hatilah saat Anda mengonsumsi pesan.
Aturan Penagihan
Konektor sink OSS berjalan di Alibaba Cloud Function Compute. Saat data dalam konektor sink OSS diproses dan ditransmisikan, Anda dikenakan biaya untuk sumber daya Function Compute yang dikonsumsi. Untuk informasi lebih lanjut, lihat Ikhtisar Penagihan.
Langkah 1: Buat Sumber Daya OSS
Buat bucket di konsol OSS. Untuk informasi lebih lanjut, lihat Buat Bucket.
Dalam topik ini, sebuah bucket bernama oss-sink-connector-bucket dibuat.
Langkah 2: Buat dan Mulai Konektor Sink OSS
Masuk ke Konsol ApsaraMQ for Kafka. Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Di panel navigasi di sebelah kiri, pilih .
Di halaman Tasks, klik Create Task.
Pembuatan Tugas
Di langkah Source, atur parameter Data Provider menjadi ApsaraMQ for Kafka dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Next Step. Tabel berikut menjelaskan parameter tersebut.
Parameter
Deskripsi
Contoh
Region
Wilayah tempat instance sumber ApsaraMQ for Kafka berada.
Tiongkok (Beijing)
ApsaraMQ for Kafka Instance
Instance ApsaraMQ for Kafka di mana pesan yang ingin Anda arahkan diproduksi.
alikafka_post-cn-jte3****
Topic
Topik pada instance ApsaraMQ for Kafka di mana pesan yang ingin Anda arahkan diproduksi.
demo-topic
Group ID
Nama grup konsumen pada instance sumber ApsaraMQ for Kafka.
Quickly Create: Sistem secara otomatis membuat grup konsumen dengan nama dalam format
GID_EVENTBRIDGE_xxx. Kami merekomendasikan Anda memilih nilai ini.Use Existing Group: Pilih ID grup yang ada yang tidak sedang digunakan. Jika Anda memilih grup yang ada yang sedang digunakan, publikasi dan langganan pesan yang ada akan terpengaruh.
Buat Cepat
Consumer Offset
Offset dari mana pesan dikonsumsi. Nilai valid:
Latest Offset
Earliest Offset
Offset Terbaru
Network Configuration
Jenis jaringan tempat Anda ingin mengarahkan pesan. Nilai valid:
Basic Network
Self-managed Internet
Jaringan Dasar
VPC
ID virtual private cloud (VPC) tempat instance ApsaraMQ for Kafka diterapkan. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.
vpc-bp17fapfdj0dwzjkd****
vSwitch
ID vSwitch tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.
vsw-bp1gbjhj53hdjdkg****
Security Group
ID grup keamanan tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.
alikafka_pre-cn-7mz2****
Data Format
Fitur format data digunakan untuk mengkodekan data biner yang dikirim dari sumber ke format data tertentu. Beberapa format data didukung. Jika Anda tidak memiliki persyaratan khusus untuk pengkodean, tentukan Json sebagai nilainya.
Json: Data biner dikodekan menjadi data berformat JSON berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload.
Text: Data biner dikodekan menjadi string berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload. Ini adalah nilai default.
Binary: Data biner dikodekan menjadi string berdasarkan pengkodean Base64 dan kemudian dimasukkan ke dalam payload.
Json
Messages
Jumlah maksimum pesan yang dapat dikirim dalam setiap pemanggilan fungsi. Permintaan hanya dikirim ketika jumlah pesan dalam backlog mencapai nilai yang ditentukan. Nilai valid: 1 hingga 10000.
100
Interval (Unit: Seconds)
Interval waktu pemanggilan fungsi. Sistem mengirimkan pesan yang terkumpul ke Function Compute pada interval waktu yang ditentukan. Nilai valid: 0 hingga 15. Unit: detik. Nilai 0 menentukan bahwa pesan dikirim segera setelah pengumpulan.
3
Di langkah Filtering, definisikan pola data di editor kode Pattern Content untuk menyaring data. Untuk informasi lebih lanjut, lihat Pola Acara.
Di langkah Transformation, tentukan metode pembersihan data untuk mengimplementasikan kemampuan pemisahan, pemetaan, pengayaan, dan pengalihan data. Untuk informasi lebih lanjut, lihat Gunakan Function Compute untuk melakukan pembersihan pesan.
Di langkah Sink, atur parameter Service Type menjadi OSS dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Save. Tabel berikut menjelaskan parameter tersebut.
Parameter
Deskripsi
Contoh
OSS Bucket
Bucket OSS yang Anda buat.
PentingPastikan bucket yang ditentukan dibuat secara manual dan tidak dihapus saat konektor berjalan.
Saat membuat bucket, atur parameter Kelas Penyimpanan menjadi Standard atau IA. Bucket Arsip tidak didukung oleh konektor sink ApsaraMQ for Kafka.
Setelah Anda membuat konektor sink OSS, jalur file sistem .tmp/ dihasilkan di direktori level-1 bucket OSS. Jangan hapus atau gunakan objek OSS di jalur tersebut.
oss-sink-connector-bucket
Storage Path
Objek tempat pesan yang diarahkan disimpan. Kunci objek OSS terdiri dari jalur dan nama. Sebagai contoh, jika parameter ObjectKey diatur ke
a/b/c/a.txt, jalur objek adalaha/b/c/dan nama objek adalaha.txt. Anda dapat menentukan nilai kustom untuk jalur objek. Nama objek dihasilkan oleh konektor berdasarkan format berikut:{Timestamp UNIX dalam milidetik }_{String acak 8-bit}. Contoh: 1705576353794_elJmxu3v.Jika Anda mengatur parameter ini ke garis miring maju (/), tidak ada subdirektori yang tersedia di bucket. Data disimpan di direktori level-1 bucket.
Variabel waktu {yyyy}, {MM}, {dd}, dan {HH} dapat digunakan dalam nilai parameter ini. Variabel ini peka huruf besar/kecil dan menentukan tahun, bulan, hari, dan jam, masing-masing.
Sintaks JSONPath dapat digunakan dalam nilai parameter ini. Contoh: {$.data.topic} dan {$.data.partition}. Variabel JSONPath harus memenuhi persyaratan ekspresi JSONPath standar. Untuk mencegah penulisan data yang tidak normal, kami merekomendasikan agar nilai yang diekstraksi menggunakan JSONPath bertipe int atau string, berisi karakter yang dikodekan dalam UTF-8, dan tidak berisi spasi, dua titik berturut-turut(.), emoji, garis miring maju (/), atau garis miring balik (\).
Konstanta dapat digunakan dalam nilai parameter ini.
CatatanSubdirektori memungkinkan Anda mengelompokkan data dengan tepat. Ini membantu mencegah masalah yang disebabkan oleh sejumlah besar objek kecil dalam satu subdirektori.
Throughput konektor sink OSS berkorelasi positif dengan jumlah subdirektori. Jika tidak ada subdirektori atau jumlah subdirektori kecil yang dikonfigurasikan, throughput konektor rendah. Ini dapat menyebabkan akumulasi pesan di aplikasi upstream. Sejumlah besar subdirektori dapat menyebabkan masalah seperti data tersebar, jumlah tulisan meningkat, dan bagian berlebihan. Kami merekomendasikan Anda mengonfigurasi subdirektori berdasarkan saran berikut:
Topik ApsaraMQ for Kafka sumber: Anda dapat mengonfigurasi subdirektori berdasarkan waktu dan partisi. Dengan cara ini, Anda dapat meningkatkan jumlah partisi pada instance ApsaraMQ for Kafka untuk meningkatkan throughput konektor. Contoh: prefix/{yyyy}/{MM}/{dd}/{HH}/{$.data.partition}/.
Grup bisnis: Anda dapat mengonfigurasi subdirektori menggunakan bidang bisnis tertentu dari data. Throughput konektor ditentukan oleh jumlah nilai bidang bisnis. Contoh: prefixV2/{$.data.body.field}/.
Kami merekomendasikan Anda mengonfigurasi awalan konstan yang berbeda untuk konektor sink OSS yang berbeda. Ini mencegah beberapa konektor menulis data ke direktori yang sama.
alikafka_post-cn-9dhsaassdd****/guide-oss-sink-topic/YYYY/MM/dd/HH
Batch Aggregation Object Size
Ukuran objek yang akan diagregasi. Nilai valid: 1 hingga 1024. Unit: MB.
CatatanKonektor sink OSS menulis data ke objek OSS yang sama dalam batch. Ukuran data setiap batch lebih besar dari 0 MB dan sama dengan atau kurang dari 16 MB. Oleh karena itu, ukuran objek OSS mungkin sedikit lebih besar dari nilai yang dikonfigurasikan. Kelebihan ukuran hingga 16 MB.
Dalam skenario lalu lintas tinggi, kami merekomendasikan Anda mengatur parameter Ukuran Objek Agregasi Batch menjadi nilai lebih besar dari 100 MB dan parameter Jendela Waktu Agregasi Batch menjadi nilai lebih besar dari 1 jam. Contoh untuk parameter Ukuran Objek Agregasi Batch: 128 MB dan 512 MB. Contoh untuk parameter Jendela Waktu Agregasi Batch: 60 menit dan 120 menit.
5
Batch Aggregation Time Window
Jendela waktu untuk agregasi. Nilai valid: 1 hingga 1440. Unit: menit.
1
File Compression
Tidak Diperlukan Kompresi: menghasilkan objek tanpa akhiran.
GZIP: menghasilkan objek dengan akhiran .gz.
Snappy: menghasilkan objek dengan akhiran .snappy.
Zstd: menghasilkan objek dengan akhiran .zstd.
Jika objek akan dikompresi, konektor sink OSS menulis data dalam batch berdasarkan ukuran data sebelum kompresi. Akibatnya, ukuran objek yang ditampilkan di OSS lebih kecil dari ukuran batch. Setelah objek didekompresi, ukuran objek mendekati ukuran batch.
Tidak Diperlukan Kompresi
File Content
Data Lengkap: Konektor menggunakan spesifikasi CloudEvents untuk mengemas pesan asli. Jika Anda memilih nilai ini, data yang diarahkan ke OSS mencakup metadata dalam spesifikasi CloudEvents. Contoh kode berikut memberikan contohnya. Dalam contoh kode, bidang data menentukan data dan bidang lainnya menentukan metadata dalam spesifikasi CloudEvents.
{ "specversion": "1.0", "id": "8e215af8-ca18-4249-8645-f96c1026****", "source": "acs:alikafka", "type": "alikafka:Topic:Message", "subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-23T02:49:51.589Z", "aliyunaccountid": "182572506381****", "data": { "topic": "****", "partition": 7, "offset": 25, "timestamp": 1655952591589, "headers": { "headers": [], "isReadOnly": false }, "key": "keytest", "value": "hello kafka msg" } }Ekstraksi Data: data yang diekstraksi menggunakan JSONPath diarahkan ke OSS. Sebagai contoh, jika Anda mengonfigurasi ekspresi $.data, hanya nilai dari bidang data yang diarahkan ke OSS.
Jika Anda tidak memerlukan bidang tambahan dalam spesifikasi CloudEvents, kami merekomendasikan Anda memilih Ekstraksi Data dan mengonfigurasi ekspresi $.data untuk mengarahkan pesan asli ke OSS. Ini membantu mengurangi biaya dan meningkatkan efisiensi transmisi.
Ekstraksi Data
$.data
Properti Tugas
Konfigurasikan kebijakan pengulangan yang ingin Anda gunakan saat acara gagal didorong dan metode yang ingin Anda gunakan untuk menangani titik kegagalan. Untuk informasi lebih lanjut, lihat Kebijakan Pengulangan dan Antrian Pesan Gagal.
Kembali ke halaman Tasks, temukan konektor sink OSS yang Anda buat, lalu klik Enable di kolom Actions.
Dalam pesan Note, klik OK.
Konektor sink membutuhkan waktu 30 hingga 60 detik untuk diaktifkan. Anda dapat melihat kemajuannya di kolom Status pada halaman Tasks.
Langkah 3: Uji Konektor Sink OSS
Di halaman Tasks, temukan konektor sink OSS yang Anda buat dan klik topik sumber di kolom Event Source.
- Di halaman Detail Topik, klik Send Message.
Di panel Start to Send and Consume Message, konfigurasikan parameter berdasarkan gambar berikut dan klik OK.

Di halaman Tasks, temukan konektor sink OSS yang Anda buat dan klik bucket tujuan di kolom Event Target.
Di panel navigasi di sebelah kiri halaman yang muncul, pilih .
Direktori /tmp: jalur file sistem tempat konektor bergantung. Jangan hapus atau gunakan objek OSS di jalur ini.
Direktori file data: Subdirektori dihasilkan di direktori ini berdasarkan jalur yang Anda konfigurasikan untuk konektor. Objek data diunggah ke direktori terdalam.

Temukan objek yang ingin Anda kelola dan pilih di kolom Actions.
Buka objek yang diunduh untuk melihat detail pesan.

Gambar di atas memberikan contoh detail pesan. Beberapa pesan dipisahkan dengan baris baru.