全部产品
Search
文档中心

ApsaraMQ for Kafka:Buat konektor sink Function Compute

更新时间:Jul 06, 2025

Topik ini menjelaskan cara membuat konektor sink Function Compute untuk menyinkronkan data dari topik sumber di instance ApsaraMQ for Kafka ke fungsi dalam Function Compute.

Prasyarat

Pastikan persyaratan berikut telah dipenuhi:

  • ApsaraMQ for Kafka

    • Fitur konektor telah diaktifkan untuk instance ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Aktifkan Fitur Konektor.

    • Sebuah topik telah dibuat di instance ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Langkah 1: Buat Topik.

      Dalam contoh ini, nama topik adalah fc-test-input.

  • Function Compute

    • Sebuah fungsi telah dibuat di Function Compute. Untuk informasi lebih lanjut, lihat Buat Fungsi dengan Cepat.

      Penting

      Fungsi yang Anda buat harus merupakan fungsi event.

      Dalam contoh ini, sebuah fungsi event bernama hello_world dibuat untuk layanan guide-hello_world yang berjalan di lingkungan runtime Python. Kode fungsi:

      # -*- coding: utf-8 -*-
      import logging
      
      # Untuk mengaktifkan fitur inisialisasi
      # harap implementasikan fungsi inisialisasi sebagai berikut:
      # def initializer(context):
      #   logger = logging.getLogger()
      #   logger.info('initializing')
      
      def handler(event, context):
        logger = logging.getLogger()
        logger.info('hello world:' + bytes.decode(event))
        return 'hello world:' + bytes.decode(event)
  • Opsional:EventBridge

    Catatan

    Anda hanya perlu mengaktifkan EventBridge jika instance tempat konektor sink Function Compute miliknya berada di wilayah China (Hangzhou) atau China (Chengdu).

Catatan Penggunaan

  • Data dapat disinkronkan dari topik sumber di instance ApsaraMQ for Kafka ke fungsi di Function Compute hanya dalam wilayah yang sama. Untuk informasi lebih lanjut tentang batasan pada konektor, lihat Batasan.

  • Jika instance tempat konektor sink Function Compute miliknya berada di wilayah China (Hangzhou) atau China (Chengdu), konektor tersebut diterapkan ke EventBridge.

    • Anda dapat menggunakan EventBridge secara gratis. Untuk informasi lebih lanjut, lihat Penagihan.

    • Saat Anda membuat konektor, EventBridge secara otomatis membuat peran layanan terhubung berikut: AliyunServiceRoleForEventBridgeSourceKafka dan AliyunServiceRoleForEventBridgeConnectVPC.

      • Jika peran layanan terhubung ini tidak tersedia, EventBridge secara otomatis membuatnya sehingga EventBridge dapat menggunakan peran ini untuk mengakses ApsaraMQ for Kafka dan Virtual Private Cloud (VPC).

      • Jika peran layanan terhubung ini tersedia, EventBridge tidak akan membuatnya lagi.

      Untuk informasi lebih lanjut tentang peran layanan terhubung, lihat Peran Layanan Terhubung.

    • Anda tidak dapat melihat log operasi tugas yang diterapkan ke EventBridge. Setelah konektor dijalankan, Anda dapat memeriksa kemajuan tugas sinkronisasi dengan melihat detail konsumsi grup konsumen yang berlangganan ke topik sumber. Untuk informasi lebih lanjut, lihat Lihat Detail Konsumen.

Prosedur

Untuk menggunakan konektor sink Function Compute untuk menyinkronkan data dari topik sumber di instance ApsaraMQ for Kafka ke fungsi di Function Compute, ikuti langkah-langkah berikut:

  1. Opsional:Izinkan konektor sink Function Compute untuk mengakses Function Compute lintas wilayah.

    Penting

    Jika Anda tidak perlu menggunakan konektor sink Function Compute untuk mengakses Function Compute lintas wilayah, lewati langkah ini.

    Aktifkan Akses Internet untuk Konektor Sink Function Compute

  2. Opsional:Izinkan konektor sink Function Compute untuk mengakses Function Compute lintas akun Alibaba Cloud.

    Penting

    Jika Anda tidak perlu menggunakan konektor sink Function Compute untuk mengakses Function Compute lintas akun Alibaba Cloud, lewati langkah ini.

  3. Opsional:Buat topik dan grup konsumen yang diperlukan oleh konektor sink Function Compute.

    Penting
    • Jika Anda tidak perlu menyesuaikan nama topik dan grup konsumen, lewati langkah ini.

    • Topik tertentu yang diperlukan oleh konektor sink Function Compute harus menggunakan mesin penyimpanan lokal. Jika versi utama instance ApsaraMQ for Kafka Anda adalah V0.10.2, Anda tidak dapat secara manual membuat topik yang menggunakan mesin penyimpanan lokal. Dalam hal ini, topik-topik ini harus dibuat secara otomatis.

    1. Buat Topik yang Diperlukan oleh Konektor Sink Function Compute

    2. Buat Grup Konsumen yang Diperlukan oleh Konektor Sink Function Compute

  4. Buat dan Terapkan Konektor Sink Function Compute

  5. Verifikasi hasilnya.

    1. Kirim Pesan Uji

    2. Lihat Log Fungsi

