Skip to main content

17.4. concurrent.futures —启动并行任务

3.2 新版功能.

源代码: Lib/concurrent/futures/thread.pyLib/concurrent/futures/process.py


concurrent.futures 模块提供了一个用于异步执行可调用对象的高级接口。

异步执行可以使用线程,使用 ThreadPoolExecutor 或单独的进程,使用 ProcessPoolExecutor 执行。两者都实现相同的接口,这是由抽象 Executor 类定义的。

17.4.1. 执行器对象

class concurrent.futures.Executor

一个抽象类,提供了异步执行调用的方法。它不应该直接使用,而是通过其具体的子类。

submit(fn, *args, **kwargs)

将可调用的 fn 计划为作为 fn(*args **kwargs) 执行,并返回表示可调用的执行的 Future 对象。

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(func, *iterables, timeout=None, chunksize=1)

等同于 map(func, *iterables),除非 func 是异步执行的,并且可以同时进行对 func 的多个调用。如果调用 __next__(),则返回的迭代器提出 concurrent.futures.TimeoutError,并且在从 Executor.map() 的原始调用起的 timeout 秒之后结果不可用。 timeout 可以是int或float。如果未指定 timeoutNone,则等待时间没有限制。如果调用引发异常,那么当从迭代器检索其值时,将引发异常。当使用 ProcessPoolExecutor 时,此方法将 iterables 剁成多个块,它作为单独的任务提交到池。这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。对于非常长的迭代,与默认大小1相比,使用大值 chunksize 可以显着提高性能。使用 ThreadPoolExecutorchunksize 没有效果。

在 3.5 版更改: 添加了 chunksize 参数。

shutdown(wait=True)

告诉执行者它应该释放它正在使用的任何资源,当目前待完成的期货完成执行。在关闭后对 Executor.submit()Executor.map() 的调用将提高 RuntimeError

如果 waitTrue,则此方法将不会返回,直到所有未决期货完成执行并且与执行器相关联的资源已被释放。如果 waitFalse,那么这个方法将立即返回,并且当执行完所有挂起的期货时,与执行器相关联的资源将被释放。无论 wait 的值如何,整个Python程序将不会退出,直到所有待处理的期货完成执行。

如果使用 with 语句,可以避免必须显式调用此方法,这将关闭 Executor (等待 Executor.shutdown() 被调用,wait 设置为 True):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

17.4.2. ThreadPoolExecutor

ThreadPoolExecutor 是一个 Executor 子类,它使用一个线程池来异步执行调用。

当与 Future 相关联的可调用等待另一个 Future 的结果时,可能发生死锁。例如:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

和:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')

一个 Executor 子类,它使用一个最多 max_workers 线程的池来异步地执行调用。

在 3.5 版更改: 如果 max_workersNone 或未给出,它将默认为机器上的处理器数乘以 5,假设 ThreadPoolExecutor 经常用于重叠I/O而不是CPU工作,并且工作程序的数量应该高于数量的 ProcessPoolExecutor 工人。

3.6 新版功能: 添加了 thread_name_prefix 参数,以允许用户控制由池创建的工作线程的threading.Thread名称,以便于调试。

17.4.2.1. ThreadPoolExecutor示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

17.4.3. ProcessPoolExecutor

ProcessPoolExecutor 类是一个 Executor 子类,它使用进程池来异步执行调用。 ProcessPoolExecutor 使用 multiprocessing 模块,它允许它侧面 Global Interpreter Lock,但也意味着只有可拾取对象可以被执行和返回。

__main__ 模块必须可以由worker子进程导入。这意味着 ProcessPoolExecutor 将不在交互式解释器中工作。

从提交到 ProcessPoolExecutor 的可调用方调用 ExecutorFuture 方法将导致死锁。

class concurrent.futures.ProcessPoolExecutor(max_workers=None)

使用最多 max_workers 进程的池异步执行调用的 Executor 子类。如果 max_workersNone 或未给出,它将默认为机器上的处理器数。如果 max_workers 低于或等于 0,则将产生 ValueError

在 3.3 版更改: 当其中一个工作进程突然终止时,现在会出现 BrokenProcessPool 错误。以前,行为是未定义的,但对执行器或其未来的操作通常会冻结或死锁。

17.4.3.1. ProcessPoolExecutor示例

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

17.4.4. 未来的对象

Future 类封装了一个可调用的异步执行。 Future 实例由 Executor.submit() 创建。

class concurrent.futures.Future

封装一个可调用的异步执行。 Future 实例由 Executor.submit() 创建,除了测试之外不应直接创建。

