本文简单介绍如何使用 Spark DataFrame API 开发一个流式作业消费 LogService 数据。
示例代码
## StructuredLoghubWordCount.Scala
object StructuredLoghubSample {
def main(args: Array[String]) {
if (args.length < 7) {
System.err.println("Usage: StructuredLoghubSample <logService-project> " +
"<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
"<starting-offsets> <max-offsets-per-trigger> [<checkpoint-location>]")
System.exit(1)
}
val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, outputPath, _*) = args
val checkpointLocation =
if (args.length > 8) args(8) else "/tmp/temporary-" + UUID.randomUUID.toString
val spark = SparkSession
.builder
.appName("StructuredLoghubSample")
.master("local[5]")
.getOrCreate()
import spark.implicits._
// Create DataSet representing the stream of input lines from loghub
val lines = spark
.readStream
.format("loghub")
.option("sls.project", project)
.option("sls.store", logStore)
.option("access.key.id", accessKeyId)
.option("access.key.secret", accessKeySecret)
.option("endpoint", endpoint)
.option("startingoffsets", startingOffsets)
.option("zookeeper.connect.address", "localhost:2181")
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(content AS STRING)")
.as[String]
val query = lines.writeStream
.format("parquet")
.option("checkpointLocation", checkpointLocation)
.option("path", outputPath)
.outputMode("append")
.trigger(Trigger.ProcessingTime(30000))
.start()
query.awaitTermination()
}
}
说明 Maven pom 文件可以参见aliyun-emapreduce-demo。
编译运行
## 编译命令
mvn clean package -DskipTests
## 编译完后,作业jar包位于target/shaded/下
## 提交执行
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g
--num-executors 2--class x.x.x.StructuredLoghubSample xxx.jar <logService-project>
<logService-store> <access-key-id> <access-key-secret> <endpoint> <starting-offsets>
<max-offsets-per-trigger> <zookeeper-connect-address> <output-path> <checkpoint-location>
注意
- x.x.x.StructuredLoghubSample 和 xxx.jar 需要替换成真实的类路径和包路径。
- 作业资源需要根据实际数据规模和实际集群规模调整,如果集群太小,直接运行以上命令可能无法执行。
注意事项
由于Spark某些版本的不兼容性问题,emr_logservice sdk 也存在默认 EMR 主版本运行时不兼容,具体的版本兼容性列表如下:
emr-logservice sdk 版本 | Spark 版本 | EMR 主版本 |
---|---|---|
1.6.0 | 2.3.1 | EMR-3.18.x 以下 |
1.7.0 | 2.4.3 | EMR-3.19.x 以上 |