This topic describes how to use Spark Streaming to consume data in Message Service (MNS) and calculate the number of words in each batch.
Use Spark to access MNS
Sample code:
val conf = new SparkConf().setAppName("Test MNS Streaming")
val batchInterval = Seconds(10)
val ssc = new StreamingContext(conf, batchInterval)
val queuename = "queuename"
val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// In this example, the endpoint of the China (Hangzhou) region is used. Specify your actual endpoint.
val endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
StorageLevel.MEMORY_ONLY)
mnsStream.foreachRDD( rdd => {
rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
})
ssc.start()
ssc.awaitTermination()
Note
You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.
Use MetaService for access
In the preceding sample code, an AccessKey pair is explicitly passed to MNS. In E-MapReduce (EMR) SDK 1.3.2 and later, Spark Streaming can use MetaService to process MNS data. An AccessKey pair is not required. For more information, see the description of the MnsUtils class in EMR SDK:
MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)
Appendix
For the complete sample code, visit GitHub.