How Mars Executes Distributedly
Mars provides a set of libraries for distributed execution of Tensors. The library is written using the Actor model implemented by mars.actors and contains Scheduler, Worker and Web Services.
What users submit to Mars Web Service is a Graph composed of Tensors. The web service receives these graphs and submits them to a Scheduler. Before submitting jobs to each Worker, Mars Scheduler compiles the Tensor graph into a graph composed of Chunk and Operand, and then analyzes and divides the graph. After that, the Scheduler creates a series of OperandActors that control the execution of a single Operand based on the consistent hash among all Schedulers. Operands are scheduled in a topological order. When all Operands complete execution, the entire graph will be marked as completed, and the client can pull the results from the web. The entire execution process is described in the following figure.
The client submits jobs to the Mars service through the RESTful API. The user writes the code on the Tensor, and then uses session.run(tensor) to convert the Tensor operation into a Graph composed of Tensor and submit it to the Web API. After that, the Web API submits the job to the SessionActor and creates a GraphActor in the cluster for graph analysis and management. The client starts to query the execution state of the graph until the execution ends.
In GraphActor, we first convert the Tensor graph into a graph composed of Operand and Chunk according to the chunks setting. This process allows the graph to be further split and executed in parallel. After that, we perform a series of analysis on the graph to obtain the priority of Operand, and assign Worker to the starting Operand. For details on this part, please refer to the chapter Preparing the Execution Graph. After that, each Operand establishes an OperandActor to control the specific execution of the Operand. When Operand is in the READY state (as described in the Operand state section), the Scheduler will select the target Worker for Operand, and the job is then submitted to the Worker for actual execution.
When an Operand is submitted to a Worker, the OperandActor waits for a callback on the Worker. If Operand succeeds, the successor of Operand will be scheduled. If the Operand execution fails, the OperandActor will try several times, and if it still fails, mark the execution as failed.
The client can cancel a running job using the RESTful API. The cancellation request will be written to the Graph's state store, and the cancellation interface on the GraphActor will be called. If the job is in the prepare phase, it will end as soon as the stop request is detected, otherwise the request will be issued to each OperandActor with the state set to CANCELLING. If Operand is not running at this time, the Operand state will be directly set to CANCELLED. If Operand is running, the stop request will be issued to the Worker and cause an ExecutionInterrupted error, which will be returned to the OperandActor, at which point Operand's state will be marked as CANCELLED.
Preparing to execute the diagram
When a Tensor graph is submitted to the Mars Scheduler, a finer-grained graph consisting of Operands and Chunks will be generated based on the chunks parameter contained in the data source.
When the Chunk graph is generated, we will reduce the size of the graph by merging adjacent nodes in the graph. This merging also allows us to take full advantage of acceleration libraries such as numexpr to speed up the calculation process. Currently Mars will only merge Operands that form a single chain. For example, when executing the following code
import mars.tensor as mt
a = mt.random.rand(100, chunks=100)
b = mt.random.rand(100, chunks=100)
c = (a + b).sum()
Mars will merge Operand ADD and SUM into a FUSE node. RAND Operand will not be merged because they do not form a simple straight line with ADD and SUM.
Initial worker assignment
Assigning workers to Operand is critical to the performance of graph execution. Randomly assigning initial Operands can result in huge network overhead and potentially lead to unbalanced job distribution among different workers. Because the allocation of non-initial nodes can be easily determined according to the physical distribution of data generated by its predecessors and the idleness of each worker, in the execution graph preparation stage, we only consider the allocation of initial Operands.
There are several guidelines to follow for initial worker assignment. First, the Operand allocated to each Worker execution needs to be kept as balanced as possible, which enables the computing cluster to have high utilization throughout the execution stage, which is especially important in the final stage of execution. Second, the initial node allocation needs to make the network's transmission as small as possible when subsequent nodes are executed. That is, the initial node allocation needs to fully follow the locality principle.
It is important to note that the above guidelines may conflict with each other in some cases. An allocation scheme with minimal network traffic can be very skewed. We developed a set of heuristics to obtain the balance of the two objectives, which is described as follows:
Select the first initial node and the first machine in the list;
The undirected graph converted from the Operand graph performs a depth-first search from this point;
If another unassigned initial node is accessed, we assign it to the machine selected in step 1;
When the total number of Operands accessed is greater than the average number of Operands accepted by each Worker, the allocation is stopped;
Go to step 1, if there are still workers unassigned Operand, otherwise end.
When a Graph composed of Operand is executed, the proper execution order will reduce the total amount of data temporarily stored in the cluster, thereby reducing the possibility of data being spilled to disk. A suitable Worker can reduce the total amount of network transfers during execution.
Operand selection strategy
Proper execution order can significantly reduce the total amount of data staged in the cluster. The following figure shows an example of Tree Reduction. The circle represents Operand, the square represents Chunk, red represents Operand being executed, blue represents Operand can be executed, green represents that the Chunk generated by Operand has been stored, and gray represents Operand and its related data. has been released. Suppose we have two workers, and the resource usage of each Operand is equal, and each picture shows the state after 5 time units of execution under different strategies. The diagram on the left shows that the nodes are executed separately according to the hierarchy, while the diagram on the right shows the execution in a near-depth-first order. In the picture on the left, there are 6 chunks of data that need to be temporarily stored, and there are only 2 in the picture on the right.
Since our goal is to reduce the total amount of data stored in the cluster, we set a priority policy for Operand entering the READY state:
Operand with greater depth needs to be executed first;
Operand that is further dependent on Operand needs to be executed first;
Nodes with smaller output sizes need to be executed first.
Worker selection strategy
When the Scheduler is ready to execute the graph, the worker for the initial Operand has been determined. We choose subsequent Operand to assign workers based on the worker where the input data resides. If a worker has the largest size of input data, that worker will be selected for subsequent Operands. If there are multiple such workers, the resource status of each candidate worker will play a decisive role.
Each operator in Mars is dispatched individually by an OperandActor. The process of execution is a state transition process. In OperandActor, we define a state transition function for each state entry process. The starting Operand is initialized in the READY state, and the non-starting Operand is initialized in the UNSCHEDULED state. When the given condition is met, Operand will transition to another state and perform the corresponding action. The process of state transition can refer to the following figure:
Below we describe what each state means and what Mats performs in those states.
UNSCHEDUED: An Operand is in this state when its upstream data is not ready.
READY: An Operand is in this state when all upstream input data is ready. Upon entering this state, the OperandActor submits jobs to all Workers selected in the AssignerActor. If a worker is ready to run a job, it will send a message to the Scheduler, and the Scheduler will send a message to the other worker to stop the job, and then to that worker to start the job execution.
RUNNING: An Operand is in this state when its execution has started. When entering this state, OperandActor checks to see if the job has been submitted. If not already committed, OperandActor will construct a graph consisting of FetchChunk Operands and the current Operand and commit it to the Worker. After that, the OperandActor will register a callback in the Worker to get the message that the job execution is complete.
FINISHED: An Operand is in this state when job execution has completed. When Operand enters this state, and Operand has no successors, a message will be sent to the GraphActor to determine if the execution of the entire Graph has ended. At the same time, OperandActor sends execution completion messages to its predecessor and successor. If a predecessor receives this message, it checks to see if all successors have completed execution. If so, the data on the current Operand can be freed. If a successor receives this message, it will check if all predecessors have completed. If so, the successor's state can transition to READY.
FREED: An Operand is in this state when all data on it has been freed.
CANCELLED: An Operand is in this state when all re-execution attempts have failed. When Operand enters this state, it will pass the same state to successor nodes.
CANCELLING: An Operand is in this state when it is being canceled. If the previous job was executing, a request to cancel the execution will be sent to the worker.
CANCELLED: An Operand is in this state when the execution has been canceled and stopped running. If the execution enters this state, the OperandActor will try to convert all successors of the book friend to CANCELLING.
Execution details in Worker
A Mars Worker contains multiple processes to reduce the impact of the Global Interpreter Lock (GIL) on execution. The specific execution is done in a separate process. To reduce unnecessary memory copies and inter-process communication, Mars Worker uses shared memory to store execution results.
When a job is submitted to a worker, it will first be placed in a queue waiting for memory to be allocated. When memory is allocated, data on other workers, or data on the current worker that has been spilled to disk, will be reloaded into memory. At this point, all the data needed for the calculation is already in memory, and the real calculation process will start. When the computation is complete, the Worker will place the job on the shared storage space. The transition relationship of these four execution states is shown in the following figure.
Mars Worker controls the execution of all Operand in Worker through ExecutionActor. The actor itself does not participate in the actual operation or data transmission, but only submits tasks to other actors.
The OperandActor in the Scheduler submits jobs to the Worker via the enqueue_graph call on the ExecutionActor. Worker accepts Operand submissions and swaps them in the queue. When the job can be executed, the ExecutionActor will send a message to the Scheduler message, the Scheduler will determine whether the action will be performed. When the Scheduler determines to execute Operand on the current Worker, it will call the start_execution method and register a callback via add_finish_callback. This design allows execution results to be received by multiple locations, which is valuable for failure recovery.
The ExecutionActor uses the mars.promise module to handle multiple Operand execution requests at the same time. The specific execution steps are connected in series through the then method of the Promise class. When the final execution result is stored, the previously registered callback will be fired. If an error occurs in any of the previous execution steps, the error will be propagated to the handler function registered by the last catch method and handled.
Sort by Operand
All Operands in the READY state are submitted to the Worker selected by the Scheduler. Therefore, for the vast majority of execution time, the number of workers submitted to Operand is usually higher than the total number of Operands that a single worker can handle. Therefore, the Worker needs to sort the Operand, and then select a part of the Worker to execute. This sorting process takes place in the TaskQueueActor, which maintains a priority queue in which information about Operand is stored. At the same time, the TaskQueueActor runs a job allocation task periodically, and allocates execution resources to the Operand at the head of the priority queue until there are no extra resources to run Operand. This allocation process will also be triggered when a new Operand is submitted or Operand execution is completed.
Mars Worker manages two parts of memory. The first part is the private memory space of each Worker process, which is held by each process itself. The second part is the memory space shared by all processes, held by the plasma_store in Apache Arrow.
In order to avoid process memory overflow, we introduce a Worker-level QuotaActor for allocating process memory. When an Operand starts executing, it sends bulk memory requests to the QuotaActor for input and output chunks. If the remaining memory space can satisfy the request, the request will be accepted by the QuotaActor. Otherwise, the request will be queued for free resources. When the associated memory usage is released, the requested resource is released, at which point the QuotaActor can allocate resources for other Operands.
Shared memory is managed by plasma_store and usually occupies 50% of the entire memory. Since there is no possibility of overflow, this part of memory is allocated directly through the related methods of plasma_store without going through QuotaActor. When the shared memory is exhausted, Mars Worker will try to spill some unused chunks to disk to make room for new chunks.
The chunk data spilled from the shared memory to the disk may be reused by future Operands, and the operation of reloading the shared memory from the disk may consume IO resources, especially when the shared memory has been exhausted, and other Chunks need to be spilled to the disk for When accommodating the loaded Chunk. Therefore, when data sharing is not required, for example, the Chunk will only be used by one Operand, we will load the Chunk directly into the process private memory instead of the shared memory, which can significantly reduce the total execution time of the job.
Mars is currently iterating rapidly, and will consider implementing worker-level failover and shuffle support in the near future. Scheduler-level failover is also planned.
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Knowledge Base Team
Explore More Special Offers
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00