Skip to main content

18.5.1. 基本事件循环

事件循环是 asyncio 提供的中央执行设备。它提供多种设施,包括:

  • 注册,执行和取消延迟呼叫(超时)。

  • 创建客户端和服务器 运输 以进行各种通信。

  • 启动子进程和相关的 运输 以与外部程序通信。

  • 将代价很高的函数调用委派给线程池。

class asyncio.BaseEventLoop

这个类是一个实现细节。它是 AbstractEventLoop 的子类,可以是在 asyncio 中发现的具体事件循环实现的基类。它不应该直接使用;使用 AbstractEventLoopBaseEventLoop 不应该被第三方代码子类化;内部接口不稳定。

class asyncio.AbstractEventLoop

事件循环的抽象基类。

这个类是 不是线程安全

18.5.1.1. 运行事件循环

AbstractEventLoop.run_forever()

运行直到调用 stop()。如果在调用 run_forever() 之前调用 stop(),则将以超时为零轮询I/O选择器一次,运行为响应I/O事件(以及已调度的那些)而调度的所有回调,然后退出。如果在 run_forever() 运行时调用 stop(),这将运行当前批处理的回调,然后退出。注意,回调调度的回调在这种情况下不会运行;它们将在下次调用 run_forever() 时运行。

在 3.5.1 版更改.

AbstractEventLoop.run_until_complete(future)

运行直到 Future 完成。

如果参数是 协同对象,它被 ensure_future() 包装。

返回未来的结果,或提出它的异常。

AbstractEventLoop.is_running()

返回事件循环的运行状态。

AbstractEventLoop.stop()

停止运行事件循环。

这导致 run_forever() 在下一个适当的机会退出(见有更多的细节)。

在 3.5.1 版更改.

AbstractEventLoop.is_closed()

如果事件循环关闭,则返回 True

3.4.2 新版功能.

AbstractEventLoop.close()

关闭事件循环。循环不能运行。挂起的回调将丢失。

这会清除队列并关闭执行程序,但不会等待执行程序完成。

这是幂等的和不可逆的。在这之后不应该调用其他方法。

coroutine AbstractEventLoop.shutdown_asyncgens()

计划所有当前打开的 asynchronous generator 对象以通过 aclose() 呼叫关闭。调用此方法后,每当迭代一个新的异步生成器时,事件循环将发出警告。应该用于可靠地完成所有调度的异步发电机。例:

try:
    loop.run_forever()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

3.6 新版功能.

18.5.1.2. 呼叫

大多数 asyncio 函数不接受关键字。如果要将关键字传递到回调,请使用 functools.partial()。例如,loop.call_soon(functools.partial(print, "Hello", flush=True)) 将调用 print("Hello", flush=True)

注解

functools.partial() 优于 lambda 函数,因为 asyncio 可以检查 functools.partial() 对象以在调试模式下显示参数,而 lambda 函数具有较差的表示。

AbstractEventLoop.call_soon(callback, *args)

安排一个回调被尽快调用。在 call_soon() 返回后,控制返回到事件循环时调用回调。

此操作作为 FIFO(先进先出) 队列,回调按照它们注册的顺序调用。每个回调将被精确调用一次。

回调之后的任何位置参数将在调用时传递给回调。

返回一个 asyncio.Handle 实例,可用于取消回调。

使用functools.partial将关键字传递给回调

AbstractEventLoop.call_soon_threadsafe(callback, *args)

call_soon(),但线程安全。

请参阅文档的 并发和多线程 部分。

18.5.1.3. 延迟呼叫

事件循环具有用于计算超时的自己的内部时钟。使用哪个时钟取决于(平台特定的)事件循环实现;理想地,它是单调时钟。这通常是与 time.time() 不同的时钟。

注解

超时(相对 delay 或绝对 when)不应超过一天。

AbstractEventLoop.call_later(delay, callback, *args)

安排 callback 在给定的 delay 秒后调用(int或float)。

返回一个 asyncio.Handle 实例,可用于取消回调。

每次调用 call_later() 时,callback 将被调用一次。如果两个回调被调度完全相同的时间,它是未定义的,将首先调用。

