This topic describes how to use Spark Streaming of Data Lake Analytics (DLA) to access
LogHub.
Procedure
- Prepare the following test code to access LogHub, package the test code into a JAR
file, and then upload this file to OSS.
package com.aliyun.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
import org.apache.spark.streaming.{ Milliseconds, StreamingContext}
object SparkLogHub {
def main(args: Array[String]): Unit = {
if (args.length < 8) {
System.err.println(
"""Usage: LoghubSample <sls project> <sls logstore> <loghub group name> <sls endpoint>
| <access key id> <access key secret> <batch interval seconds> <checkpoint dir>
""".stripMargin)
System.exit(1)
}
val loghubProject = args(0)
val logStore = args(1)
val loghubGroupName = args(2)
val endpoint = args(3)
val accessKeyId = args(4)
val accessKeySecret = args(5)
val batchInterval = Milliseconds(args(6).toInt * 1000)
val checkPointDir = args(7)
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("LoghubSample")
val ssc = new StreamingContext(conf, batchInterval)
val loghubStream = LoghubUtils.createStream(
ssc,
loghubProject,
logStore,
loghubGroupName,
endpoint,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK)
loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd => println(rdd.count()))
ssc.checkpoint(checkPointDir) // set checkpoint directory
ssc
}
val ssc = StreamingContext.getOrCreate(checkPointDir, functionToCreateContext _)
ssc.start()
ssc.awaitTermination()
}
}
- Log on to the DLA console.
- In the top navigation bar, select the region where LogHub is deployed.
- In the left-side navigation pane, choose .
- On the Parameter Configuration page, click Create Job.
- In the Create Job dialog box, configure the parameters and click OK to create a Spark Streaming job.
- In the Job List navigation tree, click the Spark Streaming job you created and enter
the following content of the job in the code editor. Then, save and submit the job.
{
"args": [
"<sls project>", # The name of the project of Log Service.
"<ls logstore>, # The name of the Logstore of Log Service.
"<loghub group name>", # The name of the LogHub group of Log Service.
"<sls endpoint>", # The endpoint of Log Service.
"<access key id>", # The AccessKey ID that is used to access Log Service.
"<access key secret>", # The AccessKey secret that is used to access Log Service.
"<batch interval seconds>", # The interval at which Spark Streaming jobs are run in batches.
"<checkpoint dir>" # The OSS directory where the checkpoint is saved.
],
"name": "LogHub",
"className": "com.aliyun.spark.streaming.SparkLogHub",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.dla.connectors": "oss",
"spark.executor.instances": 1,
"spark.dla.job.log.oss.uri": "oss://</path/to/store/your/spark/log>", # The directory where Spark logs are saved.
"spark.executor.resourceSpec": "medium"
},
"file": "oss://path/to/spark-examples-0.0.1-SNAPSHOT-shaded.jar"
}