This topic describes how to access Message Queue for Apache Kafka by using the serverless Spark engine of Data Lake Analytics (DLA).

Prerequisites

Before you use the serverless Spark engine of DLA to access Message Queue for Apache Kafka, make sure that the network where Message Queue for Apache Kafka is deployed is correctly configured. To check whether the network where Message Queue for Apache Kafka is deployed is correctly configured, perform the following operations:
  • Authorize the serverless Spark engine of DLA to gain access to your VPC.
    1. Use the vSwitch of Message Queue for Apache Kafka to access your VPC. In the Message Queue for Apache Kafka console, you can find the vSwitch of Message Queue for Apache Kafka.
    2. Use the security group of Message Queue for Apache Kafka to access your VPC. You can log on to the Elastic Compute Service (ECS) console and search for the instance ID of Message Queue for Apache Kafka by Security Group Name on the Security Groups page to obtain the required security group.
  • In the DLA console, click Virtual Cluster management in the left-side navigation pane. On the Virtual Cluster management page, verify that the version of the virtual cluster (VC) is spark_2_4_5-dla_1_2_0 or later.

Procedure

  1. Log on to the DLA console.
  2. In the top navigation bar, select the region where Message Queue for Apache Kafka is deployed.
  3. In the left-side navigation pane, choose Serverless Spark > Submit job.
  4. On the Parameter Configuration page, click Create Job.
  5. In the Create Job dialog box, configure the parameters and click OK to create a Spark job.
  6. In the Job List navigation tree, click the Spark job that you created and enter the following content of the job in the code editor.
    {
        "file": "oss://path/to/xxx.jar",
        "name": "Kafka",
        "className": "com.alibabacloud.cwchan.SparkKafkaSub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
            "spark.dla.eni.vswitch.id": "{vswitch-id}",
            "spark.dla.eni.security.group.id": "{security-group-id}",
            "spark.dla.eni.enable": "true"
        }
    }
    Package the relevant dependencies of Spark-Kafka when you compile and package code. Sample dependencies:
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.2</version>
    </dependency>
                        
    Notice If you submit the job as a RAM user, you must grant permissions to the RAM user. For more information, see Grant permissions to a RAM user (simplified version).

Sample code

The following core code snippet is used to access Message Queue for Apache Kafka. For more information about the complete code, see alibabacloud-dla-demo.
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkKafkaSub")
    val sparkSessoin = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    val df = sparkSessoin
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topicName)
      .option("group.id", groupId)
      .load()
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
Note The code consumes Kafka messages and displays key-value pairs.