本文简单介绍如何使用Spark DataFrame API开发一个流式作业消费LogService数据。

注意事项

由于Spark某些版本的不兼容性问题,emr_logservice sdk也存在默认EMR主版本运行时不兼容,具体的版本兼容性列表如下。
emr-logservice sdk版本 Spark版本 EMR主版本
1.6.0 2.3.1 EMR-3.18.x以下
1.7.0 2.4.3 EMR-3.19.x以上

Spark Structured Streaming Scala访问LogHub

代码示例

## StructuredLoghubWordCount.Scala

object StructuredLoghubSample {
  def main(args: Array[String]) {
    if (args.length < 7) {
      System.err.println("Usage: StructuredLoghubSample <logService-project> " +
        "<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
        "<starting-offsets> <max-offsets-per-trigger> [<checkpoint-location>]")
      System.exit(1)
    }

    val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, outputPath, _*) = args
    val checkpointLocation =
      if (args.length > 8) args(8) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .appName("StructuredLoghubSample")
      .master("local[5]")
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from loghub
    val lines = spark
      .readStream
      .format("loghub")
      .option("sls.project", project)
      .option("sls.store", logStore)
      .option("access.key.id", accessKeyId)
      .option("access.key.secret", accessKeySecret)
      .option("endpoint", endpoint)
      .option("startingoffsets", startingOffsets)
      .option("zookeeper.connect.address", "localhost:2181")
      .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
      .load()
      .selectExpr("CAST(content AS STRING)")
      .as[String]

    val query = lines.writeStream
      .format("parquet")
      .option("checkpointLocation", checkpointLocation)
      .option("path", outputPath)
      .outputMode("append")
      .trigger(Trigger.ProcessingTime(30000))
      .start()

    query.awaitTermination()
  }
}
说明 Maven pom文件可以参见aliyun-emapreduce-demo

编译运行

注意
  • x.x.x.StructuredLoghubSample和xxx.jar需要替换成真实的类路径和包路径。
  • 作业资源需要根据实际数据规模和实际集群规模调整,如果集群太小,直接运行以上命令可能无法执行。
## 编译命令
mvn clean package -DskipTests

## 编译完后,作业JAR包位于target/shaded/目录下。

## 提交执行

spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g 
--num-executors 2--class x.x.x.StructuredLoghubSample xxx.jar <logService-project> 
<logService-store> <access-key-id> <access-key-secret> <endpoint> <starting-offsets> 
<max-offsets-per-trigger> <zookeeper-connect-address> <output-path> <checkpoint-location>

PySpark Structured Streaming访问LogHub

代码示例

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("xx") \
    .getOrCreate()

# 读取LogHub数据源。
lines = spark \
    .readStream \
    .format("loghub") \
    .option("endpoint", "cn-hangzhou-intranet.log.aliyuncs.com") \
    .option("access.key.id", "LTAI----") \
    .option("access.key.secret", "DTi----") \
    .option("sls.project", "emr-test-hz-1") \
    .option("sls.store", "test1") \
    .option("startingoffsets", "earliest") \
    .load()


# 处理transform逻辑。
wordCounts = lines.groupBy("__logStore__").count()

# 处理Sink逻辑。
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

执行Python脚本

注意 /usr/lib/emrsdk-current/emr-datasources_shaded_2.12-3.0.1.jar需要替换为您实际集群中/usr/lib/emrsdk-current/目录下的JAR包。
spark-submit --jars usr/lib/emrsdk-current/emr-datasources_shaded_2.12-3.0.1.jar --master local loghub.py

配置参数说明

参数 描述
endpoint LogHub的endpoint。例如,cn-hangzhou-intranet.log.aliyuncs.com。
access.key.id 您阿里云账号的AccessKey ID。
access.key.secret 您阿里云账号的AccessKey Secret。
sls.project LogStore名。
sls.store LogService项目名。
startingoffsets 开启offset位置,取值为earliest和lastest。