本文介紹如何在E-MapReduce的Hadoop叢集,運行Spark作業消費DataHub資料、統計資料個數並列印出來。
Spark Streaming消費DataHub
準備工作
使用DataHub的訂閱功能訂閱Topic,詳細資料請參見建立訂閱。
消費DataHub資料
運行Spark Streaming作業消費DataHub資料有兩種使用方式:
指定特定的ShardId,消費該ShardId的資料。
datahubStream = DatahubUtils.createStream( ssc, project, // DataHub的專案名。 topic, // DataHub的topic名稱。 subId, // DataHub的訂閱ID。 accessKeyId, accessKeySecret, endpoint, // DataHub endpoint。 shardId, // DataHub Topic中的一個ShardId。 read, // 處理DataHub資料的RecordEntry。 StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // 取出RecordEntry中第一個Field的資料。 def read(record: RecordEntry): String = { record.getString(0) }消費所有Shard的資料。
datahubStream = DatahubUtils.createStream( ssc, project, // DataHub的專案名。 topic, // DataHub的topic名稱。 subId, // DataHub的訂閱ID。 accessKeyId, accessKeySecret, endpoint, // DataHub endpoint。 read, // 處理DataHub資料的RecordEntry。 StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // 取出RecordEntry中第一個Field的資料。 def read(record: RecordEntry): String = { record.getString(0) }
說明完整範例程式碼,請參見SparkDatahubDemo.scala。
Spark Structured Streaming消費DataHub
Maven依賴
Spark2
<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-datahub_2.11</artifactId> <version>2.0.0</version> </dependency>Spark3
請在叢集
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/目錄,下載emr-datasources_shaded_***.jar作為依賴。說明如果您的叢集中沒有以上目錄,則使用
/usr/lib/emrsdk-current/目錄。emr-datasources_shaded_***.jar,請根據您實際叢集目錄下的JAR包來替換。
消費樣本
val spark = SparkSession .builder() .appName("test datahub") .getOrCreate() //建立readstream。 val datahubRows = spark .readStream .format("datahub") .option("access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) .option("access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) .option("endpoint", "https://dh-cn-hangzhou.aliyuncs.com") .option("project", "project_test") .option("startingoffsets", "earliest") .option("topic", "topic_test") .load //DataFrame處理邏輯。 datahubRows.printSchema() // 當前執行個體中,schema有key和value兩個欄位。 println("print schema" + datahubRows.schema.toString()) val df = datahubRows.groupBy("key").count() //建立writestream,輸出資料。 val query = df .writeStream .format("console") .outputMode("complete") .start() //結束流任務。 query.awaitTermination(100000) spark.close()核心流程如下:
建立readstream讀取DataHub DataFrame資料。
自訂資料來源DataFrame處理邏輯。
建立writestream輸出資料。
說明運行程式碼範例前必須先配置環境變數。關於如何配置環境變數,請參見配置環境變數。
DataHub核心配置參數
重要Structured Streaming消費DataHub不需要填寫參數subId。主要是因為DataHub的消費offset由Spark管理,不需要DataHub管理,所以不需要提供subId。
參數
描述
是否必選
access.key.id
建立DataHub的阿里雲AccessKey ID。
是
access.key.secret
建立DataHub的阿里雲AccessKey Secret。
是
endpoint
DataHub API Endpoint。您可以在DataHub頁面查看。
是
project
DataHub的專案名。
是
topic
DataHub的topic名稱。
是
decimal.precision
當topic欄位中包含decimal欄位時,需要指定該參數。
否
decimal.scale
當topic欄位中包含decimal欄位時,需要指定該參數。
否
startingoffsets
開始消費點位。取值如下:
latest:表示最晚。
earliest:表示最早。
json字串:json結構如下所示。
{ "project名稱#topic名稱" : { "shardId" : "startingoffsets的值" } }樣本如下。
{ "project_test#topic_test" : { "0" : "100" } }
否
endingoffsets
結束消費點位。取值如下:
latest:表示最晚。
json字串:json結構如下所示。
{ "project名稱#topic名稱" : { "shardId" : "endingoffsets的值" } }
否