このトピックでは、E-MapReduce(EMR)Hadoop クラスターで Spark ジョブを実行して DataHub データを消費し、データレコード数を計算し、データレコードを出力する方法について説明します。
Spark Streaming を使用して DataHub データを消費する
準備を行います。
DataHub のサブスクリプション機能を使用して、DataHub トピックをサブスクライブします。 詳細については、「サブスクリプションを作成する」をご参照ください。
DataHub データを消費します。
次のいずれかの方法を使用して、Spark Streaming ジョブを実行して DataHub データを消費できます。
シャードの ID を指定し、シャードのデータを消費します。
datahubStream = DatahubUtils.createStream( ssc, project, // DataHub プロジェクトの名前。 topic, // DataHub トピックの名前。 subId, // DataHub のサブスクリプション ID。 accessKeyId, accessKeySecret, endpoint, // DataHub のエンドポイント。 shardId, // DataHub トピック内の特定のシャードの ID。 read, // DataHub データの RecordEntry を処理します。 StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // RecordEntry の最初のフィールドからデータを読み取ります。 def read(record: RecordEntry): String = { record.getString(0) }
すべてのシャードからデータを消費します。
datahubStream = DatahubUtils.createStream( ssc, project, // DataHub プロジェクトの名前。 topic, // DataHub トピックの名前。 subId, // DataHub のサブスクリプション ID。 accessKeyId, accessKeySecret, endpoint, // DataHub のエンドポイント。 read, // DataHub データの RecordEntry を処理します。 StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // RecordEntry の最初のフィールドからデータを読み取ります。 def read(record: RecordEntry): String = { record.getString(0) }
説明完全なサンプルコードについては、SparkDatahubDemo.scala を参照してください。
Spark Structured Streaming を使用して DataHub データを消費する
Maven 依存関係
Spark 2
<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-datahub_2.11</artifactId> <version>2.0.0</version> </dependency>
Spark 3
クラスターディレクトリ emr-datasources_shaded_***.jar
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/
内の を依存関係としてダウンロードします。説明上記のディレクトリがクラスターに存在しない場合は、
/usr/lib/emrsdk-current/
ディレクトリを使用できます。実際のクラスターディレクトリに基づいて、emr-datasources_shaded_***.jar を置き換える必要があります。
例
val spark = SparkSession .builder() .appName("test datahub") .getOrCreate() // 読み取りストリームを作成します。 val datahubRows = spark .readStream .format("datahub") .option("access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) .option("access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) .option("endpoint", "https://dh-cn-hangzhou.aliyuncs.com") .option("project", "project_test") .option("startingoffsets", "earliest") .option("topic", "topic_test") .load // DataFrame の処理ロジックを指定します。 datahubRows.printSchema() // このインスタンスでは、スキーマには key と value の 2 つのフィールドがあります。 println("print schema" + datahubRows.schema.toString()) val df = datahubRows.groupBy("key").count() // データを書き込むための書き込みストリームを作成します。 val query = df .writeStream .format("console") .outputMode("complete") .start() // ストリーミングジョブを停止します。 query.awaitTermination(100000) spark.close()
コアプロセス:
DataHub の DataFrame からデータを読み取る読み取りストリームを作成します。
DataFrame の処理ロジックを指定します。
データ書き込み用の書き込みストリームを作成します。
説明サンプルコードを実行する前に、環境変数を構成する必要があります。 環境変数の構成方法の詳細については、このトピックの 環境変数を構成する セクションを参照してください。
DataHub のコアパラメーター
重要Spark Structured Streaming を使用して DataHub データを消費する場合、subId パラメーターを構成する必要はありません。 これは、DataHub によって消費されるオフセットが DataHub ではなく Spark によって管理されるためです。
パラメーター
説明
必須
access.key.id
DataHub プロジェクトの作成に使用される Alibaba Cloud アカウントの AccessKey ID。
はい
access.key.secret
DataHub プロジェクトの作成に使用される Alibaba Cloud アカウントの AccessKey シークレット。
はい
endpoint
DataHub API のエンドポイント。 エンドポイントは DataHub ページで確認できます。
はい
project
DataHub プロジェクトの名前。
はい
topic
DataHub トピックの名前。
はい
decimal.precision
トピックに DECIMAL 型のフィールドが含まれている場合は、このパラメーターを構成します。
いいえ
decimal.scale
トピックに DECIMAL 型のフィールドが含まれている場合は、このパラメーターを構成します。
いいえ
startingoffsets
データ消費を開始するオフセット。 有効な値:
latest: 最新のオフセットからデータを消費します。
earliest: 最も古いオフセットからデータを消費します。
JSON 文字列: 特定の JSON 文字列からデータを消費します。 JSON 文字列は次の形式である必要があります。
{ "プロジェクト名#トピック名" : { "shardId" : "startingoffsets パラメーターの値" } }
サンプルコード:
{ "project_test#topic_test" : { "0" : "100" } }
いいえ
endingoffsets
データ消費を終了するオフセット。 有効な値:
latest: 最新のオフセットでデータ消費を停止します。
JSON 文字列: 特定の JSON 文字列からのデータ消費を停止します。 JSON 文字列は次の形式である必要があります。
{ "プロジェクト名#トピック名" : { "shardId" : "endingoffsets パラメーターの値" } }
いいえ