This topic describes how to call Spark DataFrame API operations to develop a streaming job to consume data of Log Service.
Use Spark Structured Streaming in Scala to access LogHub
Sample code
## StructuredLoghubSample.Scala
object StructuredLoghubSample {
def main(args: Array[String]) {
if (args.length < 7) {
System.err.println("Usage: StructuredLoghubSample <logService-project> " +
"<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
"<starting-offsets> <max-offsets-per-trigger>[outputPath] [<checkpoint-location>]")
System.exit(1)
}
val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, outputPath, _*) = args
val checkpointLocation =
if (args.length > 8) args(8) else "/tmp/temporary-" + UUID.randomUUID.toString
val spark = SparkSession
.builder
.appName("StructuredLoghubSample")
.getOrCreate()
import spark.implicits._
// Create DataSet representing the stream of input lines from loghub
val lines = spark
.readStream
.format("loghub")
.option("sls.project", project)
.option("sls.store", logStore)
.option("access.key.id", accessKeyId)
.option("access.key.secret", accessKeySecret)
.option("endpoint", endpoint)
.option("startingoffsets", startingOffsets)
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(__value__ AS STRING)")
.as[String]
val query = lines.writeStream
.format("parquet")
.option("checkpointLocation", checkpointLocation)
.option("path", outputPath)
.outputMode("append")
.trigger(Trigger.ProcessingTime(30000))
.start()
query.awaitTermination()
}
}For more information about the Maven project object model (POM) file, see aliyun-emapreduce-demo.
Compile and run the code
## Run a command to compile the code.
mvn clean package -DskipTests
## After the code is compiled, the JAR file of the job is stored in the target directory.
## Submit and run the job.
spark-submit --master --master yarn --deploy-mode cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g \
--num-executors 2 --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --class x.x.x.StructuredLoghubSample xxx.jar \
<logService-project> <logService-store> <access-key-id> <access-key-secret> <endpoint> \
<starting-offsets><max-offsets-per-trigger> <output-path> <checkpoint-location>You must adjust the configurations of job resources based on the actual data size and cluster scale. If the cluster uses low specifications, you may fail to run the job by running commands in the preceding code.
Replace the following information based on your actual environment:
x.x.x.StructuredLoghubSample:x.x.xindicates the name of the package of the StructuredLoghubSample class in your environment.xxx.jar: The JAR file of the project.<starting-offsets>: The starting position from which data is consumed. Valid values: earliest and latest.<max-offsets-per-trigger>: The maximum number of messages or the maximum offset of data consumed during each trigger period.<output-path>: The directory in which the output data is stored. Example: /loghub/data/.<checkpoint-location>: The directory in which the checkpoints are stored. Example: /loghub/checkpoint.--jars: This parameter is required. The parameter value is the JAR package of Spark DataSource of LogHub. If you do not specify this parameter, the errorCaused by: java.lang.ClassNotFoundException: loghub.DefaultSourceis reported.For Spark 2, you can set the --jars parameter to the following value:
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jarFor Spark 3, you can set the --jars parameter to the following value:
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.2.jar
NoteIf the directory in the preceding value does not exist in your cluster, use the /usr/lib/emrsdk-current/ directory.
Use PySpark Structured Streaming to access LogHub
Sample code
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("xx") \
.getOrCreate()
# Read data from the LogHub data source.
lines = spark \
.readStream \
.format("loghub") \
.option("endpoint", "cn-hangzhou-intranet.log.aliyuncs.com") \
.option("access.key.id", "LTAI----") \
.option("access.key.secret", "DTi----") \
.option("sls.project", "emr-test-hz-1") \
.option("sls.store", "test1") \
.option("startingoffsets", "earliest") \
.load()
# Process the transformation logic.
wordCounts = lines.groupBy("__logStore__").count()
# Process the sink logic.
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
Parameters
Parameter | Description |
endpoint | The endpoint that is used to access LogHub. Example: cn-hangzhou-intranet.log.aliyuncs.com. |
access.key.id | The AccessKey ID of your Alibaba Cloud account. |
access.key.secret | The AccessKey secret of your Alibaba Cloud account. |
sls.project | The name of the Logstore. |
sls.store | The name of the Log Service project. |
startingoffsets | The starting position from which data is consumed. Valid values: earliest and latest. |
Run a Python script
For Spark 2, you can run the following command to run a Python script:
spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --master local loghub.pyFor Spark 3, you can run the following command to run a Python script:
spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.2.jar --master local loghub.py
If the directory in the preceding value does not exist in your cluster, use the /usr/lib/emrsdk-current/ directory.