cancel()

尝试取消呼叫。如果呼叫当前正在执行并且不能被取消,则该方法将返回 False,否则呼叫将被取消,并且该方法将返回 True

cancelled()

如果呼叫成功取消,则返回 True

running()

如果呼叫当前正在执行并且无法取消,则返回 True

done()

如果呼叫成功取消或完成运行,则返回 True

result(timeout=None)

返回调用返回的值。如果呼叫尚未完成,则此方法将等待到 timeout 秒。如果呼叫在 timeout 秒内没有完成,则将产生 concurrent.futures.TimeoutErrortimeout 可以是int或float。如果未指定 timeoutNone,则等待时间没有限制。

如果未来在完成之前被取消,则 CancelledError 将被提出。

如果呼叫提出,这种方法将引发相同的异常。

exception(timeout=None)

返回调用引发的异常。如果呼叫尚未完成,则此方法将等待到 timeout 秒。如果呼叫在 timeout 秒内没有完成,则将产生 concurrent.futures.TimeoutErrortimeout 可以是int或float。如果未指定 timeoutNone,则等待时间没有限制。

如果未来在完成之前被取消,则 CancelledError 将被提出。

如果呼叫在未提高的情况下完成,则返回 None

add_done_callback(fn)

将可调用 fn 附加到未来。 fn 将被调用,将来作为其唯一的参数,当未来被取消或完成运行时。

添加的callables按它们被添加的顺序被调用,并且总是在属于添加它们的进程的线程中被调用。如果可调用引发了 Exception 子类,它将被记录并被忽略。如果可调用引发了 BaseException 子类,则该行为是未定义的。

如果未来已经完成或被取消,fn 将立即被调用。

以下 Future 方法用于单元测试和 Executor 实现。

set_running_or_notify_cancel()

在执行与 Future 相关的工作和单元测试之前,该方法应该仅由 Executor 实现调用。

如果该方法返回 False,则 Future 被取消,即 Future.cancel() 被调用并返回 True。等待 Future 完成(即通过 as_completed()wait())的任何线程将被唤醒。

如果该方法返回 True,则 Future 未被取消并且已经被置于运行状态,即对 Future.running() 的调用将返回 True

此方法只能调用一次,并且在调用 Future.set_result()Future.set_exception() 后无法调用。

set_result(result)

将与 Future 关联的工作的结果设置为 result

此方法应仅由 Executor 实现和单元测试使用。

set_exception(exception)

将与 Future 关联的工作的结果设置为 Exception exception

此方法应仅由 Executor 实现和单元测试使用。

17.4.5. 模块功能

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

等待由 fs 给出的 Future 实例(可能由不同的 Executor 实例创建)完成。返回一个命名的2元组的集合。第一个集合,名为 done,包含在等待完成之前完成(完成或被取消)的期货。第二个集合,名为 not_done,包含未完成的期货。

timeout 可用于控制返回前等待的最大秒数。 timeout 可以是一个int或float。如果未指定 timeoutNone,则等待时间没有限制。

return_when 指示此函数何时返回。它必须是以下常量之一:

不变

描述

FIRST_COMPLETED

当任何未来完成或被取消时,该函数将返回。

FIRST_EXCEPTION

当任何未来通过提出异常完成时,函数将返回。如果没有未来引发异常,那么它等同于 ALL_COMPLETED

ALL_COMPLETED

当所有期货完成或被取消时,函数将返回。

concurrent.futures.as_completed(fs, timeout=None)

返回一个由 fs 给出的 Future 实例(可能由不同的 Executor 实例创建)中的迭代器,它在完成(完成或被取消)时产生期货。 fs 给出的任何重复的期货将被退回一次。任何在 as_completed() 之前完成的期货将被首先收回。如果调用 __next__() 并且结果在从原始调用 as_completed()timeout 秒后不可用,则返回的迭代器产生 concurrent.futures.TimeoutErrortimeout 可以是int或float。如果未指定 timeoutNone,则等待时间没有限制。

参见

PEP 3148 - futures - 异步执行计算

描述此功能以包含在Python标准库中的提案。

17.4.6. 异常类

exception concurrent.futures.CancelledError

在未来取消时引发。

exception concurrent.futures.TimeoutError

在未来操作超过给定超时时触发。

exception concurrent.futures.process.BrokenProcessPool

RuntimeError 派生,当 ProcessPoolExecutor 的一个工人以非干净方式终止(例如,如果它从外部被杀死)时,引发此异常类。

3.3 新版功能.