本ページでは、Spark DataFrame API 操作を呼び出して、ログサービスデータを使用するストリーミングジョブを開発する方法について説明します。
サンプルコード
## StructuredLoghubWordCount.Scala
object StructuredLoghubSample {
def main(args: Array[String]) {
if (args.length < 7) {
System.err.println("Usage: StructuredLoghubSample <logService-project> " +
"<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
"<starting-offsets> <max-offsets-per-trigger> [<checkpoint-location>]")
System.exit(1)
}
val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, outputPath, _*) = args
val checkpointLocation =
if (args.length > 8) args(8) else "/tmp/temporary-" + UUID.randomUUID.toString
val spark = SparkSession
.builder
.appName("StructuredLoghubSample")
.master("local[5]")
.getOrCreate()
import spark.implicits. _
// Create a dataset to represent the stream of input lines from LogHub.
val lines = spark
.readStream
.format("loghub")
.option("sls.project", project)
.option("sls.store", logStore)
.option("access.key.id", accessKeyId)
.option( "access.key.secret"、accessKeySecret)
.option("endpoint", endpoint)
.option("startingoffsets", startingOffsets)
.option("zookeeper.connect.address", "localhost:2181")
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(content AS STRING)")
.as[String]
val query = lines.writeStream
.format("parquet")
.option("checkpointLocation", checkpointLocation)
.option("path", outputPath)
.outputMode("append")
.trigger(Trigger.ProcessingTime(30000))
.start()
ssc.awaitTermination()
}
}
注 Maven プロジェクトオブジェクトモデル (POM) ファイルの詳細については、「aliyun-emapreduce-demo」をご参照ください。
コンパイルと実行
## Compile and run a command.
mvn clean package -DskipTests
## After the compiled command is run, the JAR file of the job is stored in the target/shaded/ directory.。
## Submit and run the job.
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g
--num-executors 2--class x.x.x.StructuredLoghubSample xxx.jar <logService-project>
<logService-store> <access-key-id> <access-key-secret> <endpoint> <starting-offsets>
<max-offsets-per-trigger> <zookeeper-connect-address> <output-path> <checkpoint-location>
重要
- 実際の状況に基づいて、クラスパスとパッケージパスを x.x.x.StructuredLoghubSample と xxx.jar の形式で指定する必要があります。
- 実際のデータサイズとクラスタースケールに基づいてジョブリソースを調整する必要があります。 クラスターが小さすぎると、ジョブの実行 に失敗する可能性があります。
注記
一部の Spark バージョンは、E-MapReduce (EMR) と互換性がありません。 一部の EMR バージョンは、emr-logservice SDK と互換性がありません。
次のテーブルに、互換性のある Spark、EMR、および emr-logservice SDK バージョンを示します。
emr-logservice SDK バージョン | Spark バージョン | EMR バージョン |
---|---|---|
1.6.0 | 2.3.1 | EMR 3.18.x 以前 |
1.7.0 | 2.4.3 | EMR 3.19.x 以降 |