edit-icon download-icon

Spark + MQ

Last Updated: Jan 05, 2018

Spark access to MQ

The example below demonstrates how Spark Streaming consumes the data in MQ and calculates the number of words in every batch.

  1. val Array(cId, topic, subExpression, parallelism, interval) = args
  2. val accessKeyId = "<accessKeyId>"
  3. val accessKeySecret = "<accessKeySecret>"
  4. val numStreams = parallelism.toInt
  5. val batchInterval = Milliseconds(interval.toInt)
  6. val conf = new SparkConf().setAppName("Test ONS Streaming")
  7. val ssc = new StreamingContext(conf, batchInterval)
  8. def func: Message => Array[Byte] = msg => msg.getBody
  9. val onsStreams = (0 until numStreams).map { i =>
  10. println(s"starting stream $i")
  11. OnsUtils.createStream(
  12. ssc,
  13. cId,
  14. topic,
  15. subExpression,
  16. accessKeyId,
  17. accessKeySecret,
  18. StorageLevel.MEMORY_AND_DISK_2,
  19. func)
  20. }
  21. val unionStreams = ssc.union(onsStreams)
  22. unionStreams.foreachRDD(rdd => {
  23. rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  24. .map(word => (word, 1))
  25. .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  26. })
  27. ssc.start()
  28. ssc.awaitTermination()

Appendix

For complete sample code, see:

  • [Spark access to MQ] (//docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/cn/emr/1.3.7/assets/sample/TestAliyunONS.scala)
Thank you! We've received your feedback.