Assistant Engineer
Assistant Engineer
  • UID622
  • Fans3
  • Follows0
  • Posts52

[Share]Is multi-stage execution serialized in Spark?

More Posted time:Oct 12, 2016 13:43 PM
I mentioned the following during the last internal training:
The stage execution is serialized in a single job, namely one stage execution starts only after the previous stage completes.
Sorry but obviously, this statement is not rigorous.
Let's take a look at the following code:

In this case, two inputs (input1 and input2) are built. The input2 has a reduceByKey and therefore triggered one Shuffle and Join and then another Shuffle (Noticeably, the Join operation does not necessarily generate a new stage but a Shuffle operation was triggered by forcibly changing the number of partitions after the Join operation. Then, stage segmentation was performed.)
So, this case includes two Shuffles and generates four stages.  The following figure shows the procedure of this case on the Spark UI. From this figure, you can see the execution order of the four stages.

Let's take a look at the screenshot from the Spark UI:

By analyzing the screenshot, you will find that:
Stage0 and Stage 1 were submitted simultaneously.
Stage0 has only two items and was set with two partitions. So, this stage can be fully executed at one time, and only takes about 3 seconds.
Stage1 has four partitions and six items and each partition contains up to two items. It would take up to 10 seconds to fully execute this stage if concurrent execution is supported. In this case, it took 13 seconds for the same result. Why did this happen? Click the description link for the elapsed time for details:

From the figure above, we can see two tasks were concurrently executed 3 seconds later than the expected time.  According to the aforementioned code, four cores are available to Spark and Stage1 can only run two tasks as Stage0 was running two tasks concurrently. After all the tasks were complete in Stage0, Stage1 proceeded with its two tasks.
Similarly, Stage2 started after Stage1 was complete and Stage3 started after Stage2 was complete.
Then, we come to the following conclusions:

Stages can be executed concurrently.
A Stage with dependency can be executed only after its dependent stages are fully executed.
The concurrency of stages depends on the number of resources.

Also, you can draw these conclusions from the source code:

If a stage has multiple dependent stages, in-depth traversal would be performed until it reaches the root node. If multiple root nodes are present, all tasks would be submitted by submitMissingTasks for execution. Yet, Spark only attempts to submit your tasks and whether or not concurrent execution is available actually depends on the number of resources.
I’ve provided another diagram below which should help you to understand multiple concepts including partition, shuffle, stage, RDD, transformation, action and source.