全部产品
Search
文档中心

ApsaraMQ for Kafka:Buat Konektor Sink OSS

更新时间:Jul 02, 2025

Topik ini menjelaskan cara membuat konektor sink Object Storage Service (OSS) untuk menyinkronkan data dari topik di ApsaraMQ for Kafka ke objek di OSS.

Prasyarat

Untuk informasi tentang prasyarat, lihat Prasyarat.

Catatan Penggunaan

  1. Konektor sink OSS mengarahkan data ke OSS berdasarkan waktu pemrosesan acara, bukan waktu pembuatan acara. Jika Anda mengonfigurasi subdirektori berdasarkan waktu saat membuat konektor sink OSS, data yang dihasilkan pada batas waktu mungkin dikirim ke subdirektori berikutnya.

  2. Pemrosesan data kotor: Jika Anda menggunakan sintaks JSONPath untuk mengonfigurasi jalur kustom atau konten objek saat membuat konektor sink OSS, konektor akan mengarahkan data yang tidak memenuhi sintaks JSONPath ke direktori bernama invalidRuleData/ di bucket berdasarkan kebijakan batch yang Anda konfigurasikan. Jika direktori invalidRuleData/ ditampilkan di bucket, periksa apakah sintaks JSONPath benar dan pastikan semua pesan dikonsumsi oleh konsumen.

  3. Latensi berkisar dari beberapa detik hingga menit mungkin terjadi dalam pengalihan.

  4. Jika badan pesan dalam topik sumber ApsaraMQ for Kafka perlu diekstraksi berdasarkan sintaks JSONPath yang digunakan untuk jalur kustom atau konten objek, Anda harus mengkodekan atau mendekode badan pesan ke format JSON di topik sumber ApsaraMQ for Kafka.

  5. Konektor sink OSS menulis data dari aplikasi upstream ke OSS dengan menambahkan data ke objek yang ada secara real-time. Oleh karena itu, dalam jalur tanpa subdirektori, data sedang ditulis ke objek terlihat terbaru. Dalam hal ini, berhati-hatilah saat Anda mengonsumsi pesan.

Aturan Penagihan

Konektor sink OSS berjalan di Alibaba Cloud Function Compute. Saat data dalam konektor sink OSS diproses dan ditransmisikan, Anda dikenakan biaya untuk sumber daya Function Compute yang dikonsumsi. Untuk informasi lebih lanjut, lihat Ikhtisar Penagihan.

Langkah 1: Buat Sumber Daya OSS

Buat bucket di konsol OSS. Untuk informasi lebih lanjut, lihat Buat Bucket.

Dalam topik ini, sebuah bucket bernama oss-sink-connector-bucket dibuat.

