すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Flink SQL での SPL ベースの半構造化分析

最終更新日:Apr 10, 2025

このトピックでは、Flink SQL で Simple Log Service 処理言語 (SPL) に基づいて半構造化分析を実装する方法について、例を用いて説明します。

背景情報

Simple Log Service は、可観測性と分析のためのクラウドネイティブプラットフォームであり、ログ、メトリック、トレースのコスト効率の高いリアルタイム処理を可能にします。データアクセスを簡素化し、システムログとビジネスログの取り込み、保存、分析を容易にします。

Apache Flink 上に構築された Realtime Compute for Apache Flink は、リアルタイムのデータ分析とリスク監視に適したビッグデータ分析プラットフォームです。Simple Log Service コネクタをネイティブでサポートしており、サービスをソーステーブルまたは結果テーブルとして使用できます。

このコネクタは、構造化ログの処理を効率化し、Simple Log Service ログフィールドを Flink SQL テーブルフィールドに直接マッピングできるようにします。単一のフィールドにすべてのコンテンツが含まれる半構造化ログの場合、構造化データを抽出するには、正規表現やデリミタなどのメソッドが必要です。このトピックでは、データ構造化のためのコネクタを構成するために SPL を使用するソリューションについて説明し、ログクレンジングとフォーマット正規化に焦点を当てます。

半構造化ログデータ

JSON 文字列と混合コンテンツを含む複雑な形式のログの例を考えてみましょう。ログには、次の要素が含まれています。

  • Payload: JSON 文字列。 schedule フィールドも JSON 形式です。

  • requestURL: 標準の URL パス。

  • error: 文字列 CouldNotExecuteQuery で始まり、JSON 構造が続きます。

  • __tag__:__path__: ログファイルパスを表します。 service_a はサービス名を示している可能性があります。

  • caller: ファイル名と行番号が含まれています。

{
  "Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
  "TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
  "TaskType": "ALERT",
  "__source__": "11.199.XX.XXX",
  "__tag__:__hostname__": "iabcde12345.cloud.abc121",
  "__tag__:__path__": "/var/log/service_a.LOG",
  "caller": "executor/pool.go:64",
  "error": "CouldNotExecuteQuery : {\n    \"httpCode\": 404,\n    \"errorCode\": \"LogStoreNotExist\",\n    \"errorMessage\": \"logstore k8s-event does not exist\",\n    \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
  "requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
  "ts": "2024-01-29 22:57:13"
}

構造化データ処理の要件構造化データ処理

これらのログから価値のある洞察を得るには、データクレンジングが不可欠です。まず、分析のために主要なフィールドを抽出する必要があります。これは Flink で行われます。フィールド抽出の具体的な要件は次のとおりです。

  • error フィールドから httpCodeerrorCodeerrorMessagerequestID を抽出します。

  • __tag__:__path_ から service_aserviceName として抽出します。

  • caller から pool.gofileName として、64fileNo として抽出します。

  • Payload フィールドから project を抽出し、Payload 内の schedule から typescheduleType として抽出します。

  • __source__serviceIP に名前変更します。

必要なフィールドの最終的なリストは次のとおりです。

image

ソリューション

データクレンジングにはいくつかのソリューションがあり、それぞれ特定のシナリオに適しています。

  1. データ変換ソリューション: Simple Log Service コンソールでターゲットログストアを作成し、クレンジング用のデータ変換タスクを作成します。

  2. Flink ソリューション: errorpayload をソーステーブルフィールドとして定義します。SQL 正規関数と JSON 関数を使用してこれらのフィールドを解析し、解析されたデータを一時テーブルに挿入し、テーブルで分析を実行します。

  3. SPL ソリューション: Realtime Compute for Apache Flink で Simple Log Service コネクタの SPL ステートメントを構成してデータをクレンジングします。クレンジングされたデータ構造に従って、Flink でソーステーブルフィールドを定義します。

これらのオプションの中で、SPL ソリューションは、より効率的なデータクレンジングアプローチを提供します。特に半構造化ログデータの場合、中間ログストアや一時テーブルが不要になります。ソースに近い場所でクレンジングを実行することにより、コンピューティングプラットフォームはビジネスロジックに集中できるようになり、責任の明確な分離が実現します。

SPL ソリューション

