Container Service for Kubernetes (ACK) クラスター内で実行される Spark ジョブは、多数の Pod に分散してログを出力するため、ログの集中管理が困難です。Simple Log Service (SLS) と連携することで、以下の機能を利用できます。
すべての Spark ドライバーおよびエグゼキュータ Pod から構造化ログを自動的に収集します。
指定した時間範囲内で Spark ジョブのログをクエリおよび分析します。
アプリケーション名、バージョン、ロール、送信 ID でログをフィルター処理します。
本トピックでは、ログ収集パイプライン全体の構成手順について説明します。具体的には、構造化ロギング対応の Spark コンテナイメージのビルド、Pod ログを収集するための Logtail 構成のデプロイ、および Simple Log Service における結果のクエリ方法です。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
ack-spark-operator コンポーネントがインストール済みです。詳細については、「手順 1: ack-spark-operator コンポーネントのインストール」をご参照ください。
Simple Log Service プロジェクトが作成済みです。詳細については、「プロジェクトの管理」をご参照ください。
Logtail コンポーネントが ACK クラスターにインストール済みです。詳細については、「ACK クラスターに Logtail コンポーネントをインストールする」をご参照ください。
カスタム Spark イメージをプッシュするためのコンテナイメージレジストリが利用可能であること。
仕組み
このパイプラインは、以下の手順で動作します。
カスタム Spark コンテナイメージに Log4j2 JSON テンプレートレイアウトライブラリを追加し、構造化された JSONL 形式の出力を有効にします。
ConfigMap を使用して Log4j2 を構成し、Elastic Common Schema (ECS) テンプレートに基づき、標準出力(stdout)および
/opt/spark/logs/spark.logへのファイル出力の両方で JSONL 形式のログを記録します。Logtail 構成(AliyunLogConfig)により、Spark Operator の Pod ラベルおよびコンテナ名パターンに一致するコンテナからログを収集し、JSONL 形式のフィールドを解析してタイムスタンプを抽出します。
Spark ジョブが実行されると、そのログは指定された Logstore に格納され、クエリおよび分析が可能になります。
ステップ 1:Spark コンテナイメージのビルド
以下の Dockerfile を作成します。この例では Spark 3.5.3 を使用し、Spark のクラスパスに log4j-layout-template-json 依存関係を追加することで、JsonTemplateLayout アペンダーによる JSONL 出力を有効にします。
ARG SPARK_IMAGE=<SPARK_IMAGE> # <SPARK_IMAGE> をご利用の Spark ベースイメージに置き換えます。
FROM ${SPARK_IMAGE}
# log4j-layout-template-json 依存関係の追加
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-layout-template-json/2.24.1/log4j-layout-template-json-2.24.1.jar ${SPARK_HOME}/jarsイメージをビルドしてレジストリにプッシュします。
docker build -t <your-registry>/<your-image-name>:<tag> .
docker push <your-registry>/<your-image-name>:<tag>プレースホルダーを実際のレジストリおよびイメージ情報に置き換えてください。
ステップ 2:Log4j2 ログの構成
spark-log-conf.yaml という名前のファイルを、以下の内容で作成します。この ConfigMap は、ログレベルを INFO に設定し、コンソールおよびファイルアペンダーの両方を、ECS テンプレートを使用して JSONL フォーマットでログを出力するように構成します。詳細については、「Log4j ログの収集」をご参照ください。
apiVersion: v1
kind: ConfigMap
metadata:
name: spark-log-conf
namespace: default
data:
log4j2.properties: |
# すべてのログをコンソールおよびファイルに出力するよう設定
rootLogger.level = info
rootLogger.appenderRefs = console, file
rootLogger.appenderRef.console.ref = STDOUT
rootLogger.appenderRef.file.ref = FileAppender
appender.console.name = STDOUT
appender.console.type = Console
appender.console.layout.type = JsonTemplateLayout
appender.console.layout.eventTemplateUri = classpath:EcsLayout.json
appender.file.name = FileAppender
appender.file.type = File
appender.file.fileName = /opt/spark/logs/spark.log
appender.file.layout.type = JsonTemplateLayout
appender.file.layout.eventTemplateUri = classpath:EcsLayout.jsonConfigMap を適用します。
kubectl apply -f spark-log-conf.yaml期待される出力:
configmap/spark-log-conf createdステップ 3:Logtail 構成の作成
以下の内容で aliyun-log-config.yaml というファイルを作成します。<SLS_PROJECT> をご利用の Simple Log Service プロジェクト名に、<SLS_LOGSTORE> をご利用の Logstore 名に置き換えてください。Logstore が存在しない場合、Simple Log Service が自動的に作成します。
この構成は、Spark Operator によって起動された Pod を、ラベル sparkoperator.k8s.io/launched-by-spark-operator: "true" を使用してフィルタリングします。このラベルは、Spark Operator がすべてのドライバーおよびエグゼキュータの Pod に自動的に設定します。ログは /opt/spark/logs から収集され、JSON 形式で解析され、@timestamp フィールドがログのタイムスタンプとして抽出されます。AliyunLogConfig フィールドについて詳しくは、「AliyunLogConfig を使用した Logtail 構成の管理」をご参照ください。
apiVersion: log.alibabacloud.com/v1alpha1
kind: AliyunLogConfig
metadata:
name: spark
namespace: default
spec:
# (任意)プロジェクト名。デフォルト値:k8s-log-<Your_Cluster_ID>
project: <SLS_PROJECT>
# Logstore 名。指定した Logstore が存在しない場合、Simple Log Service が自動的に作成します。
logstore: <SLS_LOGSTORE>
# Logtail 構成。
logtailConfig:
# Logtail 構成の名前。
configName: spark
# データソースの種類。値「file」はテキストログを指定します。
inputType: file
# ログ入力の構成。
inputDetail:
# ログファイルの配置ディレクトリ。
logPath: /opt/spark/logs
# ログファイル名。ワイルドカード文字が使用可能です。
filePattern: '*.log'
# ログファイルのエンコーディング。
fileEncoding: utf8
# ログタイプ。
logType: json_log
localStorage: true
key:
- content
logBeginRegex: .*
logTimezone: ''
discardNonUtf8: false
discardUnmatch: true
preserve: true
preserveDepth: 0
regex: (.*)
outputType: LogService
topicFormat: none
adjustTimezone: false
enableRawLog: false
# コンテナからのテキストログの収集。
dockerFile: true
# 高度な構成。
advanced:
# コンテナメタデータのプレビュー。
collect_containers_flag: true
# Kubernetes 上の Logtail 構成。
k8s:
# タグに基づいて Pod をフィルター処理。
IncludeK8sLabel:
sparkoperator.k8s.io/launched-by-spark-operator: "true"
# コンテナ名に基づいてコンテナをフィルター処理。
K8sContainerRegex: "^spark-kubernetes-(driver|executor)$"
# 追加のログタグ構成。
ExternalK8sLabelTag:
spark-app-name: spark-app-name
spark-version: spark-version
spark-role: spark-role
spark-app-selector: spark-app-selector
sparkoperator.k8s.io/submission-id: sparkoperator.k8s.io/submission-id
# ログ処理プラグイン。
plugin:
processors:
# ログの分離。
- type: processor_split_log_string
detail:
SplitKey: content
SplitSep: ''
# JSON フィールドの解析。
- type: processor_json
detail:
ExpandArray: false
ExpandConnector: ''
ExpandDepth: 0
IgnoreFirstConnector: false
SourceKey: content
KeepSource: false
KeepSourceIfParseError: true
NoKeyError: false
UseSourceKeyAsPrefix: false
# ログタイムスタンプの抽出。
- type: processor_strptime
detail:
SourceKey: '@timestamp'
Format: '%Y-%m-%dT%H:%M:%S.%fZ'
KeepSource: false
AdjustUTCOffset: true
UTCOffset: 0
AlarmIfFail: false構成を適用します。
kubectl apply -f aliyun-log-config.yamlLogstore および Logtail 構成が正しく作成されたかを確認するには、以下の手順を実行します。
Simple Log Service コンソール にログインします。
プロジェクト セクションで、ご利用のプロジェクトをクリックします。

