このトピックでは、Spark DataFrame API 操作を呼び出して、ログサービスのデータを使用するストリーミングジョブを開発する方法について説明します。
Scala で Spark Structured Streaming を使用して LogHub にアクセスする
サンプルコード
## StructuredLoghubSample.Scala
object StructuredLoghubSample {
def main(args: Array[String]) {
if (args.length < 7) {
System.err.println("Usage: StructuredLoghubSample <logService-project> " +
"<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
"<starting-offsets> <max-offsets-per-trigger>[outputPath] [<checkpoint-location>]")
System.exit(1)
}
val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, outputPath, _*) = args
val checkpointLocation =
if (args.length > 8) args(8) else "/tmp/temporary-" + UUID.randomUUID.toString
val spark = SparkSession
.builder
.appName("StructuredLoghubSample")
.getOrCreate()
import spark.implicits._
// 入力ストリームを表す DataSet を loghub から作成します
val lines = spark
.readStream
.format("loghub")
.option("sls.project", project)
.option("sls.store", logStore)
.option("access.key.id", accessKeyId)
.option("access.key.secret", accessKeySecret)
.option("endpoint", endpoint)
.option("startingoffsets", startingOffsets)
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(__value__ AS STRING)")
.as[String]
val query = lines.writeStream
.format("parquet")
.option("checkpointLocation", checkpointLocation)
.option("path", outputPath)
.outputMode("append")
.trigger(Trigger.ProcessingTime(30000))
.start()
query.awaitTermination()
}
}
Maven プロジェクト・オブジェクト・モデル(POM)ファイルの詳細については、「aliyun-emapreduce-demo」をご参照ください。
コードをコンパイルして実行する
## コードをコンパイルするコマンドを実行します。
mvn clean package -DskipTests
## コードがコンパイルされると、ジョブの JAR ファイルは target ディレクトリに保存されます。
## ジョブを送信して実行します。
spark-submit --master --master yarn --deploy-mode cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g \
--num-executors 2 --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --class x.x.x.StructuredLoghubSample xxx.jar \
<logService-project> <logService-store> <access-key-id> <access-key-secret> <endpoint> \
<starting-offsets><max-offsets-per-trigger> <output-path> <checkpoint-location>
ジョブリソースの構成は、実際のデータサイズとクラスタ規模に基づいて調整する必要があります。 クラスタの仕様が低い場合、上記のコードのコマンドを実行してもジョブを実行できない可能性があります。
以下の情報は、実際の環境に基づいて置き換えてください。
x.x.x.StructuredLoghubSample
:x.x.x
は、使用環境における StructuredLoghubSample クラスのパッケージ名を示します。xxx.jar
: プロジェクトの JAR ファイル。<starting-offsets>
: データの取得を開始する位置。 有効な値: earliest および latest。<max-offsets-per-trigger>
: 各トリガー期間中に取得されるメッセージの最大数、またはデータの最大オフセット。<output-path>
: 出力データが保存されるディレクトリ。 例: /loghub/data/。<checkpoint-location>
: チェックポイントが保存されるディレクトリ。 例: /loghub/checkpoint。--jars
: このパラメーターは必須です。パラメーター値は、LogHub の Spark DataSource の JAR パッケージです。 このパラメーターを指定しないと、Caused by: java.lang.ClassNotFoundException: loghub.DefaultSource
エラーが報告されます。Spark 2 の場合、--jars パラメーターを次の値に設定できます。
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar
Spark 3 の場合、--jars パラメーターを次の値に設定できます。
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.2.jar
説明上記の値のディレクトリがクラスターに存在しない場合は、 /usr/lib/emrsdk-current/ ディレクトリを使用してください。
PySpark Structured Streaming を使用して LogHub にアクセスする
サンプルコード
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("xx") \
.getOrCreate()
# LogHub データソースからデータを読み取ります。
lines = spark \
.readStream \
.format("loghub") \
.option("endpoint", "cn-hangzhou-intranet.log.aliyuncs.com") \
.option("access.key.id", "LTAI----") \
.option("access.key.secret", "DTi----") \
.option("sls.project", "emr-test-hz-1") \
.option("sls.store", "test1") \
.option("startingoffsets", "earliest") \
.load()
# 変換ロジックを処理します。
wordCounts = lines.groupBy("__logStore__").count()
# シンクロジックを処理します。
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
パラメータ
パラメータ | 説明 |
endpoint | LogHub にアクセスするために使用されるエンドポイント。 例: cn-hangzhou-intranet.log.aliyuncs.com。 |
access.key.id | Alibaba Cloud アカウントの AccessKey ID。 |
access.key.secret | Alibaba Cloud アカウントの AccessKey シークレット。 |
sls.project | ログストアの名前。 |
sls.store | ログサービスプロジェクトの名前。 |
startingoffsets | データの取得を開始する位置。 有効な値: earliest および latest。 |
Python スクリプトを実行する
Spark 2 の場合、次のコマンドを実行して Python スクリプトを実行できます。
spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --master local loghub.py
Spark 3 の場合、次のコマンドを実行して Python スクリプトを実行できます。
spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.2.jar --master local loghub.py
上記の値のディレクトリがクラスタに存在しない場合は、/usr/lib/emrsdk-current/ ディレクトリを使用してください。