This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to submit a Spark Streaming job. It also provides best practices for Spark Streaming job retries.

Prerequisites

  • The serverless Spark engine of DLA is authorized to access your virtual private cloud (VPC). For more information, see Configure the network of data sources.
  • spark_2_4_5-dla_1_2_0 or later is selected from the Version drop-down list on the virtual cluster details page of the DLA console.

Create a Spark Streaming job

This example demonstrates how to use the serverless Spark engine of DLA to access Message Queue for Apache Kafka in your VPC.

  1. Log on to the DLA console.
  2. In the top navigation bar, select the region in which Message Queue for Apache Kafka resides.
  3. In the left-side navigation pane, choose Serverless Spark > Submit job.
  4. On the Parameter Configuration page, click Create a job Template.
  5. In the Create a job Template dialog box, configure the parameters and click OK to create a Spark Streaming job. 3
  6. In the Job List navigation tree, click the Spark Streaming job that you created and enter the following job content in the code editor:
    {
        "file": "oss://path/to/xxx.jar",
        "name": "Kafka",
        "className": "com.alibabacloud.cwchan.SparkKafkaSub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
            "spark.dla.eni.vswitch.id": "{vswitch-id}",
            "spark.dla.eni.security.group.id": "{security-group-id}",
            "spark.dla.eni.enable": "true"
        }
    }
    Package the related dependencies of Spark-Kafka when you compile and package code. Sample dependencies:
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.2</version>
    </dependency>
                        
    Notice If you submit a job as a RAM user, make sure that the RAM user is granted the related permissions. For more information, see Grant permissions to a RAM user (simplified version).

Sample code

The following core code snippet is used to access Message Queue for Apache Kafka.
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkKafkaSub")
    val sparkSessoin = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    val df = sparkSessoin
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topicName)
      .option("group.id", groupId)
      .load()
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
Note The code consumes the messages of Message Queue for Apache Kafka and displays key-value pairs.

Best practices for Spark Streaming job retries

To enable the system to automatically retry a Spark Streaming job after the job fails, you can configure the following parameters in conf.
# The number of job retries allowed. The default value is 1, which indicates that a failed job is not retried. The value 2 indicates a failed job is retried only once.
spark.dla.job.maxAttempts  2 
Note For more information about the configurations of job retries, see Configure a Spark job.

In most cases, you expect to resume a failed Spark Streaming job from the last checkpoint when you submit the job again. This section describes how to resume a failed Spark Streaming job from the last checkpoint when you submit the job again.

  • Spark Structured Streaming (Recommended)

    If Spark Structured Streaming is used, you need only to set the checkpointLocation parameter to an Object Storage Service (OSS) directory when you query data. Sample code in the func.py file:
        val query = df
          .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
          .as[(String, String)]
          .writeStream
          .format("csv")
          .option("path", outputPath)
          .outputMode("Append")
          .option("checkpointLocation", "oss://path/to/checkpoint/dir")
          .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
          .start()
    Note For more information about the checkpoints of Spark Structured Streaming, see Structured Streaming Programming Guide.
  • Spark Streaming (DStreams)

    If DStreams is used, you must use a specified programming pattern to resume a failed job from the last checkpoint. Sample code:
    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
      val ssc = new StreamingContext(...)   // new context
      val lines = ssc.socketTextStream(...) // create DStreams
      ...
      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
      ssc
    }
    
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    
    // Do additional setup on context that needs to be done,
    // irrespective of whether it is being started or restarted
    context. ...
    
    // Start the context
    context.start()
    context.awaitTermination()
    Note For more information about the checkpoints of DStreams, see Spark Streaming Programming Guide.

Monitoring and alerting of Spark Streaming jobs

The serverless Spark engine of DLA automatically enables monitoring and alerting for Spark Streaming jobs.
  • You can monitor the metrics of Spark Streaming jobs, such as the job processing latency and data processing rate. For more information, see View the metrics of the serverless Spark engine.
  • You can configure alert rules to receive alert notifications in real time.