All Products
Search
Document Center

Simple Log Service:Use Spark Streaming to consume log data

Last Updated:Apr 09, 2026

Simple Log Service (SLS) supports Spark Streaming for real-time log consumption. Use the Alibaba Cloud Spark SDK to consume log data from SLS in Receiver mode or Direct mode.

Limits

Spark Streaming consumption supports only Spark 2.x.

The Alibaba Cloud Spark SDK supports two consumption modes: Receiver and Direct. Add the following Maven dependency to your project:

<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

Consume log data in Receiver mode

In Receiver mode, a consumer group consumes data from SLS and temporarily stores the data in a Spark executor. After a Spark Streaming job starts, the executor reads and processes the data. Each log entry is returned as a JSON string. The consumer group automatically saves checkpoints to SLS. For more information, see Use consumer groups to consume log data.

  • Parameters

    Parameter

    Type

    Description

    project

    String

    The name of the project in SLS.

    logstore

    String

    The name of the logstore in SLS.

    consumerGroup

    String

    The name of the consumer group.

    endpoint

    String

    The endpoint of the region where the SLS project resides. For more information, see Endpoints.

    accessKeyId

    String

    The AccessKey ID that is used to access SLS.

    accessKeySecret

    String

    The AccessKey secret that is used to access SLS.

  • Example

    Note

    In Receiver mode, data loss may occur with default configurations. To prevent data loss, enable Write-Ahead Logs (available in Spark 1.2 and later). For more information, see Spark.

    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{ Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

Consume log data in Direct mode

Direct mode does not require a consumer group. Instead, it calls API operations to request data directly from SLS. Direct mode provides the following benefits:

  • Simplified concurrency: The number of Spark partitions matches the number of shards in the Logstore. Split shards to increase task concurrency.

  • Higher efficiency: Write-Ahead Logs are not required to prevent data loss.

  • Exactly-once semantics: Data is read directly from SLS, and checkpoints are committed only after a task succeeds.

    If Spark exits unexpectedly, some data may be consumed more than once.

Direct mode requires a ZooKeeper service to store intermediate state. Set a checkpoint directory in ZooKeeper to persist the intermediate data. To re-consume data after restarting a task, delete the corresponding ZooKeeper directory and change the consumer group name.

  • Parameters

    Parameter

    Type

    Description

    project

    String

    The name of the project in SLS.

    logstore

    String

    The name of the logstore in SLS.

    consumerGroup

    String

    The name of the consumer group. This name is used only to save consumption checkpoints.

    endpoint

    String

    The endpoint of the region where the SLS project resides. For more information, see Endpoints.

    accessKeyId

    String

    The AccessKey ID that is used to access SLS.

    accessKeySecret

    String

    The AccessKey secret that is used to access SLS.

    zkAddress

    String

    The connection URL of the ZooKeeper service.

  • Rate limiting

    Spark Streaming processes data in micro-batches. Specify the number of log entries consumed per batch per shard.

    In SLS, each write request is stored as a log group. A typical write request contains multiple log entries in one log group. When web tracking is used, each write request contains only one log entry per log group. The following parameters control the amount of data consumed in a single batch.

    Parameter

    Description

    Default

    spark.loghub.batchGet.step

    The maximum number of log groups fetched per consumption request.

    100

    spark.streaming.loghub.maxRatePerShard

    The maximum number of log entries consumed per shard per batch.

    10000

    The spark.streaming.loghub.maxRatePerShard parameter sets the target maximum log entries per shard per batch. The SDK fetches log groups in increments of spark.loghub.batchGet.step and accumulates the log entry count. When the count reaches or exceeds spark.streaming.loghub.maxRatePerShard, the SDK stops fetching. Because consumption granularity is at the log group level, the spark.streaming.loghub.maxRatePerShard limit is approximate. The actual count per batch depends on spark.loghub.batchGet.step and the number of log entries in each log group.

  • Example

    import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{ Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{ CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            project,
            logStore,
            consumerGroup,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }

For the complete source code, see GitHub.