All Products
Search
Document Center

Data Lake Analytics - Deprecated:Spark Streaming

Last Updated:Feb 07, 2024

This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to submit a Spark Streaming job. This topic also provides best practices for retrying Spark Streaming jobs.

Prerequisites

Before you run a Spark Streaming job in the serverless Spark engine of DLA, ensure that the following prerequisites are met:
  • The serverless Spark engine of DLA is authorized to access your virtual private cloud (VPC). For more information, see Configure the network of data sources.
  • spark_2_4_5-dla_1_2_0 or later is selected from the Version drop-down list on the virtual cluster details page of the DLA console.

Create a Spark Streaming job

The following example shows how to use the serverless Spark engine of DLA to access a Message Queue for Apache Kafka instance in your VPC.

  1. Log on to the DLA console.
  2. In the upper-left corner of the page, select the region in which Message Queue for Apache Kafka is deployed.
  3. In the left-side navigation pane, choose Serverless Spark > Submit job.
  4. On the Parameter Configuration page, click Create a job Template.
  5. In the Create a job Template dialog box, configure the parameters and click OK to create a Spark Streaming job. 3
  6. In the Job List navigation tree, click the Spark Streaming job that you created and enter the following content in the code editor:
    {
        "file": "oss://path/to/xxx.jar",
        "name": "Kafka",
        "className": "com.alibabacloud.cwchan.SparkKafkaSub",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://path/to/spark-logs",
            "spark.dla.eni.vswitch.id": "{vswitch-id}",
            "spark.dla.eni.security.group.id": "{security-group-id}",
            "spark.dla.eni.enable": "true"
        }
    }
    When you compile and package code, package the related Spark-Kafka dependencies. Sample dependencies are provided:
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.4.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.2</version>
    </dependency>
                        
    Important If you submit a job as a RAM user, make sure that the RAM user is granted the related permissions. For more information, see Grant permissions to a RAM user.

Sample code

The following code snippet must be used to connect to Message Queue for Apache Kafka:
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("SparkKafkaSub")
    val sparkSessoin = SparkSession
      .builder()
      .config(sparkConf)
      .getOrCreate()
    val df = sparkSessoin
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topicName)
      .option("group.id", groupId)
      .load()
    val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .outputMode("append")
      .format("console")
      .start()
    query.awaitTermination()
Note The code consumes the messages of Message Queue for Apache Kafka and prints key-value pairs.

Best practices for retrying Spark Streaming jobs

If you want to enable the system to automatically retry a Spark Streaming job after the job fails, you must configure the following parameters in conf.
# The maximum allowed number of job retries. The default value 1 specifies that a failed job is not retried. The value 5 specifies that a failed job can be retried five times.
spark.dla.job.maxAttempts  5

# The time window in which job retries are counted. The value 1h specifies that job retries are counted during one hour. After the time expires, the number of retries reverts to zero. The default value -1 specifies that job retries are counted without the limit on the time window.
spark.dla.job.attemptFailuresValidityInterval 1h 

# The preceding parameters indicate that a job can be retried five times during an hour.
Note For more information about how to configure job retries, see Parameters related to job retries.

In most cases, you expect to resume a failed Spark Streaming job from the last checkpoint when you submit the job again. This section describes how to resume a failed Spark Streaming job from the last checkpoint when you submit the job again.

  • Spark Structured Streaming (recommended)

    If Spark Structured Streaming is used, you need only to set the checkpointLocation parameter to an Object Storage Service (OSS) directory when you query data. The sample code is provided:
        val query = df
          .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
          .as[(String, String)]
          .writeStream
          .format("csv")
          .option("path", outputPath)
          .outputMode("Append")
          .option("checkpointLocation", "oss://path/to/checkpoint/dir")
          .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
          .start()
    Note For more information about the checkpoints of Spark Structured Streaming, see Structured Streaming Programming Guide.
  • Spark Streaming (DStreams)

    If DStreams is used, you must use a specific programming pattern to resume a failed job from the last checkpoint. The sample code is provided:
    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
      val ssc = new StreamingContext(...)   // new context
      val lines = ssc.socketTextStream(...) // create DStreams
      ...
      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
      ssc
    }
    
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    
    // Do additional setup on context that needs to be done,
    // irrespective of whether it is being started or restarted
    context. ...
    
    // Start the context
    context.start()
    context.awaitTermination()
    Note For more information about the checkpoints of DStreams, see Spark Streaming Programming Guide.

Monitoring and alerting of Spark Streaming jobs

The serverless Spark engine of DLA automatically enables monitoring and alerting for Spark Streaming jobs.
  • You can monitor the metrics of Spark Streaming jobs, such as the job processing latency and data processing rate. For more information, see View the metrics of the serverless Spark engine.
  • You can configure alert rules to receive alert notifications in real time. For more information, see Manage alerts.