このトピックでは、Flink SQL で Simple Log Service 処理言語 (SPL) に基づいて半構造化分析を実装する方法について、例を用いて説明します。
背景情報
Simple Log Service は、可観測性と分析のためのクラウドネイティブプラットフォームであり、ログ、メトリック、トレースのコスト効率の高いリアルタイム処理を可能にします。データアクセスを簡素化し、システムログとビジネスログの取り込み、保存、分析を容易にします。
Apache Flink 上に構築された Realtime Compute for Apache Flink は、リアルタイムのデータ分析とリスク監視に適したビッグデータ分析プラットフォームです。Simple Log Service コネクタをネイティブでサポートしており、サービスをソーステーブルまたは結果テーブルとして使用できます。
このコネクタは、構造化ログの処理を効率化し、Simple Log Service ログフィールドを Flink SQL テーブルフィールドに直接マッピングできるようにします。単一のフィールドにすべてのコンテンツが含まれる半構造化ログの場合、構造化データを抽出するには、正規表現やデリミタなどのメソッドが必要です。このトピックでは、データ構造化のためのコネクタを構成するために SPL を使用するソリューションについて説明し、ログクレンジングとフォーマット正規化に焦点を当てます。
半構造化ログデータ
JSON 文字列と混合コンテンツを含む複雑な形式のログの例を考えてみましょう。ログには、次の要素が含まれています。
Payload: JSON 文字列。scheduleフィールドも JSON 形式です。requestURL: 標準の URL パス。error: 文字列CouldNotExecuteQueryで始まり、JSON 構造が続きます。__tag__:__path__: ログファイルパスを表します。service_aはサービス名を示している可能性があります。caller: ファイル名と行番号が含まれています。
{
"Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
"TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
"TaskType": "ALERT",
"__source__": "11.199.XX.XXX",
"__tag__:__hostname__": "iabcde12345.cloud.abc121",
"__tag__:__path__": "/var/log/service_a.LOG",
"caller": "executor/pool.go:64",
"error": "CouldNotExecuteQuery : {\n \"httpCode\": 404,\n \"errorCode\": \"LogStoreNotExist\",\n \"errorMessage\": \"logstore k8s-event does not exist\",\n \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
"requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
"ts": "2024-01-29 22:57:13"
}構造化データ処理の要件構造化データ処理
これらのログから価値のある洞察を得るには、データクレンジングが不可欠です。まず、分析のために主要なフィールドを抽出する必要があります。これは Flink で行われます。フィールド抽出の具体的な要件は次のとおりです。
errorフィールドからhttpCode、errorCode、errorMessage、requestIDを抽出します。__tag__:__path_からservice_aをserviceNameとして抽出します。callerからpool.goをfileNameとして、64をfileNoとして抽出します。Payloadフィールドからprojectを抽出し、Payload内のscheduleからtypeをscheduleTypeとして抽出します。__source__をserviceIPに名前変更します。
必要なフィールドの最終的なリストは次のとおりです。

ソリューション
データクレンジングにはいくつかのソリューションがあり、それぞれ特定のシナリオに適しています。
データ変換ソリューション: Simple Log Service コンソールでターゲットログストアを作成し、クレンジング用のデータ変換タスクを作成します。
Flink ソリューション:
errorとpayloadをソーステーブルフィールドとして定義します。SQL 正規関数と JSON 関数を使用してこれらのフィールドを解析し、解析されたデータを一時テーブルに挿入し、テーブルで分析を実行します。SPL ソリューション: Realtime Compute for Apache Flink で Simple Log Service コネクタの SPL ステートメントを構成してデータをクレンジングします。クレンジングされたデータ構造に従って、Flink でソーステーブルフィールドを定義します。
これらのオプションの中で、SPL ソリューションは、より効率的なデータクレンジングアプローチを提供します。特に半構造化ログデータの場合、中間ログストアや一時テーブルが不要になります。ソースに近い場所でクレンジングを実行することにより、コンピューティングプラットフォームはビジネスロジックに集中できるようになり、責任の明確な分離が実現します。
SPL ソリューション
1. Simple Log Service でデータを準備する
Simple Log Service Java SDK を使用して、ログの例をサンプルのアナログデータとしてターゲットログストアに書き込みます。他の言語の SDK については、対応する SDK リファレンスを参照してください。

ログストアで、SPL パイプライン構文を記述し、効果をプレビューします。

