All Products
Search
Document Center

ApsaraMQ for Kafka:Buat konektor sink Elasticsearch

Last Updated:Mar 11, 2026

Konektor sink Elasticsearch membaca pesan dari sebuah topik di instans ApsaraMQ for Kafka Anda dan menuliskannya ke indeks Elasticsearch. Konektor ini menggunakan Function Compute sebagai perantara: mengonsumsi pesan dari topik sumber, meneruskannya ke fungsi Function Compute, lalu fungsi tersebut menuliskan pesan ke Elasticsearch melalui Bulk API. Setiap pesan menjadi dokumen dalam indeks target, dilengkapi metadata seperti topik, partisi, offset, dan timestamp.

Sebelum memulai

Lengkapi penyiapan berikut sebelum membuat konektor.

ApsaraMQ for Kafka

Function Compute

Elasticsearch

Informasi yang perlu dikumpulkan

Kumpulkan detail berikut sebelum memulai wizard.

InformasiTempat menemukannyaContoh
ID instans ElasticsearchKonsol Elasticsearches-cn-oew1o67x0000****
Titik akhir Elasticsearch (publik atau pribadi)Informasi dasar klusteres-cn-oew1o67x0000****.elasticsearch.aliyuncs.com
Port ElasticsearchInformasi dasar kluster9200 (HTTP/HTTPS) atau 9300 (TCP)
Username dan password ElasticsearchDitetapkan saat pembuatan kluster; atur ulang jika diperlukanelastic / ********
Nama indeks ElasticsearchKonsol Elasticsearchelastic_test
Nama topik sumberKonsol ApsaraMQ for Kafkaelasticsearch-test-input

Batasan

  • Instans ApsaraMQ for Kafka dan kluster Elasticsearch harus berada di wilayah yang sama.

  • ApsaraMQ for Kafka melakukan serialisasi pesan sebagai string UTF-8. Data biner tidak didukung.

  • Jika Anda menentukan titik akhir pribadi kluster Elasticsearch, Function Compute tidak dapat mengaksesnya secara default. Untuk mengaktifkan konektivitas, konfigurasikan layanan Function Compute agar menggunakan VPC dan vSwitch yang sama dengan kluster Elasticsearch. Lihat Konfigurasikan layanan Function Compute.

  • Untuk batasan konektor tambahan, lihat Batasan.

Penagihan

Konektor menggunakan Function Compute untuk mengekspor data. Function Compute menyediakan tier gratis. Penggunaan di luar tier gratis ditagih sesuai dengan penagihan Function Compute.

Buat dan deploy konektor

  1. Masuk ke Konsol ApsaraMQ for Kafka.

  2. Pada bagian Resource Distribution di halaman Overview, pilih wilayah Anda.

  3. Di panel navigasi sebelah kiri, klik Connectors.

  4. Di halaman Connectors, pilih instans Anda dari daftar drop-down Select Instance dan klik Create Connector.

Langkah 1: Konfigurasikan informasi dasar

Pada langkah Configure Basic Information, tetapkan nama konektor dan tinjau detail instans.

ParameterDeskripsiContoh
NameNama unik dalam instans ApsaraMQ for Kafka. Gunakan 1 hingga 48 karakter: angka, huruf kecil, dan tanda hubung (-). Tidak boleh dimulai dengan tanda hubung. Konektor secara otomatis membuat kelompok konsumen bernama connect-<connector-name>.kafka-elasticsearch-sink
InstanceMenampilkan nama dan ID instans saat ini.demo alikafka_post-cn-st21p8vj****
Penting

Secara default, opsi Authorize to Create Service Linked Role dipilih. ApsaraMQ for Kafka membuat peran terkait layanan jika belum ada.

Klik Next.

Langkah 2: Konfigurasikan layanan sumber

Pada langkah Configure Source Service, pilih Message Queue for Apache Kafka sebagai layanan sumber dan konfigurasikan parameter berikut.

ParameterDeskripsiContoh
Data Source TopicTopik tempat data dikonsumsi.elasticsearch-test-input
Consumer Thread ConcurrencyJumlah thread konsumen konkuren. Nilai valid: 1, 2, 3, 6, 12. Default: 6.6
Consumer OffsetPosisi mulai mengonsumsi. Earliest Offset membaca dari awal. Latest Offset hanya membaca pesan baru.Earliest Offset

Klik Configure Runtime Environment untuk memperluas parameter tambahan.

ParameterDeskripsiContoh
VPC IDVPC dari instans sumber. Diisi otomatis; tidak perlu diubah.vpc-bp1xpdnd3l***
vSwitch IDvSwitch dari instans sumber. Harus berada dalam VPC yang sama.vsw-bp1d2jgg81***
Failure Handling PolicyTindakan saat pesan gagal. Continue Subscription mencatat error dan terus mengonsumsi. Stop Subscription mencatat error dan menghentikan partisi. Lihat Kelola konektor untuk detail log dan kode kesalahan untuk troubleshooting.
Catatan
Continue Subscription
Resource Creation MethodAuto membuat topik internal yang diperlukan secara otomatis. Manual memungkinkan Anda membuatnya sendiri.Auto
Connector Consumer GroupKelompok konsumen untuk tugas konektor. Format: connect-<connector-name>.connect-kafka-elasticsearch-sink

Topik internal (hanya untuk pembuatan manual)

Jika Anda menetapkan Resource Creation Method ke Manual, buat topik berikut. Semua topik yang memerlukan Local Storage hanya tersedia pada instans Edisi Profesional.

