This topic describes how to use Spark Streaming of Data Lake Analytics (DLA) to access LogHub.

Prerequisites

Procedure

  1. Prepare the following test code to access LogHub, package the test code into a JAR file, and then upload this file to OSS.
    package com.aliyun.spark.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{ Milliseconds, StreamingContext}
    object SparkLogHub {
      def main(args: Array[String]): Unit = {
        if (args.length < 8) {
          System.err.println(
            """Usage: LoghubSample <sls project> <sls logstore> <loghub group name> <sls endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <checkpoint dir>
            """.stripMargin)
          System.exit(1)
        }
    
        val loghubProject = args(0)
        val logStore = args(1)
        val loghubGroupName = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val checkPointDir = args(7)
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("LoghubSample")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            loghubProject,
            logStore,
            loghubGroupName,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd => println(rdd.count()))
          ssc.checkpoint(checkPointDir) // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate(checkPointDir, functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region where LogHub is deployed.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create Job.
  6. In the Create Job dialog box, configure the parameters and click OK to create a Spark Streaming job.
  7. In the Job List navigation tree, click the Spark Streaming job you created and enter the following content of the job in the code editor. Then, save and submit the job.
    {
        "args": [
            "<sls project>",  # The name of the project of Log Service.
            "<ls logstore>,   # The name of the Logstore of Log Service.
            "<loghub group name>",  # The name of the LogHub group of Log Service.
            "<sls endpoint>",   # The endpoint of Log Service.
            "<access key id>",  # The AccessKey ID that is used to access Log Service.
            "<access key secret>",  # The AccessKey secret that is used to access Log Service.
            "<batch interval seconds>", # The interval at which Spark Streaming jobs are run in batches.
            "<checkpoint dir>" # The OSS directory where the checkpoint is saved.
        ],
        "name": "LogHub",
        "className": "com.aliyun.spark.streaming.SparkLogHub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.dla.connectors": "oss",
            "spark.executor.instances": 1,
            "spark.dla.job.log.oss.uri": "oss://</path/to/store/your/spark/log>",  # The directory where Spark logs are saved.
            "spark.executor.resourceSpec": "medium"
        },
        "file": "oss://path/to/spark-examples-0.0.1-SNAPSHOT-shaded.jar"
    }