This topic describes how to use Spark Streaming to consume log data in Log Service and calculate the number of log entries.

Use Spark to access Log Service

  • Method 1: Receiver-based DStream
    val logServiceProject = args(0)    // The name of a project in Log Service. 
        val logStoreName = args(1)     // The name of a Logstore in Log Service.
        val loghubConsumerGroupName = args(2)  // Jobs that have the same consumer group name jointly consume the data in the Logstore. 
        val loghubEndpoint = args(3)  // The endpoint of Log Service. 
        val accessKeyId = "<accessKeyId>"     // The AccessKey ID that is used to access Log Service. 
        val accessKeySecret = "<accessKeySecret>" // The AccessKey secret that is used to access Log Service. 
        val numReceivers = args(4).toInt  // The number of receivers to be started to read data from the Logstore. 
        val batchInterval = Milliseconds(args(5).toInt * 1000) // The data processing interval of Spark Streaming. 
        val conf = new SparkConf().setAppName("Test Loghub Streaming")
        val ssc = new StreamingContext(conf, batchInterval)
        val loghubStream = LoghubUtils.createStream(
          ssc,
          logServiceProject,
          logStoreName,
          loghubConsumerGroupName,
          loghubEndpoint,
          numReceivers,
          accessKeyId,
          accessKeySecret,
          StorageLevel.MEMORY_AND_DISK)
        loghubStream.foreachRDD(rdd => println(rdd.count()))
        ssc.start()
        ssc.awaitTermination()
  • Method 2: Direct API-based DStream
    val logServiceProject = args(0)
        val logStoreName = args(1)
        val loghubConsumerGroupName = args(2)
        val loghubEndpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkConnect = args(7)
        val checkpointPath = args(8)
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub Streaming")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkConnect, "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            logServiceProject,
            logStoreName,
            loghubConsumerGroupName,
            accessKeyId,
            accessKeySecret,
            loghubEndpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
          ssc.checkpoint(checkpointPath)
          val stream = loghubStream.checkpoint(batchInterval)
          stream.foreachRDD(rdd => {
            println(rdd.count())
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc
        }
        val ssc = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
    In E-MapReduce (EMR) SDK V1.4.0 and later, Spark Streaming can use Direct API-based DStreams to process data. If this method is used, data in LogHub is not repeatedly stored as write ahead logs (WALs). This method allows you to write data at least once without the need to enable the WAL feature of Spark Streaming. This method is still in the experimental stage. When you use this method, pay attention to the following points:
    • When you perform a DStream action, you must make a commit.
    • In a Spark Streaming job, you can perform only one action on a Logstore.
    • This method requires the support of the ZooKeeper service.

Use MetaService for access

In the preceding sample code, an AccessKey pair is explicitly passed to the interface. In EMR SDK V1.3.2 and later, Spark Streaming can use MetaService to process data in Log Service. An AccessKey pair is not required. For more information, see the description of the LoghubUtils class in EMR SDK.
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
Note
  • EMR SDK supports three consumption modes for Log Service, which are BEGIN_CURSOR, END_CURSOR, and SPECIAL_TIMER_CURSOR. By default, END_CURSOR is used.
    • BEGIN_CURSOR: consumes data from the log header. If a checkpoint record exists, the consumption starts from the checkpoint.
    • END_CURSOR: consumes data from the end of the log. If a checkpoint record exists, the consumption starts from the checkpoint.
    • SPECIAL_TIMER_CURSOR: consumes data from a specified point in time. Unit: seconds. If a checkpoint record exists, the consumption starts from the checkpoint.
    All of the consumption modes are affected by checkpoint records. If a checkpoint record exists, the consumption always starts from the checkpoint. The SPECIAL_TIMER_CURSOR mode allows you to forcibly start data consumption from a specified point in time. To use this mode, you must configure the following parameters as required in the createStream method of the LoghubUtils class:
    • cursorPosition: Set this parameter to LogHubCursorPosition.SPECIAL_TIMER_CURSOR.
    • orceSpecial: Set this parameter to true.
  • All the nodes of an EMR cluster, except for the master node, cannot be connected to the Internet. Therefore, you must configure an internal endpoint of Log Service. Otherwise, you cannot request data from Log Service.

Appendix

For the complete sample code, visit GitHub.