すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Spark Streaming を使用したデータの消費

最終更新日:Nov 09, 2025

Simple Log Service がログデータを収集した後、Spark Streaming タスクを実行してデータを消費できます。

制限事項

Spark Streaming の消費は、Spark 2.x のみをサポートします。

Simple Log Service が提供する Spark SDK は、レシーバーモードとダイレクトモードの 2 つの消費モードをサポートしています。Maven の依存関係は次のとおりです。

<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

レシーバーモード

レシーバーモードでは、使用者グループがログデータを消費し、Spark Executor にステージングします。Spark Streaming タスクが開始されると、Executor からデータを読み取って処理します。各データは JSON 文字列として返されます。使用者グループは、定期的にチェックポイントをサーバーに自動的に保存します。手動でのチェックポイントの更新は必要ありません。詳細については、「使用者グループを使用したログデータの消費」をご参照ください。

  • パラメーター

    パラメーター

    タイプ

    説明

    project

    String

    Simple Log Service プロジェクトの名前。

    logstore

    String

    Simple Log Service Logstore の名前。

    consumerGroup

    String

    使用者グループの名前。

    endpoint

    String

    Simple Log Service プロジェクトが所在するリージョンのエンドポイント。詳細については、「エンドポイント」をご参照ください。

    accessKeyId

    String

    Simple Log Service へのアクセスに使用される AccessKey ID。

    accessKeySecret

    String

    Simple Log Service へのアクセスに使用される AccessKey Secret。

  • 説明

    デフォルトの構成を使用すると、例外が発生した場合にレシーバーモードでデータが失われる可能性があります。これを防ぐには、先行書き込みログ (WAL) を有効にします。WAL は Spark 1.2 以降でサポートされています。WAL の詳細については、「Spark ドキュメント」をご参照ください。

    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // チェックポイントディレクトリを設定
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

ダイレクトモード

ダイレクトモードでは、使用者グループは必要ありません。API を使用して、実行時にサーバーから直接データをリクエストします。ダイレクトモードには、次の利点があります。

  • 簡素化された並列処理: Spark パーティションの数は、Logstore シャードの総数と同じです。タスクの並列処理の次数を増やすには、シャードを分割するだけです。

  • 高効率: データ損失を防ぐために先行書き込みログを有効にする必要はありません。

  • Exactly-once セマンティクス: データはオンデマンドでサーバーから取得されます。チェックポイントは、タスクが成功した後にのみ送信されます。

    Spark が予期せず終了したなどの理由でタスクが期待どおりに終了しない場合、一部のデータが繰り返し消費されることがあります。

ダイレクトモードは、中間状態を一時的に保存するために ZooKeeper 環境に依存します。また、チェックポイントディレクトリを設定する必要があります。中間状態データは、ZooKeeper の対応するチェックポイントディレクトリに保存されます。タスクの再起動後にデータを再消費する場合は、ZooKeeper のディレクトリを削除して、使用者グループ名を変更できます。

  • パラメーター

    パラメーター

    タイプ

    説明

    project

    String

    Simple Log Service プロジェクトの名前。

    logstore

    String

    Simple Log Service Logstore の名前。

    consumerGroup

    String

    使用者グループの名前。これは、消費位置を保存するためにのみ使用されます。

    endpoint

    String

    Simple Log Service プロジェクトが所在するリージョンのエンドポイント。詳細については、「エンドポイント」をご参照ください。

    accessKeyId

    String

    Simple Log Service へのアクセスに使用される AccessKey ID。

    accessKeySecret

    String

    Simple Log Service へのアクセスに使用される AccessKey Secret。

    zkAddress

    String

    ZooKeeper のエンドポイント。

  • スロットリング構成

    Spark Streaming は、マイクロバッチでデータを処理します。したがって、消費が開始されるときに、各バッチの境界を定義する必要があります。この境界は、取得するデータエントリの数を指定します。

    Simple Log Service の基盤となるストレージモデルは LogGroups を使用します。通常、各 LogGroup は 1 つの書き込みリクエストに対応します。たとえば、書き込みリクエストには数千のログが含まれる場合があります。これらのログは、1 つの LogGroup として保存および消費されます。ただし、Web トラッキングを使用してログを書き込む場合、各書き込みリクエストには 1 つのログしか含まれません。これは、1 つの LogGroup に 1 つのログしか含まれないことを意味します。さまざまな書き込みシナリオの消費要件を満たすために、SDK はスロットリング用に次の 2 つのパラメーターを提供します。

    パラメーター

    説明

    デフォルト値

    spark.loghub.batchGet.step

    1 回の消費リクエストで取得する LogGroups の最大数。

    100

    spark.streaming.loghub.maxRatePerShard

    1 つのバッチで各シャードから消費するログの最大数。

    10000

    spark.streaming.loghub.maxRatePerShard を使用して、各バッチで各シャードから消費するログの最大数を指定できます。Spark SDK は、spark.loghub.batchGet.step で指定された数の LogGroups をサーバーから取得し、ログの数を累積することによって機能します。このプロセスは、累積されたログの数が spark.streaming.loghub.maxRatePerShard の値に達するか、それを超えるまで続行されます。したがって、spark.streaming.loghub.maxRatePerShard は、バッチごとに消費されるログの数を正確に制御するパラメーターではありません。各バッチで消費される実際のログの数は、spark.loghub.batchGet.step と各 LogGroup のログの数によって異なります。

  • import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // チェックポイントディレクトリを設定
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }

詳細については、GitHub 上のプロジェクトをご参照ください。