This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access ApsaraMQ for Kafka.
DLA is discontinued. AnalyticDB for MySQL supports the features of DLA and provides more features and better performance. For more information about how to use AnalyticDB for MySQL, see Access Message Queue for Apache Kafka.
Before you begin
Before you use the serverless Spark engine of DLA to access ApsaraMQ for Kafka, make sure that the network in which the ApsaraMQ for Kafka instance resides is correctly configured. To configure the network in which the ApsaraMQ for Kafka instance resides, perform the following operations:
Authorize the serverless Spark engine of DLA to access your virtual private cloud (VPC). For more information, see Configure the network of data sources.
Obtain the ID of the vSwitch that is used to access your VPC. You can use the ID of the vSwitch to which the ApsaraMQ for Kafka instance belongs. In the ApsaraMQ for Kafka console, you can view the vSwitch ID on the details page of the ApsaraMQ for Kafka instance.
Obtain the ID of the security group that is used to access your VPC. You can use the ID of the security group to which the ApsaraMQ for Kafka instance belongs. In the Elastic Compute Service (ECS) console, you can search for the security group to which the ApsaraMQ for Kafka instance belongs by security group name on the Security Group page and obtain the security group ID.
Add the CIDR block that corresponds to the vSwitch ID that you obtained in Step a to a whitelist of the ApsaraMQ for Kafka instance.
New version of the UI in the ApsaraMQ for Kafka console
Procedure
Log on to the DLA console.
In the upper-left corner of the page, select the region in which the ApsaraMQ for Kafka instance resides.
In the left-side navigation pane, choose
.On the Parameter Configuration page, click Create a job Template.
In the Create a job Template dialog box, configure the parameters as prompted and click OK.
On the Job List tab, click the name of the Spark job that you created and enter the following content of the job in the code editor.
{ "file": "oss://path/to/xxx.jar", "name": "Kafka", "className": "com.alibabacloud.cwchan.SparkKafkaSub", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 5, "spark.executor.resourceSpec": "medium", "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs", "spark.dla.eni.vswitch.id": "{vswitch-id}", "spark.dla.eni.security.group.id": "{security-group-id}", "spark.dla.eni.enable": "true" } }
When you compile and package code, make sure that the related dependencies of
Spark-Kafka
are included in the package.<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.2</version> </dependency>
ImportantIf you submit the job as a Resource Access Management (RAM) user, you must grant permissions to the RAM user. For more information, see Grant permissions to a RAM user.
Sample code
The following core code snippet is used to access the ApsaraMQ for Kafka instance. For more information about the complete code, see alibabacloud-dla-demo.
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkKafkaSub")
val sparkSessoin = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val df = sparkSessoin
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topicName)
.option("group.id", groupId)
.load()
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
The code consumes the messages of ApsaraMQ for Kafka and displays key-value pairs.