Skip to main content

18.5.9. 用asyncio开发

异步编程不同于传统的“顺序”编程。此页面列出常见陷阱,并解释如何避免它们。

18.5.9.1. 调试模式的异步

asyncio 的实现是为了性能而编写的。为了简化异步代码的开发,您可能希望启用 调试模式

要为应用程序启用所有调试检查:

示例调试检查:

  • 日志 协程定义但从不“从”

  • 如果 call_soon()call_at() 方法从错误的线程调用,则会引发异常。

  • 记录选择器的执行时间

  • 日志回调需要超过100毫秒才能执行。 AbstractEventLoop.slow_callback_duration 属性是“慢”回调的最小持续时间(以秒为单位)。

  • 当传输和事件循环是 未明确关闭 时,会发出 ResourceWarning 警告。

18.5.9.2. 消除

取消任务在经典编程中不常见。在异步编程中,不仅它是常见的东西,而且你必须准备代码来处理它。

期货和任务可以用他们的 Future.cancel() 方法明确取消。当超时发生时,wait_for() 功能取消等待的任务。还有许多其他情况下,任务可以间接取消。

如果未来被取消,不要调用 Futureset_result()set_exception() 方法:它将失败并出现异常。例如,写:

if not fut.cancelled():
    fut.set_result('done')

不要直接调度 set_result()AbstractEventLoop.call_soon() 未来的 set_exception() 方法:未来可以在调用它的方法之前取消。

如果你等待未来,你应该早点检查未来是否被取消,以避免无用的操作。例:

@coroutine
def slow_operation(fut):
    if fut.cancelled():
        return
    # ... slow computation ...
    yield from fut
    # ...

shield() 功能也可用于忽略取消。

18.5.9.3. 并发和多线程

事件循环在线程中运行,并在同一线程中执行所有回调和任务。当任务在事件循环中运行时,没有其他任务在同一线程中运行。但是当任务使用 yield from 时,任务被挂起,事件循环执行下一个任务。

要计划来自不同线程的回调,应该使用 AbstractEventLoop.call_soon_threadsafe() 方法。例:

loop.call_soon_threadsafe(callback, *args)

大多数asyncio对象不是线程安全的。你只需要担心如果你访问事件循环之外的对象。例如,要取消未来,请不要直接调用其 Future.cancel() 方法,但是:

loop.call_soon_threadsafe(fut.cancel)

为了处理信号和执行子进程,事件循环必须在主线程中运行。

要从不同的线程调度协程对象,应使用 run_coroutine_threadsafe() 函数。它返回一个 concurrent.futures.Future 以访问结果:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
result = future.result(timeout)  # Wait for the result with a timeout

AbstractEventLoop.run_in_executor() 方法可以与线程池执行器一起使用,以在不同线程中执行回调,从而不阻塞事件循环的线程。

参见

同步原语 部分描述了同步任务的方法。

子进程和线程 部分列出了从不同线程运行子进程的异步限制。

18.5.9.4. 正确处理阻塞函数

不应该直接调用阻塞函数。例如,如果功能块1秒钟,其他任务被延迟1秒,这可能对反应性有重要影响。

对于网络和子进程,asyncio 模块提供高级API,如 协议

执行器可用于在不同的线程中或甚至在不同的进程中运行任务,以不阻塞事件循环的线程。请参阅 AbstractEventLoop.run_in_executor() 方法。

参见

延迟呼叫 部分详细说明事件循环如何处理时间。

18.5.9.5. 记录

asyncio 模块在记录器 'asyncio' 中记录与 logging 模块的信息。

asyncio 模块的默认日志级别为 logging.INFO。对于那些不想从 asyncio 这样冗长的日志级别可以更改。例如,要将级别更改为 logging.WARNING

logging.getLogger('asyncio').setLevel(logging.WARNING)

18.5.9.6. 检测协程对象从未计划

当调用协同程序函数并且其结果没有传递给 ensure_future()AbstractEventLoop.create_task() 方法时,协程对象的执行将永远不会被调度,这可能是一个错误。 启用asyncio的调试模式记录警告 来检测它。

示例与错误:

import asyncio

@asyncio.coroutine
def test():
    print("never scheduled")

test()

在调试模式下输出:

Coroutine test() at test.py:3 was never yielded from
Coroutine object created at (most recent call last):
  File "test.py", line 7, in <module>
    test()

修复是使用协同程序对象调用 ensure_future() 函数或 AbstractEventLoop.create_task() 方法。

18.5.9.7. 检测未使用的异常

Python通常在未处理的异常上调用 sys.displayhook()。如果调用 Future.set_exception(),但该异常从未消耗,则不会调用 sys.displayhook()。相反,发出日志 当未来由垃圾收集器删除,与traceback其中引发异常。

