Topik ini menjelaskan cara mengimplementasikan analisis semi-terstruktur berbasis Simple Log Service Processing Language (SPL) di Flink SQL menggunakan contoh-contoh praktis.
Informasi latar belakang
Layanan Log Sederhana adalah platform observabilitas dan analitik berbasis cloud-native yang mendukung pemrosesan log, data deret waktu, dan jejak secara real-time dengan biaya efektif. Platform ini menyederhanakan akses data serta memfasilitasi pengambilan log sistem dan bisnis untuk penyimpanan dan analisis.
Realtime Compute for Apache Flink, dibangun di atas Apache Flink, adalah platform analitik big data yang ideal untuk analisis data real-time dan pemantauan risiko. Platform ini mendukung konektor Layanan Log Sederhana secara native, sehingga layanan tersebut dapat digunakan sebagai tabel sumber atau hasil.
Konektor ini mempermudah penanganan log terstruktur dengan memungkinkan pemetaan langsung field log Layanan Log Sederhana ke field tabel Flink SQL. Untuk log semi-terstruktur yang mencakup semua konten dalam satu field, metode seperti ekspresi reguler dan pembatas diperlukan untuk mengekstraksi data terstruktur. Topik ini membahas solusi menggunakan SPL untuk mengonfigurasi konektor guna membersihkan log dan menormalisasi format.
Data log semi-terstruktur
Contoh log dengan format kompleks mencakup string JSON dan konten campuran. Log tersebut mencakup elemen-elemen berikut:
Payload: String JSON dengan fieldschedulejuga dalam format JSON.requestURL: Path URL standar.error: Dimulai dengan stringCouldNotExecuteQuery, diikuti oleh struktur JSON.__tag__:__path__: Mewakili path file log, di manaservice_amungkin menunjukkan nama layanan.caller: Berisi nama file dan nomor baris.
{
"Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
"TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
"TaskType": "ALERT",
"__source__": "11.199.XX.XXX",
"__tag__:__hostname__": "iabcde12345.cloud.abc121",
"__tag__:__path__": "/var/log/service_a.LOG",
"caller": "executor/pool.go:64",
"error": "CouldNotExecuteQuery : {\n \"httpCode\": 404,\n \"errorCode\": \"LogStoreNotExist\",\n \"errorMessage\": \"logstore k8s-event does not exist\",\n \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
"requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
"ts": "2024-01-29 22:57:13"
}Persyaratan untuk pemrosesan data terstruktur
Untuk mendapatkan wawasan berharga dari log-log ini, pembersihan data sangat penting. Field utama harus diekstraksi terlebih dahulu untuk analisis, yang dilakukan di Flink. Persyaratan spesifik untuk ekstraksi field adalah sebagai berikut:
Ekstrak
httpCode,errorCode,errorMessage, danrequestIDdari fielderror.Ekstrak
service_adari__tag__:__path_sebagaiserviceName.Ekstrak
pool.godaricallersebagaifileName, dan64sebagaifileNo.Ekstrak
projectdari fieldPayload, dan ekstraktypedarischeduledalamPayloadsebagaischeduleType.Ganti nama
__source__menjadiserviceIP.
Daftar akhir field yang diperlukan adalah sebagai berikut:

Solusi
Beberapa solusi tersedia untuk pembersihan data, masing-masing sesuai untuk skenario tertentu.
Solusi Transformasi Data: Buat logstore target di konsol Layanan Log Sederhana dan tugas transformasi data untuk pembersihan.
Solusi Flink: Definisikan
errordanpayloadsebagai field tabel sumber. Gunakan fungsi reguler SQL dan fungsi JSON untuk mengurai field-field ini, masukkan data yang diurai ke dalam tabel sementara, dan lakukan analisis pada tabel tersebut.Solusi SPL: Konfigurasikan pernyataan SPL untuk konektor Layanan Log Sederhana di Realtime Compute for Apache Flink untuk membersihkan data. Definisikan field tabel sumber di Flink sesuai dengan struktur data yang telah dibersihkan.
Di antara opsi-opsi ini, solusi SPL memberikan pendekatan yang lebih efisien untuk pembersihan data. Ini menghilangkan kebutuhan akan logstore perantara atau tabel sementara, terutama untuk data log semi-terstruktur. Dengan melakukan pembersihan lebih dekat ke sumber, platform komputasi dapat fokus pada logika bisnis, menghasilkan pemisahan tanggung jawab yang lebih jelas.
Solusi SPL
1. Siapkan data di Layanan Log Sederhana
Aktifkan Layanan Log Sederhana, dan buat proyek dan logstore.
Gunakan Simple Log Service Java SDK untuk menulis contoh log ke logstore target sebagai data analog sampel. Untuk SDK dalam bahasa lain, lihat referensi SDK yang sesuai.

