全部产品
Search
文档中心

Simple Log Service:Mengonsumsi data menggunakan Spark Streaming

更新时间:Nov 09, 2025

Setelah Simple Log Service mengumpulkan data log, Anda dapat menjalankan tugas Spark Streaming untuk mengonsumsi data tersebut.

Batasan

Konsumsi Spark Streaming hanya mendukung Spark 2.x.

Spark SDK yang disediakan oleh Simple Log Service mendukung dua mode konsumsi: mode penerima dan mode langsung. Ketergantungan Maven adalah sebagai berikut:

<dependency>
  <groupId>com.aliyun.emr</groupId>
  <artifactId>emr-logservice_2.11</artifactId>
  <version>1.7.2</version>
</dependency>

Mode Penerima

Dalam mode penerima, kelompok konsumen mengonsumsi data log dan menyimpannya di Spark Executor. Setelah tugas Spark Streaming dimulai, ia membaca dan memproses data dari Executor. Setiap bagian data dikembalikan sebagai string JSON. Kelompok konsumen secara otomatis menyimpan checkpoint ke server pada interval reguler. Pembaruan manual checkpoint tidak diperlukan. Untuk informasi lebih lanjut, lihat Mengonsumsi Data Log Menggunakan Kelompok Konsumen.

  • Parameter

    Parameter

    Tipe

    Deskripsi

    project

    String

    Nama proyek Simple Log Service.

    logstore

    String

    Nama penyimpanan log Simple Log Service.

    consumerGroup

    String

    Nama kelompok konsumen.

    endpoint

    String

    Titik akhir wilayah tempat proyek Simple Log Service berada. Untuk informasi lebih lanjut, lihat Titik Akhir.

    accessKeyId

    String

    ID AccessKey yang digunakan untuk mengakses Simple Log Service.

    accessKeySecret

    String

    Rahasia AccessKey yang digunakan untuk mengakses Simple Log Service.

  • Contoh

    Catatan

    Menggunakan konfigurasi default, mode penerima dapat menyebabkan kehilangan data jika terjadi pengecualian. Untuk mencegah hal ini, Anda dapat mengaktifkan Write-Ahead Logs (WAL). WAL didukung di Spark 1.2 dan versi selanjutnya. Untuk informasi lebih lanjut tentang WAL, lihat dokumentasi Spark.

    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.SparkConf
    
    object TestLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val loghubStream = LoghubUtils.createStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            endpoint,
            accessKeyId,
            accessKeySecret,
            StorageLevel.MEMORY_AND_DISK)
    
          loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd =>
            rdd.map(bytes => new String(bytes)).top(10).foreach(println)
          )
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }

Mode Langsung

Mode langsung tidak memerlukan kelompok konsumen. Mode ini menggunakan API untuk meminta data langsung dari server pada waktu proses. Mode langsung memiliki keuntungan berikut:

  • Paralelisme yang disederhanakan: Jumlah partisi Spark sama dengan jumlah total shard Logstore. Untuk meningkatkan tingkat paralelisme suatu tugas, Anda cukup membagi shard.

  • Efisiensi tinggi: Anda tidak perlu mengaktifkan Write-Ahead Logs untuk mencegah kehilangan data.

  • Semantik exactly-once: Data diambil dari server sesuai permintaan. Checkpoint hanya dikirimkan setelah tugas berhasil.

    Jika tugas tidak selesai seperti yang diharapkan karena Spark keluar secara tak terduga atau alasan lainnya, beberapa data mungkin dikonsumsi berulang kali.

