Skip to main content

tornado.queues - 协同队列

4.2 新版功能.

队列

class tornado.queues.Queue(maxsize=0)[源代码]

协调生产者和消费者协同。

如果maxsize为0(默认值),则队列大小为无界。

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

@gen.coroutine
def producer():
    for item in range(5):
        yield q.put(item)
        print('Put %s' % item)

@gen.coroutine
def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    yield producer()     # Wait for producer to put all tasks.
    yield q.join()       # Wait for consumer to finish all tasks.
    print('Done')

IOLoop.current().run_sync(main)
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done

在Python 3.5中,Queue 实现了异步迭代器协议,因此 consumer() 可以重写为:

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

在 4.3 版更改: 在Python 3.5中添加了 async for 支持。

maxsize

队列中允许的项目数。

qsize()[源代码]

队列中的项目数。

put(item, timeout=None)[源代码]

把一个项目放入队列,或许等到有空间。

返回一个未来,它在超时后引发 tornado.gen.TimeoutError

put_nowait(item)[源代码]

将项目放入队列而不阻塞。

如果没有空闲时隙立即可用,则升高 QueueFull

get(timeout=None)[源代码]

从队列中删除并返回项目。

返回一个未来,当一个项目可用时解析,或在超时后提高 tornado.gen.TimeoutError

get_nowait()[源代码]

从队列中删除并返回项目,但不阻塞。

返回一个项目,如果一个立即可用,否则提出 QueueEmpty

task_done()[源代码]

指示以前入队的任务已完成。

由队列使用者使用。对于用于获取任务的每个 get,对 task_done 的后续调用告诉队列任务上的处理完成。

如果 join 正在阻塞,则在所有项目都已处理时恢复;即,当每个 puttask_done 匹配时。

如果调用的次数多于 put,则提高 ValueError

join(timeout=None)[源代码]

阻止,直到队列中的所有项目都被处理。

返回一个未来,它在超时后引发 tornado.gen.TimeoutError

PriorityQueue

class tornado.queues.PriorityQueue(maxsize=0)[源代码]

Queue 以优先级顺序检索条目,最低优先级。

条目通常是像 (priority number, data) 的元组。

from tornado.queues import PriorityQueue

q = PriorityQueue()
q.put((1, 'medium-priority item'))
q.put((0, 'high-priority item'))
q.put((10, 'low-priority item'))

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')

LifoQueue

class tornado.queues.LifoQueue(maxsize=0)[源代码]

Queue 首先检索最近放置的项目。

from tornado.queues import LifoQueue

q = LifoQueue()
q.put(3)
q.put(2)
q.put(1)

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
1
2
3

例外

QueueEmpty

exception tornado.queues.QueueEmpty[源代码]

当队列没有项目时由 Queue.get_nowait 引发。

QueueFull

exception tornado.queues.QueueFull[源代码]

当队列达到其最大大小时由 Queue.put_nowait 引发。