All Products
Search
Document Center

E-MapReduce:Use Spark to access MNS

Last Updated:Sep 19, 2023

This topic describes how to use Spark Streaming to consume data in Message Service (MNS) and calculate the number of words in each batch.

Use Spark to access MNS

Sample code:

val conf = new SparkConf().setAppName("Test MNS Streaming")
    val batchInterval = Seconds(10)
    val ssc = new StreamingContext(conf, batchInterval)
    val queuename = "queuename"
    val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // In this example, the endpoint of the China (Hangzhou) region is used. Specify your actual endpoint. 
    val endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
    val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
      StorageLevel.MEMORY_ONLY)
    mnsStream.foreachRDD( rdd => {
      rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
    })
    ssc.start()
    ssc.awaitTermination()
Note

You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.

Use MetaService for access

In the preceding sample code, an AccessKey pair is explicitly passed to MNS. In E-MapReduce (EMR) SDK 1.3.2 and later, Spark Streaming can use MetaService to process MNS data. An AccessKey pair is not required. For more information, see the description of the MnsUtils class in EMR SDK:

MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)

Appendix

For the complete sample code, visit GitHub.