ログストレージ > Logstore を選択します。対象の Logstore の横にある \> アイコンをクリックし、データインポート > Logtail 構成 を選択します。

Logtail 構成をクリックして、詳細を表示します。
ステップ 4:サンプル Spark ジョブの送信
以下の内容で spark-pi.yaml というファイルを作成します。sparkConfigMap フィールドは、ステップ 2 で作成した ConfigMap を参照し、Log4j2 構成を Spark Pod に挿入します。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE>
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
arguments:
- "5000"
sparkVersion: 3.5.3
sparkConfigMap: spark-log-conf
driver:
cores: 1
memory: 512m
serviceAccount: spark-operator-spark
executor:
instances: 1
cores: 1
memory: 4gジョブを送信します。
kubectl apply -f spark-pi.yamlジョブの実行が完了したら、ドライバーコンテナのログの最後の 10 行を確認し、JSONL 形式の出力が正しく生成されているかを検証します。
kubectl logs --tail=10 spark-pi-driver期待される出力:
{"@timestamp":"2024-11-20T11:45:48.487Z","ecs.version":"1.2.0","log.level":"WARN","message":"Kubernetes client has been closed.","process.thread.name":"-937428334-pool-19-thread-1","log.logger":"org.apache.spark.scheduler.cluster.k8s.ExecutorPodsWatchSnapshotSource"}
{"@timestamp":"2024-11-20T11:45:48.585Z","ecs.version":"1.2.0","log.level":"INFO","message":"MapOutputTrackerMasterEndpoint stopped!","process.thread.name":"dispatcher-event-loop-7","log.logger":"org.apache.spark.MapOutputTrackerMasterEndpoint"}
{"@timestamp":"2024-11-20T11:45:48.592Z","ecs.version":"1.2.0","log.level":"INFO","message":"MemoryStore cleared","process.thread.name":"main","log.logger":"org.apache.spark.storage.memory.MemoryStore"}
{"@timestamp":"2024-11-20T11:45:48.592Z","ecs.version":"1.2.0","log.level":"INFO","message":"BlockManager stopped","process.thread.name":"main","log.logger":"org.apache.spark.storage.BlockManager"}
{"@timestamp":"2024-11-20T11:45:48.596Z","ecs.version":"1.2.0","log.level":"INFO","message":"BlockManagerMaster stopped","process.thread.name":"main","log.logger":"org.apache.spark.storage.BlockManagerMaster"}
{"@timestamp":"2024-11-20T11:45:48.598Z","ecs.version":"1.2.0","log.level":"INFO","message":"OutputCommitCoordinator stopped!","process.thread.name":"dispatcher-event-loop-1","log.logger":"org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint"}
{"@timestamp":"2024-11-20T11:45:48.602Z","ecs.version":"1.2.0","log.level":"INFO","message":"Successfully stopped SparkContext","process.thread.name":"main","log.logger":"org.apache.spark.SparkContext"}
{"@timestamp":"2024-11-20T11:45:48.604Z","ecs.version":"1.2.0","log.level":"INFO","message":"Shutdown hook called","process.thread.name":"shutdown-hook-0","log.logger":"org.apache.spark.util.ShutdownHookManager"}
{"@timestamp":"2024-11-20T11:45:48.604Z","ecs.version":"1.2.0","log.level":"INFO","message":"Deleting directory /var/data/spark-f783cf2e-44db-452c-83c9-738f9c894ef9/spark-2caa5814-bd32-431c-a9f9-a32208b34fbb","process.thread.name":"shutdown-hook-0","log.logger":"org.apache.spark.util.ShutdownHookManager"}
{"@timestamp":"2024-11-20T11:45:48.606Z","ecs.version":"1.2.0","log.level":"INFO","message":"Deleting directory /tmp/spark-dacdfd95-f166-4b23-9312-af9052730417","process.thread.name":"shutdown-hook-0","log.logger":"org.apache.spark.util.ShutdownHookManager"}各 JSONL ログエントリには、以下のフィールドが含まれます。
| フィールド | 説明 |
|---|---|
@timestamp | ログが生成された時刻 |
ecs.version | ECS(Elastic Common Schema)のバージョン |
log.level | INFO や WARN などのログレベル |
message | ログメッセージ |
process.thread.name | ログを生成したスレッドの名前 |
log.logger | ログを記録したロガーの名前 |
ステップ 5:ログ収集の確認
Spark ジョブの実行が完了したら、分析に使用する前にログが正しく Logstore に到達しているかを確認します。
Simple Log Service コンソール にログインし、ご利用のプロジェクトを開きます。Logstore でジョブ実行期間を含む時間範囲を設定し、クエリを実行します。上記で説明した ECS フィールドを含む JSONL 形式のエントリが表示されるはずです。

ログが表示されない場合は、以下の点を確認してください。
Logtail 構成が有効であり、Logstore 名が AliyunLogConfig 内の
<SLS_LOGSTORE>と一致していること。Spark Pod に
sparkoperator.k8s.io/launched-by-spark-operator: "true"というラベルが付与されていること(Spark Operator が自動的に付与)。ドライバーまたはエグゼキュータ Pod 内の
/opt/spark/logs/spark.logにログファイルが存在すること。
ステップ 6:Spark ログのクエリおよび分析
ログが Simple Log Service に正常に流入している状態では、クエリと分析 機能を活用して、時間範囲、ログレベル、アプリケーション名、または Spark ロール(ドライバー/エグゼキュータ)ごとに Spark ジョブのログをフィルターおよび集計できます。
(任意)ステップ 7:クリーンアップ
テスト後に不要なコストを回避するため、リソースを削除します。
Spark ジョブを削除します。
kubectl delete -f spark-pi.yamlLogtail 構成を削除します。
kubectl delete -f aliyun-log-config.yamlLog4j2 ConfigMap を削除します。
kubectl delete -f spark-log-conf.yaml