All Products
Search
Document Center

ApsaraMQ for Kafka:Buat konektor sink OSS

Last Updated:Mar 12, 2026

Saat aplikasi Anda menghasilkan data streaming ke ApsaraMQ for Kafka, Anda mungkin perlu menyimpan pesan tersebut di Object Storage Service (OSS) untuk pengarsipan, analitik, atau pemrosesan lanjutan. Konektor sink OSS mengotomatiskan alur ini: ApsaraMQ for Kafka mengirimkan pesan ke Function Compute (FC), yang kemudian menuliskannya ke bucket OSS Anda.

Prasyarat

Sebelum memulai, pastikan Anda telah:

Batasan

  • Persyaratan wilayah yang sama: Instans ApsaraMQ for Kafka, bucket OSS, dan FC harus berada di wilayah yang sama. Untuk informasi selengkapnya, lihat Batasan.

  • Hanya UTF-8: ApsaraMQ for Kafka melakukan serialisasi pesan menjadi string berkode UTF-8. Data biner tidak didukung.

  • Biaya: FC menyediakan kuota sumber daya gratis. Penggunaan melebihi kuota tersebut akan ditagih sesuai dengan aturan penagihan FC. Untuk memantau eksekusi fungsi, konfigurasikan pencatatan log FC.

Buat dan terapkan konektor

Langkah 1: Mulai panduan pembuatan konektor

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Pada bagian Resource Distribution di halaman Overview, pilih wilayah tempat instans Anda berada.

  3. Di panel navigasi sebelah kiri, klik Connectors.

  4. Di halaman Connectors, pilih instans Anda dari daftar drop-down Select Instance dan klik Create Connector.

Langkah 2: Konfigurasikan informasi dasar

Pada langkah Configure Basic Information, atur parameter berikut lalu klik Next.

ParameterDeskripsiContoh
NameNama unik untuk konektor. Harus terdiri dari 1–48 karakter dan dapat berisi angka, huruf kecil, serta tanda hubung (-). Tidak boleh diawali dengan tanda hubung. Kelompok konsumen bernama connect-<connector-name> akan dibuat secara otomatis.kafka-oss-sink
InstanceMenampilkan nama dan ID instans saat ini.demo alikafka_post-cn-st21p8vj\*\*\*\*
Penting

Secara default, opsi Authorize to Create Service Linked Role dipilih. ApsaraMQ for Kafka akan membuat peran terkait layanan jika belum tersedia. Peran ini memberikan izin yang diperlukan untuk menyinkronkan data dari ApsaraMQ for Kafka ke OSS.

Langkah 3: Konfigurasikan layanan sumber

Pada langkah Configure Source Service, pilih Message Queue for Apache Kafka sebagai layanan sumber, atur parameter berikut, lalu klik Next.

Parameter wajib

ParameterDeskripsiContoh
Data Source TopicTopik tempat pesan diekspor.oss-test-input
Consumer Thread ConcurrencyJumlah thread konsumen konkuren. Nilai yang valid: 1, 2, 3, 6, 12. Default: 6.6
Consumer OffsetTitik awal konsumsi pesan. Earliest Offset membaca dari awal. Latest Offset hanya membaca pesan baru.Earliest Offset

Parameter lingkungan runtime

Klik Configure Runtime Environment untuk memperluas parameter berikut.

ParameterDeskripsiContoh
VPC IDVPC tempat tugas sinkronisasi dijalankan. Secara default menggunakan VPC dari instans sumber.vpc-bp1xpdnd3l\*\*\*
vSwitch IDvSwitch yang terhubung ke instans sumber. Harus berada dalam VPC yang sama dengan instans. Secara default menggunakan vSwitch dari instans sumber.vsw-bp1d2jgg81\*\*\*
Failure Handling PolicyCara menangani kegagalan pengiriman pesan. Continue Subscription tetap mengonsumsi dari partisi dan mencatat error. Stop Subscription menghentikan konsumsi dari partisi dan mencatat error. Untuk informasi selengkapnya, lihat Mengelola konektor dan Kode error.Continue Subscription
Resource Creation MethodApakah topik internal yang diperlukan dibuat secara otomatis (Auto) atau manual (Manual). Pilih Auto kecuali Anda memerlukan pengaturan topik kustom.Auto

Topik internal (hanya untuk pembuatan manual)

Jika Anda mengatur Resource Creation Method ke Manual, buat topik internal berikut sebelum melanjutkan. Klik Configure Runtime Environment untuk menampilkan parameter ini.

