本ページでは、Spark DataFrame API 操作を呼び出して、ログサービスデータを使用するストリーミングジョブを開発する方法について説明します。

サンプルコード

## StructuredLoghubWordCount.Scala

object StructuredLoghubSample {
  def main(args: Array[String]) {
    if (args.length < 7) {
      System.err.println("Usage: StructuredLoghubSample <logService-project> " +
        "<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
        "<starting-offsets> <max-offsets-per-trigger> [<checkpoint-location>]")
      System.exit(1)
    }

    val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, outputPath, _*) = args
    val checkpointLocation =
      if (args.length > 8) args(8) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .appName("StructuredLoghubSample")
      .master("local[5]")
      .getOrCreate()

    import spark.implicits. _

    // Create a dataset to represent the stream of input lines from LogHub.
    val lines = spark
      .readStream
      .format("loghub")
      .option("sls.project", project)
      .option("sls.store", logStore)
      .option("access.key.id", accessKeyId)
      .option( "access.key.secret"、accessKeySecret)
      .option("endpoint", endpoint)
      .option("startingoffsets", startingOffsets)
      .option("zookeeper.connect.address", "localhost:2181")
      .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
      .load()
      .selectExpr("CAST(content AS STRING)")
      .as[String]

    val query = lines.writeStream
      .format("parquet")
      .option("checkpointLocation", checkpointLocation)
      .option("path", outputPath)
      .outputMode("append")
      .trigger(Trigger.ProcessingTime(30000))
      .start()

    ssc.awaitTermination()
  }
}
Maven プロジェクトオブジェクトモデル (POM) ファイルの詳細については、「aliyun-emapreduce-demo」をご参照ください。

コンパイルと実行

## Compile and run a command.
mvn clean package -DskipTests

## After the compiled command is run, the JAR file of the job is stored in the target/shaded/ directory.。

## Submit and run the job.

spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g 
--num-executors 2--class x.x.x.StructuredLoghubSample xxx.jar <logService-project> 
<logService-store> <access-key-id> <access-key-secret> <endpoint> <starting-offsets> 
<max-offsets-per-trigger> <zookeeper-connect-address> <output-path> <checkpoint-location>
重要
  • 実際の状況に基づいて、クラスパスとパッケージパスを x.x.x.StructuredLoghubSample と xxx.jar の形式で指定する必要があります。
  • 実際のデータサイズとクラスタースケールに基づいてジョブリソースを調整する必要があります。 クラスターが小さすぎると、ジョブの実行 に失敗する可能性があります。

注記

一部の Spark バージョンは、E-MapReduce (EMR) と互換性がありません。 一部の EMR バージョンは、emr-logservice SDK と互換性がありません。 次のテーブルに、互換性のある Spark、EMR、および emr-logservice SDK バージョンを示します。
emr-logservice SDK バージョン Spark バージョン EMR バージョン
1.6.0 2.3.1 EMR 3.18.x 以前
1.7.0 2.4.3 EMR 3.19.x 以降