未处理的异常的示例:

import asyncio

@asyncio.coroutine
def bug():
    raise Exception("not consumed")

loop = asyncio.get_event_loop()
asyncio.ensure_future(bug())
loop.run_forever()
loop.close()

输出:

Task exception was never retrieved
future: <Task finished coro=<coro() done, defined at asyncio/coroutines.py:139> exception=Exception('not consumed',)>
Traceback (most recent call last):
  File "asyncio/tasks.py", line 237, in _step
    result = next(coro)
  File "asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "test.py", line 5, in bug
    raise Exception("not consumed")
Exception: not consumed

启用asyncio的调试模式 来获取创建任务的回溯。在调试模式下输出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3> exception=Exception('not consumed',) created at test.py:8>
source_traceback: Object created at (most recent call last):
  File "test.py", line 8, in <module>
    asyncio.ensure_future(bug())
Traceback (most recent call last):
  File "asyncio/tasks.py", line 237, in _step
    result = next(coro)
  File "asyncio/coroutines.py", line 79, in __next__
    return next(self.gen)
  File "asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "test.py", line 5, in bug
    raise Exception("not consumed")
Exception: not consumed

有不同的选项来解决这个问题。第一个选项是在另一个协程中链接协同程序,并使用经典的try/except:

@asyncio.coroutine
def handle_exception():
    try:
        yield from bug()
    except Exception:
        print("exception consumed")

loop = asyncio.get_event_loop()
asyncio.ensure_future(handle_exception())
loop.run_forever()
loop.close()

另一个选项是使用 AbstractEventLoop.run_until_complete() 功能:

task = asyncio.ensure_future(bug())
try:
    loop.run_until_complete(task)
except Exception:
    print("exception consumed")

参见

Future.exception() 方法。

18.5.9.8. 链协同正确

当协同程序函数调用其他协同程序函数和任务时,它们应该用 yield from 明确链接。否则,不能保证执行是顺序的。

使用 asyncio.sleep() 模拟慢速操作的不同错误的示例:

import asyncio

@asyncio.coroutine
def create():
    yield from asyncio.sleep(3.0)
    print("(1) create file")

@asyncio.coroutine
def write():
    yield from asyncio.sleep(1.0)
    print("(2) write into file")

@asyncio.coroutine
def close():
    print("(3) close file")

@asyncio.coroutine
def test():
    asyncio.ensure_future(create())
    asyncio.ensure_future(write())
    asyncio.ensure_future(close())
    yield from asyncio.sleep(2.0)
    loop.stop()

loop = asyncio.get_event_loop()
asyncio.ensure_future(test())
loop.run_forever()
print("Pending tasks at exit: %s" % asyncio.Task.all_tasks(loop))
loop.close()

预期输出:

(1) create file
(2) write into file
(3) close file
Pending tasks at exit: set()

实际输出:

(3) close file
(2) write into file
Pending tasks at exit: {<Task pending create() at test.py:7 wait_for=<Future pending cb=[Task._wakeup()]>>}
Task was destroyed but it is pending!
task: <Task pending create() done at test.py:5 wait_for=<Future pending cb=[Task._wakeup()]>>

循环在 create() 完成之前停止,close() 已经在 write() 之前调用,而协同程序函数按以下顺序调用:create()write()close()

要修复示例,任务必须用 yield from 标记:

@asyncio.coroutine
def test():
    yield from asyncio.ensure_future(create())
    yield from asyncio.ensure_future(write())
    yield from asyncio.ensure_future(close())
    yield from asyncio.sleep(2.0)
    loop.stop()

或没有 asyncio.ensure_future():

@asyncio.coroutine
def test():
    yield from create()
    yield from write()
    yield from close()
    yield from asyncio.sleep(2.0)
    loop.stop()

18.5.9.9. 待处理的任务已销毁

如果待处理的任务被销毁,其包装的 协同 的执行没有完成。这可能是一个错误,因此记录一个警告。

日志示例:

Task was destroyed but it is pending!
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()]>>

启用asyncio的调试模式 来获取创建任务的回溯。登录调试模式示例:

Task was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "test.py", line 15, in <module>
    task = asyncio.ensure_future(coro, loop=loop)
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()] created at test.py:7> created at test.py:15>

18.5.9.10. 关闭传输和事件循环

当不再需要传输时,调用其 close() 方法释放资源。事件循环也必须明确地关闭。

如果传输或事件循环没有明确关闭,ResourceWarning 警告将在其析构函数中发出。默认情况下,将忽略 ResourceWarning 警告。 调试模式的异步 部分说明如何显示它们。