This topic describes how to use Spark Streaming to consume data in Message Service (MNS) and calculate the number of words in each batch.

Use Spark to access MNS

Sample code:
val conf = new SparkConf().setAppName("Test MNS Streaming")
    val batchInterval = Seconds(10)
    val ssc = new StreamingContext(conf, batchInterval)
    val queuename = "queuename"
    val accessKeyId = "<accessKeyId>"
    val accessKeySecret = "<accessKeySecret>"
    val endpoint = "http://xxx.yyy.zzzz/abc"
    val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
    mnsStream.foreachRDD( rdd => { => new String(bytes)).flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))

Use MetaService for access

In the preceding sample code, an AccessKey pair is explicitly passed to MNS. In E-MapReduce (EMR) SDK 1.3.2 and later, Spark Streaming can use MetaService to process MNS data. An AccessKey pair is not required. For more information, see the description of the MnsUtils class in EMR SDK:
MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)


For the complete sample code, visit GitHub.