All Products
Search
Document Center

Simple Log Service:Spark Streaming consumption

Last Updated:Mar 25, 2026

After Simple Log Service (SLS) collects log data, you can run Spark Streaming jobs to consume the data.

Limitations

The Spark Streaming consumer supports only Spark 2.x versions.

The Spark SDK provided by Simple Log Service (SLS) implements two consumption modes: Receiver mode and Direct mode. Add the following Maven dependency:

<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

Receiver mode

In Receiver mode, a consumer group consumes log data and temporarily stores it in a Spark executor. After a Spark Streaming job starts, it reads and processes the data from the executor. Each log is returned as a JSON string. The consumer group automatically saves periodic checkpoints to the server, eliminating the need for manual updates. For more information, see Consume log data by using a consumer group.

  • Parameters

    Parameter

    Type

    Description

    Project

    String

    The name of the Simple Log Service Project.

    LogStore

    String

    The name of the LogStore.

    consumerGroup

    String

    The name of the consumer group.

    endpoint

    String

    The endpoint for the region where the Simple Log Service Project is located. For more information, see Endpoints.

    accessKeyId

    String

    The AccessKey ID used to access Simple Log Service.

    accessKeySecret

    String

    The AccessKey Secret used to access Simple Log Service.

  • Example

    Note

    By default, Receiver mode can lose data during failures. Enable Write-Ahead Logs to prevent this. This feature is available in Spark 1.2 and later. For details, see the Spark documentation.

    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

Direct mode

Direct mode does not require a consumer group. Instead, it fetches data directly from the server at runtime. This mode offers several advantages:

  • Simplified concurrency: The number of Spark partitions matches the number of shards in the LogStore. You can increase job concurrency by splitting a shard.

  • Efficiency: Prevents data loss without requiring Write-Ahead Logs.

  • exactly-once semantics: Data is fetched from the server on demand, and a checkpoint is committed only after the job succeeds.

    If a job terminates unexpectedly (for example, due to a Spark failure), data from the last unsuccessful batch may be reprocessed upon restart.

Direct mode requires a ZooKeeper cluster to store intermediate state. You must specify a checkpoint directory path in ZooKeeper for this data. To re-consume data after a job restart, delete this directory in ZooKeeper and change the consumer group name.

  • Parameters

    Parameter

    Type

    Description

    Project

    String

    The name of the Simple Log Service Project.

    LogStore

    String

    The name of the LogStore.

    consumerGroup

    String

    The name of the consumer group, used only to save the consumption checkpoint.

    endpoint

    String

    The endpoint for the region where the Simple Log Service Project is located. For more information, see Endpoints.

    accessKeyId

    String

    The AccessKey ID used to access Simple Log Service.

    accessKeySecret

    String

    The AccessKey Secret used to access Simple Log Service.

    zkAddress

    String

    The connection address of your ZooKeeper cluster.

  • Rate limiting configuration

    Spark Streaming processes data in micro-batches. For each batch, the consumer must determine how many log entries to fetch.

    The underlying storage model of Simple Log Service uses the LogGroup as a unit. Normally, each LogGroup corresponds to a single write request and can contain thousands of logs. However, in scenarios like web tracking, each write request might contain only a single log. To handle these different scenarios, the SDK provides two rate-limiting parameters:

    Parameter

    Description

    Default

    spark.loghub.batchGet.step

    The maximum number of LogGroups to fetch in a single request.

    100

    spark.streaming.loghub.maxRatePerShard

    The maximum number of log entries to consume from a single shard per batch.

    10000

    The parameter spark.streaming.loghub.maxRatePerShard sets the target for the maximum number of log entries to fetch from a shard in each batch. The SDK fetches LogGroups in increments defined by spark.loghub.batchGet.step until the total log count meets or exceeds the target set by spark.streaming.loghub.maxRatePerShard. Because of this, spark.streaming.loghub.maxRatePerShard acts as a soft limit. The actual number of logs fetched per batch will vary depending on spark.loghub.batchGet.step and the number of logs within each LogGroup.

  • Example

    import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }

For more information, see the project on GitHub.