すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark を使用して ApsaraMQ for RocketMQ にアクセスする

最終更新日:Jan 11, 2025

このトピックでは、Spark Streaming を使用して ApsaraMQ for RocketMQ のデータを使用し、各バッチの単語数を計算する方法について説明します。

Spark を使用して ApsaraMQ for RocketMQ にアクセスする

サンプルコード:

val Array(cId, topic, subExpression, parallelism, interval) = args
    val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    val numStreams = parallelism.toInt
    val batchInterval = Milliseconds(interval.toInt)
    val conf = new SparkConf().setAppName("Test ONS Streaming") // ONS Streaming のテスト
    val ssc = new StreamingContext(conf, batchInterval)
    def func: Message => Array[Byte] = msg => msg.getBody
    val onsStreams = (0 until numStreams).map { i =>
      println(s"starting stream $i") // ストリーム i を開始しています
      OnsUtils.createStream(
        ssc,
        cId,
        topic,
        subExpression,
        accessKeyId,
        accessKeySecret,
        StorageLevel.MEMORY_AND_DISK_2,
        func)
    }
    val unionStreams = ssc.union(onsStreams)
    unionStreams.foreachRDD(rdd => {
      rdd.map(bytes => new String(bytes)).flatMap(line => line.split(" "))
        .map(word => (word, 1))
        .reduceByKey(_ + _).collect().foreach(e => println(s"word: ${e._1}, cnt: ${e._2}")) // word: ${e._1}, cnt: ${e._2}
    })
    ssc.start()
    ssc.awaitTermination()
説明

サンプルコードを実行する前に、環境変数を設定する必要があります。環境変数の設定方法の詳細については、このトピックの[環境変数の設定] セクションをご参照ください。

付録

完全なサンプルコードについては、GitHub にアクセスしてください。