Aktifkan akses Internet untuk konektor sink Function Compute

Jika Anda perlu menggunakan konektor sink Function Compute untuk mengakses layanan Alibaba Cloud lainnya lintas wilayah, aktifkan akses Internet untuk konektor sink Function Compute. Untuk informasi lebih lanjut, lihat Aktifkan Akses Internet untuk Konektor.

Buat kebijakan kustom.

Buat kebijakan kustom untuk memberikan akses ke Function Compute dalam akun Alibaba Cloud tempat Anda ingin menyinkronkan data.

  1. Masuk ke Konsol RAM.

  2. Di panel navigasi sisi kiri, pilih Permissions > Policies.

  3. Di halaman Policies, klik Create Policy.

    image

  4. Di halaman Create Policy, buat kebijakan kustom.

    1. Klik tab JSON, masukkan skrip kebijakan kustom di editor kode, lalu klik Next Step.

      Contoh skrip:

      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "fc:InvokeFunction",
                      "fc:GetFunction"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
    2. Di bagian Basic Information, masukkan KafkaConnectorFcAccess di bidang Name.

    3. Klik OK.

Buat peran RAM

Buat peran RAM dalam akun Alibaba Cloud tempat Anda ingin menyinkronkan data. Anda tidak dapat memilih ApsaraMQ for Kafka sebagai layanan tepercaya saat membuat peran RAM. Oleh karena itu, pilih layanan yang didukung sebagai layanan tepercaya terlebih dahulu. Kemudian, ubah kebijakan kepercayaan dari peran RAM yang dibuat.

  1. Di panel navigasi sisi kiri, pilih Identities > Roles.

  2. Di halaman Roles, klik Create Role.

    image

  3. Di panel Create Role, buat peran RAM.

    1. Atur parameter Pilih Entitas Tepercaya ke Alibaba Cloud Service dan klik Next.

    2. Atur parameter Role Type ke Service Role. Di bidang RAM Role Name, masukkan AliyunKafkaConnectorRole. Dari daftar drop-down Select Trusted Service, pilih Function Compute. Lalu, klik OK.

  4. Di halaman Roles, temukan dan klik AliyunKafkaConnectorRole.

  5. Di halaman AliyunKafkaConnectorRole, klik tab Trust Policy Management. Di tab ini, klik Edit Trust Policy.

  6. Di panel Edit Trust Policy, ganti fc di skrip dengan alikafka dan klik OK.

    AliyunKafkaConnectorRole

Berikan izin kepada peran RAM

Berikan peran RAM yang dibuat izin untuk mengakses Function Compute dalam akun Alibaba Cloud tempat Anda ingin menyinkronkan data.

  1. Di panel navigasi sisi kiri, pilih Identities > Roles.

  2. Di halaman Roles, temukan AliyunKafkaConnectorRole dan klik Add Permissions di kolom Actions.

  3. Di panel Add Permissions, lampirkan kebijakan KafkaConnectorFcAccess ke peran RAM.

    1. Pada bagian Select Policy, klik Custom Policy.

    2. Di kolom Authorization Policy Name, temukan dan klik KafkaConnectorFcAccess.

    3. Klik OK.

    4. Klik Complete.

Buat topik yang diperlukan oleh konektor sink Function Compute

