すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Flink SQL基于SPL实现弱结构化分析

最終更新日:Jun 18, 2026

Flink SQL と Simple Log Service Processing Language (SPL) を使用すると、中間 Logstore や一時テーブルを作成することなく、半構造化ログデータを構造化フィールドに解析して分析できます。

背景情報

Simple Log Service (SLS) は、ログ、メトリクス、トレース向けに大規模、低コスト、リアルタイムのサービスを提供するクラウドネイティブの可観測性・分析プラットフォームです。システムログ、ビジネスログ、その他のデータを SLS に収集して保存・分析できます。Realtime Compute for Apache Flink は、Alibaba Cloud が Apache Flink をベースに構築したビッグデータ分析プラットフォームであり、リアルタイムデータ分析やリスク監視に広く使用されています。Realtime Compute for Apache Flink は SLS コネクタをネイティブでサポートしており、SLS をソーステーブルまたは結果テーブルとして使用できます。

Realtime Compute for Apache Flink の SLS コネクタは、構造化ログを直接処理し、ログフィールドと Flink SQL テーブル列を 1 対 1 でマッピングします。しかし、多くのビジネスログは完全に構造化されていません。たとえば、すべてのログコンテンツが単一のフィールドに書き込まれており、構造化フィールドを抽出するには正規表現や区切り文字による分割が必要になる場合があります。本トピックでは、ログクレンジングと形式の正規化を含め、SLS コネクタで SPL を使用してこのようなデータを構造化する方法を説明します。

半構造化ログデータ

次のサンプルログは、JSON 文字列と他のデータ型が混在した複雑な形式です。このログには次の内容が含まれています。

  • Payload フィールドは JSON 文字列で、ネストされた schedule フィールドも JSON 構造です。

  • requestURL フィールドは標準的な URL パスです。

  • error フィールドは文字列 CouldNotExecuteQuery で始まり、その後に JSON 構造が続きます。

  • __tag__:__path__ フィールドにはログファイルパスが含まれており、service_a がサービス名である可能性があります。

  • caller フィールドにはファイル名と行番号が含まれています。

{
  "Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
  "TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
  "TaskType": "ALERT",
  "__source__": "11.199.XXX.XXX",
  "__tag__:__hostname__": "iabcde12345.cloud.abc121",
  "__tag__:__path__": "/var/log/service_a.LOG",
  "caller": "executor/pool.go:64",
  "error": "CouldNotExecuteQuery : {\n    \"httpCode\": 404,\n    \"errorCode\": \"LogStoreNotExist\",\n    \"errorMessage\": \"logstore k8s-event does not exist\",\n    \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
  "requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
  "ts": "2024-01-29 22:57:13"
}

データ構造化の要件

これらのログから有用な情報を抽出するには、主要なフィールドを抽出してデータを変換する必要があります。

  • error フィールドから、httpCodeerrorCodeerrorMessagerequestID を抽出します。

  • __tag__:__path__ フィールドから、service_aserviceName として抽出します。

  • caller フィールドから、pool.gofileName として、64 を fileNo として抽出します。

  • Payload フィールドから project を抽出します。Payload 内のネストされた schedule オブジェクトから、type を抽出し、scheduleType という名前にします。

  • [__source__] フィールドを serviceIP に名前変更します。

その他のフィールドはすべて破棄されます。最終的に必要なフィールドのリストは、httpCode、errorCode、errorMessage、requestID、serviceName、fileName、fileNo、project、scheduleType、serviceIP です。

ソリューション

データを変換する方法はいくつかあります。このセクションでは、SLS と Flink を使用するソリューションを比較します。これらはそれぞれ異なるシナリオに適しています。

  • データ変換ソリューション:SLS コンソールでデータ変換ジョブを作成してデータをクレンジングし、ターゲット Logstore に保存します。

  • Flink ソリューション:ソーステーブルで errorpayload をフィールドとして定義します。Flink SQL の正規表現関数と JSON 関数を使用してこれらのフィールドを解析し、結果を一時テーブルに書き込んでから、そのテーブルで分析を実行します。

  • SPL ソリューション:Flink SLS コネクタで SPL ステートメントを構成してデータを変換します。その後、Flink ソーステーブルは最終的な構造化スキーマで定義されます。

