This topic describes how to submit a Spark Streaming job in DLA Serverless Spark.

Background information

The following example demonstrates how to use DLA Serverless Spark to connect to Alibaba Cloud Message Queue for Apache Kafka over an internal network.

Prerequisites

The following prerequisites are met before you run a Spark Streaming job in DLA Serverless Spark:
  • DLA Serverless Spark is authorized to access your VPC. For more information, see Access a VPC.
  • The VSwitch where Message Queue for Apache Kafka resides is used as the VSwitch to access your VPC. You can log on to the Message Queue for Apache Kafka console and view the VSwitch ID on the Instance Details tab.
  • The security group of Message Queue for Apache Kafka can be used as the security group to access your VPC. You can log on to the Elastic Compute Service (ECS) console and search for the instance ID of Message Queue for Apache Kafka by Security Group Name on the Security Groups page to obtain the required security group.

Procedure

  1. Log on to the Data Lake Analytics console.
  2. In the upper-left corner of the page, select the region where Message Queue for Apache Kafka resides.
  3. In the left-side navigation pane, choose Serverless Spark > Submit job.
  4. On the Parameter Configuration page, click Create Job.
  5. In the Create Job dialog box, configure parameters as prompted and click OK to create a Spark Streaming job.
  6. In the Job List navigation tree, click the Spark Streaming job you created. In the Spark Streaming job edit box, enter the content of the Spark Streaming job.
    {
        "file": "oss://path/to/xxx.jar",
        "name": "Kafka",
        "className": "com.alibabacloud.cwchan.SparkKafkaSub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
            "spark.dla.eni.vswitch.id": "{vswitch-id}",
            "spark.dla.eni.security.group.id": "{security-group-id}",
            "spark.dla.eni.enable": "true"
        }
    }
    Notice If you use a RAM user to submit a job, you must set the spark.dla.roleArn parameter. For more information, see Use a RAM user to create Spark Streaming jobs.

Sample code

The following core code snippet is used to connect to Message Queue for Apache Kafka:
        val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkKafkaSub")
    val sparkSessoin = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    val df = sparkSessoin
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topicName)
      .option("group.id", groupId)
      .load()
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
Note The code consumes messages of Message Queue for Apache Kafka and displays the key-value pairs.