ParameterKonvensi penamaanPartisiStorage enginecleanup.policy
Task Offset Topicconnect-offset-*Lebih dari 1Local StorageCompact
Task Configuration Topicconnect-config-*Tepat 1Local StorageCompact
Task Status Topicconnect-status-*6 (disarankan)Local StorageCompact
Dead-letter Queue Topicconnect-error-*6 (disarankan)Local Storage atau Cloud Storage--
Error Data Topicconnect-error-*6 (disarankan)Local Storage atau Cloud Storage--
Catatan

Untuk menghemat resource topik, gunakan topik yang sama untuk dead-letter queue topic dan error data topic.

Klik Next.

Langkah 3: Konfigurasikan layanan tujuan

Pada langkah Configure Destination Service, pilih Elasticsearch sebagai layanan tujuan dan konfigurasikan parameter berikut.

ParameterDeskripsiContoh
Elasticsearch Instance IDID kluster Elasticsearch.es-cn-oew1o67x0000****
EndpointTitik akhir publik atau pribadi kluster. Lihat Lihat informasi dasar kluster.es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com
Port9200 untuk HTTP/HTTPS, atau 9300 untuk TCP.9300
UsernameUsername Elasticsearch. Default: elastic. Sesuaikan melalui X-Pack RBAC jika diperlukan. Akun harus memiliki izin menulis pada indeks target.elastic
PasswordPassword yang ditetapkan saat pembuatan kluster. Atur ulang password jika lupa.********
IndexNama indeks target Elasticsearch.elastic_test
Catatan
  • Username dan password diteruskan ke Function Compute sebagai variabel lingkungan saat tugas konektor dibuat. ApsaraMQ for Kafka tidak menyimpan kredensial ini setelah tugas dibuat.
  • Akun harus memiliki izin untuk menulis ke indeks, karena pesan dikirim melalui Elasticsearch Bulk API.

Klik Create.

Deploy konektor

Setelah dibuat, konektor muncul di halaman Connectors. Klik Deploy di kolom Actions untuk menjalankan konektor.

Konfigurasikan layanan Function Compute

Setelah Anda mendeploy konektor, Function Compute secara otomatis membuat layanan bernama kafka-service-<connector-name>-<random-string>. Jika kluster Elasticsearch menggunakan titik akhir pribadi, konfigurasikan layanan Function Compute agar menggunakan VPC dan vSwitch yang sama dengan kluster Elasticsearch.

  1. Di halaman Connectors, temukan konektor tersebut. Di kolom Actions, pilih More > Configure Function.

  2. Di Konsol Function Compute, temukan layanan yang dibuat otomatis dan perbarui pengaturan VPC dan vSwitch agar sesuai dengan kluster Elasticsearch.

Verifikasi alur data

Kirim pesan uji untuk memastikan data mengalir dari ApsaraMQ for Kafka ke Elasticsearch.

Kirim pesan uji

  1. Di halaman Connectors, temukan konektor dan klik Test di kolom Actions.

  2. Di panel Send Message, atur Method of Sending ke Console.

  3. Di bidang Message Key, masukkan kunci, misalnya demo.

  4. Di bidang Message Content, masukkan isi JSON, misalnya:

       {"key": "test"}
  5. Untuk Send to Specified Partition, klik Yes dan masukkan Partition ID (misalnya, 0) untuk menargetkan partisi tertentu, atau klik No agar sistem menentukan sendiri. Untuk mencari tahu ID partisi, lihat Lihat status partisi.

Anda juga dapat mengirim pesan uji melalui Docker atau SDK. Pilih opsi yang sesuai di bidang Method of Sending dan ikuti petunjuk di layar.

Periksa indeks Elasticsearch

  1. Masuk ke konsol Kibana.

  2. Jalankan kueri berikut untuk mencari indeks target:

       GET /<index_name>/_search
  3. Pastikan tanggapan berisi pesan yang Anda kirim. Tanggapan sukses terlihat seperti ini:

       {
         "took": 8,
         "timed_out": false,
         "_shards": {
           "total": 5,
           "successful": 5,
           "skipped": 0,
           "failed": 0
         },
         "hits": {
           "total": {
             "value": 1,
             "relation": "eq"
           },
           "max_score": 1.0,
           "hits": [
             {
               "_index": "product_****",
               "_type": "_doc",
               "_id": "TX3TZHgBfHNEDGoZ****",
               "_score": 1.0,
               "_source": {
                 "msg_body": {
                   "key": "test",
                   "offset": 2,
                   "overflowFlag": false,
                   "partition": 2,
                   "timestamp": 1616599282417,
                   "topic": "dv****",
                   "value": "test1",
                   "valueSize": 8
                 },
                 "doc_as_upsert": true
               }
             }
           ]
         }
       }

Troubleshooting

GejalaKemungkinan penyebabResolusi
Konektor gagal menulis ke ElasticsearchFunction Compute tidak dapat menjangkau kluster ElasticsearchVerifikasi bahwa layanan Function Compute menggunakan VPC dan vSwitch yang sama dengan kluster Elasticsearch. Lihat Konfigurasikan layanan Function Compute.
Pesan tidak dikonsumsiPengaturan offset konsumen salahPeriksa pengaturan Consumer Offset. Gunakan Earliest Offset untuk membaca semua pesan yang ada, atau Latest Offset hanya untuk pesan baru.
Error autentikasiKredensial tidak valid atau izin tidak mencukupiPastikan username dan password benar serta akun memiliki izin menulis pada indeks target.

Untuk log pemanggilan fungsi Function Compute, lihat Konfigurasikan fitur logging.