Spark + ONS

Last Updated: Mar 27, 2017

Spark access to ONS

The example below demonstrates how Spark Streaming consumes the data in ONS and calculates the number of words in every batch.

  1. val Array(cId, topic, subExpression, parallelism, interval) = args
  2. val accessKeyId = "<accessKeyId>"
  3. val accessKeySecret = "<accessKeySecret>"
  4. val numStreams = parallelism.toInt
  5. val batchInterval = Milliseconds(interval.toInt)
  6. val conf = new SparkConf().setAppName("Test ONS Streaming")
  7. val ssc = new StreamingContext(conf, batchInterval)
  8. def func: Message => Array[Byte] = msg => msg.getBody
  9. val onsStreams = (0 until numStreams).map { i =>
  10. println(s"starting stream $i")
  11. OnsUtils.createStream(
  12. ssc,
  13. cId,
  14. topic,
  15. subExpression,
  16. accessKeyId,
  17. accessKeySecret,
  18. StorageLevel.MEMORY_AND_DISK_2,
  19. func)
  20. }
  21. val unionStreams = ssc.union(onsStreams)
  22. unionStreams.foreachRDD(rdd => {
  23. rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
  24. .map(word => (word, 1))
  25. .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
  26. })
  27. ssc.start()
  28. ssc.awaitTermination()

Appendix

For complete sample code, see:

  • [Spark access to ONS] (//docs-aliyun.cn-hangzhou.oss.aliyun-inc.com/cn/emr/1.3.7/assets/sample/TestAliyunONS.scala)
Thank you! We've received your feedback.