All Products
Search
Document Center

Simple Log Service:Analisis Semi-terstruktur Berbasis SPL dalam Flink SQL

Last Updated:Jul 02, 2025

Topik ini menjelaskan cara mengimplementasikan analisis semi-terstruktur berbasis Simple Log Service Processing Language (SPL) di Flink SQL menggunakan contoh-contoh praktis.

Informasi latar belakang

Layanan Log Sederhana adalah platform observabilitas dan analitik berbasis cloud-native yang mendukung pemrosesan log, data deret waktu, dan jejak secara real-time dengan biaya efektif. Platform ini menyederhanakan akses data serta memfasilitasi pengambilan log sistem dan bisnis untuk penyimpanan dan analisis.

Realtime Compute for Apache Flink, dibangun di atas Apache Flink, adalah platform analitik big data yang ideal untuk analisis data real-time dan pemantauan risiko. Platform ini mendukung konektor Layanan Log Sederhana secara native, sehingga layanan tersebut dapat digunakan sebagai tabel sumber atau hasil.

Konektor ini mempermudah penanganan log terstruktur dengan memungkinkan pemetaan langsung field log Layanan Log Sederhana ke field tabel Flink SQL. Untuk log semi-terstruktur yang mencakup semua konten dalam satu field, metode seperti ekspresi reguler dan pembatas diperlukan untuk mengekstraksi data terstruktur. Topik ini membahas solusi menggunakan SPL untuk mengonfigurasi konektor guna membersihkan log dan menormalisasi format.

Data log semi-terstruktur

Contoh log dengan format kompleks mencakup string JSON dan konten campuran. Log tersebut mencakup elemen-elemen berikut:

  • Payload: String JSON dengan field schedule juga dalam format JSON.

  • requestURL: Path URL standar.

  • error: Dimulai dengan string CouldNotExecuteQuery, diikuti oleh struktur JSON.

  • __tag__:__path__: Mewakili path file log, di mana service_a mungkin menunjukkan nama layanan.

  • caller: Berisi nama file dan nomor baris.

{
  "Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
  "TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
  "TaskType": "ALERT",
  "__source__": "11.199.XX.XXX",
  "__tag__:__hostname__": "iabcde12345.cloud.abc121",
  "__tag__:__path__": "/var/log/service_a.LOG",
  "caller": "executor/pool.go:64",
  "error": "CouldNotExecuteQuery : {\n    \"httpCode\": 404,\n    \"errorCode\": \"LogStoreNotExist\",\n    \"errorMessage\": \"logstore k8s-event does not exist\",\n    \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
  "requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
  "ts": "2024-01-29 22:57:13"
}

Persyaratan untuk pemrosesan data terstruktur

Untuk mendapatkan wawasan berharga dari log-log ini, pembersihan data sangat penting. Field utama harus diekstraksi terlebih dahulu untuk analisis, yang dilakukan di Flink. Persyaratan spesifik untuk ekstraksi field adalah sebagai berikut:

  • Ekstrak httpCode, errorCode, errorMessage, dan requestID dari field error.

  • Ekstrak service_a dari __tag__:__path_ sebagai serviceName.

  • Ekstrak pool.go dari caller sebagai fileName, dan 64 sebagai fileNo.

  • Ekstrak project dari field Payload, dan ekstrak type dari schedule dalam Payload sebagai scheduleType.

  • Ganti nama __source__ menjadi serviceIP.

Daftar akhir field yang diperlukan adalah sebagai berikut:

image

Solusi

Beberapa solusi tersedia untuk pembersihan data, masing-masing sesuai untuk skenario tertentu.

  1. Solusi Transformasi Data: Buat logstore target di konsol Layanan Log Sederhana dan tugas transformasi data untuk pembersihan.

  2. Solusi Flink: Definisikan error dan payload sebagai field tabel sumber. Gunakan fungsi reguler SQL dan fungsi JSON untuk mengurai field-field ini, masukkan data yang diurai ke dalam tabel sementara, dan lakukan analisis pada tabel tersebut.

  3. Solusi SPL: Konfigurasikan pernyataan SPL untuk konektor Layanan Log Sederhana di Realtime Compute for Apache Flink untuk membersihkan data. Definisikan field tabel sumber di Flink sesuai dengan struktur data yang telah dibersihkan.

Di antara opsi-opsi ini, solusi SPL memberikan pendekatan yang lebih efisien untuk pembersihan data. Ini menghilangkan kebutuhan akan logstore perantara atau tabel sementara, terutama untuk data log semi-terstruktur. Dengan melakukan pembersihan lebih dekat ke sumber, platform komputasi dapat fokus pada logika bisnis, menghasilkan pemisahan tanggung jawab yang lebih jelas.

Solusi SPL

