Untuk membangun sistem pengambilan log, Anda dapat menggunakan Realtime Compute for Apache Flink untuk memproses data log dan menuliskannya ke Elasticsearch guna pencarian. Topik ini menggunakan Alibaba Cloud Simple Log Service (SLS) sebagai contoh untuk menjelaskan proses tersebut.
Prasyarat
Anda telah menyelesaikan tugas-tugas berikut:
Aktifkan Realtime Compute for Apache Flink dan buat proyek.
Buat instans Alibaba Cloud Elasticsearch.
Untuk informasi selengkapnya, lihat Buat instans Alibaba Cloud Elasticsearch.
Aktifkan SLS, lalu buat proyek dan Logstore.
Untuk informasi selengkapnya, lihat Aktifkan Alibaba Cloud Simple Log Service, Kelola proyek, dan Buat Logstore dasar.
Informasi latar belakang
Realtime Compute for Apache Flink adalah layanan berbasis Flink yang didukung resmi oleh Alibaba Cloud. Layanan ini mendukung berbagai sistem sumber dan sink, seperti Kafka dan Elasticsearch. Kombinasi Realtime Compute for Apache Flink dan Elasticsearch memenuhi persyaratan skenario pengambilan log khas.
Log dari sistem seperti Kafka atau Simple Log Service diproses oleh Flink menggunakan komputasi sederhana atau kompleks, lalu dituliskan ke Elasticsearch untuk pencarian. Dengan menggabungkan kemampuan komputasi Flink yang andal dan kemampuan pencarian Elasticsearch yang kuat, Anda dapat menerapkan transformasi data real-time dan kueri untuk bisnis Anda, sehingga memudahkan transisi ke layanan real-time.
Realtime Compute for Apache Flink menyediakan cara sederhana untuk terhubung ke Elasticsearch. Misalnya, jika log atau data bisnis Anda dituliskan ke Simple Log Service dan perlu diproses sebelum dituliskan ke Elasticsearch untuk pencarian, Anda dapat menggunakan pipeline seperti pada gambar berikut.
Prosedur
Buat pekerjaan Realtime Compute for Apache Flink.
Untuk informasi selengkapnya, lihat bagian Job Development > Development dari Blink SQL Development Guide dalam dokumen Alibaba Cloud Blink Exclusive Mode (Phased-Out for Alibaba Cloud) document.
Tulis Flink SQL.
Buat tabel sumber untuk Layanan Log Sederhana.
create table sls_stream( a int, b int, c VARCHAR ) WITH ( type ='sls', endPoint ='<yourEndpoint>', accessId ='<yourAccessId>', accessKey ='<yourAccessKey>', startTime = '<yourStartTime>', project ='<yourProjectName>', logStore ='<yourLogStoreName>', consumerGroup ='<yourConsumerGroupName>' );Tabel berikut menjelaskan parameter dalam klausa WITH.
Variable
Deskripsi
endPoint
Titik akhir publik Alibaba Cloud Simple Log Service, yaitu URL untuk mengakses proyek yang sesuai dan data log-nya. Untuk informasi selengkapnya, lihat Endpoints.
Misalnya, titik akhir Simple Log Service di wilayah China (Hangzhou) adalah http://cn-hangzhou.log.aliyuncs.com. Titik akhir harus diawali dengan http://.
accessId
ID AccessKey Anda.
accessKey
Rahasia AccessKey Anda.
startTime
Waktu mulai mengonsumsi log. Saat menjalankan pekerjaan Flink, waktu yang dipilih harus lebih baru daripada waktu yang ditetapkan di sini.
project
Nama proyek Layanan Log Sederhana.
logStore
Nama penyimpanan log di proyek.
consumerGroup
Nama kelompok konsumen untuk Simple Log Service.
Buat tabel sink Elasticsearch.
PentingRealtime Compute for Apache Flink versi 3.2.2 dan yang lebih baru mendukung tabel sink Elasticsearch. Saat membuat pekerjaan Flink, pilih versi yang didukung.
Tabel sink Elasticsearch diimplementasikan menggunakan REST API dan kompatibel dengan semua versi Elasticsearch.
CREATE TABLE es_stream_sink( a int, cnt BIGINT, PRIMARY KEY(a) ) WITH( type ='elasticsearch-7', endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>', accessId = '<yourAccessId>', accessKey = '<yourAccessSecret>', index = '<yourIndex>', typeName = '<yourTypeName>' );Tabel berikut menjelaskan parameter dalam klausa WITH.
Parameter
Deskripsi
endPoint
Titik akhir publik instans Alibaba Cloud Elasticsearch Anda, dalam format http://<instanceid>.public.elasticsearch.aliyuncs.com:9200. Anda dapat memperolehnya dari halaman informasi dasar instans. Untuk informasi selengkapnya, lihat Lihat informasi dasar instans.
accessId
Username untuk mengakses instans Alibaba Cloud Elasticsearch. Nilai default-nya adalah elastic.
accessKey
Password untuk pengguna tersebut. Password untuk pengguna elastic ditetapkan saat Anda membuat instans. Jika lupa password, Anda dapat mengatur ulang. Untuk informasi tentang langkah-langkah dan hal yang perlu diperhatikan saat mengatur ulang password, lihat Atur ulang password akses instans.
index
Nama indeks. Jika Anda belum membuat indeks, buat terlebih dahulu. Untuk informasi selengkapnya, lihat Panduan pemula: Dari pembuatan instans hingga pengambilan data. Anda juga dapat mengaktifkan pembuatan indeks otomatis. Untuk informasi selengkapnya, lihat Konfigurasi parameter YML.
typeName
Tipe indeks. Untuk instans Elasticsearch versi 7.0 atau lebih baru, nilai ini harus _doc.
CatatanElasticsearch mendukung pembaruan dokumen menggunakan primary key. Hanya satu field yang dapat ditentukan sebagai
PRIMARY KEY. Setelah Anda menentukanPRIMARY KEY, nilai fieldPRIMARY KEYdigunakan sebagai ID dokumen. JikaPRIMARY KEYtidak ditentukan, sistem akan menghasilkan ID dokumen secara acak. Untuk informasi selengkapnya, lihat Index API.Elasticsearch mendukung beberapa mode pembaruan. Anda dapat menentukan mode tersebut menggunakan parameter updateMode dalam klausa WITH:
Jika
updateMode=full, dokumen baru sepenuhnya menimpa dokumen yang ada.Jika
updateMode=inc, Elasticsearch memperbarui field yang sesuai berdasarkan nilai input field tersebut.
Semua pembaruan di Elasticsearch secara default menggunakan semantik UPSERT, artinya INSERT atau UPDATE.
Tulis logika bisnis untuk memproses dan menyinkronkan data.
INSERT INTO es_stream_sink SELECT a, count(*) as cnt FROM sls_stream GROUP BY a
Publikasikan dan mulai pekerjaan.
Setelah Anda mempublikasikan dan menjalankan pekerjaan, data dari Simple Log Service akan diagregasi lalu dituliskan ke Alibaba Cloud Elasticsearch.
Informasi tambahan
Anda dapat menggunakan Realtime Compute for Apache Flink bersama Elasticsearch untuk dengan cepat membangun pipeline pencarian real-time. Jika Anda memiliki persyaratan yang lebih kompleks untuk menuliskan data ke Elasticsearch, Anda dapat menggunakan fitur sink kustom dari Realtime Compute for Apache Flink.