すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark を使用して DataHub にアクセスする

最終更新日:Jan 11, 2025

このトピックでは、E-MapReduce(EMR)Hadoop クラスターで Spark ジョブを実行して DataHub データを消費し、データレコード数を計算し、データレコードを出力する方法について説明します。

Spark Streaming を使用して DataHub データを消費する

  • 準備を行います。

    DataHub のサブスクリプション機能を使用して、DataHub トピックをサブスクライブします。 詳細については、「サブスクリプションを作成する」をご参照ください。

  • DataHub データを消費します。

    次のいずれかの方法を使用して、Spark Streaming ジョブを実行して DataHub データを消費できます。

    • シャードの ID を指定し、シャードのデータを消費します。

      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub プロジェクトの名前。
                topic, // DataHub トピックの名前。
                subId, // DataHub のサブスクリプション ID。
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub のエンドポイント。
                shardId, // DataHub トピック内の特定のシャードの ID。
                read, // DataHub データの RecordEntry を処理します。
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // RecordEntry の最初のフィールドからデータを読み取ります。
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    • すべてのシャードからデータを消費します。

      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // DataHub プロジェクトの名前。
                topic, // DataHub トピックの名前。
                subId, // DataHub のサブスクリプション ID。
                accessKeyId,
                accessKeySecret,
                endpoint, // DataHub のエンドポイント。
                read, // DataHub データの RecordEntry を処理します。
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // RecordEntry の最初のフィールドからデータを読み取ります。
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    説明

    完全なサンプルコードについては、SparkDatahubDemo.scala を参照してください。

Spark Structured Streaming を使用して DataHub データを消費する

  • Maven 依存関係

    • Spark 2

       <dependency>
              <groupId>com.aliyun.emr</groupId>
              <artifactId>emr-datahub_2.11</artifactId>
              <version>2.0.0</version>
       </dependency>
    • Spark 3

      クラスターディレクトリ emr-datasources_shaded_***.jar/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/ 内の を依存関係としてダウンロードします。

      説明
      • 上記のディレクトリがクラスターに存在しない場合は、/usr/lib/emrsdk-current/ ディレクトリを使用できます。

      • 実際のクラスターディレクトリに基づいて、emr-datasources_shaded_***.jar を置き換える必要があります。

  • val spark =  SparkSession
      .builder()
      .appName("test datahub")
      .getOrCreate()
    
    
    // 読み取りストリームを作成します。
    val datahubRows = spark
      .readStream
      .format("datahub")
      .option("access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      .option("access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      .option("endpoint", "https://dh-cn-hangzhou.aliyuncs.com")
      .option("project", "project_test")
      .option("startingoffsets", "earliest")
      .option("topic", "topic_test")
      .load
    
    // DataFrame の処理ロジックを指定します。
    datahubRows.printSchema() // このインスタンスでは、スキーマには key と value の 2 つのフィールドがあります。
    println("print schema" + datahubRows.schema.toString())
    val df = datahubRows.groupBy("key").count()
    
    
    // データを書き込むための書き込みストリームを作成します。
    val query = df
      .writeStream
      .format("console")
      .outputMode("complete")
      .start()
    
    // ストリーミングジョブを停止します。
    query.awaitTermination(100000)
    spark.close()

    コアプロセス:

    1. DataHub の DataFrame からデータを読み取る読み取りストリームを作成します。

    2. DataFrame の処理ロジックを指定します。

    3. データ書き込み用の書き込みストリームを作成します。

    説明

    サンプルコードを実行する前に、環境変数を構成する必要があります。 環境変数の構成方法の詳細については、このトピックの 環境変数を構成する セクションを参照してください。

  • DataHub のコアパラメーター

    重要

    Spark Structured Streaming を使用して DataHub データを消費する場合、subId パラメーターを構成する必要はありません。 これは、DataHub によって消費されるオフセットが DataHub ではなく Spark によって管理されるためです。

    パラメーター

    説明

    必須

    access.key.id

    DataHub プロジェクトの作成に使用される Alibaba Cloud アカウントの AccessKey ID。

    はい

    access.key.secret

    DataHub プロジェクトの作成に使用される Alibaba Cloud アカウントの AccessKey シークレット。

    はい

    endpoint

    DataHub API のエンドポイント。 エンドポイントは DataHub ページで確認できます。

    はい

    project

    DataHub プロジェクトの名前。

    はい

    topic

    DataHub トピックの名前。

    はい

    decimal.precision

    トピックに DECIMAL 型のフィールドが含まれている場合は、このパラメーターを構成します。

    いいえ

    decimal.scale

    トピックに DECIMAL 型のフィールドが含まれている場合は、このパラメーターを構成します。

    いいえ

    startingoffsets

    データ消費を開始するオフセット。 有効な値:

    • latest: 最新のオフセットからデータを消費します。

    • earliest: 最も古いオフセットからデータを消費します。

    • JSON 文字列: 特定の JSON 文字列からデータを消費します。 JSON 文字列は次の形式である必要があります。

      {
         "プロジェクト名#トピック名" : {
                 "shardId" : "startingoffsets パラメーターの値"
            }
      }

      サンプルコード:

      {
          "project_test#topic_test" : {
                 "0" : "100"
         }
      }

    いいえ

    endingoffsets

    データ消費を終了するオフセット。 有効な値:

    • latest: 最新のオフセットでデータ消費を停止します。

    • JSON 文字列: 特定の JSON 文字列からのデータ消費を停止します。 JSON 文字列は次の形式である必要があります。

      {
         "プロジェクト名#トピック名" : {
                 "shardId" : "endingoffsets パラメーターの値"
            }
      }

    いいえ