All Products
Search
Document Center

AnalyticDB:Access ApsaraMQ for Kafka

Last Updated:Mar 27, 2025

This topic describes how to use AnalyticDB for MySQL Spark to access ApsaraMQ for Kafka over an elastic network interface (ENI).

Prerequisites

Preparations

  1. Log on to the ApsaraMQ for Kafka console and go to the Instance Details page to obtain the vSwitch ID of the Kafka instance.

  2. Log on to the Elastic Compute Service (ECS) console and go to the Security Groups page to obtain the ID of the security group to which the Kafka instance is added.

  3. Log on to the ApsaraMQ for Kafka console and go to the Whitelist Management page to check whether the CIDR block of the vSwitch is added to a whitelist of the Kafka instance.

Procedure

  1. Download the JAR packages that correspond to the versions of the Kafka instance and the AnalyticDB for MySQL Spark application respectively. For more information, see Kafka-clients and Spark-sql-kafka-0-10.

  2. Add the following dependencies to the pom.xml file:

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.2.2</version>
        <scope>test</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.1</version>
    </dependency>
  3. Write and package a program named Spark Streaming. In this example, the generated package is named spark-example.jar. Sample code:

    package com.aliyun.spark.streaming
    
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object SparkKafka {
      def main(args: Array[String]): Unit = {
        if(args.length < 3){
          System.err.println(
            """
              |args0: groupId
              |args1: topicName
              |args2: bootstrapServers
              |""".stripMargin)
          System.exit(1)
        }
        val groupId = args(0)
        val topicName = args(1)
        val bootstrapServers = args(2)
    
    
        val sparkConf: SparkConf = new SparkConf()
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .setAppName("SparkKafkaSub")
        sparkConf.registerKryoClasses(Array(classOf[ConsumerRecord[_,_]]))
    
        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
    
        val df = sparkSession
          .readStream
          .format("kafka")
         // The endpoint of the Kafka instance. 
          .option("kafka.bootstrap.servers", alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092)
         // The name of the topic that you want the AnalyticDB for MySQL cluster to consume. 
          .option("subscribe", kafka_test)
         // The ID of the consumer group that consumes the topic. 
          .option("group.id", kafka_groupId)
          .load()
    
        val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
          .writeStream
          .outputMode("append")
          .format("console")
          .start()
        query.awaitTermination()
    
      }
    }
  4. Upload the downloaded JAR packages and the Spark Streaming program to OSS. For more information about how to upload files, see Upload objects.

  5. Go to the Spark editor.

    1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition tab, find the cluster that you want to manage and click the cluster ID.

    2. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  6. Select a job resource group and a job type for the Spark job. In this example, the batch type is selected.

  7. Run the following code in the Spark editor:

    {
        "args": [
          "kafka_groupId",
          "kafka_test",
          "alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092"
        ],
        "file": "oss://<bucket_name>/spark-example.jar",
        "jars": "oss://<bucket_name>/kafka-clients-2.8.1.jar,oss://<bucket_name>/spark-sql-kafka-0-10_2.12-3.2.0.jar",
        "name": "Kafka Example",
        "className": "com.aliyun.spark.streaming.SparkKafka",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****"
        }
    }

    The following table describes the parameters.

    Parameter

    Description

    args

    The arguments passed to the Spark job. Separate multiple arguments with commas (,).

    file

    The path of the main file of the Spark job. The main file can be a JAR package that contains the entry class or an executable file that serves as the entry point for the Python program.

    Note

    The main file of a Spark job must be stored in OSS.

    jars

    The JAR packages that are required for a Spark job. Separate multiple JAR packages with commas (,).

    name

    The name of the Spark job.

    className

    The entry class of the Java or Scala program. This parameter is not required for a Python program.

    spark.adb.eni.enabled

    Specifies whether to enable ENI. You must enable ENI when you use Enterprise Edition, Basic Edition, or Data Lakehouse Edition Spark to access Kafka.

    spark.adb.eni.vswitchId

    The vSwitch ID of the Kafka instance.

    spark.adb.eni.securityGroupId

    The ID of the security group to which the Kafka instance is added.

    conf

    The configuration parameters that are required for the Spark job, which are similar to those of Apache Spark. The parameters must be in the key:value format. Separate multiple parameters with commas (,). For more information about the configuration parameters that are different from those of Apache Spark or the configuration parameters specific to AnalyticDB for MySQL, see Spark application configuration parameters.

  8. Click Run Now.