全部產品
Search
文件中心

E-MapReduce:Spark對接DataHub

更新時間:Jul 01, 2024

本文介紹如何在E-MapReduce的Hadoop叢集,運行Spark作業消費DataHub資料、統計資料個數並列印出來。

Spark Streaming消費DataHub

  • 準備工作

    使用DataHub的訂閱功能訂閱Topic,詳細資料請參見建立訂閱

  • 消費DataHub資料

    運行Spark Streaming作業消費DataHub資料有兩種使用方式:

    • 指定特定的ShardId,消費該ShardId的資料。

      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub的專案名。
                topic, // DataHub的topic名稱。
                subId, // DataHub的訂閱ID。
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint。
                shardId, // DataHub Topic中的一個ShardId。
                read, // 處理DataHub資料的RecordEntry。
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 取出RecordEntry中第一個Field的資料。
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    • 消費所有Shard的資料。

      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub的專案名。
                topic, // DataHub的topic名稱。
                subId, // DataHub的訂閱ID。
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub endpoint。
                read, // 處理DataHub資料的RecordEntry。
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // 取出RecordEntry中第一個Field的資料。
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    說明

    完整範例程式碼,請參見SparkDatahubDemo.scala

Spark Structured Streaming消費DataHub

  • Maven依賴

    • Spark2

       <dependency>
              <groupId>com.aliyun.emr</groupId>
              <artifactId>emr-datahub_2.11</artifactId>
              <version>2.0.0</version>
       </dependency>
    • Spark3

      請在叢集/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/目錄,下載emr-datasources_shaded_***.jar作為依賴。

      說明
      • 如果您的叢集中沒有以上目錄,則使用/usr/lib/emrsdk-current/目錄。

      • emr-datasources_shaded_***.jar,請根據您實際叢集目錄下的JAR包來替換。

  • 消費樣本

    val spark =  SparkSession
      .builder()
      .appName("test datahub")
      .getOrCreate()
    
    
    //建立readstream。
    val datahubRows = spark
      .readStream
      .format("datahub")
      .option("access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      .option("access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      .option("endpoint", "https://dh-cn-hangzhou.aliyuncs.com")
      .option("project", "project_test")
      .option("startingoffsets", "earliest")
      .option("topic", "topic_test")
      .load
    
    //DataFrame處理邏輯。
    datahubRows.printSchema() // 當前執行個體中,schema有key和value兩個欄位。
    println("print schema" + datahubRows.schema.toString())
    val df = datahubRows.groupBy("key").count()
    
    
    //建立writestream,輸出資料。
    val query = df
      .writeStream
      .format("console")
      .outputMode("complete")
      .start()
    
    //結束流任務。
    query.awaitTermination(100000)
    spark.close()

    核心流程如下:

    1. 建立readstream讀取DataHub DataFrame資料。

    2. 自訂資料來源DataFrame處理邏輯。

    3. 建立writestream輸出資料。

    說明

    運行程式碼範例前必須先配置環境變數。關於如何配置環境變數,請參見配置環境變數

  • DataHub核心配置參數

    重要

    Structured Streaming消費DataHub不需要填寫參數subId。主要是因為DataHub的消費offset由Spark管理,不需要DataHub管理,所以不需要提供subId

    參數

    描述

    是否必選

    access.key.id

    建立DataHub的阿里雲AccessKey ID。

    access.key.secret

    建立DataHub的阿里雲AccessKey Secret。

    endpoint

    DataHub API Endpoint。您可以在DataHub頁面查看。

    project

    DataHub的專案名。

    topic

    DataHub的topic名稱。

    decimal.precision

    當topic欄位中包含decimal欄位時,需要指定該參數。

    decimal.scale

    當topic欄位中包含decimal欄位時,需要指定該參數。

    startingoffsets

    開始消費點位。取值如下:

    • latest:表示最晚。

    • earliest:表示最早。

    • json字串:json結構如下所示。

      {
         "project名稱#topic名稱" : {
                 "shardId" : "startingoffsets的值"
            }
      }

      樣本如下。

      {
          "project_test#topic_test" : {
                 "0" : "100"
         }
      }

    endingoffsets

    結束消費點位。取值如下:

    • latest:表示最晚。

    • json字串:json結構如下所示。

      {
         "project名稱#topic名稱" : {
                 "shardId" : "endingoffsets的值"
            }
      }