This topic describes how to use Spark Streaming to consume data in ApsaraMQ for RocketMQ and calculate the number of words in each batch.
Use Spark to access ApsaraMQ for RocketMQ
Sample code:
val Array(cId, topic, subExpression, parallelism, interval) = args
val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
val numStreams = parallelism.toInt
val batchInterval = Milliseconds(interval.toInt)
val conf = new SparkConf().setAppName("Test ONS Streaming")
val ssc = new StreamingContext(conf, batchInterval)
def func: Message => Array[Byte] = msg => msg.getBody
val onsStreams = (0 until numStreams).map { i =>
println(s"starting stream $i")
OnsUtils.createStream(
ssc,
cId,
topic,
subExpression,
accessKeyId,
accessKeySecret,
StorageLevel.MEMORY_AND_DISK_2,
func)
}
val unionStreams = ssc.union(onsStreams)
unionStreams.foreachRDD(rdd => {
rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
})
ssc.start()
ssc.awaitTermination()
Note
You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.
Appendix
For the complete sample code, visit GitHub.