Topik ini menjelaskan cara membuat konektor sink OSS untuk menyinkronkan data dari topik dalam instance ApsaraMQ for Kafka ke Object Storage Service (OSS).
Prasyarat
Pastikan persyaratan berikut telah dipenuhi:
Fitur konektor diaktifkan untuk instance ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Aktifkan Fitur Konektor.
Sebuah topik dibuat dalam instance ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Langkah 1: Buat Topik.
Bucket OSS dibuat di konsol OSS. Untuk informasi lebih lanjut, lihat Buat Bucket.
Function Compute diaktifkan. Untuk informasi lebih lanjut, lihat Buat Fungsi di Konsol Function Compute.
Peringatan
Ketika Anda menyinkronkan data dari topik dalam instance ApsaraMQ for Kafka ke bucket OSS, pastikan bahwa instance Message Queue for Apache Kafka dan bucket OSS berada di wilayah yang sama serta bahwa Function Compute tersedia di wilayah tersebut. Message Queue for Apache Kafka terlebih dahulu menyinkronkan data Anda ke Function Compute, yang kemudian menyinkronkan data tersebut ke OSS. Untuk informasi lebih lanjut tentang batasan pada konektor, lihat Batasan.
Konektor sink OSS mengekspor data menggunakan Function Compute. Function Compute menyediakan sejumlah sumber daya secara gratis. Jika kuota gratis habis, Anda akan dikenakan biaya sesuai aturan penagihan. Untuk informasi lebih lanjut, lihat Ikhtisar Penagihan.
Function Compute memungkinkan Anda untuk menanyakan log pemanggilan fungsi. Untuk informasi lebih lanjut, lihat Konfigurasikan Logging.
ApsaraMQ for Kafka mengubah pesan menjadi string yang dikodekan UTF-8 untuk transfer. Message Queue for Apache Kafka tidak mendukung data biner.
Buat dan terapkan konektor sink OSS
Masuk ke Konsol ApsaraMQ for Kafka.
Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Di panel navigasi sisi kiri, klik Connectors.
Di halaman Connectors, pilih instance tempat konektor miliknya dari daftar drop-down Select Instance dan klik Create Connector.
Di wizard Create Connector, lakukan langkah-langkah berikut:
Di langkah Configure Basic Information, konfigurasikan parameter dan klik Next. Tabel berikut menjelaskan parameter.
PentingSecara default, Authorize to Create Service Linked Role dipilih. Ini berarti bahwa ApsaraMQ for Kafka akan membuat peran terhubung layanan berdasarkan aturan berikut:
Jika tidak ada peran terhubung layanan yang tersedia, ApsaraMQ for Kafka secara otomatis membuat peran terhubung layanan agar Anda dapat menggunakan konektor sink OSS untuk menyinkronkan data dari ApsaraMQ for Kafka ke OSS.
Jika peran terhubung layanan tersedia, ApsaraMQ for Kafka tidak membuat peran baru.
Untuk informasi lebih lanjut tentang peran terhubung layanan, lihat Peran Terhubung Layanan.
Parameter
Deskripsi
Contoh
Name
Nama konektor. Tentukan nama konektor berdasarkan konvensi penamaan berikut:
Nama konektor harus memiliki panjang 1 hingga 48 karakter dan dapat berisi angka, huruf kecil, dan tanda hubung (-). Nama tidak boleh dimulai dengan tanda hubung (-).
Nama harus unik dalam sebuah instance ApsaraMQ for Kafka.
Tugas sinkronisasi data konektor harus menggunakan grup konsumen yang dinamai dalam format connect-Nama tugas. Jika Anda belum membuat grup konsumen seperti itu, Message Queue for Apache Kafka secara otomatis membuat satu untuk Anda.
kafka-oss-sink
Instance
Informasi tentang instance Message Queue for Apache Kafka. Secara default, nama dan ID instance ditampilkan.
demo alikafka_post-cn-st21p8vj****
Di langkah Configure Source Service, pilih Message Queue for Apache Kafka sebagai layanan sumber, konfigurasikan parameter, dan klik Next. Tabel berikut menjelaskan parameter.
Parameter
Deskripsi
Contoh
Data Source Topic
Nama topik dari mana data akan disinkronkan.
oss-test-input
Consumer Thread Concurrency
Jumlah thread konsumen konkuren yang digunakan untuk menyinkronkan data dari topik sumber. Nilai default: 6. Nilai valid:
1
2
3
6
12
6
Consumer Offset
Offset konsumen dari mana Anda ingin konsumsi pesan dimulai. Nilai valid:
Earliest Offset: Konsumsi pesan dimulai dari offset konsumen paling awal.
Latest Offset: Konsumsi pesan dimulai dari offset konsumen paling akhir.
Earliest Offset
VPC ID
ID VPC di mana tugas sinkronisasi data berjalan. Klik Configure Runtime Environment untuk menampilkan parameter. Secara default, ID VPC yang Anda tentukan saat menerapkan instance sumber ApsaraMQ for Kafka ditampilkan. Anda tidak perlu mengonfigurasi parameter ini.
vpc-bp1xpdnd3l***
vSwitch ID
ID vSwitch ke mana instance sumber terhubung. Klik Configure Runtime Environment untuk menampilkan parameter. vSwitch harus diterapkan di VPC yang sama dengan instance ApsaraMQ for Kafka sumber. Secara default, ID vSwitch yang Anda tentukan saat menerapkan instance sumber ApsaraMQ for Kafka ditampilkan.
vsw-bp1d2jgg81***
Failure Handling Policy
Menentukan apakah akan mempertahankan langganan ke partisi di mana terjadi kesalahan setelah pesan gagal dikirim. Klik Configure Runtime Environment untuk menampilkan parameter. Nilai valid:
Continue Subscription: mempertahankan langganan ke partisi di mana terjadi kesalahan. Entri log dibuat untuk kesalahan tersebut.
Stop Subscription: menghentikan langganan ke partisi di mana terjadi kesalahan. Entri log dibuat untuk kesalahan tersebut.
CatatanUntuk informasi lebih lanjut, lihat Kelola konektor.
Untuk informasi tentang cara menyelesaikan kesalahan berdasarkan kode kesalahan, lihat Kode kesalahan.
Continue Subscription
Resource Creation Method
Metode untuk membuat topik dan grup yang diperlukan oleh konektor sink OSS. Klik Configure Runtime Environment untuk menampilkan parameter.
Auto
Manual
Auto
Connector Consumer Group
Nama grup konsumen yang diperlukan oleh konektor sink OSS. Klik Configure Runtime Environment untuk menampilkan parameter. Kami merekomendasikan agar Anda memulai nama dengan connect-cluster.
connect-cluster-kafka-oss-sink
Task Offset Topic
Topik yang digunakan untuk menyimpan offset konsumen. Klik Configure Runtime Environment untuk menampilkan parameter.
Topik: Kami merekomendasikan agar Anda memulai nama topik dengan connect-offset.
Partisi: Jumlah partisi dalam topik harus lebih besar dari 1.
Mesin Penyimpanan: Anda harus menyetel mesin penyimpanan topik ke Penyimpanan Lokal.
CatatanAnda hanya dapat menyetel mesin penyimpanan ke Penyimpanan Lokal untuk instance Message Queue for Apache Kafka Edisi Profesional saat Anda membuat topik.
cleanup.policy: Anda harus menyetel kebijakan pembersihan log untuk topik ke Compact.
connect-offset-kafka-oss-sink
Task Configuration Topic
Topik yang digunakan untuk menyimpan konfigurasi tugas. Klik Configure Runtime Environment untuk menampilkan parameter.
Topik: Kami merekomendasikan agar Anda memulai nama topik dengan connect-config.
Partisi: Topik hanya dapat berisi satu partisi.
Mesin Penyimpanan: Anda harus menyetel mesin penyimpanan topik ke Penyimpanan Lokal.
CatatanAnda hanya dapat menyetel mesin penyimpanan ke Penyimpanan Lokal untuk instance Message Queue for Apache Kafka Edisi Profesional saat Anda membuat topik.
cleanup.policy: Anda harus menyetel kebijakan pembersihan log untuk topik ke Compact.
connect-config-kafka-oss-sink
Task Status Topic
Topik yang digunakan untuk menyimpan status tugas. Klik Configure Runtime Environment untuk menampilkan parameter.
Topik: Kami merekomendasikan agar Anda memulai nama topik dengan connect-status.
Partisi: Kami merekomendasikan agar Anda menyetel jumlah partisi dalam topik menjadi 6.
Mesin Penyimpanan: Anda harus menyetel mesin penyimpanan topik ke Penyimpanan Lokal.
CatatanAnda hanya dapat menyetel mesin penyimpanan ke Penyimpanan Lokal untuk instance Message Queue for Apache Kafka Edisi Profesional saat Anda membuat topik.
cleanup.policy: Anda harus menyetel kebijakan pembersihan log untuk topik ke Compact.
connect-status-kafka-oss-sink
Dead-letter Queue Topic
Topik yang digunakan untuk menyimpan data kesalahan dari kerangka kerja Kafka Connect. Klik Configure Runtime Environment untuk menampilkan parameter. Untuk menghemat sumber daya topik, Anda dapat membuat topik dan menggunakan topik tersebut sebagai topik antrian pesan gagal dan topik data kesalahan.
Topik: Kami merekomendasikan agar Anda memulai nama topik dengan connect-error.
Partisi: Kami merekomendasikan agar Anda menyetel jumlah partisi dalam topik menjadi 6.
Mesin Penyimpanan: Anda dapat menyetel mesin penyimpanan topik ke Penyimpanan Lokal atau Penyimpanan Cloud.
CatatanAnda hanya dapat menyetel mesin penyimpanan ke Penyimpanan Lokal untuk instance Message Queue for Apache Kafka Edisi Profesional saat Anda membuat topik.
connect-error-kafka-oss-sink
Error Data Topic
Topik yang digunakan untuk menyimpan data kesalahan dari konektor sink OSS. Klik Configure Runtime Environment untuk menampilkan parameter. Untuk menghemat sumber daya topik, Anda dapat membuat topik dan menggunakan topik tersebut sebagai topik antrian pesan gagal dan topik data kesalahan.
Topik: Kami merekomendasikan agar Anda memulai nama topik dengan connect-error.
Partisi: Kami merekomendasikan agar Anda menyetel jumlah partisi dalam topik menjadi 6.
Mesin Penyimpanan: Anda dapat menyetel mesin penyimpanan topik ke Penyimpanan Lokal atau Penyimpanan Cloud.
CatatanAnda hanya dapat menyetel mesin penyimpanan ke Penyimpanan Lokal untuk instance Message Queue for Apache Kafka Edisi Profesional saat Anda membuat topik.
connect-error-kafka-oss-sink
Di langkah Configure Destination Service, pilih Object Storage Service sebagai layanan tujuan, atur parameter, dan klik Create. Tabel berikut menjelaskan parameter.
Parameter
Deskripsi
Contoh
Bucket Name
Nama bucket OSS tempat data akan disinkronkan.
bucket_test
AccessKey ID
ID AccessKey akun Alibaba Cloud Anda.
yourAccessKeyID
AccessKey Secret
Rahasia AccessKey akun Alibaba Cloud Anda.
yourAccessKeySecret
Pastikan akun Alibaba Cloud Anda memiliki izin berikut sesuai prinsip hak istimewa minimal:
{ "Version": "1", "Statement": [ { "Action": [ "oss:GetObject", "oss:PutObject" ], "Resource": "*", "Effect": "Allow" } ] }CatatanID AccessKey dan Rahasia AccessKey dilewatkan ke OSS sebagai variabel lingkungan ketika tugas sinkronisasi data dibuat. Setelah tugas dibuat, ApsaraMQ for Kafka tidak menyimpan ID AccessKey atau Rahasia AccessKey akun Alibaba Cloud Anda.
Setelah konektor dibuat, Anda dapat melihat konektor di halaman Connectors.
Pergi ke halaman Connectors, temukan konektor yang Anda buat, dan klik Deploy di kolom Actions.
Kirim pesan
Setelah menerapkan konektor sink OSS, kirim pesan ke topik sumber dalam instance ApsaraMQ for Kafka untuk menguji apakah pesan dapat disinkronkan ke OSS.
Di halaman Connectors, temukan konektor yang ingin Anda kelola dan klik Test di kolom Actions.
Di panel Send Message, konfigurasikan parameter untuk mengirim pesan uji.
Jika Anda menyetel parameter Sending Method ke Console, lakukan langkah-langkah berikut:
Di bidang Message Key, masukkan kunci pesan. Contoh: demo.
Di bidang Message Content, masukkan isi pesan. Contoh: {"key": "test"}.
Konfigurasikan parameter Send to Specified Partition untuk menentukan apakah akan mengirim pesan uji ke partisi tertentu.
Jika Anda ingin mengirim pesan uji ke partisi tertentu, klik Yes dan masukkan ID partisi di bidang Partition ID. Contoh: 0. Untuk informasi tentang cara menanyakan ID partisi, lihat Lihat Status Partisi.
Jika Anda tidak ingin mengirim pesan uji ke partisi tertentu, klik No.
Jika Anda menyetel parameter Sending Method ke Docker, jalankan perintah Docker di bagian Run the Docker container to produce a sample message untuk mengirim pesan uji.
Jika Anda menyetel parameter Sending Method ke SDK, pilih SDK untuk bahasa pemrograman atau framework yang diperlukan dan metode akses untuk mengirim dan berlangganan pesan uji.
Verifikasi hasil
Setelah mengirim pesan uji ke topik sumber dalam instance ApsaraMQ for Kafka, Anda dapat memeriksa apakah pesan tersebut disinkronkan ke OSS di halaman File bucket OSS yang ditentukan di konsol OSS. Untuk informasi lebih lanjut, lihat Gambaran Umum.
Jika objek baru dihasilkan di bucket OSS, data telah disinkronkan ke OSS.

Data yang disinkronkan dari ApsaraMQ for Kafka ke OSS berada dalam format berikut:
[
{
"key":"123",
"offset":4,
"overflowFlag":true,
"partition":0,
"timestamp":1603779578478,
"topic":"Test",
"value":"1",
"valueSize":272687
}
]Operasi terkait
Anda dapat mengonfigurasi sumber daya Function Compute yang diperlukan oleh konektor sink OSS berdasarkan kebutuhan Anda.
Di halaman Connectors, temukan konektor yang Anda buat, klik More di kolom Actions, dan pilih .
Anda akan diarahkan ke konsol Function Compute, di mana Anda dapat mengonfigurasi sumber daya sesuai kebutuhan.