すべてのプロダクト
Search
ドキュメントセンター

Simple Log Service:Spark Streaming による消費

最終更新日:Mar 26, 2026

Simple Log Service (SLS) がログデータを収集した後、Spark Streaming ジョブを実行してデータを消費できます。

制限事項

Spark Streaming コンシューマーは、Spark 2.x バージョンのみをサポートします。

SLS が提供する Spark SDK は、Receiver モードと Direct モードの 2 つの消費モードを実装しています。次の Maven 依存関係を追加してください:

<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

Receiver モード

Receiver モードでは、コンシューマーグループがログデータを消費し、Spark エグゼキュータに一時的に保存します。Spark Streaming ジョブが開始されると、エグゼキュータからデータを読み取って処理します。各ログは JSON 文字列として返されます。コンシューマーグループは定期的にチェックポイントをサーバーに自動的に保存するため、手動で更新する必要はありません。詳細については、「コンシューマーグループを使用したログデータの消費」をご参照ください。

  • パラメーター

    パラメーター

    タイプ

    説明

    Project

    String

    SLS プロジェクトの名前。

    LogStore

    String

    LogStore の名前。

    consumerGroup

    String

    コンシューマーグループの名前。

    endpoint

    String

    SLS プロジェクトが配置されているリージョンのエンドポイント。詳細については、「エンドポイント」をご参照ください。

    accessKeyId

    String

    SLS へのアクセスに使用する AccessKey ID。

    accessKeySecret

    String

    SLS へのアクセスに使用する AccessKey Secret。

  • 説明

    デフォルトでは、Receiver モードは障害発生時にデータを損失する可能性があります。これを防ぐには、先行書き込みログ (Write-Ahead Logs) を有効にしてください。この機能は Spark 1.2 以降で利用可能です。詳細については、Spark ドキュメントをご参照ください。

    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

Direct モード

Direct モードでは、コンシューマーグループは不要です。代わりに、ランタイムにサーバーから直接データをフェッチします。このモードにはいくつかの利点があります:

  • 簡素化された同時実行性:Spark パーティションの数は LogStore のシャード数と一致します。シャードを分割することで、ジョブの同時実行性を高めることができます。

  • 効率性:先行書き込みログ (Write-Ahead Logs) を必要とせずにデータ損失を防ぎます。

  • exactly-once セマンティクス:データはオンデマンドでサーバーからフェッチされ、ジョブが成功した後にのみチェックポイントがコミットされます。

    ジョブが予期せず終了した場合 (例えば、Spark の障害による)、再起動時に最後の失敗したバッチのデータが再処理される可能性があります。

Direct モードでは、中間状態を保存するために ZooKeeper クラスターが必要です。このデータのために ZooKeeper にチェックポイントディレクトリのパスを指定する必要があります。ジョブの再起動後にデータを再消費するには、ZooKeeper 内のこのディレクトリを削除し、コンシューマーグループ名を変更してください。

  • パラメーター

    パラメーター

    タイプ

    説明

    Project

    String

    SLS プロジェクトの名前。

    LogStore

    String

    LogStore の名前。

    consumerGroup

    String

    コンシューマーグループの名前。消費チェックポイントの保存にのみ使用されます。

    endpoint

    String

    SLS プロジェクトが配置されているリージョンのエンドポイント。詳細については、「エンドポイント」をご参照ください。

    accessKeyId

    String

    SLS へのアクセスに使用する AccessKey ID。

    accessKeySecret

    String

    SLS へのアクセスに使用する AccessKey Secret。

    zkAddress

    String

    ご利用の ZooKeeper クラスターの接続アドレス。

  • レート制限の設定

    Spark Streaming はマイクロバッチでデータを処理します。各バッチについて、コンシューマーはフェッチするログエントリの数を決定する必要があります。

    SLS の基盤となるストレージモデルは、ユニットとして LogGroup を使用します。通常、各 LogGroup は単一の書き込みリクエストに対応し、数千のログを含むことができます。ただし、ウェブトラッキングなどのシナリオでは、各書き込みリクエストに単一のログしか含まれない場合があります。これらの異なるシナリオに対応するため、SDK は 2 つのレート制限パラメーターを提供します:

    パラメーター

    説明

    デフォルト

    spark.loghub.batchGet.step

    1 回のリクエストでフェッチする LogGroup の最大数。

    100

    spark.streaming.loghub.maxRatePerShard

    バッチごとに単一のシャードから消費するログエントリの最大数。

    10000

    パラメーター spark.streaming.loghub.maxRatePerShard は、各バッチでシャードからフェッチするログエントリの最大数の目標を設定します。SDK は、合計ログ数が spark.streaming.loghub.maxRatePerShard で設定された目標に達するか超えるまで、spark.loghub.batchGet.step で定義された増分で LogGroup をフェッチします。このため、spark.streaming.loghub.maxRatePerShard はソフトリミットとして機能します。バッチごとにフェッチされる実際のログ数は、spark.loghub.batchGet.step と各 LogGroup 内のログ数によって異なります。

  • import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }

詳細については、GitHub 上のプロジェクトをご参照ください。