Python | Use of process pool
Use of the process pool
When the number of sub-processes to be created is not large, you can directly use the Process in multiprocessing to dynamically generate multiple processes. However, if there are hundreds or even thousands of targets, the workload of manually creating processes is huge. At this time, you can use to the Pool method provided by the multiprocessing module.
Pool
Opening too many processes will not improve your efficiency, but will reduce your efficiency. Suppose there are 500 tasks and 500 processes are opened at the same time. Except that these 500 processes cannot be executed together (cpu does not have so many cores), the operating system Scheduling these 500 processes so that they execute on average on 4 or 8 cpus would take up a lot of space.
If you want to start a large number of child processes, you can use the process pool to create child processes in batches:
def task(n):
print('{}----->start'. format(n))
time. sleep(1)
print('{}------>end'. format(n))
if __name__ == '__main__':
p = Pool(8) # Create a process pool and specify the number of thread pools, the default is the number of CPU cores
for i in range(1, 11):
# p.apply(task, args=(i,)) # Execute tasks synchronously, one by one, without concurrent effect
p.apply_async(task, args=(i,)) # Execute tasks asynchronously to achieve concurrency
p. close()
p. join()
The process pool obtains the execution result of the task:
def task(n):
print('{}----->start'. format(n))
time. sleep(1)
print('{}------>end'. format(n))
return n ** 2
if __name__ == '__main__':
p = Pool(4)
for i in range(1, 11):
res = p.apply_async(task, args=(i,)) # res is the execution result of the task
print(res.get()) # The disadvantage of directly obtaining the result is that multitasking becomes synchronous again
p. close()
# p.join() does not need to join anymore, because res.get() itself is a blocking method
Get the execution result of the thread asynchronously:
import time
from multiprocessing.pool import Pool
def task(n):
print('{}----->start'. format(n))
time. sleep(1)
print('{}------>end'. format(n))
return n ** 2
if __name__ == '__main__':
p = Pool(4)
res_list = []
for i in range(1, 11):
res = p.apply_async(task, args=(i,))
res_list.append(res) # Use a list to save the process execution results
for re in res_list:
print(re. get())
p. close()
When initializing the Pool, you can specify a maximum number of processes. When a new request is submitted to the Pool, if the pool is not full, a new process will be created to execute the request; but if the number of processes in the pool is already When the specified maximum value is reached, the request will wait until a process in the pool ends, and then the previous process will be used to execute the new task. Please see the following example:
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time. time()
print("%s starts to execute, the process number is %d" % (msg, os.getpid()))
# random.random() randomly generates floating point numbers between 0 and 1
time. sleep(random. random() * 2)
t_stop = time. time()
print(msg, "Completed, time-consuming %0.2f" % (t_stop - t_start))
if __name__ == '__main__':
po = Pool(3) # Define a process pool, the maximum number of processes is 3
for i in range(0, 10):
# Pool().apply_async(target to be invoked, (parameter ancestor passed to the target,))
# Each loop will use the idle child process to call the target
po.apply_async(worker, (i,))
print("----start----")
po.close() # close the process pool, po will no longer receive new requests after closing
po.join() # Wait for all child processes in po to complete, must be placed after the close statement
print("-----end-----")
running result:
----start----
0 starts execution, the process number is 21466
1 starts execution, the process number is 21468
2 starts to execute, the process number is 21467
0 Execution completed, time-consuming 1.01
3 start execution, the process number is 21466
2 Execution is complete, time-consuming 1.24
4 starts to execute, the process number is 21467
3 Execution is complete, time-consuming 0.56
5 starts to execute, the process number is 21466
1 Execution is complete, time-consuming 1.68
6 starts to execute, the process number is 21468
4 Execution is complete, time-consuming 0.67
7 starts to execute, the process number is 21467
5 Execution is complete, time-consuming 0.83
8 starts to execute, the process number is 21466
6 Execution is complete, time-consuming 0.75
9 starts to execute, the process number is 21468
7 Execution is complete, time-consuming 1.03
8 Execution is complete, time-consuming 1.05
9 Execution is complete, time-consuming 1.69
-----end-----
Analysis of common functions of multiprocessing.Pool:
apply_async(func[, args[, kwds]]) : Call func in a non-blocking way (parallel execution, blocking mode must wait for the previous process to exit before executing the next process), args is the parameter list passed to func, kwds is passed A list of keyword arguments to func;
close(): Close the Pool so that it no longer accepts new tasks;
terminate(): Terminate immediately regardless of whether the task is completed;
join(): The main process is blocked, waiting for the exit of the child process, must be used after close or terminate;
Queue in the process pool
If you want to use Pool to create a process, you need to use Queue() in multiprocessing.Manager() instead of multiprocessing.Queue(), otherwise you will get an error message as follows:
RuntimeError: Queue objects should only be shared between processes through inheritance.
The following example demonstrates how processes in a process pool communicate:
# Modify the Queue in import to Manager
from multiprocessing import Manager, Pool
import os, time, random
def reader(q):
print("reader starts (%s), parent process is (%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("reader got the message from Queue: %s" % q.get(True))
def writer(q):
print("writer starts (%s), parent process is (%s)" % (os.getpid(), os.getppid()))
for i in "helloworld":
q. put(i)
if __name__ == "__main__":
print("(%s) start" % os. getpid())
q = Manager().Queue() # Use Queue in Manager
po = Pool()
po.apply_async(writer, (q,))
time.sleep(1) # Let the above tasks store data in the Queue first, and then let the following tasks start fetching data from it
po.apply_async(reader, (q,))
po. close()
po. join()
print("(%s) End" % os. getpid())
operation result:
(4171) start
The writer starts (4173), and the parent process is (4171)
The reader starts (4174), and the parent process is (4171)
The reader gets the message from the Queue: h
The reader gets the message from the Queue: e
The reader gets the message from the Queue: l
The reader gets the message from the Queue: l
The reader gets the message from the Queue: o
The reader gets the message from the Queue: w
The reader gets the message from the Queue: o
The reader gets the message from the Queue: r
The reader gets the message from the Queue: l
The reader gets the message from the Queue: d
(4171) End
Use of the join method
# join threads and processes have join methods
import threading
import time
x = 10
def test(a, b):
time. sleep(1)
global x
x = a + b
# test(1, 1)
# print(x) # 2
t = threading.Thread(target=test, args=(1, 1))
t. start()
t.join() # Let the main thread wait
print(x) # 10
When the number of sub-processes to be created is not large, you can directly use the Process in multiprocessing to dynamically generate multiple processes. However, if there are hundreds or even thousands of targets, the workload of manually creating processes is huge. At this time, you can use to the Pool method provided by the multiprocessing module.
Pool
Opening too many processes will not improve your efficiency, but will reduce your efficiency. Suppose there are 500 tasks and 500 processes are opened at the same time. Except that these 500 processes cannot be executed together (cpu does not have so many cores), the operating system Scheduling these 500 processes so that they execute on average on 4 or 8 cpus would take up a lot of space.
If you want to start a large number of child processes, you can use the process pool to create child processes in batches:
def task(n):
print('{}----->start'. format(n))
time. sleep(1)
print('{}------>end'. format(n))
if __name__ == '__main__':
p = Pool(8) # Create a process pool and specify the number of thread pools, the default is the number of CPU cores
for i in range(1, 11):
# p.apply(task, args=(i,)) # Execute tasks synchronously, one by one, without concurrent effect
p.apply_async(task, args=(i,)) # Execute tasks asynchronously to achieve concurrency
p. close()
p. join()
The process pool obtains the execution result of the task:
def task(n):
print('{}----->start'. format(n))
time. sleep(1)
print('{}------>end'. format(n))
return n ** 2
if __name__ == '__main__':
p = Pool(4)
for i in range(1, 11):
res = p.apply_async(task, args=(i,)) # res is the execution result of the task
print(res.get()) # The disadvantage of directly obtaining the result is that multitasking becomes synchronous again
p. close()
# p.join() does not need to join anymore, because res.get() itself is a blocking method
Get the execution result of the thread asynchronously:
import time
from multiprocessing.pool import Pool
def task(n):
print('{}----->start'. format(n))
time. sleep(1)
print('{}------>end'. format(n))
return n ** 2
if __name__ == '__main__':
p = Pool(4)
res_list = []
for i in range(1, 11):
res = p.apply_async(task, args=(i,))
res_list.append(res) # Use a list to save the process execution results
for re in res_list:
print(re. get())
p. close()
When initializing the Pool, you can specify a maximum number of processes. When a new request is submitted to the Pool, if the pool is not full, a new process will be created to execute the request; but if the number of processes in the pool is already When the specified maximum value is reached, the request will wait until a process in the pool ends, and then the previous process will be used to execute the new task. Please see the following example:
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_start = time. time()
print("%s starts to execute, the process number is %d" % (msg, os.getpid()))
# random.random() randomly generates floating point numbers between 0 and 1
time. sleep(random. random() * 2)
t_stop = time. time()
print(msg, "Completed, time-consuming %0.2f" % (t_stop - t_start))
if __name__ == '__main__':
po = Pool(3) # Define a process pool, the maximum number of processes is 3
for i in range(0, 10):
# Pool().apply_async(target to be invoked, (parameter ancestor passed to the target,))
# Each loop will use the idle child process to call the target
po.apply_async(worker, (i,))
print("----start----")
po.close() # close the process pool, po will no longer receive new requests after closing
po.join() # Wait for all child processes in po to complete, must be placed after the close statement
print("-----end-----")
running result:
----start----
0 starts execution, the process number is 21466
1 starts execution, the process number is 21468
2 starts to execute, the process number is 21467
0 Execution completed, time-consuming 1.01
3 start execution, the process number is 21466
2 Execution is complete, time-consuming 1.24
4 starts to execute, the process number is 21467
3 Execution is complete, time-consuming 0.56
5 starts to execute, the process number is 21466
1 Execution is complete, time-consuming 1.68
6 starts to execute, the process number is 21468
4 Execution is complete, time-consuming 0.67
7 starts to execute, the process number is 21467
5 Execution is complete, time-consuming 0.83
8 starts to execute, the process number is 21466
6 Execution is complete, time-consuming 0.75
9 starts to execute, the process number is 21468
7 Execution is complete, time-consuming 1.03
8 Execution is complete, time-consuming 1.05
9 Execution is complete, time-consuming 1.69
-----end-----
Analysis of common functions of multiprocessing.Pool:
apply_async(func[, args[, kwds]]) : Call func in a non-blocking way (parallel execution, blocking mode must wait for the previous process to exit before executing the next process), args is the parameter list passed to func, kwds is passed A list of keyword arguments to func;
close(): Close the Pool so that it no longer accepts new tasks;
terminate(): Terminate immediately regardless of whether the task is completed;
join(): The main process is blocked, waiting for the exit of the child process, must be used after close or terminate;
Queue in the process pool
If you want to use Pool to create a process, you need to use Queue() in multiprocessing.Manager() instead of multiprocessing.Queue(), otherwise you will get an error message as follows:
RuntimeError: Queue objects should only be shared between processes through inheritance.
The following example demonstrates how processes in a process pool communicate:
# Modify the Queue in import to Manager
from multiprocessing import Manager, Pool
import os, time, random
def reader(q):
print("reader starts (%s), parent process is (%s)" % (os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("reader got the message from Queue: %s" % q.get(True))
def writer(q):
print("writer starts (%s), parent process is (%s)" % (os.getpid(), os.getppid()))
for i in "helloworld":
q. put(i)
if __name__ == "__main__":
print("(%s) start" % os. getpid())
q = Manager().Queue() # Use Queue in Manager
po = Pool()
po.apply_async(writer, (q,))
time.sleep(1) # Let the above tasks store data in the Queue first, and then let the following tasks start fetching data from it
po.apply_async(reader, (q,))
po. close()
po. join()
print("(%s) End" % os. getpid())
operation result:
(4171) start
The writer starts (4173), and the parent process is (4171)
The reader starts (4174), and the parent process is (4171)
The reader gets the message from the Queue: h
The reader gets the message from the Queue: e
The reader gets the message from the Queue: l
The reader gets the message from the Queue: l
The reader gets the message from the Queue: o
The reader gets the message from the Queue: w
The reader gets the message from the Queue: o
The reader gets the message from the Queue: r
The reader gets the message from the Queue: l
The reader gets the message from the Queue: d
(4171) End
Use of the join method
# join threads and processes have join methods
import threading
import time
x = 10
def test(a, b):
time. sleep(1)
global x
x = a + b
# test(1, 1)
# print(x) # 2
t = threading.Thread(target=test, args=(1, 1))
t. start()
t.join() # Let the main thread wait
print(x) # 10
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00