Skip to main content

18.5.5. 流(基于协同的API)

18.5.5.1. 流函数

注解

本模块中的顶层函数仅作为方便包装器;有真正没有什么特别的,如果他们不做你想要的,随意复制他们的代码。

coroutine asyncio.open_connection(host=None, port=None, *, loop=None, limit=None, **kwds)

create_connection() 的包装器返回(读取器,写入器)对。

读者返回的是 StreamReader 实例;作者是 StreamWriter 实例。

这些参数是 AbstractEventLoop.create_connection() 的所有常见参数,除了 protocol_factory;最常见的是位置主机和端口,各种可选的关键字参数如下。

附加的可选关键字参数是 loop (设置要使用的事件循环实例)和 limit (设置传递给 StreamReader 的缓冲区限制)。

此功能是 协同

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, loop=None, limit=None, **kwds)

启动套接字服务器,为每个连接的客户端回调一个。返回值与 create_server() 相同。

使用两个参数来调用 client_connected_cb 参数:client_readerclient_writerclient_readerStreamReader 对象,而 client_writerStreamWriter 对象。 client_connected_cb 参数可以是普通回调函数或 协同功能;如果是协程函数,它将被自动转换为 Task

其余的参数都是 create_server() 的常见参数,除了 protocol_factory;最常见的是位置 hostport,各种可选的关键字参数如下。

附加的可选关键字参数是 loop (设置要使用的事件循环实例)和 limit (设置传递给 StreamReader 的缓冲区限制)。

此功能是 协同

coroutine asyncio.open_unix_connection(path=None, *, loop=None, limit=None, **kwds)

create_unix_connection() 的包装器返回(读取器,写入器)对。

有关返回值和其他详细信息,请参阅 open_connection()

此功能是 协同

可用性:UNIX。

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, loop=None, limit=None, **kwds)

启动UNIX Domain Socket服务器,为每个连接的客户端回调一个。

有关返回值和其他详细信息,请参阅 start_server()

此功能是 协同

可用性:UNIX。

18.5.5.2. StreamReader

class asyncio.StreamReader(limit=None, loop=None)

这个类是 不是线程安全

exception()

获取异常。

feed_eof()

确认EOF。

feed_data(data)

在内部缓冲区中馈送 data 字节。将恢复等待数据的任何操作。

set_exception(exc)

设置异常。

set_transport(transport)

设置传输。

coroutine read(n=-1)

读取到 n 字节。如果未提供 n 或设置为 -1,请读取直到EOF并返回所有读取的字节。

如果接收到EOF并且内部缓冲区为空,则返回一个空的 bytes 对象。

这种方法是 协同

coroutine readline()

读取一行,其中“line”是以 \n 结尾的字节序列。

如果接收到EOF,并且没有找到 \n,则该方法将返回部分读取字节。

如果接收到EOF并且内部缓冲区为空,则返回一个空的 bytes 对象。

这种方法是 协同

coroutine readexactly(n)

正确读取 n 字节。如果在读取 n 之前达到流的结尾,则提升 IncompleteReadError,异常的 IncompleteReadError.partial 属性包含部分读取字节。

这种方法是 协同

coroutine readuntil(separator=b'\n')

从流中读取数据,直到找到 separator

成功时,数据和分隔符将从内部缓冲区(消耗)中删除。返回的数据将包括末尾的分隔符。

配置流限制用于检查结果。 Limit设置可以返回的数据的最大长度,不计算分隔符。

如果发生EOF并且仍未找到完整的分隔符,则会引发 IncompleteReadError 异常,并且内部缓冲区将被重置。 IncompleteReadError.partial 属性可以部分地包含分隔符。

如果由于超限而无法读取数据,则会引发 LimitOverrunError 异常,并且数据将保留在内部缓冲区中,因此可以再次读取。

3.5.2 新版功能.

at_eof()

如果缓冲区为空并且调用 feed_eof(),则返回 True

18.5.5.3. StreamWriter

class asyncio.StreamWriter(transport, protocol, reader, loop)

包装运输。

这暴露了 write()writelines()can_write_eof()write_eof()get_extra_info()close()。它添加 drain(),它返回一个可选的 Future,您可以在其上等待流控制。它还添加了直接引用 Transport 的传输属性。

这个类是 不是线程安全

transport

运输。

can_write_eof()

如果传输支持 write_eof(),则返回 True,否则返回 False。见 WriteTransport.can_write_eof()

close()

关闭运输:见 BaseTransport.close()

coroutine drain()

让底层传输的写缓冲区有机会被刷新。

目的用途是写:

w.write(data)
yield from w.drain()

