All Products
Search
Document Center

E-MapReduce:Use Spark to access DataHub

Last Updated:Aug 15, 2023

This topic describes how to run Spark jobs in E-MapReduce (EMR) Hadoop clusters to consume DataHub data, calculate the number of data records, and print the data records.

Use Spark Streaming to consume DataHub data

  • Make preparations.

    Subscribe to a DataHub topic by using the subscription feature of DataHub. For more information, see Create a subscription.

  • Consume DataHub data.

    You can run Spark Streaming jobs to consume DataHub data by using one of the following methods:

    • Specify the ID of a shard and consume the data of the shard:

      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // The name of the DataHub project. 
                topic, // The name of the DataHub topic. 
                subId, // The subscription ID of DataHub. 
                accessKeyId,
                accessKeySecret,
                endpoint, // The endpoint of DataHub. 
                shardId, // The ID of a specific shard in the DataHub topic. 
                read, // Process RecordEntry in DataHub data. 
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // Read data from the first field in RecordEntry. 
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    • Consume data from all shards.

      datahubStream = DatahubUtils.createStream(
                ssc,
                project, // The name of the DataHub project. 
                topic, // The name of the DataHub topic. 
                subId, // The subscription ID of DataHub. 
                accessKeyId,
                accessKeySecret,
                endpoint, // The endpoint of DataHub. 
                read, // Process RecordEntry in DataHub data. 
                StorageLevel.MEMORY_AND_DISK)
      datahubStream.foreachRDD(rdd => println(rdd.count()))
      
      // Read data from the first field in RecordEntry. 
      def read(record: RecordEntry): String = {
        record.getString(0)
      }
    Note

    For the complete sample code, see SparkDatahubDemo.scala.

Use Spark Structured Streaming to consume DataHub data

  • Maven dependency

    • Spark 2

       <dependency>
              <groupId>com.aliyun.emr</groupId>
              <artifactId>emr-datahub_2.11</artifactId>
              <version>2.0.0</version>
       </dependency>
    • Spark 3

      Download emr-datasources_shaded_***.jar in the cluster directory /opt/apps/SPARK-EXTENSION/spark-extension-current/spark3-emrsdk/ as the dependency.

      Note
      • If the preceding directory does not exist in your cluster, you can use the /usr/lib/emrsdk-current/ directory.

      • You need to replace emr-datasources_shaded_***.jar based on your actual cluster directory.

  • Example

    val spark =  SparkSession
      .builder()
      .appName("test datahub")
      .getOrCreate()
    
    
    // Create a read stream. 
    val datahubRows = spark
      .readStream
      .format("datahub")
      .option("access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      .option("access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      .option("endpoint", "https://dh-cn-hangzhou.aliyuncs.com")
      .option("project", "project_test")
      .option("startingoffsets", "earliest")
      .option("topic", "topic_test")
      .load
    
    // Specify the processing logic of DataFrame. 
    datahubRows.printSchema() // In the current instance, the schema has two fields: key and value. 
    println("print schema" + datahubRows.schema.toString())
    val df = datahubRows.groupBy("key").count()
    
    
    // Create a write stream to write data. 
    val query = df
      .writeStream
      .format("console")
      .outputMode("complete")
      .start()
    
    // Stop the streaming job. 
    query.awaitTermination(100000)
    spark.close()

    Core process:

    1. Create a read stream to read data from DataFrame of DataHub.

    2. Specify the processing logic of DataFrame.

    3. Create a write stream to write data.

    Note

    You must configure environment variables before you can run the sample code. For more information about how to configure environment variables, see the Configure environment variables section in this topic.

  • Core parameters of DataHub

    Important

    You do not need to configure the subId parameter when you use Spark Structured Streaming to consume DataHub data. This is because the offsets that are consumed by DataHub are managed by Spark instead of DataHub.

    Parameter

    Description

    Required

    access.key.id

    The AccessKey ID of your Alibaba Cloud account that is used to create the DataHub project.

    Yes

    access.key.secret

    The AccessKey secret of your Alibaba Cloud account that is used to create the DataHub project.

    Yes

    endpoint

    The endpoint of the DataHub API. You can view the endpoint on the DataHub page.

    Yes

    project

    The name of the DataHub project.

    Yes

    topic

    The name of the DataHub topic.

    Yes

    decimal.precision

    If the topic contains a field of the DECIMAL type, configure this parameter.

    No

    decimal.scale

    If the topic contains a field of the DECIMAL type, configure this parameter.

    No

    startingoffsets

    The offset from which data consumption starts. Valid values:

    • latest: consumes data from the latest offset.

    • earliest: consumes data from the earliest offset.

    • JSON string: consumes data from a specific JSON string. The JSON string must be in the following format:

      {
         "Project name#Topic name" : {
                 "shardId" : "Value of the startingoffsets parameter"
            }
      }

      Sample code:

      {
          "project_test#topic_test" : {
                 "0" : "100"
         }
      }

    No

    endingoffsets

    The offset at which data consumption ends. Valid values:

    • latest: stops data consumption at the latest offset.

    • JSON string: stops data consumption from a specific JSON string. The JSON string must be in the following format:

      {
         "Project name#Topic name" : {
                 "shardId" : "Value of the endingoffsets parameter"
            }
      }

    No