edit-icon download-icon

Spark + Log Service

Last Updated: Aug 06, 2018

Spark access to Log Service

This following two methods demonstrate how Spark Streaming consumes the log data in Log Service and calculates the number of logs.

Method 1: Receiver Based DStream

  1. val logserviceProject = args(0) // Project name in Log Service
  2. val logStoreName = args(1) // Logstore name in Log Service
  3. val loghubGroupName = args(2) // Jobs with the same loghubGroupName will consume data in the logstore together.
  4. val loghubEndpoint = args(3) // The data class API endpoint of Alibaba Cloud log service
  5. val accessKeyId = “<accessKeyId>” // The AccessKeyId for accessing the log service
  6. val accessKeySecret = “<accessKeySecret>” // The AccessKeySecret for accessing the log service
  7. val numReceivers = args(4).toInt // How many receivers have started to read data in the logstore
  8. val batchInterval = Milliseconds(args(5).toInt * 1000) // The interval between processing batches in Spark Streaming [A1]
  9. val conf = new SparkConf().setAppName("Test Loghub Streaming")
  10. val ssc = new StreamingContext(conf, batchInterval)
  11. val loghubStream = LoghubUtils.createStream(
  12. ssc,
  13. loghubProject,
  14. logStream,
  15. loghubGroupName,
  16. endpoint,
  17. numReceivers,
  18. accessKeyId,
  19. accessKeySecret,
  20. StorageLevel.MEMORY_AND_DISK)
  21. loghubStream.foreachRDD(rdd => println(rdd.count()))
  22. ssc.start()
  23. ssc.awaitTermination()

Method 2: Direct API Based DStream

  1. val logServiceProject = args(0)
  2. val logStoreName = args(1)
  3. val loghubConsumerGroupName = args(2)
  4. val loghubEndpoint = args(3)
  5. val accessKeyId = args(4)
  6. val accessKeySecret = args(5)
  7. val batchInterval = Milliseconds(args(6).toInt * 1000)
  8. val zkConnect = args(7)
  9. val checkpointPath = args(8)
  10. def functionToCreateContext(): StreamingContext = {
  11. val conf = new SparkConf().setAppName("Test Direct Loghub Streaming")
  12. val ssc = new StreamingContext(conf, batchInterval)
  13. val zkParas = Map("zookeeper.connect" -> zkConnect, "enable.auto.commit" -> "false")
  14. val loghubStream = LoghubUtils.createDirectStream(
  15. ssc,
  16. logServiceProject,
  17. logStoreName,
  18. loghubConsumerGroupName,
  19. accessKeyId,
  20. accessKeySecret,
  21. loghubEndpoint,
  22. zkParas,
  23. LogHubCursorPosition.END_CURSOR)
  24. ssc.checkpoint(checkpointPath)
  25. val stream = loghubStream.checkpoint(batchInterval)
  26. stream.foreachRDD(rdd => {
  27. println(rdd.count())
  28. loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
  29. })
  30. ssc
  31. }
  32. val ssc = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)
  33. ssc.start()
  34. ssc.awaitTermination()

From the 1.4.0 version of E-MapReduce SDK , we provide method based Direct API. This method can avoid the repeated storage of Loghub data into the Write Ahead Log, without enabling the WAL feature of the Spark Streaming to realize the at least once of data. At present, the Direct API implementation is in experimental state.


  • In the actions of DStream, commit operation must be done.
  • In a Spark Streaming, multiple actions on logstore data sources at the same time are not supported.
  • The Direct API method requires zookeeper service.

Support MetaService

In the preceding examples, we send AccessKey into API explicitly. From the 1.3.2 version of E-MapReduce SDK, Spark Streaming can process LogService data without entering AccessKey based on MetaService. For more information, see the LoghubUtils class in E-MapReduce SDK:

  1. LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel)
  2. LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel)
  3. LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
  4. LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)


  • E-MapReduce SDK supports three consumption modes of Log Service: BEGIN_CURSOR, END_CURSOR and SPECIAL_TIMER_CURSOR. The default mode is END_CURSOR.
    • BEGIN_CURSOR: Consumes the log from the beginning. If there are checkpoint records, then Spark Streaming consumes the log from the checkpoint.
    • END_CURSOR: Consumes the log from the ending. If there are checkpoint records, then Spark Streaming consumes the log from the checkpoint.
    • SPECIAL_TIMER_CURSOR: Consumes the log from a specified time. If there are checkpoint records, then Spark Streaming consumes the log from the checkpoint. The unit is second.

      Note: The preceding three consumption modes are all affected by the checkpoint record. If there is a checkpoint record, the consumption begins at the checkpoint regardless of the consumption mode. E-MapReduce SDK enables you to consume the log from a specified time compulsively in SPECIAL_TIMER_CURSOR mode. In the LoghubUtils#createStream interface, the following parameters need to be used in combination.

      • cursorPosition:LogHubCursorPosition.SPECIAL_TIMER_CURSOR
      • forceSpecial:true
  • E-MapReduce nodes cannot connect to the Internet except the master node. when you configure LogService endpoint, be noted that you should use intranet endpoints provided by Log Service. Otherwise the requests to the Log Service will fail.


For a complete sample code, see Spark access to LogService.

Thank you! We've received your feedback.