Di konsol ApsaraMQ for Kafka, buat topik-topik berikut yang diperlukan oleh konektor sink Function Compute: topik offset tugas, topik konfigurasi tugas, topik status tugas, topik antrian pesan gagal, dan topik data kesalahan. Topik-topik ini berbeda dalam jumlah partisi dan mesin penyimpanan. Untuk informasi lebih lanjut, lihat Parameter dalam Langkah Konfigurasi Layanan Sumber.

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.

    Penting

    Anda harus membuat topik di wilayah tempat instance Elastic Compute Service (ECS) Anda ditempatkan. Topik tidak dapat digunakan lintas wilayah. Misalnya, jika produsen dan konsumen pesan berjalan di instance ECS yang ditempatkan di wilayah China (Beijing), topik juga harus dibuat di wilayah China (Beijing).

  3. Di halaman Instances, klik nama instance yang ingin Anda kelola.

  4. Di panel navigasi sisi kiri, klik Topics.

  5. Di halaman Topics, klik Create Topic.

  6. Di panel Create Topic, tentukan properti topik dan klik OK.

    Parameter

    Deskripsi

    Contoh

    Name

    Nama topik.

    demo

    Description

    Deskripsi topik.

    demo test

    Partitions

    Jumlah partisi dalam topik.

    12

    Storage Engine

    Catatan

    Anda dapat menentukan jenis mesin penyimpanan hanya jika Anda menggunakan instance Edisi Profesional non-serverless. Untuk jenis instance lainnya, Cloud Storage dipilih secara default.

    Jenis mesin penyimpanan yang digunakan untuk menyimpan pesan di topik.

    ApsaraMQ for Kafka mendukung jenis mesin penyimpanan berikut:

    • Cloud Storage: Jika Anda memilih nilai ini, sistem menggunakan disk Alibaba Cloud untuk topik dan menyimpan data dalam tiga replika dalam mode terdistribusi. Mesin penyimpanan ini memiliki latensi rendah, kinerja tinggi, daya tahan panjang, dan keandalan tinggi. Jika Anda mengatur parameter Instance Edition ke Standard (High Write) saat membuat instance, Anda hanya dapat mengatur parameter ini ke Cloud Storage.

    • Local Storage: Jika Anda memilih nilai ini, sistem menggunakan algoritma in-sync replicas (ISR) dari Apache Kafka open source dan menyimpan data dalam tiga replika dalam mode terdistribusi.

    Cloud Storage

    Message Type

    Jenis pesan dari topik. Nilai yang valid:

    • Normal Message: Secara default, pesan dengan kunci yang sama disimpan di partisi yang sama sesuai urutan pengiriman pesan. Jika broker dalam kluster gagal, urutan pesan yang disimpan di partisi mungkin tidak dipertahankan. Jika Anda mengatur parameter Storage Engine ke Cloud Storage, parameter ini secara otomatis diatur ke Normal Message.

    • Partitionally Ordered Message: Secara default, pesan dengan kunci yang sama disimpan di partisi yang sama sesuai urutan pengiriman pesan. Jika broker dalam kluster gagal, pesan masih disimpan di partisi sesuai urutan pengiriman pesan. Pesan di beberapa partisi tidak dapat dikirim sampai partisi dipulihkan. Jika Anda mengatur parameter Storage Engine ke Local Storage, parameter ini secara otomatis diatur ke Partitionally Ordered Message.

    Normal Message

    Log Cleanup Policy

    Kebijakan pembersihan log yang digunakan oleh topik.

    Jika Anda mengatur parameter Storage Engine ke Local Storage, Anda harus mengonfigurasi parameter Log Cleanup Policy. Anda hanya dapat mengatur Parameter Mesin Penyimpanan ke Penyimpanan Lokal jika Anda menggunakan instance Edisi Profesional ApsaraMQ for Kafka.

    ApsaraMQ for Kafka menyediakan kebijakan pembersihan log berikut:

    • Delete: kebijakan pembersihan log default. Jika ruang penyimpanan yang cukup tersedia di sistem, pesan disimpan berdasarkan periode retensi maksimum. Setelah penggunaan penyimpanan melebihi 85%, sistem menghapus pesan yang disimpan paling awal untuk memastikan ketersediaan layanan.

    • Compact: kebijakan pemadatan log yang digunakan di Apache Kafka. Pemadatan log memastikan bahwa nilai terbaru dipertahankan untuk pesan dengan kunci yang sama. Kebijakan ini cocok untuk skenario seperti memulihkan sistem yang gagal atau memuat ulang cache setelah sistem di-restart. Misalnya, saat menggunakan Kafka Connect atau Confluent Schema Registry, Anda harus menyimpan informasi tentang status dan konfigurasi sistem di topik yang dipadatkan log-nya.

      Penting

      Anda hanya dapat menggunakan topik yang dipadatkan log-nya di komponen cloud-native tertentu, seperti Kafka Connect dan Confluent Schema Registry. Untuk informasi lebih lanjut, lihat aliware-kafka-demos.

    Compact

    Tag

    Tag yang ingin Anda lampirkan ke topik.

    demo

    Setelah topik dibuat, Anda dapat melihat topik tersebut di halaman Topics.

