Setelah Simple Log Service mengumpulkan data log, Anda dapat menjalankan tugas Spark Streaming untuk mengonsumsi data tersebut.
Batasan
Konsumsi Spark Streaming hanya mendukung Spark 2.x.
Spark SDK yang disediakan oleh Simple Log Service mendukung dua mode konsumsi: mode penerima dan mode langsung. Ketergantungan Maven adalah sebagai berikut:
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-logservice_2.11</artifactId>
<version>1.7.2</version>
</dependency>Mode Penerima
Dalam mode penerima, kelompok konsumen mengonsumsi data log dan menyimpannya di Spark Executor. Setelah tugas Spark Streaming dimulai, ia membaca dan memproses data dari Executor. Setiap bagian data dikembalikan sebagai string JSON. Kelompok konsumen secara otomatis menyimpan checkpoint ke server pada interval reguler. Pembaruan manual checkpoint tidak diperlukan. Untuk informasi lebih lanjut, lihat Mengonsumsi Data Log Menggunakan Kelompok Konsumen.
Parameter
Parameter
Tipe
Deskripsi
project
String
Nama proyek Simple Log Service.
logstore
String
Nama penyimpanan log Simple Log Service.
consumerGroup
String
Nama kelompok konsumen.
endpoint
String
Titik akhir wilayah tempat proyek Simple Log Service berada. Untuk informasi lebih lanjut, lihat Titik Akhir.
accessKeyId
String
ID AccessKey yang digunakan untuk mengakses Simple Log Service.
accessKeySecret
String
Rahasia AccessKey yang digunakan untuk mengakses Simple Log Service.
Contoh
CatatanMenggunakan konfigurasi default, mode penerima dapat menyebabkan kehilangan data jika terjadi pengecualian. Untuk mencegah hal ini, Anda dapat mengaktifkan Write-Ahead Logs (WAL). WAL didukung di Spark 1.2 dan versi selanjutnya. Untuk informasi lebih lanjut tentang WAL, lihat dokumentasi Spark.
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.aliyun.logservice.LoghubUtils import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.SparkConf object TestLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Loghub") val ssc = new StreamingContext(conf, batchInterval) val loghubStream = LoghubUtils.createStream( ssc, project, logstore, consumerGroup, endpoint, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK) loghubStream.checkpoint(batchInterval * 2).foreachRDD(rdd => rdd.map(bytes => new String(bytes)).top(10).foreach(println) ) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
Mode Langsung
Mode langsung tidak memerlukan kelompok konsumen. Mode ini menggunakan API untuk meminta data langsung dari server pada waktu proses. Mode langsung memiliki keuntungan berikut:
Paralelisme yang disederhanakan: Jumlah partisi Spark sama dengan jumlah total shard Logstore. Untuk meningkatkan tingkat paralelisme suatu tugas, Anda cukup membagi shard.
Efisiensi tinggi: Anda tidak perlu mengaktifkan Write-Ahead Logs untuk mencegah kehilangan data.
Semantik exactly-once: Data diambil dari server sesuai permintaan. Checkpoint hanya dikirimkan setelah tugas berhasil.
Jika tugas tidak selesai seperti yang diharapkan karena Spark keluar secara tak terduga atau alasan lainnya, beberapa data mungkin dikonsumsi berulang kali.
Mode langsung bergantung pada lingkungan ZooKeeper untuk menyimpan status antara secara sementara. Anda juga harus menetapkan direktori checkpoint. Data status antara disimpan di direktori checkpoint yang sesuai di ZooKeeper. Jika Anda ingin mengonsumsi ulang data setelah tugas dimulai ulang, Anda dapat menghapus direktori di ZooKeeper dan mengubah nama kelompok konsumen.
Parameter
Parameter
Tipe
Deskripsi
project
String
Nama proyek Simple Log Service.
logstore
String
Nama penyimpanan log Simple Log Service.
consumerGroup
String
Nama kelompok konsumen. Ini hanya digunakan untuk menyimpan posisi konsumsi.
endpoint
String
Titik akhir wilayah tempat proyek Simple Log Service berada. Untuk informasi lebih lanjut, lihat Titik Akhir.
accessKeyId
String
ID AccessKey yang digunakan untuk mengakses Simple Log Service.
accessKeySecret
String
Rahasia AccessKey yang digunakan untuk mengakses Simple Log Service.
zkAddress
String
Alamat koneksi ZooKeeper.
Konfigurasi pembatasan
Spark Streaming memproses data dalam mikro-batch. Oleh karena itu, ketika konsumsi dimulai, batas setiap batch harus ditentukan. Batas ini menentukan jumlah entri data yang akan diambil.
Model penyimpanan dasar Simple Log Service menggunakan LogGroups. Biasanya, setiap LogGroup sesuai dengan satu permintaan tulis. Misalnya, permintaan tulis dapat berisi ribuan log. Log ini disimpan dan dikonsumsi sebagai satu LogGroup. Namun, ketika Anda menulis log menggunakan pelacakan web, setiap permintaan tulis hanya berisi satu log. Ini berarti bahwa satu LogGroup hanya berisi satu log. Untuk memenuhi persyaratan konsumsi skenario tulis yang berbeda, SDK menyediakan dua parameter berikut untuk pembatasan.
Parameter
Deskripsi
Nilai default
spark.loghub.batchGet.step
Jumlah maksimum LogGroups yang akan diambil dalam satu permintaan konsumsi.
100
spark.streaming.loghub.maxRatePerShard
Jumlah maksimum log yang dikonsumsi dari setiap shard dalam satu batch.
10000
Anda dapat menggunakan spark.streaming.loghub.maxRatePerShard untuk menentukan jumlah maksimum log yang ingin Anda konsumsi dari setiap shard di setiap batch. SDK Spark bekerja dengan mengambil jumlah LogGroups yang ditentukan oleh spark.loghub.batchGet.step dari server dan mengakumulasi jumlah log. Proses ini berlanjut hingga jumlah log yang terakumulasi mencapai atau melebihi nilai spark.streaming.loghub.maxRatePerShard. Oleh karena itu, spark.streaming.loghub.maxRatePerShard bukanlah parameter yang secara tepat mengontrol jumlah log yang dikonsumsi per batch. Jumlah sebenarnya dari log yang dikonsumsi dalam setiap batch bergantung pada spark.loghub.batchGet.step dan jumlah log dalam setiap LogGroup.
Contoh
import com.aliyun.openservices.loghub.client.config.LogHubCursorPosition import org.apache.spark.SparkConf import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.aliyun.logservice.{CanCommitOffsets, LoghubUtils} object TestDirectLoghub { def main(args: Array[String]): Unit = { if (args.length < 7) { System.err.println( """Usage: TestDirectLoghub <project> <logstore> <loghub group name> <endpoint> | <access key id> <access key secret> <batch interval seconds> <zookeeper host:port=localhost:2181> """.stripMargin) System.exit(1) } val project = args(0) val logstore = args(1) val consumerGroup = args(2) val endpoint = args(3) val accessKeyId = args(4) val accessKeySecret = args(5) val batchInterval = Milliseconds(args(6).toInt * 1000) val zkAddress = if (args.length >= 8) args(7) else "localhost:2181" def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setAppName("Test Direct Loghub") val ssc = new StreamingContext(conf, batchInterval) val zkParas = Map("zookeeper.connect" -> zkAddress, "enable.auto.commit" -> "false") val loghubStream = LoghubUtils.createDirectStream( ssc, project, logstore, consumerGroup, accessKeyId, accessKeySecret, endpoint, zkParas, LogHubCursorPosition.END_CURSOR) loghubStream.checkpoint(batchInterval).foreachRDD(rdd => { println(s"count by key: ${rdd.map(s => { s.sorted (s.length, s) }).countByKey().size}") loghubStream.asInstanceOf[CanCommitOffsets].commitAsync() }) ssc.checkpoint("hdfs:///tmp/spark/streaming") // set checkpoint directory ssc } val ssc = StreamingContext.getOrCreate("hdfs:///tmp/spark/streaming", functionToCreateContext _) ssc.start() ssc.awaitTermination() } }
Untuk informasi lebih lanjut, lihat proyek di GitHub.