This topic describes how to run Spark jobs in E-MapReduce (EMR) Hadoop clusters to consume DataHub data, calculate the number of data records, and print the data records.
Use Spark Streaming to consume DataHub data
Make preparations.
Subscribe to a DataHub topic by using the subscription feature of DataHub. For more information, see Create a subscription.
Consume DataHub data.
You can run Spark Streaming jobs to consume DataHub data by using one of the following methods:
Specify the ID of a shard and consume the data of the shard:
datahubStream = DatahubUtils.createStream( ssc, project, // The name of the DataHub project. topic, // The name of the DataHub topic. subId, // The subscription ID of DataHub. accessKeyId, accessKeySecret, endpoint, // The endpoint of DataHub. shardId, // The ID of a specific shard in the DataHub topic. read, // Process RecordEntry in DataHub data. StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // Read data from the first field in RecordEntry. def read(record: RecordEntry): String = { record.getString(0) }
Consume data from all shards.
datahubStream = DatahubUtils.createStream( ssc, project, // The name of the DataHub project. topic, // The name of the DataHub topic. subId, // The subscription ID of DataHub. accessKeyId, accessKeySecret, endpoint, // The endpoint of DataHub. read, // Process RecordEntry in DataHub data. StorageLevel.MEMORY_AND_DISK) datahubStream.foreachRDD(rdd => println(rdd.count())) // Read data from the first field in RecordEntry. def read(record: RecordEntry): String = { record.getString(0) }
NoteFor the complete sample code, see SparkDatahubDemo.scala.
Use Spark Structured Streaming to consume DataHub data
Maven dependency
Spark 2
<dependency> <groupId>com.aliyun.emr</groupId> <artifactId>emr-datahub_2.11</artifactId> <version>2.0.0</version> </dependency>
Spark 3
Download emr-datasources_shaded_***.jar in the cluster directory
/opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/
as the dependency.NoteIf the preceding directory does not exist in your cluster, you can use the
/usr/lib/emrsdk-current/
directory.You need to replace emr-datasources_shaded_***.jar based on your actual cluster directory.
Example
val spark = SparkSession .builder() .appName("test datahub") .getOrCreate() // Create a read stream. val datahubRows = spark .readStream .format("datahub") .option("access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) .option("access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) .option("endpoint", "https://dh-cn-hangzhou.aliyuncs.com") .option("project", "project_test") .option("startingoffsets", "earliest") .option("topic", "topic_test") .load // Specify the processing logic of DataFrame. datahubRows.printSchema() // In the current instance, the schema has two fields: key and value. println("print schema" + datahubRows.schema.toString()) val df = datahubRows.groupBy("key").count() // Create a write stream to write data. val query = df .writeStream .format("console") .outputMode("complete") .start() // Stop the streaming job. query.awaitTermination(100000) spark.close()
Core process:
Create a read stream to read data from DataFrame of DataHub.
Specify the processing logic of DataFrame.
Create a write stream to write data.
NoteYou must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.
Core parameters of DataHub
ImportantYou do not need to configure the subId parameter when you use Spark Structured Streaming to consume DataHub data. This is because the offsets that are consumed by DataHub are managed by Spark instead of DataHub.
Parameter
Description
Required
access.key.id
The AccessKey ID of your Alibaba Cloud account that is used to create the DataHub project.
Yes
access.key.secret
The AccessKey secret of your Alibaba Cloud account that is used to create the DataHub project.
Yes
endpoint
The endpoint of the DataHub API. You can view the endpoint on the DataHub page.
Yes
project
The name of the DataHub project.
Yes
topic
The name of the DataHub topic.
Yes
decimal.precision
If the topic contains a field of the DECIMAL type, configure this parameter.
No
decimal.scale
If the topic contains a field of the DECIMAL type, configure this parameter.
No
startingoffsets
The offset from which data consumption starts. Valid values:
latest: consumes data from the latest offset.
earliest: consumes data from the earliest offset.
JSON string: consumes data from a specific JSON string. The JSON string must be in the following format:
{ "Project name#Topic name" : { "shardId" : "Value of the startingoffsets parameter" } }
Sample code:
{ "project_test#topic_test" : { "0" : "100" } }
No
endingoffsets
The offset at which data consumption ends. Valid values:
latest: stops data consumption at the latest offset.
JSON string: stops data consumption from a specific JSON string. The JSON string must be in the following format:
{ "Project name#Topic name" : { "shardId" : "Value of the endingoffsets parameter" } }
No