Skip to main content

18.5.3. 任务和协同

18.5.3.1. 协同

asyncio 一起使用的协同程序可以使用 async def 语句或通过使用 发电机 来实现。 async def 类型的协程是在Python 3.5中添加的,如果不需要支持旧的Python版本,则推荐使用。

基于生成器的协程应该使用 @asyncio.coroutine 进行装饰,虽然这不是严格执行的。装饰器实现与 async def 协同程序的兼容性,并且也用作文档。基于生成器的协同程序使用 PEP 380 中引入的 yield from 语法,而不是原始的 yield 语法。

词语“协同程序”,类似于词“生成器”,用于两个不同的(虽然相关的)概念:

  • 定义协程(使用 async def 或用 @asyncio.coroutine 装饰的函数定义)的函数。如果需要消除歧义,我们称之为 协同功能iscoroutinefunction() 返回 True)。

  • 通过调用coroutine函数获得的对象。此对象表示将最终完成的计算或I/O操作(通常是组合)。如果需要消除歧义,我们称之为 协同对象iscoroutine() 返回 True)。

协同程序可以执行的操作:

  • result = await futureresult = yield from future - 挂起协程直到未来完成,然后返回未来的结果,或引发异常,这将被传播。 (如果未来被取消,它将引发一个 CancelledError 异常。)注意任务是期货,关于期货的一切也适用于任务。

  • result = await coroutineresult = yield from coroutine - 等待另一个协程生成结果(或引发异常,将被传播)。 coroutine 表达式必须是 call 到另一个协程。

  • return expression - 使用 awaityield from 向正在等待此协程的协程产生结果。

  • raise exception - 在使用 awaityield from 等待此协程的协程中引发异常。

调用协程不会启动其代码运行 - 调用返回的协程对象在您安排其执行之前不会执行任何操作。有两种基本方法来启动它运行:从另一个协同程序调用 await coroutineyield from coroutine (假设另一个协程已经运行!),或者使用 ensure_future() 函数或 AbstractEventLoop.create_task() 方法调度它的执行。

协程(和任务)只能在事件循环运行时运行。

@asyncio.coroutine

装饰器标记基于生成器的协同程序。这使得生成器使用 yield from 来调用 async def 协同程序,并且还使得能够由 async def 协同程序调用该生成器,例如使用 await 表达式。

没有必要装饰 async def 协程本身。

如果生成器在被销毁之前没有生成,则会记录一条错误消息。见 检测协程从未计划

注解

在本文档中,一些方法被记录为协程,即使它们是返回 Future 的纯Python函数。这是有意在未来调整这些职能的执行的自由。如果需要在回调式代码中使用这样的函数,请用 ensure_future() 包装其结果。

18.5.3.1.1. 示例:Hello World协程

协同程序显示 "Hello World" 的示例:

import asyncio

async def hello_world():
    print("Hello World!")

loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
loop.run_until_complete(hello_world())
loop.close()

参见

Hello World with call_soon() 示例使用 AbstractEventLoop.call_soon() 方法计划回调。

18.5.3.1.2. 示例:协程显示当前日期

协同程序示例使用 sleep() 功能在5秒内每秒显示当前日期:

import asyncio
import datetime

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

相同的协程使用发生器实现:

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)

参见

显示当前日期与call_later() 示例使用对 AbstractEventLoop.call_later() 方法的回调。

18.5.3.1.3. 示例:链协同

链接协同程序示例:

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

compute() 链接到 print_sum()print_sum() 协程等待,直到 compute() 完成,然后返回其结果。

序列图示例:

../../_images/tulip_coro.png

“任务”由 AbstractEventLoop.run_until_complete() 方法创建,当它获得协同对象而不是任务时。

该图显示了控制流,它没有准确描述内部如何工作。例如,睡眠协程创建了一个内部未来,它使用 AbstractEventLoop.call_later() 在1秒钟内唤醒任务。

18.5.3.2. InvalidStateError

exception asyncio.InvalidStateError

在此状态下不允许操作。

18.5.3.3. TimeoutError

exception asyncio.TimeoutError

操作超过了给定的截止日期。

注解

此异常与内置 TimeoutError 异常不同!

18.5.3.4. 未来

class asyncio.Future(*, loop=None)

这个类是 almost 兼容 concurrent.futures.Future

区别:

这个类是 不是线程安全

cancel()

取消未来并计划回调。

如果未来已经完成或取消,返回 False。否则,将未来的状态更改为取消,计划回调并返回 True

cancelled()

如果未来被取消,请返回 True

done()

如果未来完成,返回 True

完成意味着结果/异常可用,或未来被取消。

result()

返回这个未来所代表的结果。

如果未来被取消,提出 CancelledError。如果未来的结果还不可用,提高 InvalidStateError。如果未来完成并且设置了异常,则会引发此异常。

exception()

返回对此未来设置的异常。

如果未来完成,则返回异常(如果没有设置异常,则为 None)。如果未来被取消,提高 CancelledError。如果未来还没有完成,提高 InvalidStateError

