This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to submit a Spark Streaming job. It also provides best practices for Spark Streaming job retries.
Prerequisites
- The serverless Spark engine of DLA is authorized to access your virtual private cloud (VPC). For more information, see Configure the network of data sources.
spark_2_4_5-dla_1_2_0
or later is selected from the Version drop-down list on the virtual cluster details page of the DLA console.
Create a Spark Streaming job
This example demonstrates how to use the serverless Spark engine of DLA to access Message Queue for Apache Kafka in your VPC.
- Log on to the DLA console.
- In the top navigation bar, select the region in which Message Queue for Apache Kafka resides.
- In the left-side navigation pane, choose .
- On the Parameter Configuration page, click Create a job Template.
- In the Create a job Template dialog box, configure the parameters and click OK to create a Spark Streaming job.
- In the Job List navigation tree, click the Spark Streaming job that you created and
enter the following job content in the code editor:
Package the related dependencies of{ "file": "oss://path/to/xxx.jar", "name": "Kafka", "className": "com.alibabacloud.cwchan.SparkKafkaSub", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 5, "spark.executor.resourceSpec": "medium", "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs", "spark.dla.eni.vswitch.id": "{vswitch-id}", "spark.dla.eni.security.group.id": "{security-group-id}", "spark.dla.eni.enable": "true" } }
Spark-Kafka
when you compile and package code. Sample dependencies:<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.2</version> </dependency>
Notice If you submit a job as a RAM user, make sure that the RAM user is granted the related permissions. For more information, see Grant permissions to a RAM user (simplified version).
Sample code
val sparkConf: SparkConf = new SparkConf()
.setAppName("SparkKafkaSub")
val sparkSessoin = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val df = sparkSessoin
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topicName)
.option("group.id", groupId)
.load()
val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
Best practices for Spark Streaming job retries
# The number of job retries allowed. The default value is 1, which indicates that a failed job is not retried. The value 2 indicates a failed job is retried only once.
spark.dla.job.maxAttempts 2
In most cases, you expect to resume a failed Spark Streaming job from the last checkpoint when you submit the job again. This section describes how to resume a failed Spark Streaming job from the last checkpoint when you submit the job again.
-
Spark Structured Streaming (Recommended)
If Spark Structured Streaming is used, you need only to set the checkpointLocation parameter to an Object Storage Service (OSS) directory when you query data. Sample code in the func.py file:val query = df .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] .writeStream .format("csv") .option("path", outputPath) .outputMode("Append") .option("checkpointLocation", "oss://path/to/checkpoint/dir") .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) .start()
Note For more information about the checkpoints of Spark Structured Streaming, see Structured Streaming Programming Guide. -
Spark Streaming (DStreams)
If DStreams is used, you must use a specified programming pattern to resume a failed job from the last checkpoint. Sample code:// Function to create and setup a new StreamingContext def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // new context val lines = ssc.socketTextStream(...) // create DStreams ... ssc.checkpoint(checkpointDirectory) // set checkpoint directory ssc } // Get StreamingContext from checkpoint data or create a new one val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start() context.awaitTermination()
Note For more information about the checkpoints of DStreams, see Spark Streaming Programming Guide.
Monitoring and alerting of Spark Streaming jobs
- You can monitor the metrics of Spark Streaming jobs, such as the job processing latency and data processing rate. For more information, see View the metrics of the serverless Spark engine.
- You can configure alert rules to receive alert notifications in real time.