クエリ文は次のとおりです。SPL パイプライン構文では、
|区切り文字を使用して命令を区切ります。各命令を入力した直後に結果を確認し、パイプラインを段階的に追加して最終結果を反復的に達成できます。詳細については、「スキャンベースクエリの構文」をご参照ください。* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, '$.type') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project構文について以下に説明します。
project: 元の結果からPayload、error、tag:path、callerフィールドを保持し、その他は破棄して後続の解析を容易にします。parse-json:Payload文字列を JSON に変換し、lastNotified、serviceUri、jobIDなどの第 1 レベルのフィールドを生成します。project-away: 元のPayloadフィールドを削除します。parse-regexp:errorフィールドから JSON コンテンツの一部を抽出し、errorJsonに格納します。parse-json:errorJsonフィールドを展開して、httpCode、errorCode、errorMessageなどのフィールドを取得します。parse-regexp: 正規表現を使用して__tag__:__path__からファイル名を抽出し、serviceNameに割り当てます。parse-regexp:callerからファイル名と行番号を抽出し、それぞれfileNameフィールドとfileNoフィールドに配置します。project-rename:__tag__:__hostname__フィールドの名前を serviceHost に変更します。extend:json_extract_scalar関数を使用してscheduleからtypeフィールドを抽出し、scheduleTypeという名前を付けます。project:Payloadのprojectフィールドを含む、必要なフィールドリストを保持します。
2. SQL ジョブを作成する
Realtime Compute for Apache Flink コンソール にログインし、ターゲットワークスペースをクリックします。
重要ターゲットワークスペースと Simple Log Service プロジェクトは同じリージョンにある必要があります。
左側のナビゲーションウィンドウで、 を選択します。
[新規] をクリックします。 [新規ドラフト] ダイアログボックスで、 を選択し、[次へ] をクリックします。

名前を入力し、[作成] をクリックします。次の SQL をコピーして、ドラフトに一時テーブルを作成します。
CREATE TEMPORARY TABLE sls_input_complex ( errorCode STRING, errorMessage STRING, fileName STRING, fileNo STRING, httpCode STRING, requestID STRING, scheduleType STRING, serviceHost STRING, project STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'sls', 'endpoint' ='ap-southeast-1-intranet.log.aliyuncs.com', 'accessId' = '${yourAccessKeyID}', 'accessKey' = '${yourAccessKeySecret}', 'starttime' = '2024-02-01 10:30:00', 'project' ='${project}', 'logstore' ='${logtore}', 'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project' );SQL ステートメントのパラメーターについて以下に説明します。必要に応じて置き換えてください。
パラメーター
説明
例
connector
「サポートされているコネクタ」をご参照ください。
sls
endpoint
Simple Log Service プロジェクトへのアクセスに使用する内部エンドポイント。取得方法については、「エンドポイントを表示する」をご参照ください。
ap-southeast-1-intranet.log.aliyuncs.com
accessId
ユーザーを識別するために使用される AccessKey ID。詳細については、「AccessKey ペアを作成する」をご参照ください。
LTAI****************
accessKey
ユーザーの ID を検証するために使用される AccessKey シークレット。詳細については、「AccessKey ペアを作成する」をご参照ください。
yourAccessKeySecret
starttime
ログのクエリを開始する時刻。
2025-02-19 00:00:00
project
Simple Log Service プロジェクトの名前。
test-project
logstore
Simple Log Service ログストアの名前。
clb-access-log
query
SPL ステートメントを入力します。文字列は単一引用符を使用してエスケープする必要があることに注意してください。
* | where slbid = ''slb-01''
説明ここで、
''は文字列内に埋め込まれた単一引用符を表します。SQL を選択し、右クリックして、[実行] を選択して Simple Log Service に接続します。

3. クエリを実行して結果を表示する
次の分析ステートメントをドラフトにコピーして、
slbidによる集計クエリを実行します。SELECT * FROM sls_input_complex;右上隅にある [デバッグ] をクリックします。デバッグダイアログボックスで、[新しいセッションクラスタを作成] ドロップダウンリストから [セッションクラスタ] を選択します。次の図を参考に、新しいデバッグクラスタを作成します。

デバッグダイアログボックスで、作成したデバッグクラスタを選択し、[OK] をクリックします。

[結果] 領域で、テーブルの列の値を表示します。これは、SPL によって処理された結果を反映しています。SPL によって生成されたフィールドの最終的なリストは、テーブルのフィールドと一致します。
