DataHub is a data distribution platform designed to process streaming data. You can publish and subscribe to applications for streaming data in DataHub and distribute the data to other platforms. DataHub allows you to analyze streaming data and build applications based on the streaming data. This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access DataHub.

Prerequisites

  • A project is created in DataHub. In the following procedure, DataHub is deployed in the China (Shenzhen) region. The name of the project is spark_test, and the name of the topic is topic01.
    Note The built-in SparkOnDataHub Connectors supports topics of the TUPLE type.
  • Object Storage Service (OSS) is activated. For more information, see Activate OSS.

Background information

To enable the serverless Spark engine of DLA to consume data in DataHub, you must send the locally-prepared test data to DataHub to test the connectivity between the serverless Spark engine of DLA and DataHub. In this topic, assume that you run the following commands to download the test code to a local directory and run the JAR file to send data to topic01 in the spark_test project.
// Download the test code to a local directory.
wget https://spark-home.oss-cn-shanghai.aliyuncs.com/common_test/common-test-0.0.1-SNAPSHOT-shaded.jar
// Run the JAR file to send data to topic01 in the spark_test project.
java -cp /opt/jars/common-test-0.0.1-SNAPSHOT-shaded.jar com.aliyun.datahub.DatahubWrite_java spark_test topic01 xxx1 xxx2 https://dh-cn-shenzhen.aliyuncs.com
The following table describes the parameters that are used in the preceding command.
Parameter Description
spark_test The name of the project in DataHub.
topic01 The name of the topic in the project of DataHub.
xxx1 The AccessKey ID that is used to access Alibaba Cloud API operations.
xxx2 The AccessKey secret that is used to access Alibaba Cloud API operations.
https://dh-cn-shenzhen.aliyuncs.com The public endpoint that is used to access DataHub in the China (Shenzhen) region.

Procedure

  1. Compile the following test code and the pom.xml file that contains the dependencies required for accessing DataHub. Then, package the test code and dependencies into a JAR file and upload this file to OSS.
    Sample test code:
    package com.aliyun.spark
    
    import com.aliyun.datahub.model.RecordEntry
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.aliyun.datahub.DatahubUtils
    import org.apache.spark.streaming.{ Milliseconds, StreamingContext}
    import org.apache.spark.streaming.dstream.DStream
    
    object SparkStreamingOnDataHub {
      def main(args: Array[String]): Unit = {
    
        val endpoint = args(0)
        // The AccessKey ID in Resource Access Management (RAM).
        val accessKeyId = args(1)
        // The AccessKey secret in RAM.
        val accessKeySecret = args(2)
        // The subscription ID of DataHub.
        val subId = args(3)
        // The name of the project in DataHub.
        val project = args(4)
        // The name of the topic in DataHub.
        val topic = args(5)
        val batchInterval = Milliseconds(10 * 1000)
    
        var checkpoint = "/tmp/SparkOnDatahubReliable_T001/"
        if (args.length >= 7) {
          checkpoint = args(6)
        }
        var shardId = "0"
        if (args.length >= 8) {
          shardId = args(7).trim
        }
    
        println(s"=====project=${project}===topic=${topic}===batchInterval=${batchInterval.milliseconds / 1000}=====")
    
        def functionToCreateContext(): StreamingContext = {
          val conf = new SparkConf().setAppName("Test Datahub")
          // Enable a reliable data receiver.
          conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
          val ssc = new StreamingContext(conf, batchInterval)
          ssc.checkpoint(checkpoint)
    
          var datahubStream: DStream[String] = null
          if (! shardId.isEmpty) {
            datahubStream = DatahubUtils.createStream(
              ssc,
              project,
              topic,
              subId,
              accessKeyId,
              accessKeySecret,
              endpoint,
              shardId,
              read,
              StorageLevel.MEMORY_AND_DISK_SER_2)
          } else {
            datahubStream = DatahubUtils.createStream(
              ssc,
              project,
              topic,
              subId,
              accessKeyId,
              accessKeySecret,
              endpoint,
              read,
              StorageLevel.MEMORY_AND_DISK_SER_2)
          }
    
          datahubStream.foreachRDD { rdd =>
            // The rdd.collect() method is called for a small amount of data in the test environment. Exercise caution when you use this method in actual environments.
            rdd.collect().foreach(println)
            //        rdd.foreach(println)
          }
          ssc
        }
    
        val ssc = StreamingContext.getActiveOrCreate(checkpoint, functionToCreateContext)
        ssc.start()
        ssc.awaitTermination()
      }
    
      def read(record: RecordEntry): String = {
        s"${record.getString(0)},${record.getString(1)}"
      }
    }
    Dependencies in the pom.xml file of DataHub:
            <dependency>
                <groupId>com.aliyun.apsaradb</groupId>
                <artifactId>datahub-spark</artifactId>
                <version>2.9.2-public_2.4.3-1.0.4</version>
            </dependency>
            <dependency>
                <groupId>com.aliyun.datahub</groupId>
                <artifactId>aliyun-sdk-datahub</artifactId>
                <version>2.9.2-public</version>
            </dependency>
  2. Log on to the DLA console.
  3. In the top navigation bar, select the region where DataHub resides.
  4. In the left-side navigation pane, choose Serverless Spark > Submit job.
  5. On the Parameter Configuration page, click Create Job.
  6. In the Create Job dialog box, configure the parameters and click OK to create a Spark job.
    3
  7. In the Job List navigation tree, click the Spark job that you created and enter the following content of the job in the code editor. Replace the parameter values based on the following parameter descriptions. Then, click Save and Execute.
    {
        "args": [
            "http://dh-cn-shenzhen-int-vpc.aliyuncs.com", # The public endpoint that is used to access DataHub in the China (Shenzhen) region.
            "xxx1",  # The AccessKey ID that is used to access Alibaba Cloud API operations.
            "xxx2",  # The AccessKey secret that is used to access Alibaba Cloud API operations.
            "xxx 3", # The subscription ID of topic01 in DataHub.
            "spark_test",  # The name of the project in DataHub.
            "topic01"  # The name of the topic in DataHub.
        ],
        "file": "oss://spark_test/jars/datahub/spark-examples-0.0.1-SNAPSHOT.jar",  # The OSS directory where the test code is saved.
        "name": "datahub-test",
        "jars": [
            //# The OSS directory where the JAR file that contains the dependencies of the test code is saved.
            "oss://spark_test/jars/datahub/aliyun-sdk-datahub-2.9.2-public.jar",
            "oss://spark_test/jars/datahub/datahub-spark-2.9.2-public_2.4.3-1.0.4.jar"
        ],
        "className": "com.aliyun.spark.SparkStreamingOnDataHub",
        "conf": {
            "spark.driver.resourceSpec": "small",  # The specifications of the Spark driver, which can be small, medium, large, or xlarge.
            "spark.executor.instances": 2,  # The number of Spark executors.
            "spark.executor.resourceSpec": "small"  # The specifications of Spark executors, which can be small, medium, large, or xlarge.
        }
    }

Result

After the job succeeds, find the job and click Log in the Operation column to view the logs of the job.