All Products
Search
Document Center

E-MapReduce:Use Spark to access SMQ

Last Updated:Sep 19, 2024

This topic describes how to use Spark Streaming to consume data in Simple Message Queue (formerly MNS) (SMQ) and calculate the number of words in each batch.

Use Spark to access SMQ

Sample code:

val conf = new SparkConf().setAppName("Test MNS Streaming")
    val batchInterval = Seconds(10)
    val ssc = new StreamingContext(conf, batchInterval)
    val queuename = "queuename"
    val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // In this example, the endpoint of the China (Hangzhou) region is used. Specify your actual endpoint. 
    val endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
    val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
      StorageLevel.MEMORY_ONLY)
    mnsStream.foreachRDD( rdd => {
      rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
    })
    ssc.start()
    ssc.awaitTermination()
Note

You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.

Use MetaService for access

In the preceding sample code, an AccessKey pair is explicitly passed to Simple Message Queue (formerly MNS). In E-MapReduce (EMR) SDK 1.3.2 and later, Spark Streaming can use MetaService to process Simple Message Queue (formerly MNS) data. An AccessKey pair is not required. For more information, see the description of the MnsUtils class in EMR SDK:

MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)

Appendix

For the complete sample code, visit SparkMNSDemo.