すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark を使用した SMQ へのアクセス

最終更新日:Jan 11, 2025

このトピックでは、Spark Streaming を使用して Simple Message Queue (formerly MNS) (SMQ) のデータを使用し、各バッチの単語数を計算する方法について説明します。

Spark を使用した SMQ へのアクセス

サンプルコード:

val conf = new SparkConf().setAppName("Test MNS Streaming")
    val batchInterval = Seconds(10)
    val ssc = new StreamingContext(conf, batchInterval)
    val queuename = "queuename"
    val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // この例では、中国 (杭州) リージョンのエンドポイントを使用しています。実際のエンドポイントを指定してください。
    val endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
    val mnsStream = MnsUtils.createPullingStreamAsRawBytes(ssc, queuename, accessKeyId, accessKeySecret, endpoint,
      StorageLevel.MEMORY_ONLY)
    mnsStream.foreachRDD( rdd => {
      rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}"))
    })
    ssc.start()
    ssc.awaitTermination()
説明

サンプルコードを実行する前に、環境変数を設定する必要があります。環境変数の設定方法の詳細については、このトピックの環境変数の設定セクションをご参照ください。

MetaService を使用したアクセス

上記のサンプルコードでは、AccessKey ペアが Simple Message Queue (formerly MNS) に明示的に渡されています。 E-MapReduce (EMR) SDK 1.3.2 以降では、Spark Streaming は MetaService を使用して Simple Message Queue (formerly MNS) データを処理できます。 AccessKey ペアは必要ありません。詳細については、EMR SDK の MnsUtils クラスの説明をご参照ください。

MnsUtils.createPullingStreamAsBytes(ssc, queueName, endpoint, storageLevel)
MnsUtils.createPullingStreamAsRawBytes(ssc, queueName, endpoint, storageLevel)

付録

完全なサンプルコードについては、SparkMNSDemo にアクセスしてください。