Langkah 2: Buat dan Mulai Konektor Sink OSS

  1. Masuk ke Konsol ApsaraMQ for Kafka. Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.

  2. Di panel navigasi di sebelah kiri, pilih Connector Ecosystem Integration > Tasks.

  3. Di halaman Tasks, klik Create Task.

    • Pembuatan Tugas

      1. Di langkah Source, atur parameter Data Provider menjadi ApsaraMQ for Kafka dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Next Step. Tabel berikut menjelaskan parameter tersebut.

        Parameter

        Deskripsi

        Contoh

        Region

        Wilayah tempat instance sumber ApsaraMQ for Kafka berada.

        Tiongkok (Beijing)

        ApsaraMQ for Kafka Instance

        Instance ApsaraMQ for Kafka di mana pesan yang ingin Anda arahkan diproduksi.

        alikafka_post-cn-jte3****

        Topic

        Topik pada instance ApsaraMQ for Kafka di mana pesan yang ingin Anda arahkan diproduksi.

        demo-topic

        Group ID

        Nama grup konsumen pada instance sumber ApsaraMQ for Kafka.

        • Quickly Create: Sistem secara otomatis membuat grup konsumen dengan nama dalam format GID_EVENTBRIDGE_xxx. Kami merekomendasikan Anda memilih nilai ini.

        • Use Existing Group: Pilih ID grup yang ada yang tidak sedang digunakan. Jika Anda memilih grup yang ada yang sedang digunakan, publikasi dan langganan pesan yang ada akan terpengaruh.

        Buat Cepat

        Consumer Offset

        Offset dari mana pesan dikonsumsi. Nilai valid:

        • Latest Offset

        • Earliest Offset

        Offset Terbaru

        Network Configuration

        Jenis jaringan tempat Anda ingin mengarahkan pesan. Nilai valid:

        • Basic Network

        • Self-managed Internet

        Jaringan Dasar

        VPC

        ID virtual private cloud (VPC) tempat instance ApsaraMQ for Kafka diterapkan. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        ID vSwitch tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.

        vsw-bp1gbjhj53hdjdkg****

        Security Group

        ID grup keamanan tempat instance ApsaraMQ for Kafka termasuk. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Configuration menjadi Internet Mandiri.

        alikafka_pre-cn-7mz2****

        Data Format

        Fitur format data digunakan untuk mengkodekan data biner yang dikirim dari sumber ke format data tertentu. Beberapa format data didukung. Jika Anda tidak memiliki persyaratan khusus untuk pengkodean, tentukan Json sebagai nilainya.

        • Json: Data biner dikodekan menjadi data berformat JSON berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload.

        • Text: Data biner dikodekan menjadi string berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload. Ini adalah nilai default.

        • Binary: Data biner dikodekan menjadi string berdasarkan pengkodean Base64 dan kemudian dimasukkan ke dalam payload.

        Json

        Messages

        Jumlah maksimum pesan yang dapat dikirim dalam setiap pemanggilan fungsi. Permintaan hanya dikirim ketika jumlah pesan dalam backlog mencapai nilai yang ditentukan. Nilai valid: 1 hingga 10000.

        100

        Interval (Unit: Seconds)

        Interval waktu pemanggilan fungsi. Sistem mengirimkan pesan yang terkumpul ke Function Compute pada interval waktu yang ditentukan. Nilai valid: 0 hingga 15. Unit: detik. Nilai 0 menentukan bahwa pesan dikirim segera setelah pengumpulan.

        3

      2. Di langkah Filtering, definisikan pola data di editor kode Pattern Content untuk menyaring data. Untuk informasi lebih lanjut, lihat Pola Acara.

      3. Di langkah Transformation, tentukan metode pembersihan data untuk mengimplementasikan kemampuan pemisahan, pemetaan, pengayaan, dan pengalihan data. Untuk informasi lebih lanjut, lihat Gunakan Function Compute untuk melakukan pembersihan pesan.

      4. Di langkah Sink, atur parameter Service Type menjadi OSS dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Save. Tabel berikut menjelaskan parameter tersebut.

        Parameter

        Deskripsi

        Contoh

        OSS Bucket

        Bucket OSS yang Anda buat.

        Penting
        • Pastikan bucket yang ditentukan dibuat secara manual dan tidak dihapus saat konektor berjalan.

        • Saat membuat bucket, atur parameter Kelas Penyimpanan menjadi Standard atau IA. Bucket Arsip tidak didukung oleh konektor sink ApsaraMQ for Kafka.

        • Setelah Anda membuat konektor sink OSS, jalur file sistem .tmp/ dihasilkan di direktori level-1 bucket OSS. Jangan hapus atau gunakan objek OSS di jalur tersebut.

        oss-sink-connector-bucket

        Storage Path

        Objek tempat pesan yang diarahkan disimpan. Kunci objek OSS terdiri dari jalur dan nama. Sebagai contoh, jika parameter ObjectKey diatur ke a/b/c/a.txt, jalur objek adalah a/b/c/ dan nama objek adalah a.txt. Anda dapat menentukan nilai kustom untuk jalur objek. Nama objek dihasilkan oleh konektor berdasarkan format berikut: {Timestamp UNIX dalam milidetik }_{String acak 8-bit}. Contoh: 1705576353794_elJmxu3v.

        • Jika Anda mengatur parameter ini ke garis miring maju (/), tidak ada subdirektori yang tersedia di bucket. Data disimpan di direktori level-1 bucket.

        • Variabel waktu {yyyy}, {MM}, {dd}, dan {HH} dapat digunakan dalam nilai parameter ini. Variabel ini peka huruf besar/kecil dan menentukan tahun, bulan, hari, dan jam, masing-masing.

        • Sintaks JSONPath dapat digunakan dalam nilai parameter ini. Contoh: {$.data.topic} dan {$.data.partition}. Variabel JSONPath harus memenuhi persyaratan ekspresi JSONPath standar. Untuk mencegah penulisan data yang tidak normal, kami merekomendasikan agar nilai yang diekstraksi menggunakan JSONPath bertipe int atau string, berisi karakter yang dikodekan dalam UTF-8, dan tidak berisi spasi, dua titik berturut-turut(.), emoji, garis miring maju (/), atau garis miring balik (\).

        • Konstanta dapat digunakan dalam nilai parameter ini.

          Catatan

          Subdirektori memungkinkan Anda mengelompokkan data dengan tepat. Ini membantu mencegah masalah yang disebabkan oleh sejumlah besar objek kecil dalam satu subdirektori.

          Throughput konektor sink OSS berkorelasi positif dengan jumlah subdirektori. Jika tidak ada subdirektori atau jumlah subdirektori kecil yang dikonfigurasikan, throughput konektor rendah. Ini dapat menyebabkan akumulasi pesan di aplikasi upstream. Sejumlah besar subdirektori dapat menyebabkan masalah seperti data tersebar, jumlah tulisan meningkat, dan bagian berlebihan. Kami merekomendasikan Anda mengonfigurasi subdirektori berdasarkan saran berikut:

          • Topik ApsaraMQ for Kafka sumber: Anda dapat mengonfigurasi subdirektori berdasarkan waktu dan partisi. Dengan cara ini, Anda dapat meningkatkan jumlah partisi pada instance ApsaraMQ for Kafka untuk meningkatkan throughput konektor. Contoh: prefix/{yyyy}/{MM}/{dd}/{HH}/{$.data.partition}/.

          • Grup bisnis: Anda dapat mengonfigurasi subdirektori menggunakan bidang bisnis tertentu dari data. Throughput konektor ditentukan oleh jumlah nilai bidang bisnis. Contoh: prefixV2/{$.data.body.field}/.

          Kami merekomendasikan Anda mengonfigurasi awalan konstan yang berbeda untuk konektor sink OSS yang berbeda. Ini mencegah beberapa konektor menulis data ke direktori yang sama.

        alikafka_post-cn-9dhsaassdd****/guide-oss-sink-topic/YYYY/MM/dd/HH

        Batch Aggregation Object Size

        Ukuran objek yang akan diagregasi. Nilai valid: 1 hingga 1024. Unit: MB.

        Catatan
        • Konektor sink OSS menulis data ke objek OSS yang sama dalam batch. Ukuran data setiap batch lebih besar dari 0 MB dan sama dengan atau kurang dari 16 MB. Oleh karena itu, ukuran objek OSS mungkin sedikit lebih besar dari nilai yang dikonfigurasikan. Kelebihan ukuran hingga 16 MB.

        • Dalam skenario lalu lintas tinggi, kami merekomendasikan Anda mengatur parameter Ukuran Objek Agregasi Batch menjadi nilai lebih besar dari 100 MB dan parameter Jendela Waktu Agregasi Batch menjadi nilai lebih besar dari 1 jam. Contoh untuk parameter Ukuran Objek Agregasi Batch: 128 MB dan 512 MB. Contoh untuk parameter Jendela Waktu Agregasi Batch: 60 menit dan 120 menit.

        5

        Batch Aggregation Time Window

        Jendela waktu untuk agregasi. Nilai valid: 1 hingga 1440. Unit: menit.

        1

        File Compression

        • Tidak Diperlukan Kompresi: menghasilkan objek tanpa akhiran.

        • GZIP: menghasilkan objek dengan akhiran .gz.

        • Snappy: menghasilkan objek dengan akhiran .snappy.

        • Zstd: menghasilkan objek dengan akhiran .zstd.

        Jika objek akan dikompresi, konektor sink OSS menulis data dalam batch berdasarkan ukuran data sebelum kompresi. Akibatnya, ukuran objek yang ditampilkan di OSS lebih kecil dari ukuran batch. Setelah objek didekompresi, ukuran objek mendekati ukuran batch.

        Tidak Diperlukan Kompresi

        File Content

        • Data Lengkap: Konektor menggunakan spesifikasi CloudEvents untuk mengemas pesan asli. Jika Anda memilih nilai ini, data yang diarahkan ke OSS mencakup metadata dalam spesifikasi CloudEvents. Contoh kode berikut memberikan contohnya. Dalam contoh kode, bidang data menentukan data dan bidang lainnya menentukan metadata dalam spesifikasi CloudEvents.

          {
            "specversion": "1.0",
            "id": "8e215af8-ca18-4249-8645-f96c1026****",
            "source": "acs:alikafka",
            "type": "alikafka:Topic:Message",
            "subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****",
            "datacontenttype": "application/json; charset=utf-8",
            "time": "2022-06-23T02:49:51.589Z",
            "aliyunaccountid": "182572506381****",
            "data": {
              "topic": "****",
              "partition": 7,
              "offset": 25,
              "timestamp": 1655952591589,
              "headers": {
                "headers": [],
                "isReadOnly": false
              },
              "key": "keytest",
              "value": "hello kafka msg"
            }
          }
        • Ekstraksi Data: data yang diekstraksi menggunakan JSONPath diarahkan ke OSS. Sebagai contoh, jika Anda mengonfigurasi ekspresi $.data, hanya nilai dari bidang data yang diarahkan ke OSS.

        Jika Anda tidak memerlukan bidang tambahan dalam spesifikasi CloudEvents, kami merekomendasikan Anda memilih Ekstraksi Data dan mengonfigurasi ekspresi $.data untuk mengarahkan pesan asli ke OSS. Ini membantu mengurangi biaya dan meningkatkan efisiensi transmisi.

        Ekstraksi Data

        $.data
    • Properti Tugas

      Konfigurasikan kebijakan pengulangan yang ingin Anda gunakan saat acara gagal didorong dan metode yang ingin Anda gunakan untuk menangani titik kegagalan. Untuk informasi lebih lanjut, lihat Kebijakan Pengulangan dan Antrian Pesan Gagal.

  4. Kembali ke halaman Tasks, temukan konektor sink OSS yang Anda buat, lalu klik Enable di kolom Actions.

  5. Dalam pesan Note, klik OK.

    Konektor sink membutuhkan waktu 30 hingga 60 detik untuk diaktifkan. Anda dapat melihat kemajuannya di kolom Status pada halaman Tasks.

Langkah 3: Uji Konektor Sink OSS

  1. Di halaman Tasks, temukan konektor sink OSS yang Anda buat dan klik topik sumber di kolom Event Source.

  2. Di halaman Detail Topik, klik Send Message.
  3. Di panel Start to Send and Consume Message, konfigurasikan parameter berdasarkan gambar berikut dan klik OK.

    发送消息

  4. Di halaman Tasks, temukan konektor sink OSS yang Anda buat dan klik bucket tujuan di kolom Event Target.

  5. Di panel navigasi di sebelah kiri halaman yang muncul, pilih Object Management > Objects.

    • Direktori /tmp: jalur file sistem tempat konektor bergantung. Jangan hapus atau gunakan objek OSS di jalur ini.

    • Direktori file data: Subdirektori dihasilkan di direktori ini berdasarkan jalur yang Anda konfigurasikan untuk konektor. Objek data diunggah ke direktori terdalam.

    最深层路径

  6. Temukan objek yang ingin Anda kelola dan pilih 图标 > Download di kolom Actions.

  7. Buka objek yang diunduh untuk melihat detail pesan.

    消息

    Gambar di atas memberikan contoh detail pesan. Beberapa pesan dipisahkan dengan baris baru.