Buat grup konsumen yang diperlukan oleh konektor sink Function Compute

Di konsol ApsaraMQ for Kafka, buat grup konsumen yang diperlukan oleh konektor sink Function Compute. Nama grup konsumen harus dalam format connect-Nama Tugas. Untuk informasi lebih lanjut, lihat Parameter dalam Langkah Konfigurasi Layanan Sumber.

  1. Masuk ke konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.

  3. Di halaman Instances, klik nama instance yang ingin Anda kelola.

  4. Di panel navigasi sisi kiri, klik Groups.

  5. Di halaman Groups, klik Create Group.

  6. Di panel Create Group, masukkan nama grup di bidang Group ID dan deskripsi grup di bidang Description, lampirkan tag ke grup, lalu klik OK.

    Setelah membuat grup konsumen, Anda dapat melihat grup tersebut di halaman Groups.

Buat dan terapkan konektor sink Function Compute

Buat dan terapkan konektor sink Function Compute yang menyinkronkan data dari ApsaraMQ for Kafka ke Function Compute.

  1. Masuk ke konsol ApsaraMQ for Kafka.

  2. Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.

  3. Di panel navigasi sisi kiri, klik Connectors.

  4. Di halaman Connectors, pilih instance tempat konektor miliknya dari daftar drop-down Select Instance dan klik Create Connector.

  5. Lengkapi wizard Create Connector.

    1. Di langkah Configure Basic Information, atur parameter yang dijelaskan dalam tabel berikut dan klik Next.

      Parameter

      Deskripsi

      Contoh

      Name

      Nama konektor. Tentukan nama konektor berdasarkan aturan berikut:

      • Nama konektor harus memiliki panjang 1 hingga 48 karakter. Dapat berisi angka, huruf kecil, dan tanda hubung (-), tetapi tidak boleh dimulai dengan tanda hubung (-).

      • Setiap nama konektor harus unik dalam sebuah instance ApsaraMQ for Kafka.

      Tugas sinkronisasi data konektor harus menggunakan grup konsumen bernama dalam format connect-Nama Tugas. Jika Anda belum membuat grup konsumen seperti itu, Message Queue for Apache Kafka secara otomatis membuat satu untuk Anda.

      kafka-fc-sink

      Instance

      Informasi tentang instance Message Queue for Apache Kafka. Secara default, nama dan ID instance ditampilkan.

      demo alikafka_post-cn-st21p8vj****

    2. Di langkah Configure Source Service, pilih Message Queue for Apache Kafka sebagai layanan sumber, atur parameter yang dijelaskan dalam tabel berikut, lalu klik Next.

      Catatan

      Jika Anda telah membuat topik dan grup konsumen sebelumnya, atur parameter Metode Pembuatan Sumber Daya ke Manual dan masukkan nama sumber daya yang dibuat di bidang di bawah ini. Jika tidak, atur parameter Metode Pembuatan Sumber Daya ke Auto.

      Tabel 1. Parameter dalam Langkah Konfigurasi Layanan Sumber

      Parameter

      Deskripsi

      Contoh

      Data Source Topic

      Nama topik sumber tempat data akan disinkronkan.

      fc-test-input

      Consumer Thread Concurrency

      Jumlah thread konsumen konkuren yang digunakan untuk menyinkronkan data dari topik sumber. Secara default, enam thread konsumen konkuren digunakan. Nilai yang valid:

      • 1

      • 2

      • 3

      • 6

      • 12

      6

      Consumer Offset

      Offset dari mana konsumsi dimulai. Nilai yang valid:

      • Earliest Offset: Konsumsi dimulai dari offset paling awal.

      • Latest Offset: Konsumsi dimulai dari offset terbaru.

      Earliest Offset

      VPC ID

      ID VPC tempat tugas sinkronisasi data berjalan. Klik Configure Runtime Environment untuk menampilkan parameter. Nilai default adalah ID VPC yang Anda tentukan saat menerapkan instance ApsaraMQ for Kafka. Anda tidak perlu mengubah nilai tersebut.

      vpc-bp1xpdnd3l***

      vSwitch ID

      ID vSwitch tempat tugas sinkronisasi data berjalan. Klik Configure Runtime Environment untuk menampilkan parameter. vSwitch harus ditempatkan di VPC yang sama dengan instance ApsaraMQ for Kafka. Nilai default adalah ID vSwitch yang Anda tentukan saat menerapkan instance ApsaraMQ for Kafka.

      vsw-bp1d2jgg81***

      Failure Handling Policy

      Menentukan apakah akan mempertahankan langganan ke partisi tempat kesalahan terjadi setelah pesan relevan gagal dikirim. Klik Configure Runtime Environment untuk menampilkan parameter. Nilai yang valid:

      • Continue Subscription: mempertahankan langganan ke partisi tempat kesalahan terjadi dan mengembalikan log.

      • Stop Subscription: menghentikan langganan ke partisi tempat kesalahan terjadi dan mengembalikan log.

      Catatan
      • Untuk informasi lebih lanjut, lihat Kelola konektor.

      • Untuk informasi lebih lanjut tentang cara menyelesaikan kesalahan berdasarkan kode kesalahan, lihat Kode kesalahan.

      Continue Subscription

      Resource Creation Method

      Metode yang digunakan untuk membuat topik dan grup konsumen yang diperlukan oleh konektor sink Function Compute. Klik Configure Runtime Environment untuk menampilkan parameter. Nilai yang valid:

      • Auto

      • Manual

      Auto

      Connector Consumer Group

      Grup konsumen yang digunakan oleh tugas sinkronisasi data konektor. Klik Configure Runtime Environment untuk menampilkan parameter. Nama grup konsumen ini harus dalam format connect-Nama Tugas.

      connect-kafka-fc-sink

      Task Offset Topic

      Topik yang digunakan untuk menyimpan offset konsumen. Klik Configure Runtime Environment untuk menampilkan parameter.

      • Topik: Kami merekomendasikan agar Anda memulai nama topik dengan connect-offset.

      • Partisi: Jumlah partisi di topik harus lebih dari 1.

      • Mesin Penyimpanan: Mesin penyimpanan topik harus diatur ke Penyimpanan Lokal.

      • cleanup.policy: Kebijakan pembersihan log untuk topik harus diatur ke Compact.

      connect-offset-kafka-fc-sink

      Task Configuration Topic

      Topik yang digunakan untuk menyimpan konfigurasi tugas. Klik Configure Runtime Environment untuk menampilkan parameter.

      • Topik: Kami merekomendasikan agar Anda memulai nama topik dengan connect-config.

      • Partisi: Topik hanya dapat berisi satu partisi.

      • Mesin Penyimpanan: Mesin penyimpanan topik harus diatur ke Penyimpanan Lokal.

      • cleanup.policy: Kebijakan pembersihan log untuk topik harus diatur ke Compact.

      connect-config-kafka-fc-sink

      Task Status Topic

      Topik yang digunakan untuk menyimpan status tugas. Klik Configure Runtime Environment untuk menampilkan parameter.

      • Topik: Kami merekomendasikan agar Anda memulai nama topik dengan connect-status.

      • Partisi: Kami merekomendasikan agar Anda mengatur jumlah partisi di topik menjadi 6.

      • Mesin Penyimpanan: Mesin penyimpanan topik harus diatur ke Penyimpanan Lokal.

      • cleanup.policy: Kebijakan pembersihan log untuk topik harus diatur ke Compact.

      connect-status-kafka-fc-sink

      Dead-letter Queue Topic

      Topik yang digunakan untuk menyimpan data kesalahan kerangka kerja Kafka Connect. Klik Configure Runtime Environment untuk menampilkan parameter. Untuk menghemat sumber daya topik, Anda dapat membuat topik dan menggunakan topik tersebut sebagai topik antrian pesan gagal dan topik data kesalahan.

      • Topik: Kami merekomendasikan agar Anda memulai nama topik dengan connect-error.

      • Partisi: Kami merekomendasikan agar Anda mengatur jumlah partisi di topik menjadi 6.

      • Mesin Penyimpanan: Mesin penyimpanan topik dapat diatur ke Penyimpanan Lokal atau Penyimpanan Cloud.

      connect-error-kafka-fc-sink

      Error Data Topic

      Topik yang digunakan untuk menyimpan data kesalahan konektor. Klik Configure Runtime Environment untuk menampilkan parameter. Untuk menghemat sumber daya topik, Anda dapat membuat topik dan menggunakan topik tersebut sebagai topik antrian pesan gagal dan topik data kesalahan.

      • Topik: Kami merekomendasikan agar Anda memulai nama topik dengan connect-error.

      • Partisi: Kami merekomendasikan agar Anda mengatur jumlah partisi di topik menjadi 6.

      • Mesin Penyimpanan: Mesin penyimpanan topik dapat diatur ke Penyimpanan Lokal atau Penyimpanan Cloud.

      connect-error-kafka-fc-sink

    3. Di langkah Configure Destination Service, pilih Function Compute sebagai layanan tujuan, atur parameter yang dijelaskan dalam tabel berikut, lalu klik Create.

      Catatan

      Jika instance tempat konektor sink Function Compute miliknya berada di wilayah China (Hangzhou) atau China (Chengdu), pesan Service Authorization akan muncul saat Anda memilih Function Compute sebagai layanan tujuan. Peran layanan berikut secara otomatis dibuat jika Anda klik OK: AliyunServiceRoleForEventBridgeSourceKafka dan AliyunServiceRoleForEventBridgeSourceKafka. Klik OK di pesan Service Authorization, atur parameter yang dijelaskan dalam tabel berikut, lalu klik Create. Jika peran layanan telah dibuat, pesan Service Authorization tidak akan muncul.

      Parameter

      Deskripsi

      Contoh

      Cross-account/Cross-region

      Menentukan apakah konektor sink Function Compute menyinkronkan data ke Function Compute lintas akun Alibaba Cloud atau wilayah. Secara default, parameter ini diatur ke No. Nilai yang valid:

      • No: Konektor sink Function Compute menyinkronkan data ke Function Compute dalam wilayah yang sama dan akun Alibaba Cloud yang sama.

      • Yes: Konektor sink Function Compute menyinkronkan data ke Function Compute lintas wilayah tetapi dalam akun Alibaba Cloud yang sama, dalam wilayah yang sama tetapi lintas akun Alibaba Cloud, atau lintas wilayah dan akun Alibaba Cloud.

      Tidak

      Region

      Wilayah tempat Function Compute diaktifkan. Secara default, wilayah tempat konektor sink Function Compute berada dipilih. Untuk menyinkronkan data lintas wilayah, aktifkan akses Internet untuk konektor dan pilih wilayah tujuan. Untuk informasi lebih lanjut, lihat Aktifkan akses Internet untuk konektor sink Function Compute.

      Penting

      Parameter Region hanya ditampilkan jika Anda mengatur parameter Cross-account/Cross-region ke Yes.

      cn-hangzhou

      Service Endpoint

      Titik akhir Function Compute. Di konsol Function Compute, Anda dapat melihat titik akhir Function Compute di bagian Common Info halaman Overview.

      • Titik akhir internal: Kami merekomendasikan agar Anda menggunakan titik akhir internal untuk latensi yang lebih rendah. Titik akhir internal dapat digunakan jika instance ApsaraMQ for Kafka dan Function Compute berada di wilayah yang sama.

      • Titik akhir publik: Kami merekomendasikan agar Anda tidak menggunakan titik akhir publik karena latensi yang lebih tinggi. Titik akhir publik dapat digunakan jika instance ApsaraMQ for Kafka dan Function Compute berada di wilayah yang berbeda. Untuk menggunakan titik akhir publik, Anda harus mengaktifkan akses Internet untuk konektor. Untuk informasi lebih lanjut, lihat Aktifkan akses Internet untuk konektor sink Function Compute.

      Penting

      Parameter Service Endpoint hanya ditampilkan jika Anda mengatur parameter Cross-account/Cross-region menjadi Yes.

      http://188***.cn-hangzhou.fc.aliyuncs.com

      Alibaba Cloud Account

      ID akun Alibaba Cloud yang digunakan untuk masuk ke Function Compute. Di konsol Function Compute, Anda dapat melihat ID akun Alibaba Cloud di bagian Common Info halaman Overview.

      Penting

      Parameter Alibaba Cloud Account hanya ditampilkan jika Anda mengatur parameter Cross-account/Cross-region menjadi Yes.

      188***

      RAM Role Name

      Nama peran RAM yang diasumsikan oleh ApsaraMQ for Kafka untuk mengakses Function Compute.

      • Jika Anda tidak perlu menyinkronkan data lintas akun Alibaba Cloud, Anda harus membuat peran RAM dan memberikan izin spesifik kepada peran RAM dalam akun Alibaba Cloud saat ini. Lalu, masukkan nama peran RAM. Untuk informasi lebih lanjut, lihat Buat kebijakan kustom., Buat peran RAM, dan Berikan izin kepada peran RAM.

      • Jika Anda perlu menyinkronkan data lintas akun Alibaba Cloud, Anda harus membuat peran RAM menggunakan akun Alibaba Cloud tempat Anda ingin menyinkronkan data. Lalu, berikan izin spesifik kepada peran RAM dan masukkan nama peran RAM. Untuk informasi lebih lanjut, lihat Buat kebijakan kustom., Buat peran RAM, dan Berikan izin kepada peran RAM.

      Penting

      Parameter RAM Role Name ditampilkan hanya jika Anda mengatur parameter Cross-account/Cross-region ke Yes.

      AliyunKafkaConnectorRole

      Service Name

      Nama layanan di Function Compute.

      guide-hello_world

      Function Name

      Nama fungsi dalam layanan di Function Compute.

      hello_world

      Version or Alias

      Versi atau alias layanan di Function Compute.

      Penting
      • Anda harus mengatur parameter ini ke Specified Version atau Specified Alias jika Anda mengatur parameter Cross-account/Cross-region ke No.

      • Anda harus menentukan versi layanan atau alias jika Anda mengatur parameter Cross-account/Cross-region ke Yes.

      LATEST

      Service Version

      Versi layanan di Function Compute.

      Penting

      Parameter Service Version ditampilkan hanya jika Anda mengatur parameter Cross-account/Cross-region ke No dan parameter Version or Alias ke Specified Version.

      LATEST

      Service Alias

      Alias layanan di Function Compute.

      Penting

      Parameter Service Alias ditampilkan hanya jika Anda mengatur parameter Cross-account/Cross-region ke No dan parameter Version or Alias ke Specified Alias.

      jy

      Transmission Mode

      Mode pengiriman pesan. Nilai yang valid:

      • Asynchronous: direkomendasikan.

      • Synchronous: tidak direkomendasikan. Dalam mode sinkron, jika Function Compute memproses pesan dalam waktu lama, ApsaraMQ for Kafka menunggu Function Compute menyelesaikan pemrosesan. Jika satu batch pesan gagal diproses dalam 5 menit, klien ApsaraMQ for Kafka akan menyeimbangkan ulang lalu lintas.

      Asynchronous

      Data Size

      Jumlah maksimum pesan yang dapat dikirim sekaligus. Nilai default: 20. Konektor menggabungkan pesan untuk dikirim secara bersamaan berdasarkan jumlah maksimum pesan dan ukuran maksimum pesan yang diizinkan dalam satu permintaan. Ukuran pesan agregat tidak boleh melebihi 6 MB dalam mode sinkron dan 128 KB dalam mode asinkron. Sebagai contoh, pesan dikirim dalam mode asinkron, dan hingga 20 pesan dapat dikirim sekaligus. Anda ingin mengirim 18 pesan, di antaranya 17 pesan memiliki total ukuran 127 KB, dan satu pesan berukuran 200 KB. Dalam kasus ini, konektor menggabungkan 17 pesan menjadi satu batch dan mengirim batch ini terlebih dahulu, kemudian mengirim pesan tersisa yang ukurannya lebih dari 128 KB.

      Catatan

      Jika Anda menetapkan parameter key ke null saat mengirim pesan, permintaan tidak akan berisi parameter key. Jika Anda menetapkan parameter value ke null, permintaan tidak akan berisi parameter value.

      • Jika ukuran pesan dalam satu batch tidak melebihi ukuran maksimum pesan yang diizinkan dalam satu permintaan, permintaan berisi semua konten pesan. Contoh permintaan:

        [
            {
                "key":"this is the message's key2",
                "offset":8,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325438,
                "topic":"Test",
                "value":"this is the message's value2",
                "valueSize":28
            },
            {
                "key":"this is the message's key9",
                "offset":9,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325440,
                "topic":"Test",
                "value":"this is the message's value9",
                "valueSize":28
            },
            {
                "key":"this is the message's key12",
                "offset":10,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325442,
                "topic":"Test",
                "value":"this is the message's value12",
                "valueSize":29
            },
            {
                "key":"this is the message's key38",
                "offset":11,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325464,
                "topic":"Test",
                "value":"this is the message's value38",
                "valueSize":29
            }
        ]
      • Jika ukuran satu pesan melebihi ukuran maksimum pesan yang diizinkan dalam satu permintaan, permintaan tidak akan berisi konten pesan tersebut. Contoh permintaan:

        [
            {
                "key":"123",
                "offset":4,
                "overflowFlag":true,
                "partition":0,
                "timestamp":1603779578478,
                "topic":"Test",
                "value":"1",
                "valueSize":272687
            }
        ]
        Catatan

        Untuk mendapatkan konten pesan, Anda harus menarik pesan berdasarkan offset-nya.

      50

      Retries

      Jumlah maksimum percobaan ulang yang diizinkan setelah pesan gagal dikirim. Nilai default: 2. Nilai yang valid: 1 hingga 3. Dalam kasus tertentu ketika pesan gagal dikirim, percobaan ulang tidak didukung. Aturan berikut menjelaskan jenis kode kesalahan dan apakah percobaan ulang didukung. Untuk informasi lebih lanjut, lihat Kode kesalahan.

      • 4XX: Percobaan ulang tidak didukung kecuali jika 429 dikembalikan.

      • 5XX: Percobaan ulang didukung.

      Catatan
      • Konektor memanggil operasi InvokeFunction untuk mengirim pesan ke Function Compute.

      • Jika pesan masih gagal dikirim ke Function Compute setelah jumlah maksimum percobaan ulang, pesan tersebut dikirim ke topik antrian pesan gagal. Pesan dalam topik antrian pesan gagal tidak dapat disinkronkan ke Function Compute menggunakan konektor. Kami merekomendasikan agar Anda mengonfigurasi aturan peringatan untuk topik untuk memantau sumber daya topik secara real-time. Dengan cara ini, Anda dapat menyelesaikan masalah pada kesempatan pertama.

      2

      Setelah konektor dibuat, Anda dapat melihat konektor di halaman Connectors.

  6. Pergi ke halaman Connectors, temukan konektor yang Anda buat, lalu klik Deploy di kolom Actions.

    Untuk mengonfigurasi sumber daya Function Compute, pilih More > Configure Function di kolom Actions untuk pergi ke konsol Function Compute dan menyelesaikan konfigurasi.

