This topic describes how to call Spark DataFrame API operations to develop a streaming job to consume data of Log Service.
Use Spark Structured Streaming in Scala to access LogHub
Sample code
## StructuredLoghubWordCount.Scala
object StructuredLoghubSample {
def main(args: Array[String]) {
if (args.length < 7) {
System.err.println("Usage: StructuredLoghubSample <logService-project> " +
"<logService-store> <access-key-id> <access-key-secret> <endpoint> " +
"<starting-offsets> <max-offsets-per-trigger>[outputPath] [<checkpoint-location>]")
System.exit(1)
}
val Array(project, logStore, accessKeyId, accessKeySecret, endpoint, startingOffsets, maxOffsetsPerTrigger, outputPath, _*) = args
val checkpointLocation =
if (args.length > 8) args(8) else "/tmp/temporary-" + UUID.randomUUID.toString
val spark = SparkSession
.builder
.appName("StructuredLoghubSample")
.getOrCreate()
import spark.implicits._
// Create DataSet representing the stream of input lines from loghub
val lines = spark
.readStream
.format("loghub")
.option("sls.project", project)
.option("sls.store", logStore)
.option("access.key.id", accessKeyId)
.option("access.key.secret", accessKeySecret)
.option("endpoint", endpoint)
.option("startingoffsets", startingOffsets)
.option("maxOffsetsPerTrigger", maxOffsetsPerTrigger)
.load()
.selectExpr("CAST(__value__ AS STRING)")
.as[String]
val query = lines.writeStream
.format("parquet")
.option("checkpointLocation", checkpointLocation)
.option("path", outputPath)
.outputMode("append")
.trigger(Trigger.ProcessingTime(30000))
.start()
query.awaitTermination()
}
}
Note For more information about the Maven project object model (POM) file, see aliyun-emapreduce-demo.
Compile and run the code
## Run a command to compile the code.
mvn clean package -DskipTests
## After the code is compiled, the JAR file of the job is stored in the target directory.
## Submit and run the job.
spark-submit --master yarn-cluster --executor-cores 2 --executor-memory 1g --driver-memory 1g
--num-executors 2 --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar --class x.x.x.StructuredLoghubSample xxx.jar <logService-project>
<logService-store> <access-key-id> <access-key-secret> <endpoint> <starting-offsets>
<max-offsets-per-trigger> <output-path> <checkpoint-location>
Note You must adjust the configurations of job resources based on the actual data size and cluster scale. If the cluster uses low specifications, you may fail to run the job by running commands in the preceding code.
Replace the following information based on your actual environment:
x.x.x.StructuredLoghubSample
:x.x.x
indicates the name of the package of the StructuredLoghubSample class in your environment.xxx.jar
: The JAR file of the project.<output-path>
: The directory in which the output data is stored. Example: /loghub/data/.<checkpoint-location>
: The directory in which the checkpoints are stored. Example: /loghub/checkpoint.--jars
: This parameter is required. The parameter value is the JAR package of Spark DataSource of LogHub. If you do not specify this parameter, the errorCaused by: java.lang.ClassNotFoundException: loghub.DefaultSource
is reported.- For Spark 2, you can set the --jars parameter to the following value:
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar
- For Spark 3, you can set the --jars parameter to the following value:
--jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar
Note If the directory in the preceding value does not exist in your cluster, use the /usr/lib/emrsdk-current/ directory.- For Spark 2, you can set the --jars parameter to the following value:
Use PySpark Structured Streaming to access LogHub
Sample code
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("xx") \
.getOrCreate()
# Read data from the LogHub data source.
lines = spark \
.readStream \
.format("loghub") \
.option("endpoint", "cn-hangzhou-intranet.log.aliyuncs.com") \
.option("access.key.id", "LTAI----") \
.option("access.key.secret", "DTi----") \
.option("sls.project", "emr-test-hz-1") \
.option("sls.store", "test1") \
.option("startingoffsets", "earliest") \
.load()
# Process the transformation logic.
wordCounts = lines.groupBy("__logStore__").count()
# Process the sink logic.
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
Run a Python script
- For Spark 2, you can run the following command to run a Python script:
spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark2-emrsdk/emr-datasources_shaded_2.11-2.3.1.jar
- For Spark 3, you can run the following command to run a Python script:
spark-submit --jars /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/emr-datasources_shaded_2.12-3.0.1.jar
Note If the directory in the preceding value does not exist in your cluster, use the /usr/lib/emrsdk-current/ directory.
Parameters
Parameter | Description |
---|---|
endpoint | The endpoint that is used to access LogHub. Example: cn-hangzhou-intranet.log.aliyuncs.com. |
access.key.id | The AccessKey ID of your Alibaba Cloud account. |
access.key.secret | The AccessKey secret of your Alibaba Cloud account. |
sls.project | The name of the Logstore. |
sls.store | The name of the Log Service project. |
startingoffsets | The starting position from which data is consumed. Valid values: earliest and latest. |