Skip to main content

18.5.4. 传输和协议(基于回调的API)

18.5.4.1. 运输

传输是由 asyncio 提供的类,以便抽象各种类型的通信信道。你通常不会自己实例化运输;相反,您将调用一个 AbstractEventLoop 方法,该方法将创建传输并尝试启动基础通信通道,当它成功时回叫您。

一旦建立了通信信道,传输就总是与 协议 实例配对。协议然后可以出于各种目的调用传输的方法。

asyncio 当前实现TCP,UDP,SSL和子进程管道的传输。运输工具上可用的方法取决于运输工具的种类。

传输类是 不是线程安全

在 3.6 版更改: 默认情况下,套接字选项 TCP_NODELAY 已设置。

18.5.4.1.1. BaseTransport

class asyncio.BaseTransport

传输的基类。

close(self)

关闭运输。如果传输具有用于传出数据的缓冲区,则缓冲的数据将被异步刷新。不会再收到更多的数据。在所有缓冲数据被刷新之后,协议的 connection_lost() 方法将以 None 作为其参数被调用。

is_closing(self)

如果运输关闭或关闭,请返回 True

3.5.1 新版功能.

get_extra_info(name, default=None)

返回可选的传输信息。 name 是表示要获取的传输特定信息的字符串,default 是如果信息不存在则返回的值。

这种方法允许传输实现容易地暴露通道特定的信息。

set_protocol(protocol)

设置新协议。切换协议只应在两种协议都支持交换机的情况下进行。

3.5.3 新版功能.

get_protocol()

返回当前协议。

3.5.3 新版功能.

在 3.5.1 版更改: 'ssl_object' 信息已添加到SSL套接字。

18.5.4.1.2. ReadTransport

class asyncio.ReadTransport

只读传输接口。

pause_reading()

暂停传输的接收端。在调用 resume_reading() 之前,不会将数据传递到协议 data_received() 方法。

resume_reading()

恢复接收端。如果一些数据可用于读取,则将再次调用协议的 data_received() 方法。

18.5.4.1.3. WriteTransport

class asyncio.WriteTransport

只写传输接口。

abort()

立即关闭运输,无需等待待完成的操作完成。缓冲数据将丢失。不会再收到更多的数据。协议的 connection_lost() 方法最终将以 None 作为其参数被调用。

can_write_eof()

如果传输支持 write_eof(),则返回 True,否则返回 False

get_write_buffer_size()

返回传输所使用的输出缓冲区的当前大小。

get_write_buffer_limits()

获取写入流控制的 highlow 水限制。返回一个元组 (low, high),其中 lowhigh 是正数字节。

使用 set_write_buffer_limits() 设置限制。

3.4.2 新版功能.

set_write_buffer_limits(high=None, low=None)

设置写入流控制的 high-和 low-水限制。

这两个值控制何时调用协议的 pause_writing()resume_writing() 方法。如果指定,低水限制必须小于或等于高水限制。 highlow 都不能为阴性。

默认值是特定于实现的。如果仅给出高水位限制,则低水位限制默认为小于或等于高水位限制的实施特定值。将 high 设置为零会强制 low 也为零,并且每当缓冲区变为非空时,调用 pause_writing()。将 low 设置为零会导致仅在缓冲区为空时调用 resume_writing()。对于任一限制使用零通常是次优的,因为它减少了同时进行I/O和计算的机会。

使用 get_write_buffer_limits() 获取限制。

write(data)

将一些 data 字节写入传输。

这种方法不阻塞;它缓冲数据并安排它异步发送。

writelines(list_of_data)

将数据字节的列表(或任何可迭代的)写入传输。这在功能上等同于对由迭代器产生的每个元素调用 write(),但是可以更有效地实现。

write_eof()

刷新缓冲数据后关闭传输的写入结束。仍可接收数据。

如果传输(例如SSL)不支持半关闭,此方法可以提高 NotImplementedError

18.5.4.1.4. DatagramTransport

DatagramTransport.sendto(data, addr=None)

data 字节发送到由 addr (传输相关的目标地址)给出的远程对等体。如果 addrNone,则将数据发送到在传输创建时给定的目标地址。

这种方法不阻塞;它缓冲数据并安排它异步发送。

DatagramTransport.abort()

立即关闭运输,无需等待待完成的操作完成。缓冲数据将丢失。不会再收到更多的数据。协议的 connection_lost() 方法最终将以 None 作为其参数被调用。

18.5.4.1.5. BaseSubprocessTransport

class asyncio.BaseSubprocessTransport
get_pid()

以整数形式返回子进程进程ID。

get_pipe_transport(fd)

返回对应于整数文件描述符 fd 的通信管道的传输:

  • 0:标准输入(stdin)的可读流传输,如果子流程不是用 stdin=PIPE 创建的,则为 None

  • 1:标准输出(stdout)的可写流传输,如果子流程不是使用 stdout=PIPE 创建的,则为 None

  • 2:标准错误(stderr)的可写流传输,或者如果子过程不是用 stderr=PIPE 创建的 None

  • 其他 fdNone

