This topic describes how to use Spark Streaming to consume log data. After logs are collected to Log Service, you can use the Spark SDK provided by Log Service to process log data in Spark Streaming.

The Spark SDK supports two consumption modes: Receiver and Direct.

Maven dependencies are as follows:
<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

Receiver mode

In the Receiver mode, a consumer group consumes data from Log Service and temporarily stores the data in Spark Executor. After a Spark Streaming job is started, it reads and processes data from Spark Executor. For more information, see Use a consumer group to consume logs. Each data record is returned as a JSON string. The consumer group periodically saves checkpoints to the server. You do not need to update checkpoints.
  • Example
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{ Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds>
            """.stripMargin)
          System.exit(1)
        }
    
        val loghubProject = args(0)
        val logStore = args(1)
        val loghubGroupName = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            loghubProject,
            logStore,
            loghubGroupName,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • Parameter description
    Parameter Type Description
    loghubProject String The project in Log Service
    logStore String The Logstore in Log Service
    loghubGroupName String The name of the consumer group
    endpoint String The endpoint of the region to which the project belongs
    accessKeyId String The AccessKey ID used to access Log Service
    accessKeySecret String The AccessKey secret used to access Log Service
  • Additional considerations

    In the Receiver mode, data loss may occur in some cases. To avoid data loss, you can turn on the Write-Ahead Logs switch, which is supported in Spark 1.2 or later versions. For more information, visit Spark Streaming Programming Guide.

Direct mode

In the Direct mode, no consumer group is required. API operations are called to request data from the server. Compared with the Receiver mode, the Direct mode has the following benefits:
  • Simplified parallelism. The number of Spark partitions is the same as the number of Logstore shards. You can split shards to improve the parallelism of tasks.
  • Increased efficiency. You no longer need to turn on the Write-Ahead Logs switch to prevent data loss.
  • Exactly-once semantics. Data is obtained directly from the server. Checkpoints are submitted after the task is successful. In some cases, data may be repeatedly consumed if the task is not ended because Spark unexpectedly exits.

The Direct mode can only be used in a ZooKeeper environment to temporarily store intermediate states. In addition, you must set a checkpoint directory in ZooKeeper to store intermediate state data. If you want to consume data again after restarting a task, you must delete the directory from ZooKeeper and change the name of the consumer group.

  • Example
    import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{ Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{ CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
            """.stripMargin)
          System.exit(1)
        }
    
        val loghubProject = args(0)
        val logStore = args(1)
        val loghubGroupName = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            loghubProject,
            logStore,
            loghubGroupName,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }
  • Parameter description
    Parameter Type Description
    loghubProject String The project in Log Service
    logStore String The Logstore in Log Service
    loghubGroupName String The name of the consumer group (only used to store consumer offsets)
    endpoint String The endpoint of the region to which the project belongs
    accessKeyId String The AccessKey ID used to access Log Service
    accessKeySecret String The AccessKey secret used to access Log Service
    zkAddress String The endpoint of ZooKeeper
  • Parameter settings
    In the Direct mode, you must specify the number of data rows that are consumed by each shard in each batch. Otherwise, the data reading process cannot be ended. You can throttle the transmission rate of a single batch by setting the two parameters listed in the following table.
    Parameter Description Default
    spark.loghub.batchGet.step The number of log groups returned for a single request 100
    spark.streaming.loghub.maxRatePerShard The number of log entries that a shard needs to process per second 10000
    The number of log entries processed in each batch is calculated as follows: number of shards x max(spark.loghub.batchGet.step x n, spark.streaming.loghub.maxRatePerShard x duration).
    • n: the number of requests required to increase the returned rows to spark.streaming.loghub.maxRatePerShard x duration.
    • duration: the interval between batches. Unit: seconds.

    If you need to combine multiple DStreams, the number of shards refers to the total number of shards in all Logstores.

    • Example
      For example, the number of shards is 100. Each log group contains 50 log entries on average. Batches are processed at an interval of two seconds. If you want to process 20,000 log entries in each batch, use the following configurations:
      • spark.loghub.batchGet.step: 4
      • spark.streaming.loghub.maxRatePerShard: 200

      If each log group contains 60 log entries and you want to process 20,000 log entries in each batch, 24,000 log entries will be processed based on the preceding configurations (105 x 2 x 100 = 24,000).

    • Accurate transmission rate throttling

      A smaller spark.loghub.batchGet.step value increases the accuracy of throttling and the number of requests. We recommend that you count the average number of log entries contained in each write request (a log group) and then set the parameters in the preceding two configurations.

To download the source code, visit Spark SDK.

For more information about the difference between the Receiver and Direct modes, visit Spark Streaming + Kafka Integration Guide.