Saat aplikasi Anda menghasilkan data streaming ke ApsaraMQ for Kafka, Anda mungkin perlu menyimpan pesan tersebut di Object Storage Service (OSS) untuk pengarsipan, analitik, atau pemrosesan lanjutan. Konektor sink OSS mengotomatiskan alur ini: ApsaraMQ for Kafka mengirimkan pesan ke Function Compute (FC), yang kemudian menuliskannya ke bucket OSS Anda.
Prasyarat
Sebelum memulai, pastikan Anda telah:
Memiliki instans ApsaraMQ for Kafka dengan fitur konektor diaktifkan
Sebuah topik yang dibuat di instans ApsaraMQ for Kafka
Sebuah Bucket OSS yang dibuat di Konsol OSS
Mengaktifkan Function Compute (konektor meneruskan data melalui FC untuk mencapai OSS)
Batasan
Persyaratan wilayah yang sama: Instans ApsaraMQ for Kafka, bucket OSS, dan FC harus berada di wilayah yang sama. Untuk informasi selengkapnya, lihat Batasan.
Hanya UTF-8: ApsaraMQ for Kafka melakukan serialisasi pesan menjadi string berkode UTF-8. Data biner tidak didukung.
Biaya: FC menyediakan kuota sumber daya gratis. Penggunaan melebihi kuota tersebut akan ditagih sesuai dengan aturan penagihan FC. Untuk memantau eksekusi fungsi, konfigurasikan pencatatan log FC.
Buat dan terapkan konektor
Langkah 1: Mulai panduan pembuatan konektor
Masuk ke Konsol ApsaraMQ for Kafka.
Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.
Di panel navigasi sebelah kiri, klik Connectors.
Di halaman Connectors, pilih instans Anda dari daftar drop-down Select Instance dan klik Create Connector.
Langkah 2: Konfigurasikan informasi dasar
Pada langkah Configure Basic Information, atur parameter berikut lalu klik Next.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| Name | Nama unik untuk konektor. Harus terdiri dari 1–48 karakter dan dapat berisi angka, huruf kecil, serta tanda hubung (-). Tidak boleh diawali dengan tanda hubung. Kelompok konsumen bernama connect-<connector-name> akan dibuat secara otomatis. | kafka-oss-sink |
| Instance | Menampilkan nama dan ID instans saat ini. | demo alikafka_post-cn-st21p8vj\*\*\*\* |
Secara default, opsi Authorize to Create Service Linked Role dipilih. ApsaraMQ for Kafka akan membuat peran terkait layanan jika belum tersedia. Peran ini memberikan izin yang diperlukan untuk menyinkronkan data dari ApsaraMQ for Kafka ke OSS.
Langkah 3: Konfigurasikan layanan sumber
Pada langkah Configure Source Service, pilih Message Queue for Apache Kafka sebagai layanan sumber, atur parameter berikut, lalu klik Next.
Parameter wajib
| Parameter | Deskripsi | Contoh |
|---|---|---|
| Data Source Topic | Topik tempat pesan diekspor. | oss-test-input |
| Consumer Thread Concurrency | Jumlah thread konsumen konkuren. Nilai yang valid: 1, 2, 3, 6, 12. Default: 6. | 6 |
| Consumer Offset | Titik awal konsumsi pesan. Earliest Offset membaca dari awal. Latest Offset hanya membaca pesan baru. | Earliest Offset |
Parameter lingkungan runtime
Klik Configure Runtime Environment untuk memperluas parameter berikut.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| VPC ID | VPC tempat tugas sinkronisasi dijalankan. Secara default menggunakan VPC dari instans sumber. | vpc-bp1xpdnd3l\*\*\* |
| vSwitch ID | vSwitch yang terhubung ke instans sumber. Harus berada dalam VPC yang sama dengan instans. Secara default menggunakan vSwitch dari instans sumber. | vsw-bp1d2jgg81\*\*\* |
| Failure Handling Policy | Cara menangani kegagalan pengiriman pesan. Continue Subscription tetap mengonsumsi dari partisi dan mencatat error. Stop Subscription menghentikan konsumsi dari partisi dan mencatat error. Untuk informasi selengkapnya, lihat Mengelola konektor dan Kode error. | Continue Subscription |
| Resource Creation Method | Apakah topik internal yang diperlukan dibuat secara otomatis (Auto) atau manual (Manual). Pilih Auto kecuali Anda memerlukan pengaturan topik kustom. | Auto |
Topik internal (hanya untuk pembuatan manual)
Jika Anda mengatur Resource Creation Method ke Manual, buat topik internal berikut sebelum melanjutkan. Klik Configure Runtime Environment untuk menampilkan parameter ini.
| Parameter | Awalan penamaan | Partisi | Mesin penyimpanan | cleanup.policy | Contoh |
|---|---|---|---|---|---|
| Connector Consumer Group | connect-cluster | -- | -- | -- | connect-cluster-kafka-oss-sink |
| Task Offset Topic | connect-offset | > 1 | Local Storage | Compact | connect-offset-kafka-oss-sink |
| Task Configuration Topic | connect-config | 1 (tepat) | Local Storage | Compact | connect-config-kafka-oss-sink |
| Task Status Topic | connect-status | 6 (disarankan) | Local Storage | Compact | connect-status-kafka-oss-sink |
| Dead-letter Queue Topic | connect-error | 6 (disarankan) | Local Storage atau Cloud Storage | -- | connect-error-kafka-oss-sink |
| Error Data Topic | connect-error | 6 (disarankan) | Local Storage atau Cloud Storage | -- | connect-error-kafka-oss-sink |
Local Storage hanya tersedia pada instans Edisi Profesional. Untuk menghemat sumber daya topik, Anda dapat menggunakan topik yang sama untuk Dead-letter Queue Topic dan Error Data Topic.
Langkah 4: Konfigurasikan layanan tujuan
Pada langkah Configure Destination Service, pilih Object Storage Service sebagai layanan tujuan, atur parameter berikut, lalu klik Create.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| Bucket Name | Nama bucket OSS yang menerima data yang diekspor. | bucket_test |
| AccessKey ID | ID AccessKey dari Akun Alibaba Cloud yang memiliki akses ke bucket tersebut. | yourAccessKeyID |
| AccessKey Secret | Rahasia AccessKey dari Akun Alibaba Cloud. | yourAccessKeySecret |
Berikan akun tersebut izin minimum berikut:
{
"Version": "1",
"Statement": [
{
"Action": [
"oss:GetObject",
"oss:PutObject"
],
"Resource": "*",
"Effect": "Allow"
}
]
}ID AccessKey dan rahasia AccessKey diteruskan ke OSS sebagai variabel lingkungan saat tugas sinkronisasi dibuat. ApsaraMQ for Kafka tidak menyimpan kredensial ini setelah tugas dibuat.
Langkah 5: Terapkan konektor
Setelah konektor dibuat, kembali ke halaman Connectors, temukan konektor tersebut, lalu klik Deploy di kolom Actions.
Kirim pesan uji
Setelah penerapan, kirim pesan uji untuk memverifikasi konektor.
Di halaman Connectors, temukan konektor tersebut dan klik Test di kolom Actions.
Di panel Send Message, pilih metode pengiriman:
Console: Masukkan Message Key (misalnya,
demo) dan Message Content (misalnya,{"key": "test"}). Secara opsional, atur Send to Specified Partition ke Yes dan masukkan Partition ID. Untuk informasi cara menemukan ID partisi, lihat View partition status.Docker: Jalankan perintah Docker yang ditampilkan di bagian Run the Docker container to produce a sample message.
SDK: Pilih SDK untuk bahasa pemrograman atau framework Anda dan ikuti petunjuknya.
Verifikasi hasil
Periksa apakah pesan uji telah tiba di bucket OSS Anda:
Buka Konsol OSS dan buka halaman Files dari bucket target. Untuk informasi selengkapnya, lihat Ikhtisar.
Konfirmasi bahwa objek baru muncul. Munculnya objek baru menandakan bahwa konektor berfungsi.

Setiap objek yang diekspor berisi array JSON dalam format berikut:
[
{
"key": "123",
"offset": 4,
"overflowFlag": true,
"partition": 0,
"timestamp": 1603779578478,
"topic": "Test",
"value": "1",
"valueSize": 272687
}
]| Field | Deskripsi |
|---|---|
key | Kunci pesan. |
offset | Offset konsumen pesan dalam partisinya. |
overflowFlag | Apakah nilai pesan dipotong karena ukurannya terlalu besar. |
partition | Partisi tempat pesan dikonsumsi. |
timestamp | Timestamp Unix (dalam milidetik) saat pesan diproduksi. |
topic | Nama topik sumber. |
value | Isi pesan (atau versi terpotong jika overflowFlag bernilai true). |
valueSize | Ukuran asli nilai pesan dalam byte. |
Konfigurasikan sumber daya Function Compute
Untuk menyesuaikan sumber daya FC yang digunakan oleh konektor:
Di halaman Connectors, temukan konektor tersebut, klik More di kolom Actions, lalu pilih Configure Function.
Konfigurasikan sumber daya di Konsol Function Compute sesuai kebutuhan.