Python | Queue usage & inter-process communication
Queue usage
import multiprocessing, queue
# q1 = multiprocessing.Queue() # Interprocess communication
# q2 = queue.Queue() # Communication between threads
# When creating a queue, you can specify a maximum length. The default value is 0, which means unlimited
q = multiprocessing. Queue(5)
q. put('hello')
q. put('good')
q. put('yes')
q. put('ok')
q. put('hi')
# print(q.full()) # True
# q.put('how') # Can't put it in
# block = True: means blocking, if the queue is full, just wait
# timeout timeout, how long the program will error after waiting, the unit is seconds
# q. put('how', block=True, timeout=5)
# q.put_nowait('how') # equivalent to q.put('how',block=False)
print(q. get())
print(q. get())
print(q. get())
print(q. get())
print(q. get())
# print(q. get())
# q. get(block=True, timeout=10)
q. get_nowait()
interprocess communication
Inter-process communication - Queue
from multiprocessing import Queue
q=Queue(3) #Initialize a Queue object, which can receive up to three put messages
q.put("message 1")
q.put("message 2")
print(q. full()) #False
q.put("message 3")
print(q. full()) #True
#Because the message queue is full, the following try will throw an exception, the first try will wait for 2 seconds before throwing an exception, and the second try will throw an exception immediately
try:
q.put("message 4",True,2)
except:
print("The message queue is full, the number of existing messages: %s"%q.qsize())
try:
q.put_nowait("message 4")
except:
print("The message queue is full, the number of existing messages: %s"%q.qsize())
#Recommended way, first judge whether the message queue is full, and then write
if not q.full():
q.put_nowait("message 4")
#When reading a message, first judge whether the message queue is empty, and then read
if not q.empty():
for i in range(q.qsize()):
print(q. get_nowait())
illustrate:
When initializing the Queue() object (for example: q=Queue()), if the maximum number of messages that can be received is not specified in the parentheses, or the number is negative, it means that there is no upper limit on the number of acceptable messages (until the end of memory) ;
Queue.qsize(): returns the number of messages contained in the current queue;
Queue.empty(): If the queue is empty, return True, otherwise False;
Queue.full(): If the queue is full, return True, otherwise False;
Queue.get([block[, timeout]]): Get a message in the queue, and then remove it from the queue, the default value of block is True;
1) If the block uses the default value and timeout (in seconds) is not set, if the message queue is empty, the program will be blocked (stopped in the read state) until the message is read from the message queue, if timeout is set , it will wait for timeout seconds, and if no message has been read, a "Queue.Empty" exception will be thrown;
2) If the block value is False, if the message queue is empty, a "Queue.Empty" exception will be thrown immediately;
Queue.get_nowait(): quite Queue.get(False);
Queue.put(item,[block[, timeout]]): write the item message to the queue, the default value of block is True;
1) If the block uses the default value, and timeout (in seconds) is not set, if there is no space to write in the message queue, the program will be blocked (stopped in the writing state) until there is room for the message queue, If timeout is set, it will wait for timeout seconds, and if there is no space, a "Queue.Full" exception will be thrown;
2) If the block value is False, if there is no space to write in the message queue, a "Queue.Full" exception will be thrown immediately;
Queue.put_nowait(item): quite Queue.put(item, False);
Example:
import os, multiprocessing, time
def producer(x):
for i in range(10):
time. sleep(0.5)
print('produced +++++++pid{} {}'.format(os.getpid(), i))
x.put('pid{} {}'.format(os.getpid(), i))
def consumer(x):
for i in range(10):
time. sleep(0.3)
print('consumed -------{}'.format(x.get()))
if __name__ == '__main__':
q = multiprocessing. Queue()
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=producer, args=(q,))
p3 = multiprocessing.Process(target=producer, args=(q,))
p1. start()
p2. start()
p3. start()
c2 = multiprocessing.Process(target=consumer, args=(q,))
c2. start()
import multiprocessing, queue
# q1 = multiprocessing.Queue() # Interprocess communication
# q2 = queue.Queue() # Communication between threads
# When creating a queue, you can specify a maximum length. The default value is 0, which means unlimited
q = multiprocessing. Queue(5)
q. put('hello')
q. put('good')
q. put('yes')
q. put('ok')
q. put('hi')
# print(q.full()) # True
# q.put('how') # Can't put it in
# block = True: means blocking, if the queue is full, just wait
# timeout timeout, how long the program will error after waiting, the unit is seconds
# q. put('how', block=True, timeout=5)
# q.put_nowait('how') # equivalent to q.put('how',block=False)
print(q. get())
print(q. get())
print(q. get())
print(q. get())
print(q. get())
# print(q. get())
# q. get(block=True, timeout=10)
q. get_nowait()
interprocess communication
Inter-process communication - Queue
from multiprocessing import Queue
q=Queue(3) #Initialize a Queue object, which can receive up to three put messages
q.put("message 1")
q.put("message 2")
print(q. full()) #False
q.put("message 3")
print(q. full()) #True
#Because the message queue is full, the following try will throw an exception, the first try will wait for 2 seconds before throwing an exception, and the second try will throw an exception immediately
try:
q.put("message 4",True,2)
except:
print("The message queue is full, the number of existing messages: %s"%q.qsize())
try:
q.put_nowait("message 4")
except:
print("The message queue is full, the number of existing messages: %s"%q.qsize())
#Recommended way, first judge whether the message queue is full, and then write
if not q.full():
q.put_nowait("message 4")
#When reading a message, first judge whether the message queue is empty, and then read
if not q.empty():
for i in range(q.qsize()):
print(q. get_nowait())
illustrate:
When initializing the Queue() object (for example: q=Queue()), if the maximum number of messages that can be received is not specified in the parentheses, or the number is negative, it means that there is no upper limit on the number of acceptable messages (until the end of memory) ;
Queue.qsize(): returns the number of messages contained in the current queue;
Queue.empty(): If the queue is empty, return True, otherwise False;
Queue.full(): If the queue is full, return True, otherwise False;
Queue.get([block[, timeout]]): Get a message in the queue, and then remove it from the queue, the default value of block is True;
1) If the block uses the default value and timeout (in seconds) is not set, if the message queue is empty, the program will be blocked (stopped in the read state) until the message is read from the message queue, if timeout is set , it will wait for timeout seconds, and if no message has been read, a "Queue.Empty" exception will be thrown;
2) If the block value is False, if the message queue is empty, a "Queue.Empty" exception will be thrown immediately;
Queue.get_nowait(): quite Queue.get(False);
Queue.put(item,[block[, timeout]]): write the item message to the queue, the default value of block is True;
1) If the block uses the default value, and timeout (in seconds) is not set, if there is no space to write in the message queue, the program will be blocked (stopped in the writing state) until there is room for the message queue, If timeout is set, it will wait for timeout seconds, and if there is no space, a "Queue.Full" exception will be thrown;
2) If the block value is False, if there is no space to write in the message queue, a "Queue.Full" exception will be thrown immediately;
Queue.put_nowait(item): quite Queue.put(item, False);
Example:
import os, multiprocessing, time
def producer(x):
for i in range(10):
time. sleep(0.5)
print('produced +++++++pid{} {}'.format(os.getpid(), i))
x.put('pid{} {}'.format(os.getpid(), i))
def consumer(x):
for i in range(10):
time. sleep(0.3)
print('consumed -------{}'.format(x.get()))
if __name__ == '__main__':
q = multiprocessing. Queue()
p1 = multiprocessing.Process(target=producer, args=(q,))
p2 = multiprocessing.Process(target=producer, args=(q,))
p3 = multiprocessing.Process(target=producer, args=(q,))
p1. start()
p2. start()
p3. start()
c2 = multiprocessing.Process(target=consumer, args=(q,))
c2. start()
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00