Apache Spark source code walkthrough - Spark thesis reading notes and job submission and running - Alibaba Cloud Developer Forums: Cloud Discussion Forums

Gordon
Assistant Engineer
Assistant Engineer
  • UID622
  • Fans2
  • Follows0
  • Posts52
Reads:4172Replies:0

[Share]Apache Spark source code walkthrough - Spark thesis reading notes and job submission and running

Created#
More Posted time:Oct 18, 2016 13:26 PM
Abstract:  Before the Spark source code walkthrough, reading the Spark thesis by Matei Zaharia is a good option if you want to get an overall picture of Spark quickly. This article is a reading note for a Spark thesis and explains job submission and running.
<I> Spark thesis reading notes
Intro
Reading source code is very easy in one sense and very difficult in the other sense. It is easy because the code is already there and you can see it as long as you open it. It is difficult because you have to finger out why the author designed the code as they did at the very beginning and what the purpose of the design was.
Before the Spark source code walkthrough, reading the Spark thesis by Matei Zaharia is a good option if you want to get an overall picture of Spark quickly.
On the basis of reading the thesis, combined with the Introduction to Spark Internals, the speech that the Spark author delivered at the 2012 Developer Meetup, we can get a rough understanding in the internal implementation of Spark.
With the above two articles understood, we can move to the source code reading part, and it will be easier for us to know the key and difficult points of analysis.
Basic concepts
RDD - Resilient distributed dataset.
Operation - Operations acting on RDDs, including transformation and action.
Job - A job contains multiple RDDs and various operations acting on the corresponding RDD.
Stage - A job comprises of multiple stages.
Partition - Data partition. Data in an RDD can be divided into multiple different partitions.
DAG - Directed Acycle graph, reflecting the dependency relationship between RDDs.
Narrow dependency - The child RDD is dependent on a fixed data partition in the parent RDD.
Wide dependency - The child RDD is dependent on all the data partitions in the parent RDD.
Caching management - It manages the caching of intermediate computational results of RDDs to accelerate the overall processing.
Programming model
RDD is the collection of read-only data partitions. Note: it is dataset.
Operations acting on RDDs can be divided into transformations and actions.  Content in the dataset will change after transformation operations, and Dataset A will be transformed to Dataset B. While after action operations, content in the dataset will be reduced to a specific value.
So only when there are action operations on the RDD will all the operations on the RDD and its parent RDD be submitted to the cluster for real execution.
From code to dynamic running, the components involved are shown in the figure below.


Demo code:
val sc = new SparkContext("Spark://...", "MyJob", home, jars)
val file = sc.textFile("hdfs://...")
val errors = file.filter(_.contains("ERROR"))
errors.cache()
errors.count()


Runtime view
Regardless of the static models, all the dynamic running involves processes and threads. In Spark terminology, the static view is called dataset view, while the dynamic view is called partition view. Their relations are as follows:


Tasks in Spark can correspond to the thread, and workers are the processes, managed by the driver.
Then a question arises: how do RDDs evolve into tasks? In the section below we will answer this question in detail.
Deployment view
When an action acts on an RDD, this action will be submitted as a job.
During the submission, the DAGScheduler module is involved in computing to calculate the dependency relationships between RDDs. The dependency relationships between RDDs constitute the DAG.
Every job is divided into multiple stages. One major basis for dividing stages is the determinacy of the current calculation factors input. If the input is determinate, the job will be divided in the same stage to avoid message passing overhead between different stages.
After a stage is submitted, the taskscheduler calculates the required tasks according to the stage and submits the tasks to corresponding workers.
Spark supports the following deployment models: 1) Standalone 2) Mesos 3) Yarn. These deployment models will serve as the input parameters for taskscheduler initialization.


RDD interface
RDDs are mainly composed of the following parts:
1. Partitions - Partition sets. The number of data partitions in an RDD.
2. Dependencies - The dependency relationships of RDDs.
3. Compute (partition) - The calculations required for a given dataset.
4. PreferredLocations - Preference of the data partition location.
5. Partitioner - How to distribute the calculated data results.
Caching mechanism
The intermediate computational results of RDD can be cached. The memory will be prioritized for caching. If the memory space is insufficient, the data will be written to the disk.
Based on the LRU (last-recent update) principle, some content will be saved in the memory and some will be saved in the disk.
Fault tolerance
A series of processing procedures are required from the initial RDD to the last derived RDD. So how are the error scenarios in the intermediate links handled?
The solution provided by Spark is only a playback of the data partition failure event, instead of a playback of the entire data set event, which will greatly accelerate the overhead of scenario recovery.
So how does an RDD know the number of its data partition? If it is an HDFS file, the block of the HDFS file will become an important calculation basis.
Cluster management
Tasks run on the cluster. In addition to the Standalone deployment model provided by Spark, Spark also supports Yarn and Mesos deployment models.
Yarn is responsible for calculating resource scheduling and monitoring to, based on the monitoring results, restart failed tasks or re-distribute tasks, if any new nodes join the cluster.
To know more about this part, you can refer to the Yarn documents.
Summary
While reading the source code, you should pay attention to the following two main lines.
Static view, that is, RDD, transformation and action.
Dynamic view, that is, life of a job. Every job is divided into multiple stages and every stage contains multiple RDDs and their transformations. How are these stages mapped to tasks and distributed to the cluster?
(reference)
1. Introduction to Spark Internals http://files.meetup.com/3138542/dev-meetup-dec-2012.pptx
2. Lightning-Fast Cluster Computing with Spark and Shark   http://www.meetup.com/TriHUG/events/112474102/
<II> Job submission and running
Overview
This article takes wordcount as an example to detail the job creation and running in Spark, and the key is in the creation of processes and threads.
Tutorial environment establishment
Before continuing to subsequent operations, make sure the following conditions are met.
1. Download Spark binary 0.9.1
2. Install Scala
3. Install SBT
4. install Java
Start spark-shell
Run it in the Standalone model, that is, the local model
The local model is very simple. You only need to run the following command. Suppose the current directory is $SPARK_HOME.
MASTER=local bin/spark-shell
“MASTER=local” indicates the current model is Standalone.
Run it in local cluster mode
The local cluster mode is a pseudo-cluster mode. It simulates the standalone cluster in a standalone environment and the starting order is as follows:
1. Start master node
2. Start worker node
3. Start spark-shell
Master node
$SPARK_HOME/sbin/start-master.sh
 Note: The runtime output log is by default saved in the $SPARK_HOME/logs directory.
