The Use of Process Pools | Teach You to Get Started With Python One Hundred and Nine

The Use of Process Pools | Teach You to Get Started With Python One Hundred and Nine

The Use of Process Pools .When the number of subprocesses to be created is not large, you can directly use the Process in multiprocessing to dynamically generate multiple processes, but if there are hundreds or even thousands of targets, the workload of manually creating processes is huge, and you can use to the Pool method provided by the multiprocessing module.

The Use of Process Pools .Use of the process pool


The Use of Process Pools.When the number of subprocesses to be created is not large, you can directly use the Process in multiprocessing to dynamically generate multiple processes, but if there are hundreds or even thousands of targets, the workload of manually creating processes is huge, and you can use to the Pool method provided by the multiprocessing module.
Pool
The Use of Process Pools.Opening too many processes will not improve your efficiency, but will reduce your efficiency. Suppose there are 500 tasks and 500 processes are opened at the same time. Except that these 500 processes cannot be executed together (the cpu does not have so many cores), the operating system Scheduling those 500 processes to execute on average on 4 or 8 CPUs would take a lot of space.
If you want to start a large number of subprocesses, you can create subprocesses in batches in a process pool:
def task (n) :
print( '{}-----> start'.format(n))
time.sleep( 1 )
print( '{}------> end'.format(n))


if __name__ == '__main__' :
p = Pool( 8 ) # Create a process pool and specify the number of thread pools, the default is the number of CPU cores
for i in range( 1 , 11 ):
# p.apply(task, args=(i,)) # Execute tasks synchronously, one by one, without concurrency
p.apply_async(task, args=(i,)) # Execute tasks asynchronously, which can achieve concurrency
p.close()
p.join()
The process pool gets the execution result of the task:
def task(n):
print ( '{}-----> start'.format(n))
time.sleep(1)
print('{}------>end'.format(n))
return n ** 2


if __name__ == '__main__':
p = Pool(4)
for i in range(1, 11):
res = p .apply_async(task, args =(i,)) # res is the execution result of the task
print ( res . get ()) # The disadvantage of getting the result directly is that the multitasking becomes synchronous again
p.close ( ) _
# p.join () doesn't need to join anymore , because res.get ( ) itself is a blocking method
Get the execution result of the thread asynchronously:
import time
from multiprocessing.pool import Pool


def task(n):
print('{}----->start'.format(n))
time.sleep(1)
print('{}------>end'.format(n))
return n ** 2


if __name__ == '__main__':
p = Pool(4)
res_list = []
for i in range(1, 11):
res = p.apply_async(task, args=(i,))
res_list.append(res) # Use a list to save the process execution results
for re in res_list:
print (re.get ( ))
p.close()
When initializing the Pool, you can specify a maximum number of processes. When a new request is submitted to the Pool, if the pool is not full, a new process will be created to execute the request; but if the number of processes in the pool is already When the specified maximum value is reached, the request will wait until a process in the pool ends, and the previous process will be used to perform a new task. Please see the following example:
from multiprocessing import Pool
import os , time , random


def worker(msg):
t_start = time.time ( )
print ( "%s starts executing, process ID is %d" % (msg, os .getpid()))
# random . random () randomly generates a floating point number between 0 and 1
time .sleep( random . random () * 2 )
t_stop = time.time ( )
print (msg, "Execution completed, time %0.2f" % (t_stop - t_start))


if __name__ == '__main__' :
po = Pool( 3 ) # Define a process pool, the maximum number of processes is 3
for i in range( 0 , 10 ):
# Pool().apply_async(target to call, (parameter tuple passed to target,))
# Each loop will use the idle child process to call the target
po.apply_async(worker, (i,))

print ( "----start----" )
po. close () # Close the process pool, po will no longer receive new requests after closing
po.join() # Wait for all child processes in po to complete execution, must be placed after the close statement
print ( "-----end-----" )
running result:
----start----
0 to start execution, the process number is 21466
1 starts to execute, the process number is 21468
2 starts to execute, the process number is 21467
0 is executed, it takes 1.01
3 starts to execute, the process number is 21466
2 is executed, it takes 1.24
4 starts to execute, the process number is 21467
3 is executed, it takes 0.56
5 starts to execute, the process number is 21466
1 is executed, it takes 1.68
6 starts to execute, the process number is 21468
4 is completed, it takes 0.67
7 starts to execute, the process number is 21467
5 is executed, it takes 0.83
8 starts to execute, the process number is 21466
6 is completed, it takes 0.75
9 starts to execute, the process number is 21468
7 is executed, it takes 1.03
8 is completed, it takes 1.05
9 is executed, it takes 1.69
-----end-----
Analysis of common functions of multiprocessing.Pool:
•apply_async(func[, args[, kwds]]) : use non-blocking method to call func (parallel execution, blocking mode must wait for the previous process to exit before executing the next process), args is the parameter list passed to func, kwds is passed a list of keyword arguments to func;
•close(): Close the Pool so that it no longer accepts new tasks;
•terminate(): terminates immediately regardless of whether the task is completed;
•join(): The main process blocks, waiting for the exit of the child process, which must be used after close or terminate;
Queue in the process pool
If you want to use Pool to create a process, you need to use Queue() in multiprocessing.Manager() instead of multiprocessing.Queue(), otherwise you will get an error message like the following:
RuntimeError: Queue objects should only be shared between processes through inheritance.
The following example demonstrates how processes in a process pool communicate:
# Modify the Queue in import to Manager
from multiprocessing import Manager, Pool
import os, time, random


def reader(q):
print ( "Reader started (%s), parent process is (%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print ( "reader got message from Queue: %s" % q. get ( True ))


def writer(q):
print ( "writer started (%s), parent process is (%s)" % (os.getpid(), os.getppid()))
for i in "helloworld" :
q.put(i)


if __name__ == "__main__" :
print ( "(%s) start" % os.getpid())
q = Manager().Queue() # Use Queue in Manager
po = Pool()
po.apply_async(writer, (q,))

time.sleep(1) # First let the above task store data into the Queue, and then let the following task start fetching data from it

po.apply_async(reader, (q,))
po.close()
po.join()
print ( "(%s) End" % os.getpid())
operation result:
( 4171 ) start
writer started ( 4173 ), parent process is ( 4171 )
reader starts ( 4174 ), parent process is ( 4171 )
The reader gets the message from the Queue : h
The reader gets the message from the Queue : e
The reader gets the message from the Queue : l
The reader gets the message from the Queue : l
The reader gets the message from the Queue : o
The reader gets the message from the Queue : w
The reader gets the message from the Queue : o
The reader gets the message from the Queue : r
The reader gets the message from the Queue : l
The reader gets the message from the Queue : d
( 4171 ) End
Use of join method
# join Threads and processes have join methods
import threading
import time

x = 10


def test(a, b):
time.sleep( 1 )
global x
x = a + b


# test(1, 1)
# print(x) # 2

t = threading.Thread( target=test, args=(1, 1 ))
t.start()
t.join() # make the main thread wait

print(x) #10

Related Articles

Explore More Special Offers

  1. Short Message Service(SMS) & Mail Service

    50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00