SLS コネクタで SPL を構成することは、より軽量なアプローチです。半構造化ログデータの場合、SPL ソリューションでは中間 Logstore の作成 (データ変換ソリューションで必要) や Flink での一時テーブルの作成 (Flink ソリューションで必要) を回避できます。ソースに近い場所でデータ変換を実行することで、コンピューティングプラットフォームではビジネスロジックに集中でき、責任の明確な分離が可能になります。

Flink での SPL の使用

1. SLS でのデータ準備

  1. Simple Log Service をアクティブ化し、プロジェクトと Logstore を作成していることを確認してください。

  2. SDK を使用して前述のログスニペットをターゲット Logstore に書き込み、サンプルデータをシミュレートします。

    データが書き込まれたら、Simple Log Service コンソールの [生ログ] タブに移動して、シミュレートされたデータを表示します。ログのコンテンツには、TaskType (値は ALERT)、Payload (alertNameprojectaliuid を含む)、エラー情報 (エラーコード LogStoreNotExist、メッセージ logstore k8s-event does not exist、HTTP ステータス 404) などのフィールドが含まれています。

  3. Logstore で、SLS SPL パイプラインステートメントを記述し、結果をプレビューします。

    SPL ステートメントを実行すると、[生ログ] タブに解析されたログレコードが表示されます。これには、errorCodeerrorMessagefileNamefileNohttpCodeprojectrequestIDscheduleTypeserviceNameserviceIP などの構造化フィールドが含まれています。

    SPL クエリステートメントは次のとおりです。SPL パイプライン構文では、パイプ (|) でコマンドを区切ります。一度に 1 つのコマンドを入力して即座に結果を確認し、さらにパイプを追加して最終的なクエリを反復的に構築できます。詳細については、「スキャンクエリ構文」をご参照ください。

    * | project Payload, error, "__tag__:__path__", caller, __source__
     | parse-json Payload 
     | project-away Payload 
     | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson 
     | parse-json errorJson 
     | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName 
     | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo 
     | project-rename serviceIP=__source__
     | extend scheduleType = json_extract_scalar(schedule, '$.type') 
     | project httpCode, errorCode, errorMessage, requestID, serviceName, fileName, fileNo, project, scheduleType, serviceIP

    構文の説明:

    • 1 行目:project コマンドは、Payload、error、__tag__:__path__、caller、__source__ フィールドを保持し、その他すべてのフィールドを破棄します。

    • 2 行目:parse-json コマンドは、Payload 文字列を JSON オブジェクトに展開します。lastNotified、serverUri、jobID などのトップレベルフィールドが結果に追加されます。

    • 3 行目:project-away コマンドは、元の Payload フィールドを削除します。

    • 4 行目:parse-regexp コマンドは、正規表現を使用して error フィールドの JSON 部分を抽出し、errorJson という名前の新しいフィールドに割り当てます。

    • 5 行目:parse-json コマンドは、errorJson フィールドを展開し、httpCode、errorCode、errorMessage を抽出します。

    • 6 行目:parse-regexp コマンドは、正規表現を使用して __tag__:__path__ からファイル名を抽出し、serviceName という名前にします。

    • 7 行目:parse-regexp コマンドは、正規表現を使用して caller フィールドからファイル名と行番号を抽出し、fileName フィールドと fileNo フィールドに割り当てます。

    • 8 行目:project-rename コマンドは、__source__ フィールドを serviceIP に名前変更します。

    • 9 行目:extend コマンドは、json_extract_scalar 関数を使用して schedule オブジェクトから type フィールドを抽出し、scheduleType という名前にします。

    • 10 行目:project コマンドは、Payload から抽出された project フィールドを含め、最終的に必要なフィールドのみを保持します。

