Setelah Simple Log Service (SLS) mengumpulkan data log, Anda dapat menjalankan pekerjaan Spark Streaming untuk mengonsumsi data tersebut.
Batasan
Konsumen Spark Streaming hanya mendukung versi Spark 2.x.
SDK Spark yang disediakan oleh Simple Log Service (SLS) mengimplementasikan dua mode konsumsi: mode Receiver dan mode Direct. Tambahkan dependensi Maven berikut:
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-logservice_2.11</artifactId>
<version>1.7.2</version>
</dependency>
Mode Receiver
Pada mode Receiver, sebuah kelompok konsumen mengonsumsi data log dan menyimpannya sementara di executor Spark. Setelah pekerjaan Spark Streaming dimulai, data tersebut dibaca dan diproses dari executor. Setiap log dikembalikan sebagai string JSON. Kelompok konsumen secara otomatis menyimpan checkpoint berkala ke server, sehingga tidak memerlukan pembaruan manual. Untuk informasi selengkapnya, lihat Mengonsumsi data log menggunakan kelompok konsumen.
-
Parameter
Parameter
Jenis
Deskripsi
Project
String
Nama Proyek Simple Log Service.
LogStore
String
Nama LogStore.
consumerGroup
String
Nama kelompok konsumen.
endpoint
String
Titik akhir untuk Wilayah tempat Proyek Simple Log Service berada. Untuk informasi selengkapnya, lihat Endpoints.
accessKeyId
String
ID AccessKey yang digunakan untuk mengakses Simple Log Service.
accessKeySecret
String
Rahasia AccessKey yang digunakan untuk mengakses Simple Log Service.
-
Contoh
CatatanSecara default, mode Receiver dapat kehilangan data saat terjadi kegagalan. Aktifkan Write-Ahead Logs untuk mencegah hal ini. Fitur ini tersedia di Spark 1.2 dan versi setelahnya. Untuk detailnya, lihat dokumentasi Spark.
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.aliyun.logservice.LoghubUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.SparkConf object TestLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Loghub") val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, project, logstore, consumerGroup, endpoint, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd => rdd.map(bytes => new String(bytes)).top(10).foreach(println) ) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
Mode Direct
Mode Direct tidak memerlukan kelompok konsumen. Sebaliknya, mode ini mengambil data langsung dari server pada waktu proses. Mode ini menawarkan beberapa keunggulan:
-
Konkurensi yang disederhanakan: Jumlah partisi Spark sesuai dengan jumlah shard di LogStore. Anda dapat meningkatkan konkurensi pekerjaan dengan melakukan Pemisahan shard.
-
Efisiensi: Mencegah kehilangan data tanpa memerlukan Write-Ahead Logs.
-
Semantik tepat-sekali: Data diambil dari server sesuai permintaan, dan checkpoint hanya dikomit setelah pekerjaan berhasil.
Jika pekerjaan dihentikan secara tak terduga (misalnya karena kegagalan Spark), data dari batch terakhir yang gagal dapat diproses ulang saat restart.
Mode Direct memerlukan kluster ZooKeeper untuk menyimpan status antara. Anda harus menentukan path direktori checkpoint di ZooKeeper untuk data ini. Untuk mengonsumsi ulang data setelah restart pekerjaan, hapus direktori tersebut di ZooKeeper dan ubah nama kelompok konsumen.
-
Parameter
Parameter
Type
Deskripsi
Project
String
Nama Proyek Simple Log Service.
LogStore
String
Nama LogStore.
consumerGroup
String
Nama kelompok konsumen, hanya digunakan untuk menyimpan titik pemeriksaan konsumsi.
endpoint
String
Titik akhir untuk Wilayah tempat Proyek Simple Log Service berada. Untuk informasi selengkapnya, lihat Endpoints.
accessKeyId
String
ID AccessKey yang digunakan untuk mengakses Simple Log Service.
accessKeySecret
String
Rahasia AccessKey yang digunakan untuk mengakses Simple Log Service.
zkAddress
String
Alamat koneksi kluster ZooKeeper Anda.
-
Konfigurasi pembatasan laju
Spark Streaming memproses data dalam mikro-batch. Untuk setiap batch, konsumen harus menentukan berapa banyak entri log yang akan diambil.
Model penyimpanan dasar Simple Log Service menggunakan LogGroup sebagai unit. Biasanya, setiap LogGroup sesuai dengan satu permintaan tulis dan dapat berisi ribuan log. Namun, dalam skenario seperti pelacakan web, setiap permintaan tulis mungkin hanya berisi satu log. Untuk menangani skenario yang berbeda ini, SDK menyediakan dua parameter pembatasan laju:
Parameter
Deskripsi
Default
spark.loghub.batchGet.step
Jumlah maksimum LogGroup yang diambil dalam satu permintaan.
100
spark.streaming.loghub.maxRatePerShard
Jumlah maksimum entri log yang dikonsumsi dari satu shard per batch.
10000
Parameter spark.streaming.loghub.maxRatePerShard menetapkan target jumlah maksimum entri log yang diambil dari satu shard dalam setiap batch. SDK mengambil LogGroup secara bertahap sesuai dengan nilai spark.loghub.batchGet.step hingga jumlah total log mencapai atau melebihi target yang ditetapkan oleh spark.streaming.loghub.maxRatePerShard. Karena itu, spark.streaming.loghub.maxRatePerShard berfungsi sebagai batas lunak. Jumlah aktual log yang diambil per batch akan bervariasi tergantung pada spark.loghub.batchGet.step dan jumlah log dalam setiap LogGroup.
-
Contoh
import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition import org.apache.spark.SparkConf import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils} object TestDirectLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val zkAddress = if (args.length >= 8) args(7) else "localhost:2181" def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Direct Loghub") val ssc = new StreamingContext(conf, batchInterval) val zkParas = Map("zookeeper.connect" -> zkAddress, "enable.auto.commit" -> "false") val loghubStream = LoghubUtils.createDirectStream( ssc, project, logstore, consumerGroup, accessKeyId, accessKeySecret, endpoint, zkParas, LogHubCursorPosition.END_CURSOR) loghubStream.checkpoint(batchInterval).foreachRDD(rdd => { println(s"count by key: ${rdd.map(s => { s.sorted (s.length, s) }).countByKey().size}") loghubStream.asInstanceOf[CanCommitOffsets].commitAsync() }) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
Untuk informasi selengkapnya, lihat proyek di GitHub.