Kirim pesan uji

Setelah Anda menerapkan konektor sink Function Compute, Anda dapat mengirim pesan ke topik sumber di instance ApsaraMQ for Kafka untuk menguji apakah pesan tersebut dapat disinkronkan ke Function Compute.

  1. Di halaman Connectors, temukan konektor yang ingin Anda kelola dan klik Test di kolom Actions.

  2. Di panel Send Message, konfigurasikan parameter untuk mengirim pesan uji.

    • Jika Anda mengatur parameter Sending Method ke Console, lakukan langkah-langkah berikut:

      1. Di bidang Message Key, masukkan kunci pesan. Contoh: demo.

      2. Di bidang Message Content, masukkan isi pesan. Contoh: {"key": "test"}.

      3. Konfigurasikan parameter Send to Specified Partition untuk menentukan apakah akan mengirim pesan uji ke partisi tertentu.

        • Jika Anda ingin mengirim pesan uji ke partisi tertentu, klik Yes dan masukkan ID partisi di bidang Partition ID. Contoh: 0. Untuk informasi tentang cara memeriksa ID partisi, lihat Lihat Status Partisi.

        • Jika Anda tidak ingin mengirim pesan uji ke partisi tertentu, klik No.

    • Jika Anda mengatur parameter Sending Method ke Docker, jalankan perintah Docker di bagian Run the Docker container to produce a sample message untuk mengirim pesan uji.

    • Jika Anda mengatur parameter Sending Method ke SDK, pilih SDK untuk bahasa pemrograman atau framework yang diperlukan dan metode akses untuk mengirim dan berlangganan pesan uji.

Lihat log fungsi

Setelah Anda mengirim pesan ke topik sumber di instance ApsaraMQ for Kafka, Anda dapat melihat log fungsi untuk memeriksa apakah pesan diterima. Untuk informasi lebih lanjut, lihat Konfigurasikan Logging.

Jika pesan uji yang Anda kirim muncul di log seperti yang ditunjukkan pada gambar berikut, tugas sinkronisasi data berhasil.

fc LOG