This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access Message Queue for Apache Kafka.

Prerequisites

Before you use the serverless Spark engine of DLA to access Message Queue for Apache Kafka, make sure that the network where Message Queue for Apache Kafka resides is correctly configured. To correctly configure the network where Message Queue for Apache Kafka resides, perform the following operations:

Authorize the serverless Spark engine of DLA to access your virtual private cloud (VPC). For more information, see Configure the network of data sources.
  1. Obtain the vSwitch ID that is used to access your VPC. You can use the ID of the vSwitch to which Message Queue for Apache Kafka belongs. In the Message Queue for Apache Kafka console, you can view the vSwitch ID on the Instance Details page. 1
  2. Obtain the security group ID that is used to access your VPC. You can use the ID of the security group to which Message Queue for Apache Kafka belongs. In the Elastic Compute Service (ECS) console, you can search for the ID of the Message Queue for Apache Kafka instance by Security Group Name on the Security Groups page and obtain the security group ID of this instance. Security group ID
  3. Add the classless inter-domain routing (CIDR) block that corresponds to the vSwitch ID selected in Step i to a whitelist of the Message Queue for Apache Kafka instance.

    New version of the UI in the Message Queue for Apache Kafka console

    New version of the UI in the Message Queue for Apache Kafka console

Procedure

  1. Log on to the DLA console.
  2. In the top navigation bar, select the region in which Message Queue for Apache Kafka resides.
  3. In the left-side navigation pane, choose Serverless Spark > Submit job.
  4. On the Parameter Configuration page, click Create a job Template.
  5. In the Create Job dialog box, configure the parameters and click OK to create a Spark job. 3
  6. In the Job List navigation tree, click the Spark job that you created and enter the following configurations of the job in the code editor.
    {
        "file": "oss://path/to/xxx.jar",
        "name": "Kafka",
        "className": "com.alibabacloud.cwchan.SparkKafkaSub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
            "spark.dla.eni.vswitch.id": "{vswitch-id}",
            "spark.dla.eni.security.group.id": "{security-group-id}",
            "spark.dla.eni.enable": "true"
        }
    }
    When you compile and package code, make sure that the related dependencies of Spark-Kafka are included in the package.
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.2</version>
    </dependency>
                        
    Notice If you submit the job as a RAM user, you must grant the RAM user the related permissions. For more information, see Grant permissions to a RAM user (simplified version).

Sample code

The following core code snippet is used to access Message Queue for Apache Kafka. For more information about the complete code, see alibabacloud-dla-demo.
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkKafkaSub")
    val sparkSessoin = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    val df = sparkSessoin
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topicName)
      .option("group.id", groupId)
      .load()
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
Note The code consumes the messages of Message Queue for Apache Kafka and displays key-value pairs.