After Simple Log Service (SLS) collects log data, you can run Spark Streaming jobs to consume the data.
Limitations
The Spark Streaming consumer supports only Spark 2.x versions.
The Spark SDK provided by Simple Log Service (SLS) implements two consumption modes: Receiver mode and Direct mode. Add the following Maven dependency:
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-logservice_2.11</artifactId>
<version>1.7.2</version>
</dependency>
Receiver mode
In Receiver mode, a consumer group consumes log data and temporarily stores it in a Spark executor. After a Spark Streaming job starts, it reads and processes the data from the executor. Each log is returned as a JSON string. The consumer group automatically saves periodic checkpoints to the server, eliminating the need for manual updates. For more information, see Consume log data by using a consumer group.
-
Parameters
Parameter
Type
Description
Project
String
The name of the Simple Log Service Project.
LogStore
String
The name of the LogStore.
consumerGroup
String
The name of the consumer group.
endpoint
String
The endpoint for the region where the Simple Log Service Project is located. For more information, see Endpoints.
accessKeyId
String
The AccessKey ID used to access Simple Log Service.
accessKeySecret
String
The AccessKey Secret used to access Simple Log Service.
-
Example
NoteBy default, Receiver mode can lose data during failures. Enable Write-Ahead Logs to prevent this. This feature is available in Spark 1.2 and later. For details, see the Spark documentation.
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.aliyun.logservice.LoghubUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.SparkConf object TestLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Loghub") val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, project, logstore, consumerGroup, endpoint, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd => rdd.map(bytes => new String(bytes)).top(10).foreach(println) ) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
Direct mode
Direct mode does not require a consumer group. Instead, it fetches data directly from the server at runtime. This mode offers several advantages:
-
Simplified concurrency: The number of Spark partitions matches the number of shards in the LogStore. You can increase job concurrency by splitting a shard.
-
Efficiency: Prevents data loss without requiring Write-Ahead Logs.
-
exactly-once semantics: Data is fetched from the server on demand, and a checkpoint is committed only after the job succeeds.
If a job terminates unexpectedly (for example, due to a Spark failure), data from the last unsuccessful batch may be reprocessed upon restart.
Direct mode requires a ZooKeeper cluster to store intermediate state. You must specify a checkpoint directory path in ZooKeeper for this data. To re-consume data after a job restart, delete this directory in ZooKeeper and change the consumer group name.
-
Parameters
Parameter
Type
Description
Project
String
The name of the Simple Log Service Project.
LogStore
String
The name of the LogStore.
consumerGroup
String
The name of the consumer group, used only to save the consumption checkpoint.
endpoint
String
The endpoint for the region where the Simple Log Service Project is located. For more information, see Endpoints.
accessKeyId
String
The AccessKey ID used to access Simple Log Service.
accessKeySecret
String
The AccessKey Secret used to access Simple Log Service.
zkAddress
String
The connection address of your ZooKeeper cluster.
-
Rate limiting configuration
Spark Streaming processes data in micro-batches. For each batch, the consumer must determine how many log entries to fetch.
The underlying storage model of Simple Log Service uses the LogGroup as a unit. Normally, each LogGroup corresponds to a single write request and can contain thousands of logs. However, in scenarios like web tracking, each write request might contain only a single log. To handle these different scenarios, the SDK provides two rate-limiting parameters:
Parameter
Description
Default
spark.loghub.batchGet.step
The maximum number of LogGroups to fetch in a single request.
100
spark.streaming.loghub.maxRatePerShard
The maximum number of log entries to consume from a single shard per batch.
10000
The parameter spark.streaming.loghub.maxRatePerShard sets the target for the maximum number of log entries to fetch from a shard in each batch. The SDK fetches LogGroups in increments defined by spark.loghub.batchGet.step until the total log count meets or exceeds the target set by spark.streaming.loghub.maxRatePerShard. Because of this, spark.streaming.loghub.maxRatePerShard acts as a soft limit. The actual number of logs fetched per batch will vary depending on spark.loghub.batchGet.step and the number of logs within each LogGroup.
-
Example
import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition import org.apache.spark.SparkConf import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils} object TestDirectLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val zkAddress = if (args.length >= 8) args(7) else "localhost:2181" def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Direct Loghub") val ssc = new StreamingContext(conf, batchInterval) val zkParas = Map("zookeeper.connect" -> zkAddress, "enable.auto.commit" -> "false") val loghubStream = LoghubUtils.createDirectStream( ssc, project, logstore, consumerGroup, accessKeyId, accessKeySecret, endpoint, zkParas, LogHubCursorPosition.END_CURSOR) loghubStream.checkpoint(batchInterval).foreachRDD(rdd => { println(s"count by key: ${rdd.map(s => { s.sorted (s.length, s) }).countByKey().size}") loghubStream.asInstanceOf[CanCommitOffsets].commitAsync() }) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
For more information, see the project on GitHub.