当传输缓冲区的大小达到高水限值(协议暂停)时,阻塞直到缓冲区的大小下降到低水限值,并且协议恢复。当没有什么要等待,收益率继续立即。

drain() 产生给出循环调度写操作和刷新缓冲器的机会。尤其应当在可能大量的数据写入传输时使用,并且协同程序不会从调用 write() 之间传递。

这种方法是 协同

get_extra_info(name, default=None)

返回可选的运输信息:请参阅 BaseTransport.get_extra_info()

write(data)

将一些 data 字节写入传输:参见 WriteTransport.write()

writelines(data)

将数据字节的列表(或任何可迭代的)写入传输:请参阅 WriteTransport.writelines()

write_eof()

刷新缓冲数据后关闭传输的写入结束:请参阅 WriteTransport.write_eof()

18.5.5.4. StreamReaderProtocol

class asyncio.StreamReaderProtocol(stream_reader, client_connected_cb=None, loop=None)

琐碎辅助类适应 ProtocolStreamReaderProtocol 的子类。

stream_readerStreamReader 实例,client_connected_cb 是在进行连接时用(stream_reader,stream_writer)调用的可选函数,loop 是要使用的事件循环实例。

(这是一个辅助类,而不是使 StreamReader 本身是一个 Protocol 子类,因为 StreamReader 有其他潜在的用途,并防止 StreamReader 的用户意外调用协议的不当方法。)

18.5.5.5. IncompleteReadError

exception asyncio.IncompleteReadError

不完全的读错误,EOFError 的子类。

expected

预期字节数(int)。

partial

在达到流结束之前读取字节字符串(bytes)。

18.5.5.6. LimitOverrunError

exception asyncio.LimitOverrunError

在查找分隔符时达到缓冲区限制。

consumed

要消耗的字节数。

18.5.5.7. 流示例

18.5.5.7.1. TCP回显客户端使用流

TCP回显客户端使用 asyncio.open_connection() 功能:

import asyncio

@asyncio.coroutine
def tcp_echo_client(message, loop):
    reader, writer = yield from asyncio.open_connection('127.0.0.1', 8888,
                                                        loop=loop)

    print('Send: %r' % message)
    writer.write(message.encode())

    data = yield from reader.read(100)
    print('Received: %r' % data.decode())

    print('Close the socket')
    writer.close()

message = 'Hello World!'
loop = asyncio.get_event_loop()
loop.run_until_complete(tcp_echo_client(message, loop))
loop.close()

18.5.5.7.2. TCP回显服务器使用流

TCP回显服务器使用 asyncio.start_server() 功能:

import asyncio

@asyncio.coroutine
def handle_echo(reader, writer):
    data = yield from reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))

    print("Send: %r" % message)
    writer.write(data)
    yield from writer.drain()

    print("Close the client socket")
    writer.close()

loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

18.5.5.7.3. 获取HTTP标头

简单示例查询在命令行上传递的URL的HTTP标头:

import asyncio
import urllib.parse
import sys

@asyncio.coroutine
def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        connect = asyncio.open_connection(url.hostname, 443, ssl=True)
    else:
        connect = asyncio.open_connection(url.hostname, 80)
    reader, writer = yield from connect
    query = ('HEAD {path} HTTP/1.0\r\n'
             'Host: {hostname}\r\n'
             '\r\n').format(path=url.path or '/', hostname=url.hostname)
    writer.write(query.encode('latin-1'))
    while True:
        line = yield from reader.readline()
        if not line:
            break
        line = line.decode('latin1').rstrip()
        if line:
            print('HTTP header> %s' % line)

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(print_http_headers(url))
loop.run_until_complete(task)
loop.close()

用法:

python example.py http://example.com/path/page.html

或使用HTTPS:

python example.py https://example.com/path/page.html

18.5.5.7.4. 注册打开的套接字以使用流等待数据

Coroutine等待,直到套接字使用 open_connection() 函数接收数据:

import asyncio
try:
    from socket import socketpair
except ImportError:
    from asyncio.windows_utils import socketpair

@asyncio.coroutine
def wait_for_data(loop):
    # Create a pair of connected sockets
    rsock, wsock = socketpair()

    # Register the open socket to wait for data
    reader, writer = yield from asyncio.open_connection(sock=rsock, loop=loop)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = yield from reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()

    # Close the second socket
    wsock.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(wait_for_data(loop))
loop.close()

参见

注册一个打开的套接字以使用协议等待数据 示例使用由 AbstractEventLoop.create_connection() 方法创建的低级协议。

观察读事件的文件描述符 示例使用低级 AbstractEventLoop.add_reader() 方法注册套接字的文件描述符。