This topic describes how to submit a Spark Streaming job in the serverless Spark engine of Data Lake Analytics (DLA).

Background information

The following example shows how to use the serverless Spark engine of DLA to access Alibaba Cloud Message Queue for Apache Kafka in a VPC.

Prerequisites

The following prerequisites are met before you run a Spark Streaming job in the serverless Spark engine of DLA:
  • The serverless Spark engine of DLA is authorized to access your VPC. For more information, see Configure the network of data sources.
  • In the DLA console, you can click Virtual Cluster management in the left-side navigation pane and verify that the version of the virtual cluster (VC) is spark_2_4_5-dla_1_2_0 or later on the Virtual Cluster management page.

Procedure

  1. Log on to the DLA console.
  2. In the top navigation bar, select the region where Message Queue for Apache Kafka resides.
  3. In the left-side navigation pane, choose Serverless Spark > Submit job.
  4. On the Parameter Configuration page, click Create Job.
  5. In the Create Job dialog box, configure the parameters and click OK to create a Spark job.3
  6. In the Job List navigation tree, click the Spark job that you created and enter the following content of the job in the code editor.
    {
        "file": "oss://path/to/xxx.jar",
        "name": "Kafka",
        "className": "com.alibabacloud.cwchan.SparkKafkaSub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
            "spark.dla.eni.vswitch.id": "{vswitch-id}",
            "spark.dla.eni.security.group.id": "{security-group-id}",
            "spark.dla.eni.enable": "true"
        }
    }
    Package the relevant dependencies of Spark-Kafka when you compile and package code. Sample dependencies:
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.2</version>
    </dependency>
                        
    Notice If you submit the job as a RAM user, you must grant permissions to the RAM user. For more information, see Grant permissions to a RAM user (simplified version).

Sample code

The following core code snippet is used to access Message Queue for Apache Kafka:
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkKafkaSub")
    val sparkSessoin = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    val df = sparkSessoin
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topicName)
      .option("group.id", groupId)
      .load()
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
Note The code consumes the messages of Message Queue for Apache Kafka and displays key-value pairs.