1. Siapkan data di Layanan Log Sederhana

  1. Aktifkan Layanan Log Sederhana, dan buat proyek dan logstore.

  2. Gunakan Simple Log Service Java SDK untuk menulis contoh log ke logstore target sebagai data analog sampel. Untuk SDK dalam bahasa lain, lihat referensi SDK yang sesuai.

    image

  3. Di logstore, tulis sintaks pipeline SPL dan pratinjau efeknya.

    image

    Pernyataan kueri adalah sebagai berikut. Sintaks pipeline SPL menggunakan pemisah | untuk memisahkan instruksi. Anda dapat segera melihat hasil setelah memasukkan setiap instruksi, kemudian secara progresif menambahkan pipeline untuk mencapai hasil akhir secara iteratif. Untuk informasi lebih lanjut, lihat Sintaks Kueri Berbasis Scan.

    * | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller 
     | parse-json Payload 
     | project-away Payload 
     | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson 
     | parse-json errorJson 
     | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName 
     | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo 
     | project-rename serviceHost="__tag__:__hostname__" 
     | extend scheduleType = json_extract_scalar(schedule, '$.type') 
     | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project

    Sintaks dijelaskan sebagai berikut:

    • project: Pertahankan field Payload, error, tag:path, dan caller dari hasil asli, membuang yang lain untuk memfasilitasi penguraian selanjutnya.

    • parse-json: Ubah string Payload menjadi JSON, menghasilkan field tingkat pertama seperti lastNotified, serviceUri, dan jobID.

    • project-away: Hapus field Payload asli.

    • parse-regexp: Ekstrak sebagian konten JSON dari field error dan simpan di errorJson.

    • parse-json: Perluas field errorJson untuk mengambil field seperti httpCode, errorCode, dan errorMessage.

    • parse-regexp: Gunakan ekspresi reguler untuk mengekstrak nama file dari __tag__:__path__ dan tetapkan ke serviceName.

    • parse-regexp: Ekstrak nama file dan nomor baris dari caller, menempatkannya di field fileName dan fileNo, masing-masing.

    • project-rename: Ganti nama field __tag__:__hostname__ menjadi serviceHost.

    • extend: Gunakan fungsi json_extract_scalar untuk mengekstrak field type dari schedule, menamainya scheduleType.

    • project: Pertahankan daftar field yang diperlukan, termasuk field project dari Payload.

2. Buat pekerjaan SQL

  1. Masuk ke Konsol Realtime Compute for Apache Flink dan klik ruang kerja target.

    Penting

    Ruang kerja target dan proyek Layanan Log Sederhana harus berada di wilayah yang sama.

  2. Di panel navigasi di sebelah kiri, pilih Development > ETL.

  3. Klik New. Di kotak dialog New Draft, pilih SQL Scripts > Blank Stream Draft dan klik Next.

    image

  4. Masukkan nama dan klik Create. Salin SQL berikut untuk membuat tabel sementara di draf.

    CREATE TEMPORARY TABLE sls_input_complex (
      errorCode STRING,
      errorMessage STRING,
      fileName STRING,
      fileNo STRING,
      httpCode STRING,
      requestID STRING,
      scheduleType STRING,
      serviceHost STRING,
      project STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='ap-southeast-1-intranet.log.aliyuncs.com',
      'accessId' = '${yourAccessKeyID}',
      'accessKey' = '${yourAccessKeySecret}',
      'starttime' = '2024-02-01 10:30:00',
      'project' ='${project}',
      'logstore' ='${logtore}',
      'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project'
      );

    Parameter dalam pernyataan SQL dijelaskan di bawah ini. Ganti sesuai kebutuhan.

    Parameter

    Deskripsi

    Contoh

    connector

    Lihat Konektor yang didukung.

    sls

    endpoint

    Titik akhir internal yang digunakan untuk mengakses proyek Layanan Log Sederhana Anda. Untuk informasi tentang cara mendapatkannya, lihat Lihat titik akhir.

    ap-southeast-1-intranet.log.aliyuncs.com

    accessId

    ID AccessKey yang digunakan untuk mengidentifikasi pengguna. Untuk informasi lebih lanjut, lihat Buat pasangan AccessKey.

    LTAI****************

    accessKey

    Rahasia AccessKey yang digunakan untuk memverifikasi identitas pengguna. Untuk informasi lebih lanjut, lihat Buat pasangan AccessKey.

    yourAccessKeySecret

    starttime

    Waktu mulai untuk kueri log.

    2025-02-19 00:00:00

    project

    Nama proyek Layanan Log Sederhana.

    test-project

    logstore

    Nama logstore Layanan Log Sederhana.

    clb-access-log

    query

    Masukkan pernyataan SPL. Perhatikan bahwa string harus di-escape menggunakan tanda kutip tunggal.

    * | where slbid = ''slb-01''

    Catatan

    Di sini, '' mewakili tanda kutip tunggal yang disematkan dalam string.

  5. Pilih SQL, klik kanan, dan pilih Run untuk terhubung ke Layanan Log Sederhana.

    image

3. Lakukan kueri dan lihat hasilnya

  1. Salin pernyataan analisis berikut ke dalam draf untuk melakukan kueri agregat berdasarkan slbid.

    SELECT * FROM sls_input_complex;
  2. Klik Debug di pojok kanan atas. Di kotak dialog debug, pilih Create new session cluster dari daftar drop-down Session Cluster. Lihat gambar di bawah untuk membuat kluster debug baru.

    image

  3. Di kotak dialog debug, pilih kluster debug yang telah dibuat, lalu klik OK.

    image

  4. Di area Results, lihat nilai kolom dari tabel, yang mencerminkan hasil yang diproses oleh SPL. Daftar akhir field yang dihasilkan oleh SPL sesuai dengan yang ada di tabel.

    image