This topic describes how to call Spark DataFrame API operations to develop a streaming job to consume data of Log Service.

Use Spark Structured Streaming in Scala to access LogHub

Sample code

## StructuredLoghubWordCount.Scala

object StructuredLoghubSample {
  def main(args: Array[String]) {
    if (args.length < 7) {
      System.err.println("Usage: StructuredLoghubSample <logService-project> " +
        "<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
        "<starting-offsets> <max-offsets-per-trigger>[outputPath] [<checkpoint-location>]")
      System.exit(1)
    }

    val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, outputPath, _*) = args
    val checkpointLocation =
      if (args.length > 8) args(8) else "/tmp/temporary-" + UUID.randomUUID.toString

    val spark = SparkSession
      .builder
      .appName("StructuredLoghubSample")
      .getOrCreate()

    import spark.implicits._

    // Create DataSet representing the stream of input lines from loghub
    val lines = spark
      .readStream
      .format("loghub")
      .option("sls.project", project)
      .option("sls.store", logStore)
      .option("access.key.id", accessKeyId)
      .option("access.key.secret", accessKeySecret)
      .option("endpoint", endpoint)
      .option("startingoffsets", startingOffsets)
      .option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
      .load()
      .selectExpr("CAST(__value__ AS STRING)")
      .as[String]

    val query = lines.writeStream
      .format("parquet")
      .option("checkpointLocation", checkpointLocation)
      .option("path", outputPath)
      .outputMode("append")
      .trigger(Trigger.ProcessingTime(30000))
      .start()

    query.awaitTermination()
  }
}
Note For more information about the Maven project object model (POM) file, see aliyun-emapreduce-demo.

Compile and run the code

## Run a command to compile the code.
mvn clean package -DskipTests

## After the code is compiled, the JAR file of the job is stored in the target directory. 

## Submit and run the job.
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g
--num-executors 2 --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --class x.x.x.StructuredLoghubSample xxx.jar <logService-project>
<logService-store> <access-key-id> <access-key-secret> <endpoint> <starting-offsets>
<max-offsets-per-trigger> <output-path> <checkpoint-location>
Note You must adjust the configurations of job resources based on the actual data size and cluster scale. If the cluster uses low specifications, you may fail to run the job by running commands in the preceding code.
Replace the following information based on your actual environment:
  • x.x.x.StructuredLoghubSample: x.x.x indicates the name of the package of the StructuredLoghubSample class in your environment.
  • xxx.jar: The JAR file of the project.
  • <output-path>: The directory in which the output data is stored. Example: /loghub/data/.
  • <checkpoint-location>: The directory in which the checkpoints are stored. Example: /loghub/checkpoint.
  • --jars: This parameter is required. The parameter value is the JAR package of Spark DataSource of LogHub. If you do not specify this parameter, the error Caused by: java.lang.ClassNotFoundException: loghub.DefaultSource is reported.
    • For Spark 2, you can set the --jars parameter to the following value:
      --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar
    • For Spark 3, you can set the --jars parameter to the following value:
      --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar
    Note If the directory in the preceding value does not exist in your cluster, use the /usr/lib/emrsdk-current/ directory.

Use PySpark Structured Streaming to access LogHub

Sample code

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("xx") \
    .getOrCreate()

# Read data from the LogHub data source. 
lines = spark \
    .readStream \
    .format("loghub") \
    .option("endpoint", "cn-hangzhou-intranet.log.aliyuncs.com") \
    .option("access.key.id", "LTAI----") \
    .option("access.key.secret", "DTi----") \
    .option("sls.project", "emr-test-hz-1") \
    .option("sls.store", "test1") \
    .option("startingoffsets", "earliest") \
    .load()


# Process the transformation logic. 
wordCounts = lines.groupBy("__logStore__").count()

# Process the sink logic. 
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

Run a Python script

  • For Spark 2, you can run the following command to run a Python script:
    spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar
  • For Spark 3, you can run the following command to run a Python script:
    spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar
Note If the directory in the preceding value does not exist in your cluster, use the /usr/lib/emrsdk-current/ directory.

Parameters

ParameterDescription
endpointThe endpoint that is used to access LogHub. Example: cn-hangzhou-intranet.log.aliyuncs.com.
access.key.idThe AccessKey ID of your Alibaba Cloud account.
access.key.secretThe AccessKey secret of your Alibaba Cloud account.
sls.projectThe name of the Logstore.
sls.storeThe name of the Log Service project.
startingoffsetsThe starting position from which data is consumed. Valid values: earliest and latest.