The master node is mainly used to run the org.apache.spark.deploy.master.Master class. It starts listening at Port 8080 and the log is shown in the figure below.


Modify configurations
1. Enter the $SPARK_HOME/conf directory.
2. Rename spark-env.sh.template as spark-env.sh.
3. Modify spark-env.sh and add the following content.
export SPARK_MASTER_IP=localhost
export SPARK_LOCAL_IP=localhost
Run worker node
bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1  -c 1 -m 512M

When the worker node is fully started, connect it to the master node. Open the web UI of the maser node and you will see the connected worker nodes. The listening address of the master web UI is http://localhost:8080.
Start spark-shell
MASTER=spark://localhost:7077 bin/spark-shell
If everything goes smoothly, you will see the following prompts.
Created spark context..
Spark context available as sc.
You can view the content below by visiting localhost:4040 in a browser.
1. stages
2. storage
3. environment
4. executors
Wordcount
After the aforementioned environment is ready, we can run a simple example in the spark shell by inputting the following code in spark-shell.
scala>sc.textFile("README.md").filter(_.contains("Spark")).count
The code above calculates the rows of Spark contained in README.md.
Detailed deployment process
The components in the Spark deployment environment are structured as shown in the figure below.


Driver Program. Simply put, the wordcount statements input in spark-shell correspond to the Driver Program in the above figure.
Cluster Manager It corresponds to the master mentioned above and mainly serves for deployment management.
Worker Node Compared with the master node, the worker node is the slave node. The various executors run above can correspond to threads. The executor processes two basic service logics. One is the driver programme, and the other is dividing submitted jobs into different stages each of which can run one to multiple tasks.
Notes:  In the cluster mode, the Cluster Manager is run in the jvm process, and the worker is run in another jvm process. In the local cluster, these jvm processes are all in the same machine. For real Standalone or Mesos/Yarn clusters, the worker and master nodes are distributed on different hosts.
Job generation and running
A simply procedure of generating a job is as follows:
1. First, the application creates the SparkContext instance, for example, Instance sc.
2. Then, the application generates the RDD using the SparkContext instance.
3. Through a series of transformation operations, the original RDD is transformed to an RDD of another type.
4. When an action operation acts on a transformed RDD, it will call the runJob method of the SparkContext instance.
5. The sc.runJob call is the starting point of a chain of following actions. A crucial change happens in this process.
 The calling path is outlined below.
1. sc.runJob > dagScheduler.runJob > submitJob
2. DAGScheduler::submitJob will create the JobSummitted event and send the event to the nested eventProcessActor class.
3. After receiving the JobSubmmitted event, the eventProcessActor calls the processEvent function.
4. The job undergoes the stage transition and generates the finalStage and is then submitted for running. The key is the call of submitStage.
5. In the submitStage, the dependency between stages will be calculated. The dependency relationship consists of the wide dependency and narrow dependency.
6. During the calculation, if the current stage is found to have no dependency or all of its dependencies have been ready, the task is submitted.
7. The task submission is completed by calling the submitMissingTasks function.
8. The TaskScheduler manages the workers on which the task is truly run, that is, the above submitMissingTasks will call TaskScheduler::submitTasks.
9. In TaskSchedulerImpl, the backend will be created based on the current running model of Spark. If Spark runs in the Standalone model, the LocalBackend is created.
10. LocalBackend receives the ReceiveOffers event passed in by the TaskSchedulerImpl.
11. receiveOffers > executor.launchTask > TaskRunner.run
Code piece executor.lauchTask
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
    val tr = new TaskRunner(context, taskId, serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }


With so much said, the kernel is that the final logical processing occurs in the executor of the TaskRunner.
The calculation results are packaged into MapStatus and fed back to DAGScheduler through a series of internal message passing. This message passing path is not very complicated and you can outline it if you are interested.
Guest