Mode langsung bergantung pada lingkungan ZooKeeper untuk menyimpan status antara secara sementara. Anda juga harus menetapkan direktori checkpoint. Data status antara disimpan di direktori checkpoint yang sesuai di ZooKeeper. Jika Anda ingin mengonsumsi ulang data setelah tugas dimulai ulang, Anda dapat menghapus direktori di ZooKeeper dan mengubah nama kelompok konsumen.

  • Parameter

    Parameter

    Tipe

    Deskripsi

    project

    String

    Nama proyek Simple Log Service.

    logstore

    String

    Nama penyimpanan log Simple Log Service.

    consumerGroup

    String

    Nama kelompok konsumen. Ini hanya digunakan untuk menyimpan posisi konsumsi.

    endpoint

    String

    Titik akhir wilayah tempat proyek Simple Log Service berada. Untuk informasi lebih lanjut, lihat Titik Akhir.

    accessKeyId

    String

    ID AccessKey yang digunakan untuk mengakses Simple Log Service.

    accessKeySecret

    String

    Rahasia AccessKey yang digunakan untuk mengakses Simple Log Service.

    zkAddress

    String

    Alamat koneksi ZooKeeper.

  • Konfigurasi pembatasan

    Spark Streaming memproses data dalam mikro-batch. Oleh karena itu, ketika konsumsi dimulai, batas setiap batch harus ditentukan. Batas ini menentukan jumlah entri data yang akan diambil.

    Model penyimpanan dasar Simple Log Service menggunakan LogGroups. Biasanya, setiap LogGroup sesuai dengan satu permintaan tulis. Misalnya, permintaan tulis dapat berisi ribuan log. Log ini disimpan dan dikonsumsi sebagai satu LogGroup. Namun, ketika Anda menulis log menggunakan pelacakan web, setiap permintaan tulis hanya berisi satu log. Ini berarti bahwa satu LogGroup hanya berisi satu log. Untuk memenuhi persyaratan konsumsi skenario tulis yang berbeda, SDK menyediakan dua parameter berikut untuk pembatasan.

    Parameter

    Deskripsi

    Nilai default

    spark.loghub.batchGet.step

    Jumlah maksimum LogGroups yang akan diambil dalam satu permintaan konsumsi.

    100

    spark.streaming.loghub.maxRatePerShard

    Jumlah maksimum log yang dikonsumsi dari setiap shard dalam satu batch.

    10000

    Anda dapat menggunakan spark.streaming.loghub.maxRatePerShard untuk menentukan jumlah maksimum log yang ingin Anda konsumsi dari setiap shard di setiap batch. SDK Spark bekerja dengan mengambil jumlah LogGroups yang ditentukan oleh spark.loghub.batchGet.step dari server dan mengakumulasi jumlah log. Proses ini berlanjut hingga jumlah log yang terakumulasi mencapai atau melebihi nilai spark.streaming.loghub.maxRatePerShard. Oleh karena itu, spark.streaming.loghub.maxRatePerShard bukanlah parameter yang secara tepat mengontrol jumlah log yang dikonsumsi per batch. Jumlah sebenarnya dari log yang dikonsumsi dalam setiap batch bergantung pada spark.loghub.batchGet.step dan jumlah log dalam setiap LogGroup.

  • Contoh

    import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils}
    
    object TestDirectLoghub {
      def main(args: Array[String]): Unit = {
        if (args.length < 7) {
          System.err.println(
            """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint>
              |         <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181>
            """.stripMargin)
          System.exit(1)
        }
    
        val project = args(0)
        val logstore = args(1)
        val consumerGroup = args(2)
        val endpoint = args(3)
        val accessKeyId = args(4)
        val accessKeySecret = args(5)
        val batchInterval = Milliseconds(args(6).toInt * 1000)
        val zkAddress = if (args.length >= 8) args(7) else "localhost:2181"
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Direct Loghub")
          val ssc = new StreamingContext(conf, batchInterval)
          val zkParas = Map("zookeeper.connect" -> zkAddress,
            "enable.auto.commit" -> "false")
          val loghubStream = LoghubUtils.createDirectStream(
            ssc,
            project,
            logstore,
            consumerGroup,
            accessKeyId,
            accessKeySecret,
            endpoint,
            zkParas,
            LogHubCursorPosition.END_CURSOR)
    
          loghubStream.checkpoint(batchInterval).foreachRDD(rdd => {
            println(s"count by key: ${rdd.map(s => {
              s.sorted
              (s.length, s)
            }).countByKey().size}")
            loghubStream.asInstanceOf[CanCommitOffsets].commitAsync()
          })
          ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory
          ssc
        }
    
        val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _)
        ssc.start()
        ssc.awaitTermination()
      }
    }

Untuk informasi lebih lanjut, lihat proyek di GitHub.