add_done_callback(fn)

添加一个回调,以便在未来完成时运行。

回调使用单个参数调用 - 未来对象。如果未来在调用时已经完成,则回调将与 call_soon() 一起调度。

使用functools.partial将参数传递给回调。例如,fut.add_done_callback(functools.partial(print, "Future:", flush=True)) 将调用 print("Future:", fut, flush=True)

remove_done_callback(fn)

从“调用完成时”列表中删除回调的所有实例。

返回已删除的回调的数量。

set_result(result)

标记未来完成并设置其结果。

如果调用此方法时未来已经完成,则引发 InvalidStateError

set_exception(exception)

标记未来完成并设置异常。

如果调用此方法时未来已经完成,则引发 InvalidStateError

18.5.3.4.1. 示例:future与run_until_complete()

组合 Future协同功能 的示例:

import asyncio

@asyncio.coroutine
def slow_operation(future):
    yield from asyncio.sleep(1)
    future.set_result('Future is done!')

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()

协程函数负责计算(需要1秒),并将结果存储到未来。 run_until_complete() 方法等待未来的完成。

注解

run_until_complete() 方法在内部使用 add_done_callback() 方法,以便在未来完成时通知。

18.5.3.4.2. 示例:未来与run_forever()

可以使用 Future.add_done_callback() 方法不同地编写前面的示例来明确地描述控制流:

import asyncio

@asyncio.coroutine
def slow_operation(future):
    yield from asyncio.sleep(1)
    future.set_result('Future is done!')

def got_result(future):
    print(future.result())
    loop.stop()

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
    loop.run_forever()
finally:
    loop.close()

在这个例子中,未来用于将 slow_operation() 链接到 got_result():当 slow_operation() 完成时,用结果调用 got_result()

18.5.3.5. 任务

class asyncio.Task(coro, *, loop=None)

安排 协同 的执行:将来包装它。任务是 Future 的子类。

任务负责在事件循环中执行协同程序对象。如果被封装的协程从未来产生,则任务暂停被封装协同程序的执行并等待未来的完成。当未来完成时,包装的协程的执行将重新启动,结果或未来的异常。

事件循环使用协作调度:事件循环每次只运行一个任务。如果其他事件循环在不同线程中运行,则其他任务可以并行运行。当任务等待未来的完成时,事件循环执行新任务。

任务的取消与未来的取消不同。调用 cancel() 将向包装的协程抛出 CancelledError。如果包装的协程没有捕获到 CancelledError 异常,或者引发 CancelledError 异常,cancelled() 只返回 True

如果待处理的任务被销毁,其包装的 协同 的执行没有完成。这可能是一个错误,并记录了一个警告:参见 待处理的任务已销毁

不要直接创建 Task 实例:使用 ensure_future() 函数或 AbstractEventLoop.create_task() 方法。

这个类是 不是线程安全

classmethod all_tasks(loop=None)

返回一组事件循环的所有任务。

默认情况下,将返回当前事件循环的所有任务。

classmethod current_task(loop=None)

返回事件循环或 None 中当前正在运行的任务。

默认情况下,返回当前事件循环的当前任务。

当不是在 Task 的上下文中调用时返回 None

cancel()

请求此任务取消自身。

这安排 CancelledError 在下一个循环通过事件循环被抛出到包装的协程中。协程然后有机会清理,甚至拒绝请求使用try/except/finally。

Future.cancel() 不同,这不能保证任务将被取消:异常可能被捕获和执行,延迟任务的取消或完全阻止取消。任务也可能返回值或引发不同的异常。

在调用此方法之后,cancelled() 不会返回 True (除非任务已被取消)。当包装的协程以 CancelledError 异常终止(即使未调用 cancel())时,任务将被标记为已取消。

get_stack(*, limit=None)

返回此任务的协程的堆栈帧的列表。

如果协程没有完成,这将返回它被暂停的堆栈。如果协程已成功完成或被取消,这将返回一个空列表。如果协程由异常终止,则返回回溯帧列表。

帧总是从最旧到最新排序。

可选限制给出了要返回的最大帧数;默认情况下返回所有可用的帧。其含义根据是否返回堆栈或回溯而有所不同:返回堆栈的最新帧,但返回回溯的最早帧。 (这与追溯模块的行为相匹配。)

由于我们无法控制的原因,对于一个挂起的协程,只返回一个栈帧。

print_stack(*, limit=None, file=None)

打印此任务的协程的堆栈或回溯。

这为由get_stack()检索的帧产生与追溯模块类似的输出。 limit参数传递给get_stack()。文件参数是写入输出的I/O流;默认情况下输出写入sys.stderr。

18.5.3.5.1. 示例:并行执行任务

并行执行3个任务(A,B,C)的示例:

import asyncio

@asyncio.coroutine
def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        yield from asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
tasks = [
    asyncio.ensure_future(factorial("A", 2)),
    asyncio.ensure_future(factorial("B", 3)),
    asyncio.ensure_future(factorial("C", 4))]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

输出:

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24