1. Simple Log Service でデータを準備する

  1. Simple Log Service をアクティブ化し、プロジェクトとログストアを作成します。

  2. Simple Log Service Java SDK を使用して、ログの例をサンプルのアナログデータとしてターゲットログストアに書き込みます。他の言語の SDK については、対応する SDK リファレンスを参照してください。

    image

  3. ログストアで、SPL パイプライン構文を記述し、効果をプレビューします。

    image

    クエリ文は次のとおりです。SPL パイプライン構文では、| 区切り文字を使用して命令を区切ります。各命令を入力した直後に結果を確認し、パイプラインを段階的に追加して最終結果を反復的に達成できます。詳細については、「スキャンベースクエリの構文」をご参照ください。

    * | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller 
     | parse-json Payload 
     | project-away Payload 
     | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson 
     | parse-json errorJson 
     | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName 
     | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo 
     | project-rename serviceHost="__tag__:__hostname__" 
     | extend scheduleType = json_extract_scalar(schedule, '$.type') 
     | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project

    構文について以下に説明します。

    • project: 元の結果から Payloaderrortag:pathcaller フィールドを保持し、その他は破棄して後続の解析を容易にします。

    • parse-json: Payload 文字列を JSON に変換し、lastNotifiedserviceUrijobID などの第 1 レベルのフィールドを生成します。

    • project-away: 元の Payload フィールドを削除します。

    • parse-regexp: error フィールドから JSON コンテンツの一部を抽出し、errorJson に格納します。

    • parse-json: errorJson フィールドを展開して、httpCodeerrorCodeerrorMessage などのフィールドを取得します。

    • parse-regexp: 正規表現を使用して __tag__:__path__ からファイル名を抽出し、serviceName に割り当てます。

    • parse-regexp: caller からファイル名と行番号を抽出し、それぞれ fileName フィールドと fileNo フィールドに配置します。

    • project-rename: __tag__:__hostname__ フィールドの名前を serviceHost に変更します。

    • extend: json_extract_scalar 関数を使用して schedule から type フィールドを抽出し、scheduleType という名前を付けます。

    • project: Payloadproject フィールドを含む、必要なフィールドリストを保持します。

2. SQL ジョブを作成する

  1. Realtime Compute for Apache Flink コンソール にログインし、ターゲットワークスペースをクリックします。

    重要

    ターゲットワークスペースと Simple Log Service プロジェクトは同じリージョンにある必要があります。

  2. 左側のナビゲーションウィンドウで、[開発] > [ETL] を選択します。

  3. [新規] をクリックします。 [新規ドラフト] ダイアログボックスで、[SQL スクリプト] > [空のストリームドラフト] を選択し、[次へ] をクリックします。

    image

  4. 名前を入力し、[作成] をクリックします。次の SQL をコピーして、ドラフトに一時テーブルを作成します。

    CREATE TEMPORARY TABLE sls_input_complex (
      errorCode STRING,
      errorMessage STRING,
      fileName STRING,
      fileNo STRING,
      httpCode STRING,
      requestID STRING,
      scheduleType STRING,
      serviceHost STRING,
      project STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='ap-southeast-1-intranet.log.aliyuncs.com',
      'accessId' = '${yourAccessKeyID}',
      'accessKey' = '${yourAccessKeySecret}',
      'starttime' = '2024-02-01 10:30:00',
      'project' ='${project}',
      'logstore' ='${logtore}',
      'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project'
      );

    SQL ステートメントのパラメーターについて以下に説明します。必要に応じて置き換えてください。

    パラメーター

    説明

    connector

    サポートされているコネクタ」をご参照ください。

    sls

    endpoint

    Simple Log Service プロジェクトへのアクセスに使用する内部エンドポイント。取得方法については、「エンドポイントを表示する」をご参照ください。

    ap-southeast-1-intranet.log.aliyuncs.com

    accessId

    ユーザーを識別するために使用される AccessKey ID。詳細については、「AccessKey ペアを作成する」をご参照ください。

    LTAI****************

    accessKey

    ユーザーの ID を検証するために使用される AccessKey シークレット。詳細については、「AccessKey ペアを作成する」をご参照ください。

    yourAccessKeySecret

    starttime

    ログのクエリを開始する時刻。

    2025-02-19 00:00:00

    project

    Simple Log Service プロジェクトの名前。

    test-project

    logstore

    Simple Log Service ログストアの名前。

    clb-access-log

    query

    SPL ステートメントを入力します。文字列は単一引用符を使用してエスケープする必要があることに注意してください。

    * | where slbid = ''slb-01''

    説明

    ここで、'' は文字列内に埋め込まれた単一引用符を表します。

  5. SQL を選択し、右クリックして、[実行] を選択して Simple Log Service に接続します。

    image

3. クエリを実行して結果を表示する

  1. 次の分析ステートメントをドラフトにコピーして、slbid による集計クエリを実行します。

    SELECT * FROM sls_input_complex;
  2. 右上隅にある [デバッグ] をクリックします。デバッグダイアログボックスで、[新しいセッションクラスタを作成] ドロップダウンリストから [セッションクラスタ] を選択します。次の図を参考に、新しいデバッグクラスタを作成します。

    image

  3. デバッグダイアログボックスで、作成したデバッグクラスタを選択し、[OK] をクリックします。

    image

  4. [結果] 領域で、テーブルの列の値を表示します。これは、SPL によって処理された結果を反映しています。SPL によって生成されたフィールドの最終的なリストは、テーブルのフィールドと一致します。

    image