This topic describes how to use AnalyticDB for MySQL Spark to access ApsaraMQ for Kafka over an elastic network interface (ENI).
Prerequisites
An AnalyticDB for MySQL Data Lakehouse Edition cluster is created.
A job resource group is created for the AnalyticDB for MySQL cluster. For more information, see Create and manage a resource group.
A database account is created for the AnalyticDB for MySQL cluster.
If you use an Alibaba Cloud account, you need to only create a privileged account. For more information, see the "Create a privileged account" section of the Create a database account topic.
If you use a Resource Access Management (RAM) user, you must create a privileged account and a standard account and associate the standard account with the RAM user. For more information, see Create a database account and Associate or disassociate a database account with or from a RAM user.
An ApsaraMQ for Kafka instance is created in the same region as the AnalyticDB for MySQL cluster. For more information, see Purchase and deploy an Internet- and VPC-connected instance.
A topic and a consumer group are created in the Kafka instance. For more information, see Create resources.
Object Storage Service (OSS) is activated and a bucket is created in the same region as the AnalyticDB for MySQL cluster. For more information, see Activate OSS and Create buckets.
Preparations
Log on to the ApsaraMQ for Kafka console and go to the Instance Details page to obtain the vSwitch ID of the Kafka instance.
Log on to the Elastic Compute Service (ECS) console and go to the Security Groups page to obtain the ID of the security group to which the Kafka instance is added.
Log on to the ApsaraMQ for Kafka console and go to the Whitelist Management page to check whether the CIDR block of the vSwitch is added to a whitelist of the Kafka instance.
Procedure
Download the JAR packages that correspond to the versions of the Kafka instance and the AnalyticDB for MySQL Spark application respectively. For more information, see Kafka-clients and Spark-sql-kafka-0-10.
Add the following dependencies to the pom.xml file:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.2.2</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.1</version> </dependency>Write and package a program named
Spark Streaming. In this example, the generated package is namedspark-example.jar. Sample code:package com.aliyun.spark.streaming import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object SparkKafka { def main(args: Array[String]): Unit = { if(args.length < 3){ System.err.println( """ |args0: groupId |args1: topicName |args2: bootstrapServers |""".stripMargin) System.exit(1) } val groupId = args(0) val topicName = args(1) val bootstrapServers = args(2) val sparkConf: SparkConf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setAppName("SparkKafkaSub") sparkConf.registerKryoClasses(Array(classOf[ConsumerRecord[_,_]])) val sparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() val df = sparkSession .readStream .format("kafka") // The endpoint of the Kafka instance. .option("kafka.bootstrap.servers", alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092) // The name of the topic that you want the AnalyticDB for MySQL cluster to consume. .option("subscribe", kafka_test) // The ID of the consumer group that consumes the topic. .option("group.id", kafka_groupId) .load() val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .outputMode("append") .format("console") .start() query.awaitTermination() } }Upload the downloaded JAR packages and the
Spark Streamingprogram to OSS. For more information about how to upload files, see Upload objects.Go to the Spark editor.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition tab, find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose .
Select a job resource group and a job type for the Spark job. In this example, the batch type is selected.
Run the following code in the Spark editor:
{ "args": [ "kafka_groupId", "kafka_test", "alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092" ], "file": "oss://<bucket_name>/spark-example.jar", "jars": "oss://<bucket_name>/kafka-clients-2.8.1.jar,oss://<bucket_name>/spark-sql-kafka-0-10_2.12-3.2.0.jar", "name": "Kafka Example", "className": "com.aliyun.spark.streaming.SparkKafka", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****" } }The following table describes the parameters.
Parameter
Description
argsThe arguments passed to the Spark job. Separate multiple arguments with commas (,).
fileThe path of the main file of the Spark job. The main file can be a JAR package that contains the entry class or an executable file that serves as the entry point for the Python program.
NoteThe main file of a Spark job must be stored in OSS.
jarsThe JAR packages that are required for a Spark job. Separate multiple JAR packages with commas (,).
nameThe name of the Spark job.
classNameThe entry class of the Java or Scala program. This parameter is not required for a Python program.
spark.adb.eni.enabledSpecifies whether to enable ENI. You must enable ENI when you use Enterprise Edition, Basic Edition, or Data Lakehouse Edition Spark to access Kafka.
spark.adb.eni.vswitchIdThe vSwitch ID of the Kafka instance.
spark.adb.eni.securityGroupIdThe ID of the security group to which the Kafka instance is added.
confThe configuration parameters that are required for the Spark job, which are similar to those of Apache Spark. The parameters must be in the
key:valueformat. Separate multiple parameters with commas (,). For more information about the configuration parameters that are different from those of Apache Spark or the configuration parameters specific to AnalyticDB for MySQL, see Spark application configuration parameters.Click Run Now.