Simple Log Service (SLS) supports Spark Streaming for real-time log consumption. Use the Alibaba Cloud Spark SDK to consume log data from SLS in Receiver mode or Direct mode.
Limits
Spark Streaming consumption supports only Spark 2.x.
The Alibaba Cloud Spark SDK supports two consumption modes: Receiver and Direct. Add the following Maven dependency to your project:
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-logservice_2.11</artifactId>
<version>1.7.2</version>
</dependency>
Consume log data in Receiver mode
In Receiver mode, a consumer group consumes data from SLS and temporarily stores the data in a Spark executor. After a Spark Streaming job starts, the executor reads and processes the data. Each log entry is returned as a JSON string. The consumer group automatically saves checkpoints to SLS. For more information, see Use consumer groups to consume log data.
-
Parameters
Parameter
Type
Description
project
String
The name of the project in SLS.
logstore
String
The name of the logstore in SLS.
consumerGroup
String
The name of the consumer group.
endpoint
String
The endpoint of the region where the SLS project resides. For more information, see Endpoints.
accessKeyId
String
The AccessKey ID that is used to access SLS.
accessKeySecret
String
The AccessKey secret that is used to access SLS.
-
Example
NoteIn Receiver mode, data loss may occur with default configurations. To prevent data loss, enable Write-Ahead Logs (available in Spark 1.2 and later). For more information, see Spark.
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.aliyun.logservice.LoghubUtils import org.apache.spark.streaming.{ Milliseconds, StreamingContext} import org.apache.spark.SparkConf object TestLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Loghub") val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, project, logstore, consumerGroup, endpoint, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd => rdd.map(bytes => new String(bytes)).top(10).foreach(println) ) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
Consume log data in Direct mode
Direct mode does not require a consumer group. Instead, it calls API operations to request data directly from SLS. Direct mode provides the following benefits:
-
Simplified concurrency: The number of Spark partitions matches the number of shards in the Logstore. Split shards to increase task concurrency.
-
Higher efficiency: Write-Ahead Logs are not required to prevent data loss.
-
Exactly-once semantics: Data is read directly from SLS, and checkpoints are committed only after a task succeeds.
If Spark exits unexpectedly, some data may be consumed more than once.
Direct mode requires a ZooKeeper service to store intermediate state. Set a checkpoint directory in ZooKeeper to persist the intermediate data. To re-consume data after restarting a task, delete the corresponding ZooKeeper directory and change the consumer group name.
-
Parameters
Parameter
Type
Description
project
String
The name of the project in SLS.
logstore
String
The name of the logstore in SLS.
consumerGroup
String
The name of the consumer group. This name is used only to save consumption checkpoints.
endpoint
String
The endpoint of the region where the SLS project resides. For more information, see Endpoints.
accessKeyId
String
The AccessKey ID that is used to access SLS.
accessKeySecret
String
The AccessKey secret that is used to access SLS.
zkAddress
String
The connection URL of the ZooKeeper service.
-
Rate limiting
Spark Streaming processes data in micro-batches. Specify the number of log entries consumed per batch per shard.
In SLS, each write request is stored as a log group. A typical write request contains multiple log entries in one log group. When web tracking is used, each write request contains only one log entry per log group. The following parameters control the amount of data consumed in a single batch.
Parameter
Description
Default
spark.loghub.batchGet.step
The maximum number of log groups fetched per consumption request.
100
spark.streaming.loghub.maxRatePerShard
The maximum number of log entries consumed per shard per batch.
10000
The spark.streaming.loghub.maxRatePerShard parameter sets the target maximum log entries per shard per batch. The SDK fetches log groups in increments of spark.loghub.batchGet.step and accumulates the log entry count. When the count reaches or exceeds spark.streaming.loghub.maxRatePerShard, the SDK stops fetching. Because consumption granularity is at the log group level, the spark.streaming.loghub.maxRatePerShard limit is approximate. The actual count per batch depends on spark.loghub.batchGet.step and the number of log entries in each log group.
-
Example
import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition import org.apache.spark.SparkConf import org.apache.spark.streaming.{ Milliseconds, StreamingContext} import org.apache.spark.streaming.aliyun.logservice.{ CanCommitOffsets, LoghubUtils} object TestDirectLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val zkAddress = if (args.length >= 8) args(7) else "localhost:2181" def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Direct Loghub") val ssc = new StreamingContext(conf, batchInterval) val zkParas = Map("zookeeper.connect" -> zkAddress, "enable.auto.commit" -> "false") val loghubStream = LoghubUtils.createDirectStream( ssc, project, logStore, consumerGroup, accessKeyId, accessKeySecret, endpoint, zkParas, LogHubCursorPosition.END_CURSOR) loghubStream.checkpoint(batchInterval).foreachRDD(rdd => { println(s"count by key: ${rdd.map(s => { s.sorted (s.length, s) }).countByKey().size}") loghubStream.asInstanceOf[CanCommitOffsets].commitAsync() }) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
For the complete source code, see GitHub.