This topic describes how to use Spark Streaming of Data Lake Analytics (DLA) to access LogHub of Log Service.

Prerequisites

Procedure

  1. Compile the following test code that is used to access LogHub. Then, package the test code into a JAR file and upload this file to OSS.
    package com.aliyun.spark.streaming
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{ Milliseconds, StreamingContext}
    object SparkLogHub {
      def main(args: Array[String]): Unit = {
        if (args.length < 8) {
          System.err.println(
            """Usage: LoghubSample <sls project> <sls logstore> <loghub group name> <sls endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <checkpoint dir>
            """.stripMargin)
          System.exit(1)
        }
    
        val loghubProject = args(0)
        val logStore = args(1)
        val loghubGroupName = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val checkPointDir = args(7)
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("LoghubSample")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            loghubProject,
            logStore,
            loghubGroupName,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd => println(rdd.count()))
          ssc.checkpoint(checkPointDir) // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate(checkPointDir, functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region where LogHub resides.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create Job.
  6. In the Create Job dialog box, configure the parameters and click OK to create a Spark Streaming job.
    3
  7. In the Job List navigation tree, click the Spark Streaming job that you created and enter the following content of the job in the code editor. Then, click Save and Execute.
    {
        "args": [
            "<sls project>",  # The name of the project of Log Service.
            "<ls logstore>,   # The name of the Logstore in Log Service.
            "<loghub group name>",  # The name of the consumer group of LogHub.
            "<sls endpoint>",   # The endpoint of Log Service.
            "<access key id>",  # The AccessKey ID that is used to access Log Service.
            "<access key secret>",  # The AccessKey secret that is used to access Log Service.
            "<batch interval seconds>", # The interval at which Spark Streaming jobs run at the same time.
            "<checkpoint dir>" # The OSS directory where checkpoints are saved.
        ],
        "name": "LogHub",
        "className": "com.aliyun.spark.streaming.SparkLogHub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.dla.connectors": "oss",
            "spark.executor.instances": 1,
            "spark.dla.job.log.oss.uri": "oss://</path/to/store/your/spark/log>",  # The OSS directory where Spark logs are saved.
            "spark.executor.resourceSpec": "medium"
        },
        "file": "oss://path/to/spark-examples-0.0.1-SNAPSHOT-shaded.jar"
    }