ParameterAwalan penamaanPartisiMesin penyimpanancleanup.policyContoh
Connector Consumer Groupconnect-cluster------connect-cluster-kafka-oss-sink
Task Offset Topicconnect-offset> 1Local StorageCompactconnect-offset-kafka-oss-sink
Task Configuration Topicconnect-config1 (tepat)Local StorageCompactconnect-config-kafka-oss-sink
Task Status Topicconnect-status6 (disarankan)Local StorageCompactconnect-status-kafka-oss-sink
Dead-letter Queue Topicconnect-error6 (disarankan)Local Storage atau Cloud Storage--connect-error-kafka-oss-sink
Error Data Topicconnect-error6 (disarankan)Local Storage atau Cloud Storage--connect-error-kafka-oss-sink
Catatan

Local Storage hanya tersedia pada instans Edisi Profesional. Untuk menghemat sumber daya topik, Anda dapat menggunakan topik yang sama untuk Dead-letter Queue Topic dan Error Data Topic.

Langkah 4: Konfigurasikan layanan tujuan

Pada langkah Configure Destination Service, pilih Object Storage Service sebagai layanan tujuan, atur parameter berikut, lalu klik Create.

ParameterDeskripsiContoh
Bucket NameNama bucket OSS yang menerima data yang diekspor.bucket_test
AccessKey IDID AccessKey dari Akun Alibaba Cloud yang memiliki akses ke bucket tersebut.yourAccessKeyID
AccessKey SecretRahasia AccessKey dari Akun Alibaba Cloud.yourAccessKeySecret

Berikan akun tersebut izin minimum berikut:

{
    "Version": "1",
    "Statement": [
        {
            "Action": [
                "oss:GetObject",
                "oss:PutObject"
            ],
            "Resource": "*",
            "Effect": "Allow"
        }
    ]
}
Catatan

ID AccessKey dan rahasia AccessKey diteruskan ke OSS sebagai variabel lingkungan saat tugas sinkronisasi dibuat. ApsaraMQ for Kafka tidak menyimpan kredensial ini setelah tugas dibuat.

Langkah 5: Terapkan konektor

Setelah konektor dibuat, kembali ke halaman Connectors, temukan konektor tersebut, lalu klik Deploy di kolom Actions.

Kirim pesan uji

Setelah penerapan, kirim pesan uji untuk memverifikasi konektor.

  1. Di halaman Connectors, temukan konektor tersebut dan klik Test di kolom Actions.

  2. Di panel Send Message, pilih metode pengiriman:

    • Console: Masukkan Message Key (misalnya, demo) dan Message Content (misalnya, {"key": "test"}). Secara opsional, atur Send to Specified Partition ke Yes dan masukkan Partition ID. Untuk informasi cara menemukan ID partisi, lihat View partition status.

    • Docker: Jalankan perintah Docker yang ditampilkan di bagian Run the Docker container to produce a sample message.

    • SDK: Pilih SDK untuk bahasa pemrograman atau framework Anda dan ikuti petunjuknya.

Verifikasi hasil

Periksa apakah pesan uji telah tiba di bucket OSS Anda:

  1. Buka Konsol OSS dan buka halaman Files dari bucket target. Untuk informasi selengkapnya, lihat Ikhtisar.

  2. Konfirmasi bahwa objek baru muncul. Munculnya objek baru menandakan bahwa konektor berfungsi.

files

Setiap objek yang diekspor berisi array JSON dalam format berikut:

[
    {
        "key": "123",
        "offset": 4,
        "overflowFlag": true,
        "partition": 0,
        "timestamp": 1603779578478,
        "topic": "Test",
        "value": "1",
        "valueSize": 272687
    }
]
FieldDeskripsi
keyKunci pesan.
offsetOffset konsumen pesan dalam partisinya.
overflowFlagApakah nilai pesan dipotong karena ukurannya terlalu besar.
partitionPartisi tempat pesan dikonsumsi.
timestampTimestamp Unix (dalam milidetik) saat pesan diproduksi.
topicNama topik sumber.
valueIsi pesan (atau versi terpotong jika overflowFlag bernilai true).
valueSizeUkuran asli nilai pesan dalam byte.

Konfigurasikan sumber daya Function Compute

Untuk menyesuaikan sumber daya FC yang digunakan oleh konektor:

  1. Di halaman Connectors, temukan konektor tersebut, klik More di kolom Actions, lalu pilih Configure Function.

  2. Konfigurasikan sumber daya di Konsol Function Compute sesuai kebutuhan.