一、共享队列
1.1、简单队列
from queue import Queue
from threading import Thread
def producer(out_q):
while True:
data = 'hello world!'
out_q.put(data)
def consumer(in_q):
while True:
data = in_q.get()
print(f'get data is: {data}')
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
1.2、协调者队列
from queue import Queue
from threading import Thread
_sentinel = object()
def producer(out_q):
put_time = 0
while True:
data = 'hello world!'
out_q.put(data)
put_time += 1
if put_time == 5:
out_q.put(_sentinel)
def consumer(in_q):
while True:
data = in_q.get()
print(f'get data is: {data}')
if data is _sentinel:
in_q.put(_sentinel)
break
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
1.3、优先级队列
import heapq
import threading
class PriorityQueue:
def __init__(self):
self._queue = []
self._count = 0
self._cv = threading.Condition()
def put(self, item, priority):
with self._cv:
heapq.heappush(self._queue, (-priority, self._count, item))
self._count += 1
self._cv.notify()
def get(self):
with self._cv:
while len(self._queue) == 0:
self._cv.wait()
return heapq.heappop(self._queue)[-1]
1.4、队列异常
import queue
q = queue.Queue()
try:
data = q.get(block=False)
except queue.Empty:
...
try:
item = ''
q.put(item, block=False)
except queue.Full:
...
try:
data = q.get(timeout=5.0)
except queue.Empty:
...
二、JOIN方法的使用
from queue import Queue
from threading import Thread
_sentinel = object()
def producer(out_q):
put_time = 0
while True:
data = 'hello world!'
out_q.put(data)
put_time += 1
if put_time == 5:
out_q.put(_sentinel)
# A thread that consumes data
def consumer(in_q):
while True:
data = in_q.get()
print(f'get data is: {data}')
if data is _sentinel:
in_q.put(_sentinel)
break
in_q.task_done()
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
q.join()
三、线程监听
from queue import Queue
from threading import Thread, Event
# A thread that produces data
def producer(out_q):
while True:
# Produce some data
...
# Make an (data, event) pair and hand it to the consumer
evt = Event()
data = ''
out_q.put((data, evt))
...
# Wait for the consumer to process the item
evt.wait()
def consumer(in_q):
while True:
data, evt = in_q.get()
# Process the data
...
# Indicate completion
evt.set()
四、线程间复制
from queue import Queue
from threading import Thread
import copy
# A thread that produces data
def producer(out_q):
while True:
# Produce some data
...
data = ''
out_q.put(copy.deepcopy(data))
# A thread that consumes data
def consumer(in_q):
while True:
# Get some data
data = in_q.get()
# Process the data
...