2. SQL ジョブの作成

  1. Realtime Compute for Apache Flink コンソールにログインし、対象のワークスペースをクリックします。

  2. 左側のナビゲーションペインで、Data Development > [ETL] を選択します。

  3. 作成 をクリックします。[新規下書き] ダイアログボックスで、[SQL スクリプト] > [空白のストリームジョブの下書き] を選択し、次へ をクリックします。

  4. 下書きエディターで、次のステートメントを入力して一時テーブルを作成します。

    CREATE TEMPORARY TABLE sls_input_complex (
      httpCode STRING,
      errorCode STRING,
      errorMessage STRING,
      requestID STRING,
      serviceName STRING,
      fileName STRING,
      fileNo STRING,
      project STRING,
      scheduleType STRING,
      serviceIP STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' ='cn-beijing-intranet.log.aliyuncs.com',
      'accessId' = '${yourAccessKeyID}',
      'accessKey' = '${yourAccessKeySecret}',
      'starttime' = '2024-02-01 10:30:00',
      'project' ='${project}',
      'logstore' ='${logstore}',
      'query' = '* | project Payload, error, "__tag__:__path__", caller, __source__ | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceIP=__source__ | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode, errorMessage, requestID, serviceName, fileName, fileNo, project, scheduleType, serviceIP'
      );

    次の表は、WITH 句のパラメーターについて説明しています。例の値を実際の値に置き換えてください。

    パラメータ

    説明

    connector

    コネクタのタイプです。詳細については、「サポートされているコネクタ」をご参照ください。

    sls

    endpoint

    SLS の内部エンドポイントです。詳細については、「エンドポイント」をご参照ください。

    cn-hangzhou-intranet.log.aliyuncs.com

    accessId

    アクセスキー ID です。詳細については、「アクセスキーを作成」をご参照ください。

    LTAI**************

    accessKey

    アクセスキーシークレットです。詳細については、「アクセスキーを作成」をご参照ください。

    yourAccessKeySecret

    starttime

    ログ消費の開始時刻です。

    2025-02-19 00:00:00

    project

    SLS プロジェクト名です。

    test-project

    logstore

    SLS Logstore 名です。

    clb-access-log

    query

    SPL ステートメントです。Flink SQL では、文字列リテラルを単一引用符 2 つ ('') でエスケープします。

    * | where slbid = ''slb-01''

  5. SQL ステートメントを選択して右クリックし、実行 を選択して Simple Log Service に接続します。

    CREATE TEMPORARY TABLE sls_input_complex (
      httpCode STRING,
      errorCode STRING,
      errorMessage STRING,
      requestID STRING,
      serviceName STRING,
      fileName STRING,
      fileNo STRING,
      project STRING,
      scheduleType STRING,
      serviceIP STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'sls',
      'endpoint' = 'cn-hxxx',
      'accessId' = 'xxx',
      'accessKey' = 'xxx',
      'starttime' = 'xxx',
      'project' = 'xxx',
      'logstore' = 'clb7xxx',
      'query' = '* | prxxx", caller, __source__ | parxxx ... | project httpCode, errorCode, ...'
    );

3. クエリの実行と結果の確認

  1. ジョブエディターで、次のステートメントを入力してデータをクエリします。

    SELECT * FROM sls_input_complex;
  2. 右上の [Debug] をクリックします。ダイアログボックスで、[Session Cluster] ドロップダウンリストから [Create new session cluster] を選択し、次のように設定します。

    [Name]demo-test[Deployment Target]default-queue[Status]RUNNING[Engine Version]vvr-8.0.11-flink-1.17 に設定し、[Create Session Cluster] をクリックします。

  3. デバッグダイアログボックスで、作成したセッションクラスターを選択し、OK をクリックします。

    注:SLS ソーステーブルを使用してデバッグすると、コンシューマーグループのオフセットが進みます。デプロイ済みのジョブは、この新しいオフセットから再開します。

  4. [結果] タブで、テーブルの各列が SPL クエリで処理されたフィールドに対応していることを確認できます。

    クエリを [デバッグ] すると、結果テーブルに解析されたフィールドが表示されます。たとえば、errorCode 列は LogStoreNotExisterrorMessage 列は logstore k8s-event does not existhttpCode 列は 404 です。