Skip to main content

17.7. queue —同步队列类

源代码: Lib/queue.py


queue 模块实现多生产者,多消费者队列。当信息必须在多个线程之间安全交换时,它在线程编程中特别有用。该模块中的 Queue 类实现了所有需要的锁定语义。这取决于Python中线程支持的可用性;请参阅 threading 模块。

模块实现三种类型的队列,它们的区别仅在于条目被检索的顺序。在 FIFO(先进先出) 队列中,添加的第一个任务是第一次检索。在 LIFO(后进先出) 队列中,最近添加的条目是第一次检索(操作类似于堆栈)。使用优先级队列,条目保持排序(使用 heapq 模块),并且首先检索最低值的条目。

在内部,模块使用锁来临时阻止竞争线程;然而,它不是设计为处理线程内的重入。

queue 模块定义了以下类和例外:

class queue.Queue(maxsize=0)

FIFO(先进先出) 队列的构造方法。 maxsize 是一个整数,它设置了可以放入队列中的项目数量的上限。一旦达到此大小,插入就会阻塞,直到队列项被消耗。如果 maxsize 小于或等于零,队列大小是无限的。

class queue.LifoQueue(maxsize=0)

LIFO(后进先出) 队列的构造方法。 maxsize 是一个整数,它设置了可以放入队列中的项目数量的上限。一旦达到此大小,插入就会阻塞,直到队列项被消耗。如果 maxsize 小于或等于零,队列大小是无限的。

class queue.PriorityQueue(maxsize=0)

优先级队列的构造函数。 maxsize 是一个整数,它设置了可以放入队列中的项目数量的上限。一旦达到此大小,插入就会阻塞,直到队列项被消耗。如果 maxsize 小于或等于零,队列大小是无限的。

首先检索最低值条目(最低值条目是 sorted(list(entries))[0] 返回的条目)。条目的典型模式是形式为 (priority_number, data) 的元组。

exception queue.Empty

在非空的 Queue 对象上调用非阻塞 get() (或 get_nowait())时引发的异常。

exception queue.Full

当非阻塞 put() (或 put_nowait())在已满的 Queue 对象上调用时引发异常。

17.7.1. 队列对象

队列对象(QueueLifoQueuePriorityQueue)提供下面描述的公共方法。

Queue.qsize()

返回队列的大致大小。注意,qsize() > 0不保证后面的get()不会阻塞,qsize() < maxsize也不会保证put()不会阻塞。

Queue.empty()

如果队列为空,返回 True,否则返回 False。如果empty()返回 True,它不保证后续调用put()不会阻塞。类似地,如果empty()返回 False,它不保证随后调用get()不会阻塞。

Queue.full()

如果队列已满,返回 True,否则返回 False。如果full()返回 True,它不保证随后调用get()不会阻塞。类似地,如果full()返回 False,它不保证随后的put()调用不会被阻塞。

Queue.put(item, block=True, timeout=None)

item 放入队列。如果可选的参数 block 为真,而 timeoutNone (默认值),则如果有必要,直到空闲插槽可用为止。如果 timeout 是正数,则它最多阻塞 timeout 秒,并且如果在该时间内没有空闲时隙,则提高 Full 异常。否则(block 为假),如果空闲时隙立即可用,则将一个项目放在队列上,否则提出 Full 异常(在这种情况下,timeout 被忽略)。

Queue.put_nowait(item)

相当于 put(item, False)

Queue.get(block=True, timeout=None)

从队列中删除并返回项目。如果可选参数 block 为真,timeoutNone (默认值),则在必要时阻止,直到项目可用。如果 timeout 是一个正数,它最多会阻塞 timeout 秒,并且如果在该时间内没有可用的项目,则会引发 Empty 异常。否则(block 为假),返回一个项,如果一个立即可用,否则提高 Empty 异常(在这种情况下忽略 timeout)。

Queue.get_nowait()

相当于 get(False)

提供了两种方法来支持跟踪入队任务是否已由守护程序消费者线程完全处理。

Queue.task_done()

指示以前入队的任务已完成。由队列消费者线程使用。对于用于获取任务的每个 get(),对 task_done() 的后续调用告诉队列任务上的处理完成。

如果 join() 当前阻塞,则当所有项目都被处理时(意味着对于已经被 put() 进入队列的每个项目接收到 task_done() 呼叫),它将恢复。

如果调用的次数比在队列中放置的项目多,则引发 ValueError

Queue.join()

阻塞,直到队列中的所有项目都被获取和处理。

每当项目添加到队列时,未完成任务的计数就会增加。当消费者线程调用 task_done() 以指示该项目已被检索并且其上的所有工作都完成时,计数下降。当未完成任务的计数下降到零时,join() 解除阻塞。

如何等待入队任务完成的示例:

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for item in source():
    q.put(item)

# block until all tasks are done
q.join()

# stop workers
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

参见

multiprocessing.Queue

用于多处理(而不是多线程)上下文的队列类。

collections.deque 是具有快速原子 append()popleft() 操作的无界队列的替代实现,不需要锁定。