全部产品
Search
文档中心

ApsaraMQ for Kafka:Buat Konektor Sink Elasticsearch

更新时间:Jun 28, 2025

Topik ini menjelaskan cara membuat konektor sink Elasticsearch untuk mengekspor data dari topik pada instance ApsaraMQ for Kafka ke Elasticsearch.

Prasyarat

Untuk informasi tentang prasyarat, lihat Prasyarat.

Langkah 1: Buat Sumber Daya Elasticsearch

Langkah 2: Buat Konektor Sink Elasticsearch

  1. Masuk ke Konsol ApsaraMQ for Kafka. Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.

  2. Di panel navigasi sisi kiri, pilih Connector Ecosystem Integration > Tasks.

  3. Di halaman Tasks, klik Create Task.

  4. Di halaman Create Task, konfigurasikan parameter Task Name dan Description. Kemudian, ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya.

    • Pembuatan Tugas

      1. Di langkah Source, atur parameter Data Provider menjadi ApsaraMQ for Kafka dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Next Step. Tabel berikut menjelaskan parameter-parameter tersebut.

        Parameter

        Deskripsi

        Contoh

        Region

        Wilayah tempat instance ApsaraMQ for Kafka berada.

        Cina (Hangzhou)

        ApsaraMQ for Kafka Instance

        ID instance ApsaraMQ for Kafka tempat data yang ingin Anda rutekan diproduksi.

        alikafka_post-cn-9hdsbdhd****

        Topic

        Topik pada instance ApsaraMQ for Kafka tempat data yang ingin Anda rutekan diproduksi.

        guide-sink-topic

        Group ID

        ID grup pada instance ApsaraMQ for Kafka tempat data yang ingin Anda rutekan diproduksi.

        • Quickly Create: Sistem secara otomatis membuat grup dengan ID dalam format GID_EVENTBRIDGE_xxx.

        • Use Existing Group: Pilih ID grup yang ada yang tidak sedang digunakan. Jika Anda memilih grup yang ada yang sedang digunakan, publikasi dan langganan pesan yang ada akan terpengaruh.

        Gunakan Grup yang Ada

        Consumer Offset

        • Latest Offset: Pesan dikonsumsi dari offset terbaru.

        • Earliest Offset: Pesan dikonsumsi dari offset paling awal.

        Offset Terbaru

        Network Configuration

        Jika diperlukan transmisi data lintas batas, pilih Self-managed Internet. Dalam kasus lain, pilih Basic Network.

        Jaringan Dasar

        Data Format

        Fitur format data digunakan untuk mengkodekan data biner yang dikirim dari sumber ke dalam format data tertentu. Beberapa format data didukung. Jika Anda tidak memiliki persyaratan khusus untuk pengkodean, tentukan Json sebagai nilainya.

        • Json: Data biner dikodekan menjadi data berformat JSON berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload. Ini adalah nilai default.

        • Teks: Data biner dikodekan menjadi string berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload.

        • Biner: Data biner dikodekan menjadi string berdasarkan pengkodean Base64 dan kemudian dimasukkan ke dalam payload.

        Json

        Messages

        Jumlah maksimum pesan yang dapat dikirim dalam setiap pemanggilan fungsi. Permintaan hanya dikirim ketika jumlah pesan dalam backlog mencapai nilai yang ditentukan. Nilai valid: 1 hingga 10000.

        2.000

        Interval (Unit: Seconds)

        Interval waktu saat fungsi dipanggil. Sistem mengirimkan pesan agregat ke Function Compute pada interval waktu yang ditentukan. Nilai valid: 0 hingga 15. Unit: detik. Nilai 0 menunjukkan bahwa pesan dikirim segera setelah agregasi.

        3

      2. Di langkah Filtering, definisikan pola data untuk menyaring data. Untuk informasi lebih lanjut, lihat Penyaringan Pesan.

      3. Di langkah Transformation, tentukan metode pembersihan data untuk mengimplementasikan kemampuan pemrosesan data seperti pemisahan, pemetaan, pengayaan, dan perutean dinamis. Untuk informasi lebih lanjut, lihat Gunakan Function Compute untuk melakukan pembersihan pesan.

      4. Di langkah Sink, atur parameter Service Type menjadi Elasticsearch acs.elasticSearch dan konfigurasikan parameter lainnya. Tabel berikut menjelaskan parameter-parameter tersebut.

        Parameter

        Deskripsi

        Contoh

        Elasticsearch Cluster

        Cluster Alibaba Cloud Elasticsearch yang Anda buat.

        es-cn-pe336j0gj001e****

        Cluster Logon Name

        Nama masuk yang Anda tentukan saat membuat cluster Elasticsearch. Nama masuk default: elastic.

        elastic

        Cluster Logon Password

        Kata sandi yang Anda tentukan saat membuat cluster Elasticsearch.

        ******

        Index Name

        Nama indeks yang Anda buat. Untuk informasi tentang cara membuat indeks, lihat Langkah 3: Buat Indeks. Konstanta string atau variabel yang diekstraksi menggunakan sintaks JSONPath didukung. Contoh: product_info dan $.data.key.

        product_info

        Document Type

        Tipe dokumen. Konstanta string atau variabel yang diekstraksi menggunakan sintaks JSONPath didukung.

        Contoh: _doc dan $.data.key.

        Catatan

        Parameter ini hanya tersedia jika versi instance Elasticsearch lebih awal dari 7. Nilai default parameter ini adalah _doc.

        _doc

        Document

        Tentukan apakah akan mengirimkan semua konten suatu event atau konten spesifik suatu event ke Elasticsearch. Jika Anda memilih Sebagian Event untuk parameter ini, Anda harus menentukan aturan ekstraksi JSONPath.

        Event Lengkap

        Network Settings

        • VPC: Pesan dalam ApsaraMQ for Kafka dikirim ke Elasticsearch dalam VPC.

        • Internet: Pesan dalam ApsaraMQ for Kafka dikirim ke Elasticsearch melalui Internet.

        Internet

        VPC

        VPC tempat instance Elasticsearch berada. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        vSwitch tempat instance Elasticsearch berada. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.

        vsw-bp1gbjhj53hdjdkg****

        Security Group

        Grup keamanan tempat instance Elasticsearch berada. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.

        test_group

    • Properti Tugas

      Konfigurasikan kebijakan ulang yang ingin Anda gunakan saat event gagal didorong dan metode yang ingin Anda gunakan untuk menangani titik kegagalan. Untuk informasi lebih lanjut, lihat Kebijakan Ulang dan Antrian Pesan Gagal.

  5. Klik Save. Di halaman Tasks, temukan konektor sink Elasticsearch yang Anda buat. Saat status di kolom Status berubah dari Starting menjadi Running, konektor telah dibuat.

Langkah 3: Uji Konektor Sink Elasticsearch

  1. Di halaman Tasks, temukan konektor sink Elasticsearch yang Anda buat dan klik topik sumber di kolom Event Source.

  2. Di halaman Detail Topik, klik Send Message.
  3. Di panel Start to Send and Consume Message, konfigurasikan parameter berdasarkan gambar berikut dan klik OK.

    发送消息

  4. Masuk ke Konsol Elasticsearch dan gunakan Kibana untuk mengakses instance. Untuk informasi lebih lanjut, lihat Memulai.

  5. Di konsol Kibana cluster Elasticsearch, jalankan perintah berikut untuk melihat hasil penyisipan data.

    GET /{Nama indeks}/_search

    Gambar berikut menunjukkan hasil penyisipan data.测试结果