可选的位置 args 将在调用时传递给回调。如果希望使用某些命名参数调用回调,请使用闭包或 functools.partial()

使用functools.partial将关键字传递给回调

AbstractEventLoop.call_at(when, callback, *args)

安排 callback 在给定的绝对时间戳 when (一个int或float)被调用,使用与 AbstractEventLoop.time() 相同的时间引用。

此方法的行为与 call_later() 相同。

返回一个 asyncio.Handle 实例,可用于取消回调。

使用functools.partial将关键字传递给回调

AbstractEventLoop.time()

根据事件循环的内部时钟返回当前时间,作为 float 值。

参见

asyncio.sleep() 功能。

18.5.1.4. 期货

AbstractEventLoop.create_future()

创建一个附加到循环的 asyncio.Future 对象。

这是在asyncio中创建期货的首选方式,因为事件循环实现可以提供Future类的替代实现(具有更好的性能或工具)。

3.5.2 新版功能.

18.5.1.5. 任务

AbstractEventLoop.create_task(coro)

安排 协同对象 的执行:将来包装它。返回 Task 对象。

第三方事件循环可以使用自己的 Task 子类来实现互操作性。在这种情况下,结果类型是 Task 的子类。

这个方法是在Python 3.4.2中添加的。使用 async() 函数也支持老的Python版本。

3.4.2 新版功能.

AbstractEventLoop.set_task_factory(factory)

设置将由 AbstractEventLoop.create_task() 使用的任务工厂。

如果 factoryNone,则将设置默认任务工厂。

如果 factorycallable,它应该具有匹配 (loop, coro) 的签名,其中 loop 将是对活动事件循环的引用,coro 将是协程对象。 callable必须返回一个 asyncio.Future 兼容的对象。

3.4.4 新版功能.

AbstractEventLoop.get_task_factory()

返回任务工厂或 None (如果使用默认工厂)。

3.4.4 新版功能.

18.5.1.6. 创建连接

coroutine AbstractEventLoop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None)

创建到给定Internet hostport 的流传输连接:根据 host (或 family,如果指定)的套接字系列 AF_INETAF_INET6,套接字类型 SOCK_STREAMprotocol_factory 必须是返回 协议 实例的可调用方。

此方法是一个 协同,它将尝试在后台建立连接。成功时,协程返回一个 (transport, protocol) 对。

基本操作的时间概要如下:

  1. 建立连接,并创建 运输 来表示它。

  2. protocol_factory 无参数调用,必须返回 协议 实例。

  3. 协议实例绑定到传输,并调用其 connection_made() 方法。

  4. 协同程序使用 (transport, protocol) 对成功返回。

创建的传输是一个依赖于实现的双向流。

注解

protocol_factory 可以是任何类型的可调用,不一定是类。例如,如果要使用预先创建的协议实例,可以传递 lambda: my_protocol

更改连接创建方式的选项:

  • ssl:如果给定而不是false,则创建SSL/TLS传输(默认情况下创建纯TCP传输)。如果 sslssl.SSLContext 对象,则此上下文用于创建传输;如果 sslTrue,则使用具有一些未指定的默认设置的上下文。

  • server_hostname,仅与 ssl 一起使用,并设置或覆盖与目标服务器的证书匹配的主机名。默认情况下使用 host 参数的值。如果 host 为空,则没有默认值,您必须传递 server_hostname 的值。如果 server_hostname 是一个空字符串,主机名匹配被禁用(这是一个严重的安全风险,允许中间人攻击)。

  • familyprotoflags 是可选的地址族,协议和标志,通过getaddrinfo()为 host 分辨率。如果给出,这些应该都是来自相应 socket 模块常数的整数。

  • sock,如果给定,应该是一个已经连接的 socket.socket 对象,供传输使用。如果给出 sock,则应当不指定 hostportfamilyprotoflagslocal_addr

  • 如果给出,local_addr 是用于将套接字本地绑定的 (local_host, local_port) 元组。使用getaddrinfo()查找 local_hostlocal_port,类似于 hostport