Di logstore, tulis sintaks pipeline SPL dan pratinjau efeknya.

Pernyataan kueri adalah sebagai berikut. Sintaks pipeline SPL menggunakan pemisah
|untuk memisahkan instruksi. Anda dapat segera melihat hasil setelah memasukkan setiap instruksi, kemudian secara progresif menambahkan pipeline untuk mencapai hasil akhir secara iteratif. Untuk informasi lebih lanjut, lihat Sintaks Kueri Berbasis Scan.* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, '$.type') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, projectSintaks dijelaskan sebagai berikut:
project: Pertahankan fieldPayload,error,tag:path, dancallerdari hasil asli, membuang yang lain untuk memfasilitasi penguraian selanjutnya.parse-json: Ubah stringPayloadmenjadi JSON, menghasilkan field tingkat pertama sepertilastNotified,serviceUri, danjobID.project-away: Hapus fieldPayloadasli.parse-regexp: Ekstrak sebagian konten JSON dari fielderrordan simpan dierrorJson.parse-json: Perluas fielderrorJsonuntuk mengambil field sepertihttpCode,errorCode, danerrorMessage.parse-regexp: Gunakan ekspresi reguler untuk mengekstrak nama file dari__tag__:__path__dan tetapkan keserviceName.parse-regexp: Ekstrak nama file dan nomor baris daricaller, menempatkannya di fieldfileNamedanfileNo, masing-masing.project-rename: Ganti nama field__tag__:__hostname__menjadi serviceHost.extend: Gunakan fungsijson_extract_scalaruntuk mengekstrak fieldtypedarischedule, menamainyascheduleType.project: Pertahankan daftar field yang diperlukan, termasuk fieldprojectdariPayload.
2. Buat pekerjaan SQL
Masuk ke Konsol Realtime Compute for Apache Flink dan klik ruang kerja target.
PentingRuang kerja target dan proyek Layanan Log Sederhana harus berada di wilayah yang sama.
Di panel navigasi di sebelah kiri, pilih .
Klik New. Di kotak dialog New Draft, pilih dan klik Next.

Masukkan nama dan klik Create. Salin SQL berikut untuk membuat tabel sementara di draf.
CREATE TEMPORARY TABLE sls_input_complex ( errorCode STRING, errorMessage STRING, fileName STRING, fileNo STRING, httpCode STRING, requestID STRING, scheduleType STRING, serviceHost STRING, project STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='ap-southeast-1-intranet.log.aliyuncs.com', 'accessId' = '${yourAccessKeyID}', 'accessKey' = '${yourAccessKeySecret}', 'starttime' = '2024-02-01 10:30:00', 'project' ='${project}', 'logstore' ='${logtore}', 'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project' );Parameter dalam pernyataan SQL dijelaskan di bawah ini. Ganti sesuai kebutuhan.
Parameter
Deskripsi
Contoh
connector
Lihat Konektor yang didukung.
sls
endpoint
Titik akhir internal yang digunakan untuk mengakses proyek Layanan Log Sederhana Anda. Untuk informasi tentang cara mendapatkannya, lihat Lihat titik akhir.
ap-southeast-1-intranet.log.aliyuncs.com
accessId
ID AccessKey yang digunakan untuk mengidentifikasi pengguna. Untuk informasi lebih lanjut, lihat Buat pasangan AccessKey.
LTAI****************
accessKey
Rahasia AccessKey yang digunakan untuk memverifikasi identitas pengguna. Untuk informasi lebih lanjut, lihat Buat pasangan AccessKey.
yourAccessKeySecret
starttime
Waktu mulai untuk kueri log.
2025-02-19 00:00:00
project
Nama proyek Layanan Log Sederhana.
test-project
logstore
Nama logstore Layanan Log Sederhana.
clb-access-log
query
Masukkan pernyataan SPL. Perhatikan bahwa string harus di-escape menggunakan tanda kutip tunggal.
* | where slbid = ''slb-01''
CatatanDi sini,
''mewakili tanda kutip tunggal yang disematkan dalam string.Pilih SQL, klik kanan, dan pilih Run untuk terhubung ke Layanan Log Sederhana.

3. Lakukan kueri dan lihat hasilnya
Salin pernyataan analisis berikut ke dalam draf untuk melakukan kueri agregat berdasarkan
slbid.SELECT * FROM sls_input_complex;Klik Debug di pojok kanan atas. Di kotak dialog debug, pilih Create new session cluster dari daftar drop-down Session Cluster. Lihat gambar di bawah untuk membuat kluster debug baru.

Di kotak dialog debug, pilih kluster debug yang telah dibuat, lalu klik OK.

Di area Results, lihat nilai kolom dari tabel, yang mencerminkan hasil yang diproses oleh SPL. Daftar akhir field yang dihasilkan oleh SPL sesuai dengan yang ada di tabel.