get_returncode()

如果没有返回,则返回子进程returncode作为整数或 None,类似于 subprocess.Popen.returncode 属性。

kill(self)

杀死子进程,如在 subprocess.Popen.kill() 中。

在POSIX系统上,函数将SIGKILL发送到子进程。在Windows上,此方法是 terminate() 的别名。

send_signal(signal)

signal 号发送到子过程,如 subprocess.Popen.send_signal() 中所示。

terminate()

请求子进程停止,如在 subprocess.Popen.terminate() 中。此方法是 close() 方法的别名。

在POSIX系统上,此方法将SIGTERM发送到子过程。在Windows上,调用Windows API函数TerminateProcess()以停止子过程。

close()

如果子进程还没有返回,并关闭所有管道(stdinstdoutstderr)的传输,则通过调用 terminate() 方法请求子进程停止。

18.5.4.2. 协议

asyncio 提供了可以子类化来实现网络协议的基类。这些类与 运输 (见下文)结合使用:协议解析输入数据并要求写入输出数据,而传输负责实际的I/O和缓冲。

当子类化协议类时,建议您覆盖某些方法。这些方法是回调:它们将由传输器在某些事件(例如当接收到一些数据时)调用;你不应该自己打电话给他们,除非你正在实施一个运输。

注解

所有回调都有默认实现,它们是空的。因此,您只需要为感兴趣的事件实现回调。

18.5.4.2.1. 协议类

class asyncio.Protocol

用于实现流协议(用于与例如TCP和SSL传输一起使用)的基类。

class asyncio.DatagramProtocol

用于实现数据报协议(用于与例如UDP传输一起使用)的基类。

class asyncio.SubprocessProtocol

用于实现与子进程(通过一组单向管道)通信的协议的基类。

18.5.4.2.2. 连接回调

这些回调可以在 ProtocolDatagramProtocolSubprocessProtocol 实例上调用:

BaseProtocol.connection_made(transport)

建立连接时调用。

transport 参数是表示连接的传输。如果你需要,你负责将它存储在某个地方(例如作为一个属性)。

BaseProtocol.connection_lost(exc)

在连接丢失或关闭时调用。

参数是异常对象或 None。后者意味着接收到正常的EOF,或者连接被连接的这一侧中止或关闭。

每次成功连接时,connection_made()connection_lost() 被精确调用一次。所有其他回调将在这两种方法之间被调用,这允许在协议实现中更容易的资源管理。

以下回调只能在 SubprocessProtocol 实例上调用:

SubprocessProtocol.pipe_data_received(fd, data)

当子进程将数据写入其stdout或stderr管道时调用。 fd 是管道的整数文件描述符。 data 是包含数据的非空字节对象。

SubprocessProtocol.pipe_connection_lost(fd, exc)

当与子进程通信的其中一个管道关闭时调用。 fd 是关闭的整数文件描述符。

SubprocessProtocol.process_exited()

当子进程退出时调用。

18.5.4.2.3. 流协议

Protocol 实例上调用以下回调:

Protocol.data_received(data)

当接收到一些数据时调用。 data 是包含传入数据的非空字节对象。

注解

数据是否被缓冲,分块或重组取决于传输。一般来说,你不应该依赖于特定的语义,而是让你的解析通用和足够灵活。但是,始终以正确的顺序接收数据。

Protocol.eof_received()

当另一端发出信号时,它将不再发送任何数据(例如通过调用 write_eof(),如果另一端也使用asyncio)。

此方法可能返回false值(包括 None),在这种情况下,传输将关闭自身。相反,如果此方法返回一个真值,关闭传输由协议决定。由于默认实现返回 None,它隐式地关闭连接。

注解

某些传输(如SSL)不支持半关闭连接,在这种情况下,从此方法返回true将不会阻止关闭连接。

data_received() 在连接期间可以被调用任意次数。然而,eof_received() 最多被调用一次,如果被调用,data_received() 将不会被调用。

状态机:

start - > connection_made() [ - > data_received() *] [ - > eof_received()?] - > connection_lost() - > end

18.5.4.2.4. 数据报协议

DatagramProtocol 实例上调用以下回调。

DatagramProtocol.datagram_received(data, addr)

在接收到数据报时调用。 data 是包含传入数据的字节对象。 addr 是发送数据的对等体的地址;确切的格式取决于传输。

DatagramProtocol.error_received(exc)

在先前的发送或接收操作引发 OSError 时调用。 excOSError 实例。

当传输(例如UDP)检测到数据报不能传递到其接收者时,在罕见的条件下调用该方法。在许多情况下,不可传递的数据报将被静默丢弃。

18.5.4.2.5. 流控制回调

