Topik ini menjelaskan cara membuat konektor sink Elasticsearch untuk menyinkronkan data dari topik sumber di instance ApsaraMQ for Kafka Anda ke indeks cluster Elasticsearch.
Prasyarat
Sebelum menyinkronkan data, pastikan persyaratan berikut telah terpenuhi:- ApsaraMQ for Kafka
- Fitur konektor diaktifkan untuk instance ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Aktifkan Fitur Konektor.
- Topik dibuat dalam instance ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Langkah 1: Buat Topik.
- Function Compute
- Function Compute diaktifkan. Untuk informasi lebih lanjut, lihat Aktifkan Function Compute.
- Elasticsearch
- Instance dan indeks dibuat di Konsol Elasticsearch. Untuk informasi lebih lanjut, lihat Memulai.
- Blok CIDR dari titik akhir Function Compute yang digunakan ditambahkan ke daftar putih instance Elasticsearch. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih Alamat IP Publik atau Pribadi untuk Cluster Elasticsearch.
Catatan- Versi klien Elasticsearch yang digunakan oleh Function Compute adalah 7.7.0. Untuk memastikan kompatibilitas, buat cluster Elasticsearch versi 7.0 atau lebih baru.
- Saat mengonfigurasi daftar putih, Anda dapat menentukan 0.0.0.0/0 sebagai blok CIDR, yang menunjukkan bahwa cluster Elasticsearch dapat diakses dari semua alamat IP dalam virtual private cloud (VPC) yang digunakan. Setelah akses berhasil, ubah blok CIDR sesuai kebutuhan.
Catatan penggunaan
- Untuk menyinkronkan data dari ApsaraMQ for Kafka ke Elasticsearch, instance Message Queue for Apache Kafka yang berisi topik sumber dan cluster Elasticsearch harus berada di wilayah yang sama. Message Queue for Apache Kafka pertama-tama menyinkronkan data ke Function Compute, lalu Function Compute menyinkronkan data ke Elasticsearch. Untuk informasi tentang batasan pada konektor, lihat Batasan.
- Konektor sink Elasticsearch mengekspor data menggunakan Function Compute. Function Compute menyediakan sejumlah sumber daya secara gratis. Saat kuota gratis habis, Anda akan dikenakan biaya untuk sumber daya Function Compute yang digunakan berdasarkan aturan penagihan. Untuk informasi lebih lanjut, lihat Ikhtisar Penagihan.
- Function Compute memungkinkan Anda menanyakan log panggilan fungsi untuk memecahkan masalah. Untuk informasi lebih lanjut, lihat Konfigurasikan Logging.
- ApsaraMQ for Kafka mengubah pesan menjadi string yang dikodekan UTF-8 untuk transfer. Message Queue for Apache Kafka tidak mendukung data biner.
- Secara default, jika Anda menentukan titik akhir pribadi cluster Elasticsearch untuk konektor sink Elasticsearch, Function Compute tidak dapat mengakses cluster Elasticsearch. Untuk memastikan konektivitas jaringan, Anda harus menentukan VPC dan vSwitch yang sama dengan cluster Elasticsearch untuk layanan Function Compute terkait di Konsol Function Compute. Untuk informasi lebih lanjut, lihat Perbarui Layanan.
Buat dan terapkan konektor sink Elasticsearch
Masuk ke Konsol ApsaraMQ for Kafka.
Di bagian Resource Distribution halaman Overview, pilih wilayah tempat instance ApsaraMQ for Kafka yang ingin Anda kelola berada.
Di panel navigasi sisi kiri, klik Connectors.
Pada halaman Connectors, pilih instance tempat konektor tersebut milik dari daftar drop-down Select Instance dan klik Create Connector.
- Lakukan operasi berikut untuk menyelesaikan wizard Create Connector.
- Di langkah Configure Basic Information, konfigurasikan parameter dan klik Next. Tabel berikut menjelaskan parameter-parameter tersebut.Penting Secara default, Authorize to Create Service Linked Role dipilih. Ini berarti bahwa ApsaraMQ for Kafka akan membuat peran terkait layanan berdasarkan aturan berikut:
- Jika Anda belum membuat peran terkait layanan, ApsaraMQ for Kafka secara otomatis membuat peran terkait layanan untuk Anda gunakan konektor sink Elasticsearch untuk menyinkronkan data dari ApsaraMQ for Kafka ke Elasticsearch.
- Jika peran terkait layanan tersedia, ApsaraMQ for Kafka tidak membuat peran baru.
Parameter Deskripsi Contoh Name Nama konektor. Tentukan nama konektor berdasarkan konvensi penamaan berikut: - Nama konektor harus memiliki panjang 1 hingga 48 karakter dan dapat berisi angka, huruf kecil, dan tanda hubung (-). Nama tidak boleh dimulai dengan tanda hubung (-).
- Nama harus unik dalam sebuah instance ApsaraMQ for Kafka.
Tugas sinkronisasi data konektor harus menggunakan grup konsumen yang dinamai dalam format connect-Nama tugas. Jika Anda belum membuat grup konsumen seperti itu, Message Queue for Apache Kafka secara otomatis membuat satu untuk Anda.
kafka-elasticsearch-sink Instance Informasi tentang instance Message Queue for Apache Kafka. Secara default, nama dan ID instance ditampilkan. demo alikafka_post-cn-st21p8vj**** - Di langkah Configure Source Service, pilih Message Queue for Apache Kafka sebagai layanan sumber, konfigurasikan parameter, lalu klik Next. Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter Deskripsi Contoh Data Source Topic Nama topik dari mana data akan disinkronkan. elasticsearch-test-input Consumer Thread Concurrency Jumlah thread konsumen konkuren yang digunakan untuk menyinkronkan data dari topik sumber. Nilai default: 6. Nilai valid: - 1
- 2
- 3
- 6
- 12
6 Consumer Offset Offset konsumen dari mana Anda ingin konsumsi pesan dimulai. Nilai valid: - Earliest Offset: Konsumsi pesan dimulai dari offset konsumen paling awal.
- Latest Offset: Konsumsi pesan dimulai dari offset konsumen paling akhir.
Earliest Offset VPC ID ID VPC di mana instance sumber diterapkan. Klik Configure Runtime Environment untuk menampilkan parameter. Secara default, ID VPC yang Anda tentukan saat menerapkan instance ApsaraMQ for Kafka sumber ditampilkan. Anda tidak perlu mengonfigurasi parameter ini. vpc-bp1xpdnd3l*** vSwitch ID ID vSwitch ke mana instance sumber terhubung. Klik Configure Runtime Environment untuk menampilkan parameter. vSwitch harus diterapkan di VPC yang sama dengan instance ApsaraMQ for Kafka sumber. Nilai default adalah ID vSwitch yang Anda tentukan saat menerapkan instance ApsaraMQ for Kafka. vsw-bp1d2jgg81*** Failure Handling Policy Menentukan apakah akan mempertahankan langganan ke partisi setelah terjadi kesalahan pengiriman pesan di partisi tersebut. Klik Configure Runtime Environment untuk menampilkan parameter. Nilai valid: - Continue Subscription: mempertahankan langganan. Entri log dibuat untuk kesalahan tersebut.
- Stop Subscription: menghentikan langganan. Entri log dibuat untuk kesalahan tersebut.
Catatan- Untuk informasi tentang cara melihat informasi log, lihat Kelola konektor.
- Untuk informasi tentang cara memecahkan masalah berdasarkan kode kesalahan, lihat Kode kesalahan.
Continue Subscription Resource Creation Method Metode untuk membuat topik dan grup yang diperlukan oleh konektor sink Elasticsearch. Klik Configure Runtime Environment untuk menampilkan parameter. - Auto
- Manual
Auto Connector Consumer Group Grup konsumen yang digunakan oleh tugas sinkronisasi data konektor.Grup Klik Configure Runtime Environment untuk menampilkan parameter. Nama grup konsumen ini harus dalam format connect-Nama tugas. connect-kafka-elasticsearch-sink Task Offset Topic Topik yang digunakan untuk menyimpan offset konsumen. Klik Configure Runtime Environment untuk menampilkan parameter. - Topik: Kami sarankan Anda memulai nama topik dengan connect-offset.
- Partisi: Jumlah partisi dalam topik harus lebih besar dari 1.
- Mesin Penyimpanan: Anda harus menyetel mesin penyimpanan topik ke Penyimpanan Lokal. Catatan Anda hanya dapat menyetel mesin penyimpanan ke Penyimpanan Lokal saat membuat topik untuk instance Edisi Profesional Message Queue for Apache Kafka.
- cleanup.policy: Anda harus menyetel kebijakan pembersihan log untuk topik ke Compact.
connect-offset-kafka-elasticsearch-sink Task Configuration Topic Topik yang digunakan untuk menyimpan konfigurasi tugas. Klik Configure Runtime Environment untuk menampilkan parameter. - Topik: Kami sarankan Anda memulai nama topik dengan connect-config.
- Partisi: Topik hanya dapat berisi satu partisi.
- Mesin Penyimpanan: Anda harus menyetel mesin penyimpanan topik ke Penyimpanan Lokal. Catatan Anda hanya dapat menyetel mesin penyimpanan ke Penyimpanan Lokal saat membuat topik untuk instance Edisi Profesional Message Queue for Apache Kafka.
- cleanup.policy: Anda harus menyetel kebijakan pembersihan log untuk topik ke Compact.
connect-config-kafka-elasticsearch-sink Task Status Topic Topik yang digunakan untuk menyimpan status tugas. Klik Configure Runtime Environment untuk menampilkan parameter. - Topik: Kami sarankan Anda memulai nama topik dengan connect-status.
- Partisi: Kami sarankan Anda menyetel jumlah partisi dalam topik menjadi 6.
- Mesin Penyimpanan: Anda harus menyetel mesin penyimpanan topik ke Penyimpanan Lokal. Catatan Anda hanya dapat menyetel mesin penyimpanan ke Penyimpanan Lokal saat membuat topik untuk instance Edisi Profesional Message Queue for Apache Kafka.
- cleanup.policy: Anda harus menyetel kebijakan pembersihan log untuk topik ke Compact.
connect-status-kafka-elasticsearch-sink Dead-letter Queue Topic Topik yang digunakan untuk menyimpan data kesalahan kerangka kerja Kafka Connect. Klik Configure Runtime Environment untuk menampilkan parameter. Untuk menghemat sumber daya topik, Anda dapat membuat topik dan menggunakan topik tersebut sebagai topik antrian pesan gagal dan topik data kesalahan. - Topik: Kami sarankan Anda memulai nama topik dengan connect-error.
- Partisi: Kami sarankan Anda menyetel jumlah partisi dalam topik menjadi 6.
- Mesin Penyimpanan: Anda dapat menyetel mesin penyimpanan topik ke Penyimpanan Lokal atau Penyimpanan Cloud. Catatan Anda hanya dapat menyetel mesin penyimpanan ke Penyimpanan Lokal saat membuat topik untuk instance Edisi Profesional Message Queue for Apache Kafka.
connect-error-kafka-elasticsearch-sink Error Data Topic Topik yang digunakan untuk menyimpan data kesalahan konektor sink. Klik Configure Runtime Environment untuk menampilkan parameter. Untuk menghemat sumber daya topik, Anda dapat membuat topik dan menggunakan topik tersebut sebagai topik antrian pesan gagal dan topik data kesalahan. - Topik: Kami sarankan Anda memulai nama topik dengan connect-error.
- Partisi: Kami sarankan Anda menyetel jumlah partisi dalam topik menjadi 6.
- Mesin Penyimpanan: Anda dapat menyetel mesin penyimpanan topik ke Penyimpanan Lokal atau Penyimpanan Cloud. Catatan Anda hanya dapat menyetel mesin penyimpanan ke Penyimpanan Lokal saat membuat topik untuk instance Edisi Profesional Message Queue for Apache Kafka.
connect-error-kafka-elasticsearch-sink - Di langkah Configure Destination Service, pilih Elasticsearch sebagai layanan tujuan, konfigurasikan parameter, lalu klik Create. Tabel berikut menjelaskan parameter-parameter tersebut.
Parameter Deskripsi Contoh Elasticsearch Instance ID ID cluster Elasticsearch. es-cn-oew1o67x0000**** Endpoint Titik akhir publik atau pribadi cluster Elasticsearch. Untuk informasi lebih lanjut, lihat Lihat informasi dasar cluster. es-cn-oew1o67x0000****.elasticsearch.aliyuncs.com Port Port publik atau pribadi yang digunakan untuk mengakses cluster Elasticsearch. Nilai valid: - 9200: untuk HTTP dan HTTPS
- 9300: untuk TCP
Untuk informasi lebih lanjut, lihat Lihat informasi dasar cluster.
9300 Username Nama pengguna yang digunakan untuk masuk ke konsol Kibana. Nilai default: elastic. Anda juga dapat menyesuaikan nama pengguna. Untuk informasi lebih lanjut, lihat Gunakan mekanisme RBAC yang disediakan oleh Elasticsearch X-Pack untuk mengimplementasikan kontrol akses. elastic Password Kata sandi yang digunakan untuk masuk ke konsol Kibana. Kata sandi pengguna elastic ditentukan saat Anda membuat cluster Elasticsearch. Jika Anda lupa kata sandi, Anda dapat menyetel ulang. Untuk informasi lebih lanjut, lihat Setel ulang kata sandi akses untuk cluster Elasticsearch. ******** Index Nama indeks Elasticsearch. elastic_test Catatan- Nama pengguna dan kata sandi digunakan untuk menginisialisasi objek Elasticsearch. Untuk mengirim pesan menggunakan bulk, pastikan akun memiliki izin untuk menulis indeks.
- Nama pengguna dan kata sandi dilewatkan ke fungsi di Function Compute sebagai variabel lingkungan saat ApsaraMQ for Kafka membuat tugas ekspor data. Setelah tugas dibuat, ApsaraMQ for Kafka tidak menyimpan nama pengguna atau kata sandi.
Setelah konektor dibuat, Anda dapat melihat konektor di halaman Connectors.
- Di langkah Configure Basic Information, konfigurasikan parameter dan klik Next. Tabel berikut menjelaskan parameter-parameter tersebut.
- Pergi ke halaman Connectors, temukan konektor yang Anda buat, lalu klik Deploy di kolom Actions.
Konfigurasikan Layanan Function Compute Terkait
Setelah Anda membuat dan menerapkan konektor sink Elasticsearch di Konsol ApsaraMQ for Kafka, Function Compute secara otomatis membuat layanan Function Compute untuk konektor dan menamai layanan tersebut dalam format kafka-service-<Nama_konektor>-<String_acak>.
- Di halaman Connectors, temukan konektor yang Anda buat. Di kolom Actions konektor, pilih .Anda akan diarahkan ke Konsol Function Compute.
- Di Konsol Function Compute, temukan layanan yang dibuat secara otomatis dan konfigurasikan VPC dan vSwitch untuk layanan tersebut. Pastikan bahwa VPC dan vSwitch sama dengan yang ditentukan untuk cluster Elasticsearch Anda. Untuk informasi lebih lanjut, lihat Perbarui Layanan.
Kirim Pesan
Anda dapat mengirim pesan ke topik sumber di instance ApsaraMQ for Kafka Anda untuk menguji apakah data dapat disinkronkan ke Elasticsearch.
Di halaman Connectors, temukan konektor yang ingin Anda kelola dan klik Test di kolom Actions.
Di panel Send Message, konfigurasikan parameter untuk mengirim pesan untuk pengujian.
Jika Anda menyetel parameter Sending Method ke Console, lakukan langkah-langkah berikut:
Di bidang Message Key, masukkan kunci pesan. Contoh: demo.
Di bidang Message Content, masukkan isi pesan. Contoh: {"key": "test"}.
Konfigurasikan parameter Send to Specified Partition untuk menentukan apakah akan mengirim pesan uji ke partisi tertentu.
Jika Anda ingin mengirim pesan uji ke partisi tertentu, klik Yes dan masukkan ID partisi di bidang Partition ID. Contoh: 0. Untuk informasi tentang cara menanyakan ID partisi, lihat Lihat Status Partisi.
Jika Anda tidak ingin mengirim pesan uji ke partisi tertentu, klik No.
Jika Anda menyetel parameter Sending Method ke Docker, jalankan perintah Docker di bagian Run the Docker container to produce a sample message untuk mengirim pesan uji.
Jika Anda menyetel parameter Sending Method ke SDK, pilih SDK untuk bahasa pemrograman atau framework yang diperlukan dan metode akses untuk mengirim dan berlangganan pesan uji.
Verifikasi Hasil
Setelah Anda mengirim pesan ke topik sumber di instance ApsaraMQ for Kafka Anda, masuk ke Konsol Kibana dan jalankan perintah GET /<nama_indeks>/_search untuk melihat indeks Elasticsearch dan memverifikasi apakah data telah disinkronkan.
{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "product_****",
"_type" : "_doc",
"_id" : "TX3TZHgBfHNEDGoZ****",
"_score" : 1.0,
"_source" : {
"msg_body" : {
"key" : "test",
"offset" : 2,
"overflowFlag" : false,
"partition" : 2,
"timestamp" : 1616599282417,
"topic" : "dv****",
"value" : "test1",
"valueSize" : 8
},
"doc_as_upsert" : true
}
}
]
}
}