すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Simple Log Service でのデータの利用

最終更新日:Jan 11, 2025

このトピックでは、Spark Streaming を使用して Simple Log Service のログデータを利用し、ログエントリの数を計算する方法について説明します。

Spark を使用して Simple Log Service にアクセスする

  • 方法 1:レシーバーベースの DStream

    val logServiceProject = args(0)    // Simple Log Service のプロジェクト名。
        val logStoreName = args(1)     // Simple Log Service のログストア名。
        val loghubConsumerGroupName = args(2)  // 同じコンシューマーグループ名を持つジョブは、ログストア内のデータを共同で消費します。
        val loghubEndpoint = args(3)  // Simple Log Service のデータクラス API エンドポイント。
        val accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")     // Simple Log Service へのアクセスに使用する AccessKey ID。
        val accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET") // Simple Log Service へのアクセスに使用する AccessKey シークレット。
        val numReceivers = args(4).toInt  // ログストアからデータを読み取るために起動されるレシーバーの数。
        val batchInterval = Milliseconds(args(5).toInt * 1000) // Spark Streaming のデータ処理間隔。
        val conf = new SparkConf().setAppName("Test Loghub Streaming")
        val ssc = new StreamingContext(conf, batchInterval)
        val loghubStream = LoghubUtils.createStream(
          ssc,
          logServiceProject,
          logStoreName,
          loghubConsumerGroupName,
          loghubEndpoint,
          numReceivers,
          accessKeyId,
          accessKeySecret,
          StorageLevel.MEMORY_AND_DISK)
        loghubStream.foreachRDD(rdd => println(rdd.count()))
        ssc.start()
        ssc.awaitTermination()
    説明

    サンプルコードを実行する前に、環境変数を設定する必要があります。環境変数の設定方法の詳細については、このトピックの[環境変数の設定]セクションをご参照ください。

  • 方法 2:ダイレクト API ベースの DStream

    val logServiceProject = args(0)
        val logStoreName = args(1)
        val loghubConsumerGroupName = args(2)
        val loghubEndpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkConnect = args(7)
        val checkpointPath = args(8)
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub Streaming")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkConnect, "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            logServiceProject,
            logStoreName,
            loghubConsumerGroupName,
            accessKeyId,
            accessKeySecret,
            loghubEndpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
          ssc.checkpoint(checkpointPath)
          val stream = loghubStream.checkpoint(batchInterval)
          stream.foreachRDD(rdd => {
            println(rdd.count())
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc
        }
        val ssc = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()

    E-MapReduce(EMR)SDK V1.4.0 以降では、Spark Streaming はダイレクト API ベースの DStream を使用してデータを処理できます。この方法を使用すると、LogHub のデータは Write-Ahead Logging(WAL)ファイルとして繰り返し保存されません。この方法により、Spark Streaming の WAL 機能を有効にすることなく、少なくとも 1 回はデータを書き込むことができます。この方法はまだ実験段階です。この方法を使用する場合は、次の点に注意してください。

    • DStream アクションを実行する場合は、コミットを行う必要があります。

    • Spark Streaming ジョブでは、ログストアに対して 1 つのアクションのみを実行できます。

    • この方法には、ZooKeeper サービスのサポートが必要です。

MetaService を使用したアクセス

上記のサンプルコードでは、AccessKey ペアがインターフェースに明示的に渡されています。 EMR SDK V1.3.2 以降では、Spark Streaming は MetaService を使用して Simple Log Service のデータを処理できます。 AccessKey ペアは必要ありません。詳細については、EMR SDK の LoghubUtils クラスの説明をご参照ください。

LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
LoghubUtils.createStream(ssc, logServiceProject, logStoreName, loghubConsumerGroupName, numReceivers, storageLevel, cursorPosition, mLoghubCursorStartTime, forceSpecial)
説明
  • EMR SDK は、Simple Log Service に対して BEGIN_CURSOR、END_CURSOR、SPECIAL_TIMER_CURSOR の 3 つの消費モードをサポートしています。デフォルトでは、END_CURSOR が使用されます。

    • BEGIN_CURSOR:ログヘッダーからデータを利用します。チェックポイントレコードが存在する場合は、チェックポイントから利用が開始されます。

    • END_CURSOR:ログの末尾からデータを利用します。チェックポイントレコードが存在する場合は、チェックポイントから利用が開始されます。

    • SPECIAL_TIMER_CURSOR:指定された時点からデータを利用します。単位:秒。チェックポイントレコードが存在する場合は、チェックポイントから利用が開始されます。

    すべての消費モードは、チェックポイントレコードの影響を受けます。チェックポイントレコードが存在する場合は、常にチェックポイントから利用が開始されます。 SPECIAL_TIMER_CURSOR モードでは、指定された時点から強制的にデータの利用を開始できます。このモードを使用するには、LoghubUtils クラスの createStream メソッドで、ビジネス要件に基づいて次のパラメーターを設定する必要があります。

    • cursorPosition:このパラメーターを LogHubCursorPosition.SPECIAL_TIMER_CURSOR に設定します。

    • forceSpecial:このパラメーターを true に設定します。

  • マスターノードを除く EMR クラスタのすべてのノードは、インターネットに接続できません。そのため、Simple Log Service の内部エンドポイントを設定する必要があります。そうしないと、Simple Log Service からデータをリクエストできません。

付録

完全なサンプルコードについては、GitHub にアクセスしてください。