在 3.5 版更改: 在具有 ProactorEventLoop 的Windows上,现在支持SSL/TLS。

参见

open_connection() 函数可以用于获取一对(StreamReaderStreamWriter)而不是协议。

coroutine AbstractEventLoop.create_datagram_endpoint(protocol_factory, local_addr=None, remote_addr=None, *, family=0, proto=0, flags=0, reuse_address=None, reuse_port=None, allow_broadcast=None, sock=None)

创建数据报连接:根据 host (或 family,如果指定)的套接字系列 AF_INETAF_INET6,套接字类型 SOCK_DGRAMprotocol_factory 必须是返回 协议 实例的可调用方。

此方法是一个 协同,它将尝试在后台建立连接。成功时,协程返回一个 (transport, protocol) 对。

更改连接创建方式的选项:

  • 如果给出,local_addr 是用于将套接字本地绑定的 (local_host, local_port) 元组。使用 getaddrinfo() 查找 local_hostlocal_port

  • remote_addr (如果给出)是用于将套接字连接到远程地址的 (remote_host, remote_port) 元组。使用 getaddrinfo() 查找 remote_hostremote_port

  • familyprotoflags 是可选的地址族,协议和标志,通过 getaddrinfo() 进行 host 解析。如果给出,这些应该都是来自相应的 socket 模块常量的整数。

  • reuse_address 告诉内核重用TIME_WAIT状态中的本地套接字,而不必等待其自然超时到期。如果未指定,将在UNIX上自动设置为 True

  • reuse_port 告诉内核允许此端点绑定到与其他现有端点绑定的相同端口,只要它们在创建时都设置此标志。在Windows和某些UNIX上不支持此选项。如果未定义 SO_REUSEPORT 常数,则不支持此功能。

  • allow_broadcast 告诉内核允许此端点发送消息到广播地址。

  • 可以可选地指定 sock,以便使用先前存在的,已经连接的 socket.socket 对象来由传输使用。如果指定,local_addrremote_addr 应该省略(必须是 None)。

在具有 ProactorEventLoop 的Windows上,不支持此方法。

请参阅 UDP回显客户端协议UDP回显服务器协议 示例。

coroutine AbstractEventLoop.create_unix_connection(protocol_factory, path, *, ssl=None, sock=None, server_hostname=None)

创建UNIX连接:套接字系列 AF_UNIX,套接字类型 SOCK_STREAMAF_UNIX 插座系列用于高效地在同一台机器上的进程之间进行通信。

此方法是一个 协同,它将尝试在后台建立连接。成功时,协程返回一个 (transport, protocol) 对。

path 是UNIX域套接字的名称,是必需的,除非指定了 sock 参数。支持抽象UNIX套接字,strbytes 路径。

请参阅参数的 AbstractEventLoop.create_connection() 方法。

可用性:UNIX。

18.5.1.7. 创建侦听连接

coroutine AbstractEventLoop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None)

创建绑定到 hostport 的TCP服务器(套接字类型 SOCK_STREAM)。

返回一个 Server 对象,它的 sockets 属性包含创建的套接字。使用 Server.close() 方法停止服务器:关闭侦听套接字。

参数:

  • host 参数可以是字符串,在这种情况下,TCP服务器绑定到 hostporthost 参数也可以是字符串序列,在这种情况下,TCP服务器绑定到序列的所有主机。如果 host 是空字符串或 None,则假定所有接口,并且将返回多个套接字的列表(很可能一个用于IPv4,另一个用于IPv6)。

  • family 可以设置为 socket.AF_INETAF_INET6,以强制套接字使用IPv4或IPv6。如果未设置,将从主机确定(默认为 socket.AF_UNSPEC)。

  • flagsgetaddrinfo() 的位掩码。

  • 可以可选地指定 sock 以便使用预先存在的套接字对象。如果指定,应省略 hostport (必须为 None)。

  • backlog 是传递给 listen() 的排队连接的最大数量(默认值为100)。

  • ssl 可以设置为 SSLContext 以在已接受的连接上启用SSL。

  • reuse_address 告诉内核重用TIME_WAIT状态中的本地套接字,而不必等待其自然超时到期。如果未指定,将在UNIX上自动设置为 True

  • reuse_port 告诉内核允许此端点绑定到与其他现有端点绑定的相同端口,只要它们在创建时都设置此标志。 Windows不支持此选项。

