All Products
Search
Document Center

E-MapReduce:Use Spark to access ApsaraMQ for RocketMQ

Last Updated:Aug 15, 2023

This topic describes how to use Spark Streaming to consume data in ApsaraMQ for RocketMQ and calculate the number of words in each batch.

Use Spark to access ApsaraMQ for RocketMQ

Sample code:

val Array(cId, topic, subExpression, parallelism, interval) = args
    val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    val numStreams = parallelism.toInt
    val batchInterval = Milliseconds(interval.toInt)
    val conf = new SparkConf().setAppName("Test ONS Streaming")
    val ssc = new StreamingContext(conf, batchInterval)
    def func: Message => Array[Byte] = msg => msg.getBody
    val onsStreams = (0 until numStreams).map { i =>
      println(s"starting stream $i")
      OnsUtils.createStream(
        ssc,
        cId,
        topic,
        subExpression,
        accessKeyId,
        accessKeySecret,
        StorageLevel.MEMORY_AND_DISK_2,
        func)
    }
    val unionStreams = ssc.union(onsStreams)
    unionStreams.foreachRDD(rdd => {
      rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
    })
    ssc.start()
    ssc.awaitTermination()
Note

You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.

Appendix

For the complete sample code, visit GitHub.