这些回调可以在 ProtocolDatagramProtocolSubprocessProtocol 实例上调用:

BaseProtocol.pause_writing()

当传输缓冲区超过高水位标记时调用。

BaseProtocol.resume_writing()

当传输缓冲区低于低水位标记时调用。

pause_writing()resume_writing() 调用是成对的 - 当缓冲区严格地超过高水位标记时(即使后续写入增加了缓冲区大小),pause_writing() 被调用一次,并且最终当缓冲区大小达到低水位时,resume_writing() 被调用一次,水印。

注解

如果缓冲区大小等于高水位标记,则不会调用 pause_writing() - 它必须严格超过。相反,当缓冲区大小等于或低于低水位标记时,将调用 resume_writing()。这些结束条件对于确保当任何一个标记为零时都如预期的那样重要。

注解

在BSD系统(OS X,FreeBSD等)上,DatagramProtocol 不支持流控制,因为写入太多数据包导致的发送失败不能被轻易检测到。套接字总是出现“就绪”,多余的包被丢弃;具有设置为 errno.ENOBUFS 的errno的 OSError 可以或可以不提出;如果提出,将报告给 DatagramProtocol.error_received(),否则忽略。

18.5.4.2.6. 协同程序和协议

协程可以使用 ensure_future() 在协议方法中调度,但不能保证执行顺序。协议不知道协议方法中创建的协程,因此不会等待它们。

要具有可靠的执行顺序,请在具有 yield from 的协同程序中使用 流对象。例如,StreamWriter.drain() 协程可以用于等待直到写缓冲区被刷新。

18.5.4.3. 协议示例

18.5.4.3.1. TCP回显客户端协议

TCP回显客户端使用 AbstractEventLoop.create_connection() 方法,发送数据并等待连接关闭:

import asyncio

class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()

loop = asyncio.get_event_loop()
message = 'Hello World!'
coro = loop.create_connection(lambda: EchoClientProtocol(message, loop),
                              '127.0.0.1', 8888)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()

事件循环运行两次。在这个简短的例子中,run_until_complete() 方法是优选的,如果服务器不在侦听,则引发异常,而不必编写一个短的协程来处理异常并停止运行循环。在 run_until_complete() 退出时,循环不再运行,因此在发生错误时不需要停止循环。

18.5.4.3.2. TCP回显服务器协议

TCP回显服务器使用 AbstractEventLoop.create_server() 方法,发送回接收到的数据并关闭连接:

import asyncio

class EchoServerClientProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()

loop = asyncio.get_event_loop()
# Each client connection will create a new protocol instance
coro = loop.create_server(EchoServerClientProtocol, '127.0.0.1', 8888)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

Transport.close() 可以在 WriteTransport.write() 后立即调用,即使数据尚未在套接字上发送:两种方法都是异步的。不需要 yield from,因为这些传输方法不是协同程序。

参见

TCP回显服务器使用流 示例使用 asyncio.start_server() 函数。

18.5.4.3.3. UDP回显客户端协议

UDP echo客户端使用 AbstractEventLoop.create_datagram_endpoint() 方法,当我们收到答案时发送数据并关闭传输:

import asyncio

class EchoClientProtocol:
    def __init__(self, message, loop):
        self.message = message
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        loop = asyncio.get_event_loop()
        loop.stop()

loop = asyncio.get_event_loop()
message = "Hello World!"
connect = loop.create_datagram_endpoint(
    lambda: EchoClientProtocol(message, loop),
    remote_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

18.5.4.3.4. UDP回显服务器协议

UDP echo服务器使用 AbstractEventLoop.create_datagram_endpoint() 方法,发送回接收的数据:

import asyncio

class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
print("Starting UDP server")
# One protocol instance will be created to serve all client requests
listen = loop.create_datagram_endpoint(
    EchoServerProtocol, local_addr=('127.0.0.1', 9999))
transport, protocol = loop.run_until_complete(listen)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

transport.close()
loop.close()

18.5.4.3.5. 注册打开的套接字以使用协议等待数据

等待套接字使用带有协议的 AbstractEventLoop.create_connection() 方法接收数据,然后关闭事件循环

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

# Create a pair of connected sockets
rsock, wsock = socketpair()
loop = asyncio.get_event_loop()

class MyProtocol(asyncio.Protocol):
    transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport (it will call connection_lost())
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed, stop the event loop
        loop.stop()

# Register the socket to wait for data
connect_coro = loop.create_connection(MyProtocol, sock=rsock)
transport, protocol = loop.run_until_complete(connect_coro)

# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())

# Run the event loop
loop.run_forever()

# We are done, close sockets and the event loop
rsock.close()
wsock.close()
loop.close()

参见

观察读事件的文件描述符 示例使用低级 AbstractEventLoop.add_reader() 方法注册套接字的文件描述符。

注册打开的套接字以使用流等待数据 示例使用由协程中的 open_connection() 函数创建的高级流。