这种方法是 协同

在 3.5 版更改: 在具有 ProactorEventLoop 的Windows上,现在支持SSL/TLS。

参见

函数 start_server() 创建一个(StreamReaderStreamWriter)对并调用这个对的函数。

在 3.5.1 版更改: host 参数现在可以是字符串序列。

coroutine AbstractEventLoop.create_unix_server(protocol_factory, path=None, *, sock=None, backlog=100, ssl=None)

类似于 AbstractEventLoop.create_server(),但特定于套接字系列 AF_UNIX

这种方法是 协同

可用性:UNIX。

coroutine BaseEventLoop.connect_accepted_socket(protocol_factory, sock, *, ssl=None)

处理已接受的连接。

这是由接受asyncio之外的连接但使用asyncio来处理它们的服务器使用的。

参数:

  • sock 是从 accept 调用返回的预先存在的套接字对象。

  • ssl 可以设置为 SSLContext 以在已接受的连接上启用SSL。

这种方法是 协同。完成后,协程返回一个 (transport, protocol) 对。

18.5.1.8. 观察文件描述符

在具有 SelectorEventLoop 的Windows上,仅支持套接字句柄(例如:不支持管道文件描述符)。

在具有 ProactorEventLoop 的Windows上,不支持这些方法。

AbstractEventLoop.add_reader(fd, callback, *args)

开始观察文件描述符的读取可用性,然后调用具有指定参数的 callback

使用functools.partial将关键字传递给回调

AbstractEventLoop.remove_reader(fd)

停止观看文件描述符以获取读取可用性。

AbstractEventLoop.add_writer(fd, callback, *args)

开始观察文件描述符的写可用性,然后调用具有指定参数的 callback

使用functools.partial将关键字传递给回调

AbstractEventLoop.remove_writer(fd)

停止观察文件描述符以获取写入可用性。

观察读事件的文件描述符 示例使用低级 AbstractEventLoop.add_reader() 方法注册套接字的文件描述符。

18.5.1.9. 低级套接字操作

coroutine AbstractEventLoop.sock_recv(sock, nbytes)

从套接字接收数据。阻塞 socket.socket.recv() 方法后建模。

返回值是表示接收到的数据的字节对象。一次性接收的最大数据量由 nbytes 指定。

使用 SelectorEventLoop 事件循环,套接字 sock 必须是非阻塞的。

这种方法是 协同

coroutine AbstractEventLoop.sock_sendall(sock, data)

将数据发送到套接字。阻塞 socket.socket.sendall() 方法后建模。

插座必须连接到远程插座。此方法继续从 data 发送数据,直到所有数据已发送或发生错误。 None 成功返回。出现错误时,会引发异常,并且无法确定连接的接收端成功处理了多少数据(如果有)。

使用 SelectorEventLoop 事件循环,套接字 sock 必须是非阻塞的。

这种方法是 协同

coroutine AbstractEventLoop.sock_connect(sock, address)

连接到 address 上的远程插座。阻塞 socket.socket.connect() 方法后建模。

使用 SelectorEventLoop 事件循环,套接字 sock 必须是非阻塞的。

这种方法是 协同

在 3.5.2 版更改: address 不再需要解决。 sock_connect 将尝试通过调用 socket.inet_pton() 来检查 address 是否已经解决。如果不是,AbstractEventLoop.getaddrinfo() 将用于解决 address

coroutine AbstractEventLoop.sock_accept(sock)

接受连接。阻塞 socket.socket.accept() 后建模。

套接字必须绑定到地址并侦听连接。返回值是一对 (conn, address),其中 conn 是可用于在连接上发送和接收数据的 new 套接字对象,address 是绑定到连接另一端的套接字的地址。

套接字 sock 必须是非阻塞的。

这种方法是 协同

18.5.1.10. 解析主机名

