Spark + MNS

Last Updated: Mar 27, 2017

Spark access to MNS

The example below demonstrates how Spark Streaming consumes the data in MNS and calculates the number of words in every batch.

  1. val conf = new SparkConf().setAppName("Test MNS Streaming")
  2. val batchInterval = Seconds(10)
  3. val ssc = new StreamingContext(conf, batchInterval)
  4. val queuename = "queuename"
  5. val accessKeyId = "<accessKeyId>"
  6. val accessKeySecret = "<accessKeySecret>"
  7. val endpoint = "http://xxx.yyy.zzzz/abc"
  8. val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
  9. StorageLevel.MEMORY_ONLY)
  10. mnsStream.foreachRDD( rdd => {
  11. rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  12. .map(word => (word, 1))
  13. .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  14. })
  15. ssc.start()
  16. ssc.awaitTermination()

Appendix

For complete sample code, see:

  • [Spark access to MNS] (//docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/cn/emr/1.3.7/assets/sample/TestMNS.scala)
Thank you! We've received your feedback.