Konektor sink Elasticsearch membaca pesan dari sebuah topik di instans ApsaraMQ for Kafka Anda dan menuliskannya ke indeks Elasticsearch. Konektor ini menggunakan Function Compute sebagai perantara: mengonsumsi pesan dari topik sumber, meneruskannya ke fungsi Function Compute, lalu fungsi tersebut menuliskan pesan ke Elasticsearch melalui Bulk API. Setiap pesan menjadi dokumen dalam indeks target, dilengkapi metadata seperti topik, partisi, offset, dan timestamp.
Sebelum memulai
Lengkapi penyiapan berikut sebelum membuat konektor.
ApsaraMQ for Kafka
Aktifkan fitur konektor untuk instans Anda.
Buat topik yang akan digunakan sebagai sumber data.
Function Compute
Elasticsearch
Buat kluster dan indeks Elasticsearch di Konsol Elasticsearch. Gunakan versi 7.0 atau lebih baru agar kompatibel dengan klien Function Compute (versi 7.7.0).
Tambahkan Blok CIDR titik akhir Function Compute ke daftar putih Elasticsearch. Untuk pengujian awal, tentukan
0.0.0.0/0untuk mengizinkan semua alamat IP dalam VPC, lalu batasi rentang tersebut setelah konektivitas diverifikasi.
Informasi yang perlu dikumpulkan
Kumpulkan detail berikut sebelum memulai wizard.
| Informasi | Tempat menemukannya | Contoh |
|---|---|---|
| ID instans Elasticsearch | Konsol Elasticsearch | es-cn-oew1o67x0000**** |
| Titik akhir Elasticsearch (publik atau pribadi) | Informasi dasar kluster | es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com |
| Port Elasticsearch | Informasi dasar kluster | 9200 (HTTP/HTTPS) atau 9300 (TCP) |
| Username dan password Elasticsearch | Ditetapkan saat pembuatan kluster; atur ulang jika diperlukan | elastic / ******** |
| Nama indeks Elasticsearch | Konsol Elasticsearch | elastic_test |
| Nama topik sumber | Konsol ApsaraMQ for Kafka | elasticsearch-test-input |
Batasan
Instans ApsaraMQ for Kafka dan kluster Elasticsearch harus berada di wilayah yang sama.
ApsaraMQ for Kafka melakukan serialisasi pesan sebagai string UTF-8. Data biner tidak didukung.
Jika Anda menentukan titik akhir pribadi kluster Elasticsearch, Function Compute tidak dapat mengaksesnya secara default. Untuk mengaktifkan konektivitas, konfigurasikan layanan Function Compute agar menggunakan VPC dan vSwitch yang sama dengan kluster Elasticsearch. Lihat Konfigurasikan layanan Function Compute.
Untuk batasan konektor tambahan, lihat Batasan.
Penagihan
Konektor menggunakan Function Compute untuk mengekspor data. Function Compute menyediakan tier gratis. Penggunaan di luar tier gratis ditagih sesuai dengan penagihan Function Compute.
Buat dan deploy konektor
Masuk ke Konsol ApsaraMQ for Kafka.
Pada bagian Resource Distribution di halaman Overview, pilih wilayah Anda.
Di panel navigasi sebelah kiri, klik Connectors.
Di halaman Connectors, pilih instans Anda dari daftar drop-down Select Instance dan klik Create Connector.
Langkah 1: Konfigurasikan informasi dasar
Pada langkah Configure Basic Information, tetapkan nama konektor dan tinjau detail instans.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| Name | Nama unik dalam instans ApsaraMQ for Kafka. Gunakan 1 hingga 48 karakter: angka, huruf kecil, dan tanda hubung (-). Tidak boleh dimulai dengan tanda hubung. Konektor secara otomatis membuat kelompok konsumen bernama connect-<connector-name>. | kafka-elasticsearch-sink |
| Instance | Menampilkan nama dan ID instans saat ini. | demo alikafka_post-cn-st21p8vj**** |
Secara default, opsi Authorize to Create Service Linked Role dipilih. ApsaraMQ for Kafka membuat peran terkait layanan jika belum ada.
Klik Next.
Langkah 2: Konfigurasikan layanan sumber
Pada langkah Configure Source Service, pilih Message Queue for Apache Kafka sebagai layanan sumber dan konfigurasikan parameter berikut.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| Data Source Topic | Topik tempat data dikonsumsi. | elasticsearch-test-input |
| Consumer Thread Concurrency | Jumlah thread konsumen konkuren. Nilai valid: 1, 2, 3, 6, 12. Default: 6. | 6 |
| Consumer Offset | Posisi mulai mengonsumsi. Earliest Offset membaca dari awal. Latest Offset hanya membaca pesan baru. | Earliest Offset |
Klik Configure Runtime Environment untuk memperluas parameter tambahan.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| VPC ID | VPC dari instans sumber. Diisi otomatis; tidak perlu diubah. | vpc-bp1xpdnd3l*** |
| vSwitch ID | vSwitch dari instans sumber. Harus berada dalam VPC yang sama. | vsw-bp1d2jgg81*** |
| Failure Handling Policy | Tindakan saat pesan gagal. Continue Subscription mencatat error dan terus mengonsumsi. Stop Subscription mencatat error dan menghentikan partisi. Lihat Kelola konektor untuk detail log dan kode kesalahan untuk troubleshooting. Catatan
| Continue Subscription |
| Resource Creation Method | Auto membuat topik internal yang diperlukan secara otomatis. Manual memungkinkan Anda membuatnya sendiri. | Auto |
| Connector Consumer Group | Kelompok konsumen untuk tugas konektor. Format: connect-<connector-name>. | connect-kafka-elasticsearch-sink |
Topik internal (hanya untuk pembuatan manual)
Jika Anda menetapkan Resource Creation Method ke Manual, buat topik berikut. Semua topik yang memerlukan Local Storage hanya tersedia pada instans Edisi Profesional.
| Parameter | Konvensi penamaan | Partisi | Storage engine | cleanup.policy |
|---|---|---|---|---|
| Task Offset Topic | connect-offset-* | Lebih dari 1 | Local Storage | Compact |
| Task Configuration Topic | connect-config-* | Tepat 1 | Local Storage | Compact |
| Task Status Topic | connect-status-* | 6 (disarankan) | Local Storage | Compact |
| Dead-letter Queue Topic | connect-error-* | 6 (disarankan) | Local Storage atau Cloud Storage | -- |
| Error Data Topic | connect-error-* | 6 (disarankan) | Local Storage atau Cloud Storage | -- |
Untuk menghemat resource topik, gunakan topik yang sama untuk dead-letter queue topic dan error data topic.
Klik Next.
Langkah 3: Konfigurasikan layanan tujuan
Pada langkah Configure Destination Service, pilih Elasticsearch sebagai layanan tujuan dan konfigurasikan parameter berikut.
| Parameter | Deskripsi | Contoh |
|---|---|---|
| Elasticsearch Instance ID | ID kluster Elasticsearch. | es-cn-oew1o67x0000**** |
| Endpoint | Titik akhir publik atau pribadi kluster. Lihat Lihat informasi dasar kluster. | es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com |
| Port | 9200 untuk HTTP/HTTPS, atau 9300 untuk TCP. | 9300 |
| Username | Username Elasticsearch. Default: elastic. Sesuaikan melalui X-Pack RBAC jika diperlukan. Akun harus memiliki izin menulis pada indeks target. | elastic |
| Password | Password yang ditetapkan saat pembuatan kluster. Atur ulang password jika lupa. | ******** |
| Index | Nama indeks target Elasticsearch. | elastic_test |
- Username dan password diteruskan ke Function Compute sebagai variabel lingkungan saat tugas konektor dibuat. ApsaraMQ for Kafka tidak menyimpan kredensial ini setelah tugas dibuat.
- Akun harus memiliki izin untuk menulis ke indeks, karena pesan dikirim melalui Elasticsearch Bulk API.
Klik Create.
Deploy konektor
Setelah dibuat, konektor muncul di halaman Connectors. Klik Deploy di kolom Actions untuk menjalankan konektor.
Konfigurasikan layanan Function Compute
Setelah Anda mendeploy konektor, Function Compute secara otomatis membuat layanan bernama kafka-service-<connector-name>-<random-string>. Jika kluster Elasticsearch menggunakan titik akhir pribadi, konfigurasikan layanan Function Compute agar menggunakan VPC dan vSwitch yang sama dengan kluster Elasticsearch.
Di halaman Connectors, temukan konektor tersebut. Di kolom Actions, pilih More > Configure Function.
Di Konsol Function Compute, temukan layanan yang dibuat otomatis dan perbarui pengaturan VPC dan vSwitch agar sesuai dengan kluster Elasticsearch.
Verifikasi alur data
Kirim pesan uji untuk memastikan data mengalir dari ApsaraMQ for Kafka ke Elasticsearch.
Kirim pesan uji
Di halaman Connectors, temukan konektor dan klik Test di kolom Actions.
Di panel Send Message, atur Method of Sending ke Console.
Di bidang Message Key, masukkan kunci, misalnya
demo.Di bidang Message Content, masukkan isi JSON, misalnya:
{"key": "test"}Untuk Send to Specified Partition, klik Yes dan masukkan Partition ID (misalnya,
0) untuk menargetkan partisi tertentu, atau klik No agar sistem menentukan sendiri. Untuk mencari tahu ID partisi, lihat Lihat status partisi.
Anda juga dapat mengirim pesan uji melalui Docker atau SDK. Pilih opsi yang sesuai di bidang Method of Sending dan ikuti petunjuk di layar.
Periksa indeks Elasticsearch
Jalankan kueri berikut untuk mencari indeks target:
GET /<index_name>/_searchPastikan tanggapan berisi pesan yang Anda kirim. Tanggapan sukses terlihat seperti ini:
{ "took": 8, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 1.0, "hits": [ { "_index": "product_****", "_type": "_doc", "_id": "TX3TZHgBfHNEDGoZ****", "_score": 1.0, "_source": { "msg_body": { "key": "test", "offset": 2, "overflowFlag": false, "partition": 2, "timestamp": 1616599282417, "topic": "dv****", "value": "test1", "valueSize": 8 }, "doc_as_upsert": true } } ] } }
Troubleshooting
| Gejala | Kemungkinan penyebab | Resolusi |
|---|---|---|
| Konektor gagal menulis ke Elasticsearch | Function Compute tidak dapat menjangkau kluster Elasticsearch | Verifikasi bahwa layanan Function Compute menggunakan VPC dan vSwitch yang sama dengan kluster Elasticsearch. Lihat Konfigurasikan layanan Function Compute. |
| Pesan tidak dikonsumsi | Pengaturan offset konsumen salah | Periksa pengaturan Consumer Offset. Gunakan Earliest Offset untuk membaca semua pesan yang ada, atau Latest Offset hanya untuk pesan baru. |
| Error autentikasi | Kredensial tidak valid atau izin tidak mencukupi | Pastikan username dan password benar serta akun memiliki izin menulis pada indeks target. |
Untuk log pemanggilan fungsi Function Compute, lihat Konfigurasikan fitur logging.