coroutine AbstractEventLoop.getaddrinfo(host, port, *, family=0, type=0, proto=0, flags=0)

这种方法是一个 协同,类似于 socket.getaddrinfo() 功能,但是非阻塞。

coroutine AbstractEventLoop.getnameinfo(sockaddr, flags=0)

这种方法是一个 协同,类似于 socket.getnameinfo() 功能,但是非阻塞。

18.5.1.11. 连接管道

在具有 SelectorEventLoop 的Windows上,不支持这些方法。使用 ProactorEventLoop 支持Windows上的管道。

coroutine AbstractEventLoop.connect_read_pipe(protocol_factory, pipe)

在事件循环中注册读取管道。

protocol_factory 应该使用 Protocol 接口实例化对象。 pipe文件状对象。返回对 (transport, protocol),其中 transport 支持 ReadTransport 接口。

使用 SelectorEventLoop 事件循环,pipe 设置为非阻塞模式。

这种方法是 协同

coroutine AbstractEventLoop.connect_write_pipe(protocol_factory, pipe)

在eventloop中注册写入管道。

protocol_factory 应该使用 BaseProtocol 接口实例化对象。 pipe文件状对象。返回对 (transport, protocol),其中 transport 支持 WriteTransport 接口。

使用 SelectorEventLoop 事件循环,pipe 设置为非阻塞模式。

这种方法是 协同

18.5.1.12. UNIX信号

可用性:仅UNIX。

AbstractEventLoop.add_signal_handler(signum, callback, *args)

为信号添加处理程序。

如果信号号无效或不可复位,则提高 ValueError。如果设置处理程序时出现问题,请提高 RuntimeError

使用functools.partial将关键字传递给回调

AbstractEventLoop.remove_signal_handler(sig)

删除信号的处理程序。

如果信号处理程序被删除,返回 True,否则返回 False

参见

signal 模块。

18.5.1.13. 执行者

调用 Executor 中的函数(线程池或进程池)。默认情况下,事件循环使用线程池执行器(ThreadPoolExecutor)。

coroutine AbstractEventLoop.run_in_executor(executor, func, *args)

安排在指定的执行器中调用 func

executor 参数应为 Executor 实例。如果 executorNone,则使用默认执行器。

使用functools.partial将关键字传递给 *func*

这种方法是 协同

在 3.5.3 版更改: BaseEventLoop.run_in_executor() 不再配置它创建的线程池执行器的 max_workers,而是让它由线程池执行器(ThreadPoolExecutor)设置为默认值。

AbstractEventLoop.set_default_executor(executor)

设置 run_in_executor() 使用的默认执行程序。

18.5.1.14. 处理API时出错

允许定制在事件循环中如何处理异常。

AbstractEventLoop.set_exception_handler(handler)

handler 设置为新的事件循环异常处理程序。

如果 handlerNone,则将设置缺省异常处理程序。

如果 handler 是可调用对象,它应该具有与 (loop, context) 匹配的签名,其中 loop 将是对活动事件循环的引用,context 将是 dict 对象(关于上下文的细节,参见 call_exception_handler() 文档)。

AbstractEventLoop.get_exception_handler()

返回异常处理程序或 None (如果使用默认的处理程序)。

3.5.2 新版功能.

AbstractEventLoop.default_exception_handler(context)

默认异常处理程序。

当异常发生并且没有异常处理程序被设置时,这被调用,并且可以由想要推迟到默认行为的自定义异常处理程序调用。

context 参数与 call_exception_handler() 中的含义相同。

AbstractEventLoop.call_exception_handler(context)

调用当前事件循环异常处理程序。

context 是包含以下键的 dict 对象(以后可能会引入新键):

  • ‘message’:错误消息;

  • ‘exception’(可选):异常对象;

  • ‘future’(可选):asyncio.Future 实例;

  • ‘handle’(可选):asyncio.Handle 实例;

  • ‘protocol’(可选):协议 实例;

  • ‘transport’(可选):运输 实例;

  • ‘socket’(可选):socket.socket 实例。

注解

注意:此方法不应在子类事件循环中重载。对于任何自定义异常处理,请使用 set_exception_handler() 方法。