任务在创建时自动计划执行。所有任务完成后,事件循环停止。

18.5.3.6. 任务功能

注解

在下面的函数中,可选的 loop 参数允许显式设置基础任务或协程使用的事件循环对象。如果没有提供,则使用默认事件循环。

asyncio.as_completed(fs, *, loop=None, timeout=None)

返回一个迭代器,其值在等待时是 Future 实例。

如果在所有期货完成之前发生超时,则提高 asyncio.TimeoutError

例:

for f in as_completed(fs):
    result = yield from f  # The 'yield from' may raise
    # Use result

注解

期货 f 不一定是fs的成员。

asyncio.ensure_future(coro_or_future, *, loop=None)

安排 协同对象 的执行:将来包装它。返回 Task 对象。

如果参数是 Future,则直接返回。

3.4.4 新版功能.

在 3.5.1 版更改: 该函数接受任何 awaitable 对象。

asyncio.async(coro_or_future, *, loop=None)

ensure_future() 的已弃用别名。

3.4.4 版后已移除.

asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

返回从给定协同对象或期货的未来聚合结果。

所有期货必须共享相同的事件循环。如果所有任务都成功完成,则返回的未来的结果是结果列表(按照原始序列的顺序,不一定是结果到达的顺序)。如果 return_exceptions 为真,则任务中的异常被视为与成功结果相同,并在结果列表中收集;否则,第一个引发的异常将立即传播到返回的未来。

取消:如果外部未来被取消,所有孩子(还没有完成)也被取消。如果任何孩子被取消,这被视为提出 CancelledError - 外部未来在这种情况下取消 not。 (这是为了防止取消一个孩子导致其他孩子被取消。)

asyncio.iscoroutine(obj)

如果 obj协同对象,则返回 True,其可以基于生成器或 async def 协同程序。

asyncio.iscoroutinefunction(func)

如果 func 被确定为 协同功能,则返回 True,其可以是修饰的生成函数或 async def 函数。

asyncio.run_coroutine_threadsafe(coro, loop)

协同对象 提交到给定的事件循环。

返回 concurrent.futures.Future 以访问结果。

此函数旨在从与运行事件循环的线程不同的线程调用。用法:

# Create a coroutine
coro = asyncio.sleep(1, result=3)
# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果在协程中引发异常,将通知返回的将来。它也可以用于在事件循环中取消任务:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print('The coroutine raised an exception: {!r}'.format(exc))
else:
    print('The coroutine returned: {!r}'.format(result))

请参阅文档的 并发和多线程 部分。

注解

与模块中的其他函数不同,run_coroutine_threadsafe() 需要明确传递 loop 参数。

3.5.1 新版功能.

coroutine asyncio.sleep(delay, result=None, *, loop=None)

创建在给定时间(以秒为单位)后完成的 协同。如果提供 result,当协程完成时,它将生成给调用者。

睡眠的决定取决于 事件循环的粒度

此功能是 协同

asyncio.shield(arg, *, loop=None)

等待未来,屏蔽它取消。

该声明:

res = yield from shield(something())

完全等价于语句:

res = yield from something()

except,如果包含它的协程被取消,在 something() 中运行的任务不会被取消。从 something() 的角度来看,取消没有发生。但是它的调用者仍然被取消,所以yield-from表达式仍然提高 CancelledError。注意:如果 something() 被其他方式取消,这将取消 shield()

如果要完全忽略取消(不推荐),您可以将 shield() 与try/except子句组合,如下所示:

try:
    res = yield from shield(something())
except CancelledError:
    res = None
coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

等待由序列 futures 给出的Futures和协程对象完成。协程将包裹在任务中。返回两组 Future:(done,pending)。

序列 futures 不能为空。

timeout 可用于控制返回前等待的最大秒数。 timeout 可以是一个int或float。如果未指定 timeoutNone,则等待时间没有限制。

return_when 指示此函数何时返回。它必须是 concurrent.futures 模块的以下常数之一:

不变

描述

FIRST_COMPLETED

当任何未来完成或被取消时,该函数将返回。

FIRST_EXCEPTION

当任何未来通过提出异常完成时,函数将返回。如果没有未来引发异常,那么它等同于 ALL_COMPLETED

ALL_COMPLETED

当所有期货完成或被取消时,函数将返回。

此功能是 协同

用法:

done, pending = yield from asyncio.wait(fs)

注解

这不会提高 asyncio.TimeoutError!在超时发生时未完成的期货在第二组中返回。

coroutine asyncio.wait_for(fut, timeout, *, loop=None)

等待单个 Future协同对象 超时完成。如果 timeoutNone,阻塞直到未来完成。

协程将被包装在 Task 中。

返回Future或协同程序的结果。当发生超时时,它取消任务并产生 asyncio.TimeoutError。要避免任务取消,请将其包装在 shield() 中。

如果等待被取消,未来的 fut 也被取消。

这个功能是 协同,用法:

result = yield from asyncio.wait_for(fut, 60.0)

在 3.4.3 版更改: 如果等待被取消,未来的 fut 现在也被取消。