Skip to main content

协同

协同 是在Tornado中编写异步代码的推荐方法。协程使用Python yield 关键字来暂停和恢复执行,而不是一系列回调(像 gevent 这样的框架中看到的协作轻量级线程有时也称为协同程序,但在Tornado中,所有协程都使用显式上下文切换,称为异步函数) 。

协程几乎和同步代码一样简单,但没有线程的代价。他们还通过减少上下文切换可能发生的地方的数量,使并发更容易 推理。

例:

from tornado import gen

@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    # In Python versions prior to 3.3, returning a value from
    # a generator is not allowed and you must use
    #   raise gen.Return(response.body)
    # instead.
    return response.body

Python 3.5:asyncawait

Python 3.5引入了 asyncawait 关键字(使用这些关键字的函数也称为“本地协同程序”)。从Tornado 4.3开始,您可以使用它们代替基于 yield 的协同程序。只需使用 async def foo() 代替具有 @gen.coroutine 装饰器的函数定义,并使用 await 代替yield。本文档的其余部分仍然使用 yield 风格与旧版本的Python兼容,但 asyncawait 在可用时运行速度更快:

async def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = await http_client.fetch(url)
    return response.body

await 关键字比 yield 关键字更不通用。例如,在基于 yield 的协程中,您可以生成 Futures 的列表,而在本地协同程序中,您必须将列表包装在 tornado.gen.multi 中。您还可以使用 tornado.gen.convert_yielded 将任何可以使用 yield 的任何东西转换为将与 await 一起使用的形式。

虽然本地协同程序不可见地绑定到特定的框架(即他们不使用装饰器像 tornado.gen.coroutineasyncio.coroutine),但并非所有协同程序都彼此兼容。有一个 协同程序转轮 由第一个协程选择被调用,然后由直接用 await 调用的所有协同程序共享。Tornado协程转轮被设计为多功能的,并接受来自任何框架的等待对象;其他协程运行可能更有限(例如,asyncio 协程运行程序不接受来自其他框架的协程)。因此,对于组合多个框架的任何应用程序,建议使用Tornado协同程序转轮。要使用已在使用asyncio runner的协程中使用Tornado runner调用协程,请使用 tornado.platform.asyncio.to_asyncio_future 适配器。

怎么运行的

包含 yield 的函数是 发电机。所有发电机都是异步的;当被调用时,它们返回一个生成器对象,而不是运行到完成。 @gen.coroutine 装饰器通过 yield 表达式与生成器通信,并通过返回 Future 与协同程序的调用者通信。

这里是协程装饰器的内循环的简化版本:

# Simplified inner loop of tornado.gen.Runner
def run(self):
    # send(x) makes the current yield return x.
    # It returns when the next yield is reached
    future = self.gen.send(self.next)
    def callback(f):
        self.next = f.result()
        self.run()
    future.add_done_callback(callback)

装饰器从发生器接收 Future,等待(不阻塞)该 Future 完成,然后“展开” Future,并将结果作为 yield 表达的结果发送回生成器。大多数异步代码不会直接触及 Future 类,除非立即将异步函数返回的 Future 传递给 yield 表达式。

如何调用协程

协程不以正常方式引发异常:它们引发的任何异常都将被捕获在 Future 中,直到它被释放。这意味着,以正确的方式调用协程很重要,或者您可能有错误,不被注意:

@gen.coroutine
def divide(x, y):
    return x / y

def bad_call():
    # This should raise a ZeroDivisionError, but it won't because
    # the coroutine is called incorrectly.
    divide(1, 0)

在几乎所有情况下,调用协程的任何函数必须是协程本身,并在调用中使用 yield 关键字。当您覆盖超类中定义的方法时,请查阅文档以查看是否允许协同程序(文档应该说该方法“可能是协程”或“可能返回 Future ”):

@gen.coroutine
def good_call():
    # yield will unwrap the Future returned by divide() and raise
    # the exception.
    yield divide(1, 0)

有时你可能想要“烧和忘记”协程,而不等待其结果。在这种情况下,建议使用 IOLoop.spawn_callback,这使 IOLoop 负责呼叫。如果失败,IOLoop 将记录一个堆栈跟踪:

# The IOLoop will catch the exception and print a stack trace in
# the logs. Note that this doesn't look like a normal call, since
# we pass the function object to be called by the IOLoop.
IOLoop.current().spawn_callback(divide, 1, 0)

最后,在程序的顶层,如果`.Ioloop`还没有运行, 可以启动 IOLoop,运行协同程序,然后使用 IOLoop.run_sync 方法停止 IOLoop。这通常用于启动面向批处理程序的 main 函数:

# run_sync() doesn't take arguments, so we must wrap the
# call in a lambda.
IOLoop.current().run_sync(lambda: divide(1, 0))

协同模式

与回调的交互

要与使用回调而不是 Future 的异步代码交互,请将调用包装在 Task 中。这将为您添加回调参数并返回一个 Future,您可以产生:

@gen.coroutine
def call_task():
    # Note that there are no parens on some_function.
    # This will be translated by Task into
    #   some_function(other_args, callback=callback)
    yield gen.Task(some_function, other_args)

调用阻塞函数

从协程中调用阻塞函数的最简单的方法是使用 ThreadPoolExecutor,它返回与协程兼容的 Futures:

thread_pool = ThreadPoolExecutor(4)

@gen.coroutine
def call_blocking():
    yield thread_pool.submit(blocking_func, args)

并行性

协同程序装饰器识别值为 Futures 的列表和字典,并且并行等待所有这些 Futures

@gen.coroutine
def parallel_fetch(url1, url2):
    resp1, resp2 = yield [http_client.fetch(url1),
                          http_client.fetch(url2)]

@gen.coroutine
def parallel_fetch_many(urls):
    responses = yield [http_client.fetch(url) for url in urls]
    # responses is a list of HTTPResponses in the same order

@gen.coroutine
def parallel_fetch_dict(urls):
    responses = yield {url: http_client.fetch(url)
                        for url in urls}
    # responses is a dict {url: HTTPResponse}

交织

有时,保存 Future 而不是立即生成它是有用的,因此您可以在等待之前启动另一个操作:

@gen.coroutine
def get(self):
    fetch_future = self.fetch_next_chunk()
    while True:
        chunk = yield fetch_future
        if chunk is None: break
        self.write(chunk)
        fetch_future = self.fetch_next_chunk()
        yield self.flush()

循环

循环对于协同程序是棘手的,因为在 forwhile 循环的每次迭代中,没有办法在Python中对 yield 进行捕获,并捕获yield的结果。相反,您需要将循环条件与访问结果分开,如本例中 发动机 所示:

import motor
db = motor.MotorClient().test

@gen.coroutine
def loop_example(collection):
    cursor = db.collection.find()
    while (yield cursor.fetch_next):
        doc = cursor.next_object()

在背景中运行

PeriodicCallback 通常不与协同程序一起使用。相反,协同程序可以包含 while True: 循环并使用 tornado.gen.sleep:

@gen.coroutine
def minute_loop():
    while True:
        yield do_something()
        yield gen.sleep(60)

# Coroutines that loop forever are generally started with
# spawn_callback().
IOLoop.current().spawn_callback(minute_loop)

有时可能需要更复杂的环路。例如,上一个循环运行每个 60+N 秒,其中 Ndo_something() 的运行时间。要正好每60秒运行一次,请使用上面的交织模式:

@gen.coroutine
def minute_loop2():
    while True:
        nxt = gen.sleep(60)   # Start the clock.
        yield do_something()  # Run while the clock is ticking.
        yield nxt             # Wait for the timer to run out.