18.5.1.15. 调试模式

AbstractEventLoop.get_debug()

获取事件循环的调试模式(bool)。

如果环境变量 PYTHONASYNCIODEBUG 设置为非空字符串,则缺省值为 True,否则为 False

3.4.2 新版功能.

AbstractEventLoop.set_debug(enabled: bool)

设置事件循环的调试模式。

3.4.2 新版功能.

参见

异步调试模式

18.5.1.16. 服务器

class asyncio.Server

服务器侦听套接字。

AbstractEventLoop.create_server() 方法和 start_server() 函数创建的对象。不要直接实例化类。

close()

停止服务:关闭侦听套接字并将 sockets 属性设置为 None

表示现有传入客户端连接的套接字保持打开。

服务器异步关闭,使用 wait_closed() 协程等待,直到服务器关闭。

coroutine wait_closed()

等待 close() 方法完成。

这种方法是 协同

sockets

服务器正在侦听的 socket.socket 对象列表,如果服务器关闭,则为 None

18.5.1.17. 处理

class asyncio.Handle

AbstractEventLoop.call_soon()AbstractEventLoop.call_soon_threadsafe()AbstractEventLoop.call_later()AbstractEventLoop.call_at() 返回的回调包装器对象。

cancel()

取消呼叫。如果回调已经取消或执行,则此方法不起作用。

18.5.1.18. 事件循环示例

18.5.1.18.1. Hello World with call_soon()

使用 AbstractEventLoop.call_soon() 方法调度回调的示例。回调显示 "Hello World",然后停止事件循环:

import asyncio

def hello_world(loop):
    print('Hello World')
    loop.stop()

loop = asyncio.get_event_loop()

# Schedule a call to hello_world()
loop.call_soon(hello_world, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

参见

Hello World协程 示例使用 协同

18.5.1.18.2. 显示当前日期与call_later()

回调示例每秒显示当前日期。回调使用 AbstractEventLoop.call_later() 方法在5秒内重新计划自身,然后停止事件循环:

import asyncio
import datetime

def display_date(end_time, loop):
    print(datetime.datetime.now())
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, display_date, end_time, loop)
    else:
        loop.stop()

loop = asyncio.get_event_loop()

# Schedule the first call to display_date()
end_time = loop.time() + 5.0
loop.call_soon(display_date, end_time, loop)

# Blocking call interrupted by loop.stop()
loop.run_forever()
loop.close()

参见

协同程序显示当前日期 示例使用 协同

18.5.1.18.3. 观察读事件的文件描述符

等待文件描述符使用 AbstractEventLoop.add_reader() 方法接收到一些数据,然后关闭事件循环:

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

# Create a pair of connected file descriptors
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()

def reader():
    data = rsock.recv(100)
    print("Received:", data.decode())
    # We are done: unregister the file descriptor
    loop.remove_reader(rsock)
    # Stop the event loop
    loop.stop()

# Register the file descriptor for read event
loop.add_reader(rsock, reader)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

# Run the event loop
loop.run_forever()

# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()

参见

注册一个打开的套接字以使用协议等待数据 示例使用由 AbstractEventLoop.create_connection() 方法创建的低级协议。

注册打开的套接字以使用流等待数据 示例使用由协程中的 open_connection() 函数创建的高级流。

18.5.1.18.4. 设置SIGINT和SIGTERM的信号处理程序

使用 AbstractEventLoop.add_signal_handler() 方法的信号 SIGINTSIGTERM 的寄存器处理程序:

import asyncio
import functools
import os
import signal

def ask_exit(signame):
    print("got signal %s: exit" % signame)
    loop.stop()

loop = asyncio.get_event_loop()
for signame in ('SIGINT', 'SIGTERM'):
    loop.add_signal_handler(getattr(signal, signame),
                            functools.partial(ask_exit, signame))

print("Event loop running forever, press Ctrl+C to interrupt.")
print("pid %s: send SIGINT or SIGTERM to exit." % os.getpid())
try:
    loop.run_forever()
finally:
    loop.close()

此示例仅适用于UNIX。