Topik ini menjelaskan cara membuat konektor sink Elasticsearch untuk mengekspor data dari topik pada instance ApsaraMQ for Kafka ke Elasticsearch.
Prasyarat
Untuk informasi tentang prasyarat, lihat Prasyarat.
Langkah 1: Buat Sumber Daya Elasticsearch
Buat instance dan indeks di Konsol Elasticsearch. Untuk informasi lebih lanjut, lihat Memulai.
Tambahkan blok CIDR dari titik akhir yang digunakan untuk mengakses Function Compute ke daftar putih alamat IP cluster Elasticsearch. Operasi ini hanya diperlukan jika Anda mengakses Elasticsearch dalam virtual private cloud (VPC). Untuk informasi tentang cara mengonfigurasi daftar putih, lihat Konfigurasikan Daftar Putih Alamat IP Publik atau Pribadi untuk Cluster Elasticsearch.
Langkah 2: Buat Konektor Sink Elasticsearch
Masuk ke Konsol ApsaraMQ for Kafka. Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Di panel navigasi sisi kiri, pilih .
Di halaman Tasks, klik Create Task.
Di halaman Create Task, konfigurasikan parameter Task Name dan Description. Kemudian, ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya.
Pembuatan Tugas
Di langkah Source, atur parameter Data Provider menjadi ApsaraMQ for Kafka dan ikuti petunjuk di layar untuk mengonfigurasi parameter lainnya. Lalu, klik Next Step. Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter
Deskripsi
Contoh
Region
Wilayah tempat instance ApsaraMQ for Kafka berada.
Cina (Hangzhou)
ApsaraMQ for Kafka Instance
ID instance ApsaraMQ for Kafka tempat data yang ingin Anda rutekan diproduksi.
alikafka_post-cn-9hdsbdhd****
Topic
Topik pada instance ApsaraMQ for Kafka tempat data yang ingin Anda rutekan diproduksi.
guide-sink-topic
Group ID
ID grup pada instance ApsaraMQ for Kafka tempat data yang ingin Anda rutekan diproduksi.
Quickly Create: Sistem secara otomatis membuat grup dengan ID dalam format GID_EVENTBRIDGE_xxx.
Use Existing Group: Pilih ID grup yang ada yang tidak sedang digunakan. Jika Anda memilih grup yang ada yang sedang digunakan, publikasi dan langganan pesan yang ada akan terpengaruh.
Gunakan Grup yang Ada
Consumer Offset
Latest Offset: Pesan dikonsumsi dari offset terbaru.
Earliest Offset: Pesan dikonsumsi dari offset paling awal.
Offset Terbaru
Network Configuration
Jika diperlukan transmisi data lintas batas, pilih Self-managed Internet. Dalam kasus lain, pilih Basic Network.
Jaringan Dasar
Data Format
Fitur format data digunakan untuk mengkodekan data biner yang dikirim dari sumber ke dalam format data tertentu. Beberapa format data didukung. Jika Anda tidak memiliki persyaratan khusus untuk pengkodean, tentukan Json sebagai nilainya.
Json: Data biner dikodekan menjadi data berformat JSON berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload. Ini adalah nilai default.
Teks: Data biner dikodekan menjadi string berdasarkan pengkodean UTF-8 dan kemudian dimasukkan ke dalam payload.
Biner: Data biner dikodekan menjadi string berdasarkan pengkodean Base64 dan kemudian dimasukkan ke dalam payload.
Json
Messages
Jumlah maksimum pesan yang dapat dikirim dalam setiap pemanggilan fungsi. Permintaan hanya dikirim ketika jumlah pesan dalam backlog mencapai nilai yang ditentukan. Nilai valid: 1 hingga 10000.
2.000
Interval (Unit: Seconds)
Interval waktu saat fungsi dipanggil. Sistem mengirimkan pesan agregat ke Function Compute pada interval waktu yang ditentukan. Nilai valid: 0 hingga 15. Unit: detik. Nilai 0 menunjukkan bahwa pesan dikirim segera setelah agregasi.
3
Di langkah Filtering, definisikan pola data untuk menyaring data. Untuk informasi lebih lanjut, lihat Penyaringan Pesan.
Di langkah Transformation, tentukan metode pembersihan data untuk mengimplementasikan kemampuan pemrosesan data seperti pemisahan, pemetaan, pengayaan, dan perutean dinamis. Untuk informasi lebih lanjut, lihat Gunakan Function Compute untuk melakukan pembersihan pesan.
Di langkah Sink, atur parameter Service Type menjadi Elasticsearch acs.elasticSearch dan konfigurasikan parameter lainnya. Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter
Deskripsi
Contoh
Elasticsearch Cluster
Cluster Alibaba Cloud Elasticsearch yang Anda buat.
es-cn-pe336j0gj001e****
Cluster Logon Name
Nama masuk yang Anda tentukan saat membuat cluster Elasticsearch. Nama masuk default: elastic.
elastic
Cluster Logon Password
Kata sandi yang Anda tentukan saat membuat cluster Elasticsearch.
******
Index Name
Nama indeks yang Anda buat. Untuk informasi tentang cara membuat indeks, lihat Langkah 3: Buat Indeks. Konstanta string atau variabel yang diekstraksi menggunakan sintaks JSONPath didukung. Contoh: product_info dan $.data.key.
product_info
Document Type
Tipe dokumen. Konstanta string atau variabel yang diekstraksi menggunakan sintaks JSONPath didukung.
Contoh: _doc dan $.data.key.
CatatanParameter ini hanya tersedia jika versi instance Elasticsearch lebih awal dari 7. Nilai default parameter ini adalah _doc.
_doc
Document
Tentukan apakah akan mengirimkan semua konten suatu event atau konten spesifik suatu event ke Elasticsearch. Jika Anda memilih Sebagian Event untuk parameter ini, Anda harus menentukan aturan ekstraksi JSONPath.
Event Lengkap
Network Settings
VPC: Pesan dalam ApsaraMQ for Kafka dikirim ke Elasticsearch dalam VPC.
Internet: Pesan dalam ApsaraMQ for Kafka dikirim ke Elasticsearch melalui Internet.
Internet
VPC
VPC tempat instance Elasticsearch berada. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.
vpc-bp17fapfdj0dwzjkd****
vSwitch
vSwitch tempat instance Elasticsearch berada. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.
vsw-bp1gbjhj53hdjdkg****
Security Group
Grup keamanan tempat instance Elasticsearch berada. Parameter ini hanya diperlukan jika Anda mengatur parameter Network Settings menjadi VPC.
test_group
Properti Tugas
Konfigurasikan kebijakan ulang yang ingin Anda gunakan saat event gagal didorong dan metode yang ingin Anda gunakan untuk menangani titik kegagalan. Untuk informasi lebih lanjut, lihat Kebijakan Ulang dan Antrian Pesan Gagal.
Klik Save. Di halaman Tasks, temukan konektor sink Elasticsearch yang Anda buat. Saat status di kolom Status berubah dari Starting menjadi Running, konektor telah dibuat.
Langkah 3: Uji Konektor Sink Elasticsearch
Di halaman Tasks, temukan konektor sink Elasticsearch yang Anda buat dan klik topik sumber di kolom Event Source.
- Di halaman Detail Topik, klik Send Message.
Di panel Start to Send and Consume Message, konfigurasikan parameter berdasarkan gambar berikut dan klik OK.

Masuk ke Konsol Elasticsearch dan gunakan Kibana untuk mengakses instance. Untuk informasi lebih lanjut, lihat Memulai.
Di konsol Kibana cluster Elasticsearch, jalankan perintah berikut untuk melihat hasil penyisipan data.
GET /{Nama indeks}/_searchGambar berikut menunjukkan hasil penyisipan data.
