Skip to main content

17.2. multiprocessing —基于过程的并行性

源代码: Lib/multiprocessing/


17.2.1. 介绍

multiprocessing 是一个包,它支持使用类似于 threading 模块的API来生成进程。 multiprocessing 包提供了本地和远程并发,通过使用子进程而不是线程有效地对 Global Interpreter Lock 进行侧移。因此,multiprocessing 模块允许编程人员充分利用给定机器上的多个处理器。它可以在Unix和Windows上运行。

multiprocessing 模块还引入了在 threading 模块中没有模拟的API。其中一个主要的例子是 Pool 对象,它提供了一种方便的方法来并行化多个输入值的函数执行,跨进程分布输入数据(数据并行性)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本示例使用 Pool

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

将打印到标准输出

[1, 4, 9]

17.2.1.1. Process

multiprocessing 中,通过创建 Process 对象然后调用其 start() 方法来生成进程。 Process 遵循 threading.Thread 的API。多进程程序的一个简单例子是

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

要显示涉及的各个进程ID,这里是一个扩展示例:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

有关为什么需要 if __name__ == '__main__' 部分的说明,请参见 编程指南

17.2.1.2. 上下文和启动方法

根据平台,multiprocessing 支持三种方式启动进程。这些 启动方法

spawn

父进程启动一个新的python解释器过程。子进程将只继承运行过程对象 run() 方法所需的那些资源。特别地,来自父进程的不必要的文件描述符和句柄将不会被继承。与使用 forkforkserver 相比,使用该方法开始过程相当慢。

在Unix和Windows上可用。 Windows上的默认值。

fork

父进程使用 os.fork() 来分叉Python解释器。子进程在开始时,与父进程有效地相同。父进程的所有资源都由子进程继承。请注意,安全地分支多线程进程是有问题的。

仅在Unix上可用。 Unix上的默认值。

forkserver

当程序启动并选择 forkserver 启动方法时,启动服务器进程。从那时起,每当需要一个新进程时,父进程连接到服务器并请求它分叉一个新进程。 fork服务器进程是单线程的,因此它使用 os.fork() 是安全的。没有不必要的资源被继承。

在Unix平台上可用,它支持通过Unix管道传递文件描述符。

在 3.4 版更改: spawn 在所有unix平台上添加,forkserver 为某些unix平台添加。子进程在Windows上不再继承所有父级可继承句柄。

在Unix上使用 spawnforkserver 启动方法还将启动一个 信号量跟踪器 进程,该进程跟踪由程序进程创建的未链接的命名信号量。当所有进程退出后,信号量跟踪器取消链接任何剩余的信号量。通常应该没有,但如果一个过程被一个信号杀死,可能有一些“泄漏”的信号量。 (取消链接命名的信号量是一个严重的问题,因为系统只允许有限的数量,它们将不会自动取消链接,直到下次重新启动。

要选择启动方法,请在主模块的 if __name__ == '__main__' 子句中使用 set_start_method()。例如:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() 不应在程序中使用多次。

或者,您可以使用 get_context() 获取上下文对象。上下文对象具有与多处理模块相同的API,并允许在同一程序中使用多个启动方法。

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

注意,与一个上下文相关的对象可能与用于不同上下文的进程不兼容。特别地,使用 fork 上下文创建的锁不能传递到使用 spawnforkserver 启动方法启动的进程。

想要使用特定启动方法的库应该使用 get_context(),以避免干扰库用户的选择。

17.2.1.3. 在进程之间交换对象

multiprocessing 支持进程之间的两种类型的通信通道:

队列

Queue 类是 queue.Queue 的近似克隆。例如:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

队列是线程和过程安全的。

管道

Pipe() 函数返回由管道连接的一对连接对象,默认情况下是双向(双向)。例如:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Pipe() 返回的两个连接对象表示管道的两端。每个连接对象都有 send()recv() 方法(等等)。请注意,如果两个进程(或线程)尝试同时从管道的 same 末端读取或写入,则管道中的数据可能会损坏。当然,没有同时使用不同管道末端的进程的腐败风险。

17.2.1.4. 进程之间的同步

multiprocessing 包含来自 threading 的所有同步原语的等同物。例如,可以使用锁定来确保一次只有一个进程打印到标准输出:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

不使用锁定输出从不同的进程很容易得到所有混合。

17.2.1.5. 进程之间的共享状态

如上所述,当进行并发编程时,通常最好避免使用尽可能共享的状态。在使用多个进程时尤其如此。

然而,如果你真的需要使用一些共享数据,那么 multiprocessing 提供了这样做的几种方法。

共享内存

数据可以使用 ValueArray 存储在共享存储器映射中。例如,下面的代码

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

将打印

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

在创建 numarr 时使用的 'd''i' 参数是 array 模块使用的类型类型:'d' 表示双精度浮点数,'i' 表示有符号整数。这些共享对象将是进程和线程安全的。

为了更灵活地使用共享内存,可以使用 multiprocessing.sharedctypes 模块,它支持创建从共享内存分配的任意ctypes对象。

服务器进程

Manager() 返回的管理器对象控制一个服务器进程,该进程保存Python对象,并允许其他进程使用代理来操作它们。

Manager() 返回的管理器将支持类型 listdictNamespaceLockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValueArray。例如,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

将打印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以由网络上不同计算机上的进程共享。但是,它们比使用共享内存慢。

17.2.1.6. 使用工人池

Pool 类表示工作进程池。它具有允许以几种不同的方式将任务卸载到工作进程的方法。

例如:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

请注意,池的方法只应该被创建它的进程使用。

注解

此包中的功能要求 __main__ 模块可由孩子导入。这是在 编程指南 涵盖,但值得在这里指出。这意味着一些例子,如 multiprocessing.pool.Pool 示例将不能在交互式解释器中工作。例如:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(如果你尝试这样,它将实际输出以半随机方式交织的三个完整的追踪,然后你可能不得不停止主进程。)

17.2.2. 参考

multiprocessing 包大多复制 threading 模块的API。

17.2.2.1. Process 和异常

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

进程对象表示在单独进程中运行的活动。 Process 类具有 threading.Thread 的所有方法的等同物。

应始终使用关键字参数调用构造函数。 group 应始终为 None;它仅存在于与 threading.Thread 的兼容性。 target 是要由 run() 方法调用的可调用对象。它默认为 None,意味着什么也不调用。 name 是进程名称(有关详细信息,请参见 name)。 args 是目标调用的参数元组。 kwargs 是目标调用的关键字参数的字典。如果提供,则仅关键字 daemon 参数将进程 daemon 标志设置为 TrueFalse。如果 None (默认),这个标志将继承自创建过程。

默认情况下,没有参数传递给 target

如果子类覆盖了构造函数,它必须确保它在对进程做任何其他事情之前调用基类构造函数(Process.__init__())。

在 3.3 版更改: 添加了 daemon 参数。

run()

表示进程活动的方法。

您可以在子类中覆盖此方法。标准 run() 方法调用传递给对象的构造函数的可调用对象作为目标参数(如果有的话),其中顺序和关键字参数分别取自 argskwargs 参数。

start()

启动进程的活动。

每个进程对象最多必须调用一次。它安排对象的 run() 方法在单独的进程中被调用。

join([timeout])

如果可选参数 timeoutNone (缺省值),则该方法将阻塞,直到调用 join() 方法的进程终止。如果 timeout 为正数,则它最多阻塞 timeout 秒。注意,如果方法终止或方法超时,该方法返回 None。检查进程的 exitcode 以确定它是否终止。

一个过程可以连接多次。

进程不能加入自身,因为这将导致死锁。尝试在进程启动之前加入进程是一个错误。

name

进程的名称。名称是仅用于识别目的的字符串。它没有语义。可以给多个进程指定相同的名称。

初始名称由构造函数设置。如果没有为构造函数提供显式名称,则构造形式为“Process-N 1:N 2:...:N k ”的名称,其中每个N k 是其第N个子父母。

is_alive()

返回进程是否存活。

粗略地说,从 start() 方法返回的时刻直到子进程终止的过程对象是活的。

daemon

进程的守护进程标志,一个布尔值。这必须在调用 start() 之前设置。

初始值从创建过程继承。

当进程退出时,它会尝试终止所有的daemonic子进程。

注意,不允许daemonic进程创建子进程。否则,一个守护进程会使其子进程成为孤立的,如果它的父进程退出时终止。此外,这些是 Unix守护进程或服务,它们是正常进程,如果非守护进程已退出,它们将被终止(而不是加入)。

除了 threading.Thread API之外,Process 对象还支持以下属性和方法:

pid

返回进程ID。在进程诞生之前,这将是 None

exitcode

孩子的退出代码。如果进程尚未终止,这将是 None。负值 -N 表示孩子被信号 N 终止。

authkey

进程的认证密钥(字节字符串)。

multiprocessing 初始化时,主进程使用 os.urandom() 分配一个随机字符串。

Process 对象被创建时,它将继承其父进程的认证密钥,虽然这可以通过将 authkey 设置为另一个字节字符串来改变。

验证密钥

sentinel

系统对象的数字句柄,在过程结束时将变为“就绪”。

如果要使用 multiprocessing.connection.wait() 一次等待几个事件,可以使用此值。否则调用 join() 比较简单。

在Windows上,这是可用于 WaitForSingleObjectWaitForMultipleObjects 系列API调用的操作系统句柄。在Unix上,这是一个文件描述符,可与 select 模块的原语一起使用。

3.3 新版功能.

terminate()

终止进程。在Unix上,这是使用 SIGTERM 信号完成的;在Windows上使用 TerminateProcess()。注意,退出处理程序和finally子句等不会被执行。

注意,进程的后代进程将终止 not - 它们将只是成为孤立的。

警告

如果在相关联的进程正在使用管道或队列时使用此方法,则管道或队列容易被损坏,并且可能变得不能由其他进程使用。类似地,如果进程已经获得锁或信号量等,则终止它可能导致其他进程死锁。

请注意,start()join()is_alive()terminate()exitcode 方法只应由创建过程对象的进程调用。

Process 的一些方法的示例用法:

>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process(Process-1, initial)> False
>>> p.start()
>>> print(p, p.is_alive())
<Process(Process-1, started)> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process(Process-1, stopped[SIGTERM])> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

所有 multiprocessing 异常的基类。

exception multiprocessing.BufferTooShort

当所提供的缓冲区对象对于消息读取而言过小时,Connection.recv_bytes_into() 引发的异常。

如果 eBufferTooShort 的实例,则 e.args[0] 将消息作为字节字符串。

exception multiprocessing.AuthenticationError

在出现身份验证错误时触发。

exception multiprocessing.TimeoutError

在超时到期时由超时引发的方法引发。

17.2.2.2. 管道和队列

当使用多个进程时,通常使用消息传递来用于进程之间的通信,并避免必须使用任何同步原语,如锁。

对于传递消息,可以使用 Pipe() (用于两个进程之间的连接)或队列(允许多个生产者和消费者)。

QueueSimpleQueueJoinableQueue 类型是在标准库中的 queue.Queue 类建模的多生产者,多消费者 FIFO(先进先出) 队列。它们的区别在于 Queue 缺少在Python 2.5的 queue.Queue 类中引入的 task_done()join() 方法。

如果你使用 JoinableQueue,那么你 必须 调用 JoinableQueue.task_done() 从队列中删除每个任务,否则用于计数未完成任务的数量的信号量可能最终溢出,引发异常。

注意,也可以通过使用管理器对象创建共享队列 - 请参阅 经理

注解

multiprocessing 使用通常的 queue.Emptyqueue.Full 异常来发出超时。它们在 multiprocessing 命名空间中不可用,因此您需要从 queue 导入它们。

注解

当一个对象放在一个队列上时,该对象被腌制,后台线程随后将经过腌制的数据清洗到底层管道。这有一些后果是有点令人惊讶,但不应该导致任何实际困难 - 如果他们真的打扰你,那么你可以改为使用一个 经理 创建的队列。

  1. 将对象放在空队列上之后,在队列的 empty() 方法返回 False 之前可能存在无穷小的延迟,并且 get_nowait() 可以在不提高 queue.Empty 的情况下返回。

  2. 如果多个进程将对象排队,则可能在另一端无序地接收对象。然而,由相同进程排队的对象将始终以相对于彼此的期望顺序。

警告

如果在尝试使用 Queue 时使用 Process.terminate()os.kill() 终止进程,则队列中的数据可能会被损坏。这可能导致任何其他进程在尝试稍后使用队列时获得异常。

警告

如上所述,如果子进程将项目放在队列上(并且它没有使用 JoinableQueue.cancel_join_thread),那么该进程将不会终止,直到所有缓冲的项目都被刷新到管道。

这意味着,如果您尝试加入该过程,您可能会遇到死锁,除非您确定已经放入队列的所有项目已被占用。类似地,如果子进程是非守护进程,则父进程在试图加入其所有非守护进程的子进程时可能在退出时挂起。

请注意,使用管理器创建的队列没有此问题。见 编程指南

有关进程间通信的队列使用示例,请参阅 例子

multiprocessing.Pipe([duplex])

返回表示管道末端的 Connection 对象的对 (conn1, conn2)

如果 duplexTrue (默认值),则管道是双向的。如果 duplexFalse,则管道是单向的:conn1 只能用于接收消息,conn2 只能用于发送消息。

class multiprocessing.Queue([maxsize])

返回使用管道和几个锁/信号量实现的进程共享队列。当进程首先将项目放在队列上时,启动了将对象从缓冲区传送到管道中的馈线线程。

来自标准库的 queue 模块的通常的 queue.Emptyqueue.Full 异常被引发用于信号超时。

Queue 实现 queue.Queue 的所有方法,除了 task_done()join()

qsize()

返回队列的大致大小。由于多线程/多进程语义,这个数字是不可靠的。

请注意,这可能会在Unix平台上引发 NotImplementedError,例如未实现 sem_getvalue() 的Mac OS X。

empty()

如果队列为空,返回 True,否则返回 False。由于多线程/多进程语义,这是不可靠的。

full()

如果队列已满,返回 True,否则返回 False。由于多线程/多进程语义,这是不可靠的。

put(obj[, block[, timeout]])

将obj放入队列。如果可选参数 blockTrue (默认值),timeoutNone (默认值),则必要时阻塞,直到空闲插槽可用。如果 timeout 是正数,则它最多阻塞 timeout 秒,并且如果在该时间内没有空闲时隙,则提高 queue.Full 异常。否则(blockFalse),如果空闲时隙立即可用,则将一个项目放在队列上,否则提高 queue.Full 异常(在这种情况下忽略 timeout)。

put_nowait(obj)

相当于 put(obj, False)

get([block[, timeout]])

从队列中删除并返回项目。如果可选的参数 blockTrue (默认值),timeoutNone (默认值),如有必要,直到项目可用为止。如果 timeout 是正数,则它最多阻塞 timeout 秒,并且如果在该时间内没有项可用,则提高 queue.Empty 异常。否则(块是 False),返回一个项,如果一个立即可用,否则提出 queue.Empty 异常(在这种情况下,timeout 被忽略)。

get_nowait()

相当于 get(False)

multiprocessing.Queue 有一些在 queue.Queue 中找不到的附加方法。这些方法通常不需要大多数代码:

close()

指示当前进程不会在此队列上放置更多数据。一旦它将所有缓冲的数据刷新到管道,后台线程将退出。当队列被垃圾回收时,这被自动调用。

join_thread()

加入后台线程。这只能在调用 close() 后使用。它阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道。

默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。该过程可以调用 cancel_join_thread() 使 join_thread() 什么也不做。

cancel_join_thread()

防止 join_thread() 阻塞。特别地,这防止后台线程在进程退出时自动连接 - 参见 join_thread()

此方法的更好的名称可能是 allow_exit_without_flush()。它可能导致入队的数据丢失,你几乎肯定不会需要使用它。它只是在那里,如果你需要当前的进程立即退出,而不等待刷新排队的数据到底层的管道,你不在乎丢失的数据。

注解

此类的功能需要在主机操作系统上运行共享信号量实现。没有一个,这个类中的功能将被禁用,并且尝试实例化 Queue 将导致 ImportError。有关其他信息,请参阅 issue 3770。这同样适用于下面列出的任何专门的队列类型。

class multiprocessing.SimpleQueue

它是一种简化的 Queue 类型,非常接近锁定的 Pipe

empty()

如果队列为空,返回 True,否则返回 False

get()

从队列中删除并返回项目。

put(item)

item 放入队列。

class multiprocessing.JoinableQueue([maxsize])

JoinableQueueQueue 子类,是另外具有 task_done()join() 方法的队列。

task_done()

指示以前入队的任务已完成。由队列使用者使用。对于用于获取任务的每个 get(),对 task_done() 的后续调用告诉队列任务上的处理完成。

如果 join() 当前阻塞,则当所有项目都被处理时(意味着对于已经被 put() 进入队列的每个项目接收到 task_done() 呼叫),它将恢复。

如果调用的次数比在队列中放置的项目多,则引发 ValueError

join()

阻塞,直到队列中的所有项目都被获取和处理。

每当项目添加到队列时,未完成任务的计数就会增加。当消费者调用 task_done() 以指示项目已检索并且其上的所有工作都完成时,计数下降。当未完成任务的计数下降到零时,join() 解除阻塞。

17.2.2.3. 杂

multiprocessing.active_children()

返回当前进程的所有活的孩子的列表。

调用这会产生“加入”已经完成的任何进程的副作用。

multiprocessing.cpu_count()

返回系统中的CPU数。

此数字不等于当前进程可以使用的CPU数。可以使用 len(os.sched_getaffinity(0)) 获得可用的CPU数

可能提出 NotImplementedError

multiprocessing.current_process()

返回与当前进程对应的 Process 对象。

threading.current_thread() 的类似物。

multiprocessing.freeze_support()

添加对使用 multiprocessing 的程序已冻结以生成Windows可执行文件的支持。 (已经用 py2exePyInstallercx_Freeze 测试。)

需要在主模块的 if __name__ == '__main__' 线之后直接调用此函数。例如:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

如果省略 freeze_support() 行,则试图运行冻结的可执行文件将提高 RuntimeError

在Windows以外的任何操作系统上调用 freeze_support() 时,调用 freeze_support() 无效。此外,如果模块在Windows上由Python解释器正常运行(程序尚未冻结),则 freeze_support() 不起作用。

multiprocessing.get_all_start_methods()

返回支持的开始方法的列表,其中第一个是默认值。可能的启动方法是 'fork''spawn''forkserver'。在Windows上只有 'spawn' 可用。在Unix上 'fork''spawn' 总是被支持,'fork' 是默认值。

3.4 新版功能.

multiprocessing.get_context(method=None)

返回一个与 multiprocessing 模块具有相同属性的上下文对象。

如果 methodNone,则返回默认上下文。否则 method 应为 'fork''spawn''forkserver'。如果指定的启动方法不可用,则引发 ValueError

3.4 新版功能.

multiprocessing.get_start_method(allow_none=False)

返回用于启动进程的start方法的名称。

如果start方法没有被修复,并且 allow_none 为false,那么start方法被固定为默认值,并返回名称。如果start方法未被修复,allow_none 为true,则返回 None

返回值可以是 'fork''spawn''forkserver'None'fork' 是Unix上的默认值,而 'spawn' 是Windows上的默认值。

3.4 新版功能.

multiprocessing.set_executable()

设置Python解释器在启动子进程时要使用的路径。 (默认使用 sys.executable)。 Embedders可能需要做一些事情

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

才能创建子进程。

在 3.4 版更改: 现在在使用 'spawn' 启动方法时支持Unix。

multiprocessing.set_start_method(method)

设置应用于启动子进程的方法。 method 可以是 'fork''spawn''forkserver'

请注意,这应该最多调用一次,它应该在主模块的 if __name__ == '__main__' 子句中保护。

3.4 新版功能.

17.2.2.4. 连接对象

连接对象允许发送和接收可拾取对象或字符串。它们可以被认为是面向消息的连接套接字。

连接对象通常使用 Pipe() 创建 - 另见 监听器和客户端

class multiprocessing.Connection
send(obj)

将对象发送到应使用 recv() 读取的连接的另一端。

对象必须可拾取。非常大的腌汁(大约32 MB +,虽然它取决于操作系统)可能会引发 ValueError 异常。

recv()

返回使用 send() 从连接的另一端发送的对象。阻塞,直到有它的东西要接收。如果没有什么剩下来接收,另一端被关闭,则提高 EOFError

fileno()

返回连接使用的文件描述符或句柄。

close()

关闭连接。

当连接被垃圾回收时,这被自动调用。

poll([timeout])

返回是否有任何数据可供读取。

如果没有指定 timeout,它将立即返回。如果 timeout 是数字,那么它指定阻止的最大时间(以秒为单位)。如果 timeoutNone,则使用无限超时。

注意,可以使用 multiprocessing.connection.wait() 一次轮询多个连接对象。

send_bytes(buffer[, offset[, size]])

bytes-like object 发送字节数据作为完整消息。

如果给出 offset,则从 buffer 中的该位置读取数据。如果给出 size,那么将从缓冲器读取许多字节。非常大的缓冲区(大约32 MB +,虽然它取决于操作系统)可能会引发 ValueError 异常

recv_bytes([maxlength])

返回从连接的另一端发送的字节数据的完整消息作为字符串。阻塞直到有东西要接收。如果没有剩余的接收和另一端已关闭,则提升 EOFError

如果指定了 maxlength 并且消息长于 maxlength,则会引发 OSError,并且连接将不再可读。

在 3.3 版更改: 此函数用于提高 IOError,现在是 OSError 的别名。

recv_bytes_into(buffer[, offset])

buffer 读取从连接的另一端发送的字节数据的完整消息,并返回消息中的字节数。阻塞直到有东西要接收。如果没有什么剩下来接收,另一端被关闭,则提高 EOFError

buffer 必须是可写的 bytes-like object。如果给定 offset,则消息将从该位置写入缓冲器。偏移量必须是小于 buffer 长度的非负整数(以字节为单位)。

如果缓冲区太短,则引发 BufferTooShort 异常,并且完整消息作为 e.args[0] 可用,其中 e 是异常实例。

在 3.3 版更改: 连接对象本身现在可以在使用 Connection.send()Connection.recv() 的进程之间传输。

3.3 新版功能: 连接对象现在支持上下文管理协议 - 请参阅 上下文管理器类型__enter__() 返回连接对象,__exit__() 调用 close()

例如:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

警告

Connection.recv() 方法会自动取消接收的数据,这可能是一种安全风险,除非您可以信任发送消息的进程。

因此,除非使用 Pipe() 生成连接对象,否则您只能在执行某种身份验证后才使用 recv()send() 方法。见 验证密钥

警告

如果进程在尝试读取或写入管道时被杀死,则管道中的数据可能会被破坏,因为可能无法确定消息边界位于何处。

17.2.2.5. 同步原语

通常,在多进程程序中同步原语不像在多线程程序中那样必要。请参阅 threading 模块的文档。

请注意,也可以使用管理器对象创建同步原语 - 请参阅 经理

class multiprocessing.Barrier(parties[, action[, timeout]])

屏障对象:threading.Barrier 的克隆。

3.3 新版功能.

class multiprocessing.BoundedSemaphore([value])

有界信号对象:threading.BoundedSemaphore 的近似模拟。

与其近似模拟存在独立的差异:其 acquire 方法的第一个参数被命名为 block,与 Lock.acquire() 一致。

注解

在Mac OS X上,这与 Semaphore 是不可区分的,因为在该平台上没有实现 sem_getvalue()

class multiprocessing.Condition([lock])

条件变量:threading.Condition 的别名。

如果指定了 lock,则它应该是来自 multiprocessingLockRLock 对象。

在 3.3 版更改: 添加 wait_for() 方法。

class multiprocessing.Event

threading.Event 的克隆。

class multiprocessing.Lock

非递归锁定对象:threading.Lock 的近似模拟。一旦进程或线程获得了锁,随后从任何进程或线程获取它的尝试将阻塞,直到它被释放;任何进程或线程都可以释放它。除非另有说明,否则 threading.Lock 的概念和行为适用于线程在 multiprocessing.Lock 中被复制,因为它适用于进程或线程。

注意,Lock 实际上是一个工厂函数,它返回使用默认上下文初始化的 multiprocessing.synchronize.Lock 实例。

Lock 支持 context manager 协议,因此可以在 with 语句中使用。

acquire(block=True, timeout=None)

获取锁定,阻止或非阻止。

block 参数设置为 True (默认值),方法调用将阻塞,直到锁处于未锁定状态,然后将其设置为锁定并返回 True。请注意,此第一个参数的名称与 threading.Lock.acquire() 中的名称不同。

block 参数设置为 False,方法调用不会阻止。如果锁当前处于锁定状态,则返回 False;否则将锁设置为锁定状态并返回 True

当为 timeout 使用正值浮点值调用时,只要不能获取锁定,最多只能阻止 timeout 指定的秒数。对于 timeout 的负值的调用等于 timeout 为零。 timeout 值为 None 的调用(默认值)将超时周期设置为无限。注意,对 timeout 的负或 None 值的处理不同于在 threading.Lock.acquire() 中实现的行为。如果 block 参数设置为 False,则 timeout 参数没有实际意义,因此被忽略。如果已获取锁定则返回 True,如果已超过超时时间,则返回 False

release()

释放锁。这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程。

行为与 threading.Lock.release() 中的相同,除了当在解锁的锁上被调用时,产生 ValueError

class multiprocessing.RLock

递归锁对象:threading.RLock 的一个接近的模拟。递归锁必须由获取它的进程或线程释放。一旦进程或线程已经获得递归锁,相同的进程或线程可以再次获取它而不阻塞;该进程或线程必须每次释放它一次它已被获取。

注意,RLock 实际上是一个工厂函数,它返回使用默认上下文初始化的 multiprocessing.synchronize.RLock 实例。

RLock 支持 context manager 协议,因此可以在 with 语句中使用。

acquire(block=True, timeout=None)

获取锁定,阻止或非阻止。

当使用设置为 Trueblock 参数调用时,阻塞直到锁处于未锁定状态(不由任何进程或线程拥有),除非锁已由当前进程或线程拥有。当前进程或线程然后获取锁的所有权(如果它还没有所有权),并且锁中的递归级别增加1,导致 True 的返回值。注意,与 threading.RLock.acquire() 的实现相比,第一个参数的行为有几个不同,从参数本身的名称开始。

当使用设置为 Falseblock 参数调用时,不要阻止。如果锁已经被另一进程或线程获取(并且因此被拥有),则当前进程或线程不取得所有权,并且锁中的递归级别不改变,导致 False 的返回值。如果锁处于解锁状态,则当前进程或线程取得所有权,并递归递归级别,导致返回值 True

timeout 参数的使用和行为与 Lock.acquire() 中的相同。注意,timeout 的这些行为中的一些不同于在 threading.RLock.acquire() 中实现的行为。

release()

释放锁,递减递归级别。如果在递减之后递归级别为零,将锁重置为解锁(不由任何进程或线程拥有),并且如果任何其他进程或线程被阻塞等待锁被解锁,则只允许其中一个进程继续。如果递减后递归级别仍然为非零,则锁保持锁定并由调用进程或线程拥有。

只有在调用进程或线程拥有锁时才调用此方法。如果此方法由除所有者之外的进程或线程调用或如果锁处于未锁定(无主)状态,则会引发 AssertionError。请注意,在这种情况下引发的异常类型与 threading.RLock.release() 中实现的行为不同。

class multiprocessing.Semaphore([value])

信号量对象:threading.Semaphore 的紧密模拟。

与其近似模拟存在独立的差异:其 acquire 方法的第一个参数被命名为 block,与 Lock.acquire() 一致。

注解

在Mac OS X上,sem_timedwait 不受支持,因此使用超时调用 acquire() 将使用睡眠循环来模拟该函数的行为。

注解

如果由 Ctrl-C 产生的SIGINT信号到达,而主线程被对 BoundedSemaphore.acquire()Lock.acquire()RLock.acquire()Semaphore.acquire()Condition.acquire()Condition.wait() 的调用阻塞,则该调用将立即中断,并且将提高 KeyboardInterrupt

这与 threading 的行为不同,其中在等效阻塞调用正在进行时将忽略SIGINT。

注解

此程序包的某些功能需要在主机操作系统上运行共享信号量实现。没有一个,multiprocessing.synchronize 模块将被禁用,并且尝试导入它将导致 ImportError。有关其他信息,请参阅 issue 3770

17.2.2.6. 共享 ctypes 对象

可以使用可以由子进程继承的共享内存创建共享对象。

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回从共享内存分配的 ctypes 对象。默认情况下,返回值实际上是对象的同步包装器。对象本身可以通过 Valuevalue 属性访问。

typecode_or_type 确定返回对象的类型:它是ctypes类型或 array 模块使用的类型的一个字符类型代码。 *args 被传递给类型的构造函数。

如果 lockTrue (默认值),那么将创建一个新的递归锁对象,以同步对该值的访问。如果 lockLockRLock 对象,那么它将用于同步对值的访问。如果 lockFalse,那么对返回的对象的访问将不会被锁自动保护,因此它不一定是“过程安全的”。

+= 这样涉及读写的操作不是原子操作。所以,如果,例如,你想原子地增加一个共享值,这是不够的,只是做

counter.value += 1

假设相关的锁是递归的(默认情况下),你可以改为做

with counter.get_lock():
    counter.value += 1

请注意,lock 是一个仅关键字的参数。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

返回从共享内存分配的ctypes数组。默认情况下,返回值实际上是数组的同步包装器。

typecode_or_type 确定返回数组的元素的类型:它是ctypes类型或 array 模块使用的类型的一个字符类型代码。如果 size_or_initializer 是整数,那么它确定数组的长度,并且该数组将被初始化为零。否则,size_or_initializer 是用于初始化阵列的序列,其长度决定阵列的长度。

如果 lockTrue (默认值),则会创建一个新的锁对象,以同步对该值的访问。如果 lockLockRLock 对象,那么它将用于同步对值的访问。如果 lockFalse,那么对返回的对象的访问将不会被锁自动保护,因此它不一定是“过程安全的”。

请注意,lock 是一个仅关键字的参数。

注意,ctypes.c_char 的数组有 valueraw 属性,它允许使用它来存储和检索字符串。

17.2.2.6.1. multiprocessing.sharedctypes 模块

multiprocessing.sharedctypes 模块提供了从共享内存分配 ctypes 对象的功能,这些对象可以由子进程继承。

注解

虽然可以在共享存储器中存储指针,但请记住这将指向特定进程的地址空间中的位置。然而,指针很可能在第二进程的上下文中是无效的,并且试图从第二进程解引用指针可能导致崩溃。

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

返回从共享内存分配的ctypes数组。

typecode_or_type 确定返回数组的元素的类型:它是ctypes类型或 array 模块使用的类型的一个字符类型代码。如果 size_or_initializer 是一个整数,那么它决定了数组的长度,并且该数组将被初始化为零。否则,size_or_initializer 是用于初始化数组的序列,其长度决定数组的长度。

注意,设置和获取元素可能是非原子的 - 使用 Array() 来确保访问是使用锁自动同步的。

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

返回从共享内存分配的ctypes对象。

typecode_or_type 确定返回对象的类型:它是ctypes类型或 array 模块使用的类型的一个字符类型代码。 *args 被传递给类型的构造函数。

注意,设置和获取值可能是非原子的 - 使用 Value() 来确保访问是使用锁自动同步的。

注意,ctypes.c_char 的数组有 valueraw 属性,允许它们使用它来存储和检索字符串 - 参见 ctypes 文档。

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

RawArray() 相同,只是根据 lock 的值,可能会返回一个进程安全的同步包装,而不是原始的ctypes数组。

如果 lockTrue (默认值),则会创建一个新的锁对象,以同步对该值的访问。如果 lockLockRLock 对象,那么它将用于同步对值的访问。如果 lockFalse,那么对返回的对象的访问将不会被锁自动保护,因此它不一定是“过程安全的”。

请注意,lock 是一个仅关键字的参数。

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

RawValue() 相同,只是根据 lock 的值,可能会返回一个过程安全的同步包装,而不是原始的ctypes对象。

如果 lockTrue (默认值),则会创建一个新的锁对象,以同步对该值的访问。如果 lockLockRLock 对象,那么它将用于同步对值的访问。如果 lockFalse,那么对返回的对象的访问将不会被锁自动保护,因此它不一定是“过程安全的”。

请注意,lock 是一个仅关键字的参数。

multiprocessing.sharedctypes.copy(obj)

返回从共享内存分配的ctypes对象,它是ctypes对象 obj 的副本。

multiprocessing.sharedctypes.synchronized(obj[, lock])

返回一个使用 lock 来同步访问的ctypes对象的进程安全包装器对象。如果 lockNone (默认值),则会自动创建 multiprocessing.RLock 对象。

同步包装器除了它包装的对象外还有两个方法:get_obj() 返回包装的对象,get_lock() 返回用于同步的锁定对象。

注意,通过包装器访问ctypes对象比访问raw ctypes对象要慢得多。

在 3.5 版更改: 同步对象支持 context manager 协议。

下表比较了使用正常ctypes语法从共享内存创建共享ctypes对象的语法。 (在表 MyStructctypes.Structure 的一些子类)

ctypes

使用类型的共享类型

使用类型代码的共享类型

c_double(2.4)

RawValue(c_double,2.4)

RawValue(’d’,2.4)

MyStruct(4,6)

RawValue(MyStruct,4,6)

 

(c_short * 7)()

RawArray(c_short,7)

RawArray(’h’,7)

(c_int * 3)(9,2,8)

RawArray(c_int,(9,2,8))

RawArray(’i’,(9,2,8))

下面是一个例子,其中许多ctypes对象被子进程修改:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

打印的结果是

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

17.2.2.7. 经理

管理器提供一种创建可在不同进程之间共享的数据的方法,包括在不同机器上运行的进程之间通过网络共享。管理器对象控制管理 共享对象 的服务器进程。其他进程可以通过使用代理访问共享对象。

multiprocessing.Manager()

返回已启动的 SyncManager 对象,可用于在进程之间共享对象。返回的管理器对象对应于生成的子进程,并具有将创建共享对象并返回相应代理的方法。

管理器进程将在垃圾收集或其父进程退出后立即关闭。管理器类在 multiprocessing.managers 模块中定义:

class multiprocessing.managers.BaseManager([address[, authkey]])

创建一个BaseManager对象。

一旦创建,应调用 start()get_server().serve_forever() 以确保管理器对象引用已启动的管理器进程。

address 是管理器进程侦听新连接的地址。如果 addressNone,则选择任意一个。

authkey 是将用于检查到服务器进程的传入连接的有效性的认证密钥。如果 authkeyNone,则使用 current_process().authkey。否则使用 authkey,它必须是字节字符串。

start([initializer[, initargs]])

启动子过程以启动管理器。如果 initializer 不是 None,则子进程在启动时将调用 initializer(*initargs)

get_server()

返回一个 Server 对象,它表示在Manager控制下的实际服务器。 Server 对象支持 serve_forever() 方法:

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server 另外具有 address 属性。

connect()

将本地管理器对象连接到远程管理器进程:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 5000), authkey=b'abc')
>>> m.connect()
shutdown()

停止经理使用的进程。这仅在 start() 已用于启动服务器进程时可用。

这可以多次调用。

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

可以用于向管理器类注册类型或可调用的类方法。

typeid 是用于标识特定类型的共享对象的“类型标识符”。这必须是字符串。

callable 是用于为此类型标识符创建对象的可调用项。如果一个管理器实例将使用 connect() 方法连接到服务器,或者如果 create_method 参数是 False,那么这可以留作 None

proxytypeBaseProxy 的子类,用于使用此 typeid 创建共享对象的代理。如果 None,则自动创建代理类。

exposed 用于指定应该允许使用 BaseProxy._callmethod() 访问此类型代理的方法名称序列。 (如果 exposedNone,则使用 proxytype._exposed_,如果存在)。在没有指定公开列表的情况下,共享对象的所有“公共方法”将是可访问的。 (这里的“公共方法”是指具有 __call__() 方法并且其名称不以 '_' 开头的任何属性。)

method_to_typeid 是一个映射,用于指定应该返回代理的那些公开方法的返回类型。它将方法名映射到typeid字符串。 (如果 method_to_typeidNone,则使用 proxytype._method_to_typeid_,如果存在)。如果方法的名称不是此映射的键,或者映射是 None,则方法返回的对象将按值进行复制。

create_method 确定是否应该使用名称 typeid 创建一个方法,该方法可以用于告诉服务器进程创建一个新的共享对象并为其返回一个代理。默认为 True

BaseManager 实例也有一个只读属性:

address

经理使用的地址。

在 3.3 版更改: Manager对象支持上下文管理协议 - 请参阅 上下文管理器类型__enter__() 启动服务器进程(如果它尚未启动),然后返回管理器对象。 __exit__() 调用 shutdown()

在以前的版本中,如果尚未启动管理器的服务器进程,则 __enter__() 没有启动它。

class multiprocessing.managers.SyncManager

BaseManager 的子类,可用于进程的同步。这种类型的对象由 multiprocessing.Manager() 返回。

它的方法创建和返回 代理对象,用于许多常用的数据类型,以跨进程同步。这尤其包括共享列表和字典。

Barrier(parties[, action[, timeout]])

创建一个共享的 threading.Barrier 对象并为其返回一个代理。

3.3 新版功能.

BoundedSemaphore([value])

创建一个共享的 threading.BoundedSemaphore 对象并为其返回一个代理。

Condition([lock])

创建一个共享的 threading.Condition 对象并为其返回一个代理。

如果提供了 lock,那么它应该是 threading.Lockthreading.RLock 对象的代理。

在 3.3 版更改: 添加 wait_for() 方法。

Event()

创建一个共享的 threading.Event 对象并为其返回一个代理。

Lock()

创建一个共享的 threading.Lock 对象并为其返回一个代理。

Namespace()

创建一个共享的 Namespace 对象并为其返回一个代理。

Queue([maxsize])

创建一个共享的 queue.Queue 对象并为其返回一个代理。

RLock()

创建一个共享的 threading.RLock 对象并为其返回一个代理。

Semaphore([value])

创建一个共享的 threading.Semaphore 对象并为其返回一个代理。

Array(typecode, sequence)

创建一个数组并返回一个代理。

Value(typecode, value)

创建一个具有可写 value 属性的对象,并为其返回一个代理。

dict()
dict(mapping)
dict(sequence)

创建一个共享的 dict 对象并为其返回一个代理。

list()
list(sequence)

创建一个共享的 list 对象并为其返回一个代理。

在 3.6 版更改: 共享对象能够嵌套。例如,诸如共享列表的共享容器对象可以包含将由 SyncManager 管理和同步的其他共享对象。

class multiprocessing.managers.Namespace

可以在 SyncManager 注册的类型。

命名空间对象没有公共方法,但有可写属性。其表示显示其属性的值。

但是,当为名称空间对象使用代理时,以 '_' 开头的属性将是代理的属性,而不是引用对象的属性:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

17.2.2.7.1. 定制经理

要创建自己的管理器,创建一个 BaseManager 的子类,并使用 register() 类方法向管理器类注册新类型或可调用。例如:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

17.2.2.7.2. 使用远程管理器

可以在一台计算机上运行管理器服务器,并让客户端从其他计算机使用它(假设涉及的防火墙允许它)。

运行以下命令为远程客户端可以访问的单个共享队列创建一个服务器:

>>> from multiprocessing.managers import BaseManager
>>> import queue
>>> queue = queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

一个客户端可以访问服务器,如下所示:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

另一个客户端也可以使用它:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

本地进程也可以访问该队列,使用上面的代码在客户端上远程访问它:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super(Worker, self).__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

17.2.2.8. 代理对象

代理是 refers 到共享对象的对象,它(在大概)存在于不同的进程中。共享对象被称为代理的 referent。多个代理对象可以具有相同的指示。

代理对象具有调用其指示对象的相应方法的方法(尽管不是指示对象的每个方法都必须通过代理可用)。这样,可以使用代理,就像它的指示可以:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

注意,将 str() 应用于代理将返回指示对象的表示,而应用 repr() 将返回代理的表示。

代理对象的一个重要特征是它们是可拾取的,因此它们可以在进程之间传递。因此,指示物可以含有 代理对象。这允许嵌套这些管理列表,dicts和其他 代理对象

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

类似地,dict和list代理可以嵌套在一起:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

如果标准(非代理) listdict 对象包含在引用中,那么对这些可变值的修改不会通过管理器传播,因为代理无法知道何时修改其中包含的值。然而,在容器代理(触发代理对象上的 __setitem__)中存储值通过管理器传播,因此有效地修改这样的项目,可以将修改的值重新分配给容器代理:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

与大多数使用情况下使用嵌套 代理对象 相比,这种方法可能不太方便,但也显示了对同步的一个级别的控制。

注解

multiprocessing 中的代理类型不支持按值进行比较。因此,例如,我们有:

>>> manager.list([1,2,3]) == [1,2,3]
False

在进行比较时,应该只使用指示物的副本。

class multiprocessing.managers.BaseProxy

代理对象是 BaseProxy 的子类的实例。

_callmethod(methodname[, args[, kwds]])

调用并返回代理引用对象的方法的结果。

如果 proxy 是指向是 obj 的代理,则表达式

proxy._callmethod(methodname, args, kwds)

将评估表达式

getattr(obj, methodname)(*args, **kwds)

在经理的过程中。

返回值将是调用结果的副本或新的共享对象的代理 - 请参阅 BaseManager.register()method_to_typeid 参数的文档。

如果调用引发异常,则由 _callmethod() 重新生成异常。如果在管理器的进程中引发一些其他异常,则这被转换为 RemoteError 异常并且由 _callmethod() 产生。

特别注意,如果 methodname 没有 exposed,将会引发异常。

_callmethod() 的使用示例:

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

返回指示对象的副本。

如果指示符不可取消,那么这将引发异常。

__repr__()

返回代理对象的表示。

__str__()

返回指示对象的表示。

17.2.2.8.1. 清理

代理对象使用一个weakref回调,所以当它被垃圾收集时,它从拥有其指示对象的管理器中注销它自己。

当不再有任何代理引用它时,共享对象将从管理器进程中删除。

17.2.2.9. 过程池

可以创建一个进程池,它将执行与 Pool 类提交的任务。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一个进程池对象,它控制可以提交作业的工作进程池。它支持具有超时和回调的异步结果,并具有并行映射实现。

processes 是要使用的工作进程的数量。如果 processesNone,则使用 os.cpu_count() 返回的数字。

如果 initializer 不是 None,则每个工作进程在启动时将调用 initializer(*initargs)

maxtasksperchild 是工作进程可以完成的任务数,在它将退出并被新的工作进程替换之前,可以释放未使用的资源。默认的 maxtasksperchildNone,这意味着工作进程将与池一样长。

context 可以用于指定用于启动工作进程的上下文。通常使用上下文对象的函数 multiprocessing.Pool()Pool() 方法创建池。在这两种情况下,context 都被适当设置。

请注意,池对象的方法只应该由创建池的进程调用。

3.2 新版功能: maxtasksperchild

3.4 新版功能: context

注解

Pool 中的工作进程通常在池的工作队列的整个持续时间内生存。在其他系统(例如Apache,mod_wsgi等)中发现的用于释放由工作者拥有的资源的频繁模式是允许池中的工作者在退出,被清理和产生新进程之前完成一定量的工作以取代旧的。 Poolmaxtasksperchild 参数向最终用户公开此功能。

apply(func[, args[, kwds]])

使用参数 args 和关键字参数 kwds 调用 func。它阻塞,直到结果准备好。给定这个块,apply_async() 更好地适于并行执行工作。此外,func 仅在池的工人之一中执行。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

返回结果对象的 apply() 方法的变体。

如果指定 callback 那么它应该是一个可调用它接受一个参数。当结果变为就绪时,callback 应用于它,这是除非调用失败,在这种情况下应用 error_callback

如果指定 error_callback 那么它应该是一个可调用它接受一个参数。如果目标函数失败,则使用异常实例调用 error_callback

回调应该立即完成,否则处理结果的线程将被阻塞。

map(func, iterable[, chunksize])

map() 内置函数的并行等效(它只支持一个 iterable 参数)。它阻塞,直到结果准备好。

此方法将iterable切成多个块,将其作为单独的任务提交到进程池。这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

返回结果对象的 map() 方法的变体。

如果指定 callback 那么它应该是一个可调用它接受一个参数。当结果变为就绪时,callback 应用于它,这是除非调用失败,在这种情况下应用 error_callback

如果指定 error_callback 那么它应该是一个可调用它接受一个参数。如果目标函数失败,则使用异常实例调用 error_callback

回调应该立即完成,否则处理结果的线程将被阻塞。

imap(func, iterable[, chunksize])

map() 的lazier版本。

chunksize 参数与 map() 方法使用的参数相同。对于非常长的迭代,对 chunksize 使用大的值可以使作业完成 许多 比使用默认值 1 更快。

此外,如果 chunksize1,则 imap() 方法返回的迭代器的 next() 方法具有可选的 timeout 参数:如果结果在 timeout 秒内不能返回,则 next(timeout) 将提高 multiprocessing.TimeoutError

imap_unordered(func, iterable[, chunksize])

imap() 相同,除了返回的迭代器的结果的顺序应该被认为是任意的。 (只有当只有一个工作进程时,该命令保证是“正确的”。)

starmap(func, iterable[, chunksize])

map(),除了 iterable 的元素被期望是被解包作为参数的迭代。

因此,[(1,2), (3, 4)]iterable 导致 [func(1,2), func(3,4)]

3.3 新版功能.

starmap_async(func, iterable[, chunksize[, callback[, error_back]]])

starmap()map_async() 的组合,它迭代可迭代的 iterable,并调用 func 与可解迭的iterable。返回结果对象。

3.3 新版功能.

close()

防止任何其他任务提交到池。一旦所有任务完成,工作进程将退出。

terminate()

立即停止工作进程,而不完成未完成的工作。当池对象被垃圾回收时,terminate() 将被立即调用。

join()

等待工作进程退出。在使用 join() 之前必须调用 close()terminate()

3.3 新版功能: 池对象现在支持上下文管理协议 - 请参阅 上下文管理器类型__enter__() 返回池对象,__exit__() 调用 terminate()

class multiprocessing.pool.AsyncResult

Pool.apply_async()Pool.map_async() 返回的结果的类。

get([timeout])

返回结果到达时。如果 timeout 不是 None 并且结果在 timeout 秒内没有到达,则 multiprocessing.TimeoutError 被引发。如果远程调用引发了异常,那么该异常将由 get() 重新审核。

wait([timeout])

等待结果可用或直到 timeout 秒过去。

ready()

返回呼叫是否已完成。

successful()

返回调用是否完成,而不引发异常。如果结果没有准备就会提高 AssertionError

以下示例演示了池的使用:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

17.2.2.10. 监听器和客户端

通常,进程之间的消息传递使用队列或通过使用 Pipe() 返回的 Connection 对象来完成。

然而,multiprocessing.connection 模块允许一些额外的灵活性。它基本上提供了一个高级的面向消息的API来处理套接字或Windows命名管道。它还支持 摘要认证 使用 hmac 模块,并且同时轮询多个连接。

multiprocessing.connection.deliver_challenge(connection, authkey)

将随机生成的消息发送到连接的另一端,并等待回复。

如果回复与使用 authkey 作为键的消息摘要匹配,则将欢迎消息发送到连接的另一端。否则引发 AuthenticationError

multiprocessing.connection.answer_challenge(connection, authkey)

接收消息,使用 authkey 作为关键字计算消息的摘要,然后发送摘要。

如果没有接收到欢迎消息,则 AuthenticationError 被引发。

multiprocessing.connection.Client(address[, family[, authenticate[, authkey]]])

尝试设置到使用地址 address 的侦听器的连接,返回 Connection

连接的类型由 family 参数确定,但这通常可以省略,因为它通常可以从 address 的格式推断。 (见 地址格式

如果 authenticateTrueauthkey 是字节字符串,则使用摘要认证。如果 authkeyNone,用于认证的密钥将是 authkeycurrent_process().authkey。如果认证失败,则提高 AuthenticationError。见 验证密钥

class multiprocessing.connection.Listener([address[, family[, backlog[, authenticate[, authkey]]]]])

绑定套接字或Windows命名管道的包装器,正在侦听连接。

address 是要由侦听器对象的绑定套接字或命名管道使用的地址。

注解

如果使用地址“0.0.0.0”,则该地址将不是Windows上的可连接终点。如果需要可连接的端点,则应使用“127.0.0.1”。

family 是要使用的套接字(或命名管道)的类型。这可以是字符串 'AF_INET' (对于TCP套接字),'AF_UNIX' (对于Unix域套接字)或 'AF_PIPE' (对于Windows命名管道)之一。其中只有第一个保证可用。如果 familyNone,则该族从 address 的格式推断。如果 address 也是 None,则选择默认值。此默认值是假定为最快可用的系列。见 地址格式。注意,如果 family'AF_UNIX',地址是 None,那么套接字将在使用 tempfile.mkstemp() 创建的私有临时目录中创建。

如果侦听器对象使用套接字,则 backlog (默认为1)一旦被绑定就传递给套接字的 listen() 方法。

如果 authenticateTrue (默认为 False)或 authkey 不是 None,则使用摘要认证。

如果 authkey 是字节字符串,则它将被用作认证密钥;否则必须是 None

如果 authkeyNoneauthenticateTrue,则使用 current_process().authkey 作为认证密钥。如果 authkeyNone 并且 authenticateFalse,则不进行认证。如果认证失败,则 AuthenticationError 升高。见 验证密钥

accept()

在侦听器对象的绑定套接字或命名管道上接受连接并返回 Connection 对象。如果尝试认证并且失败,则引发 AuthenticationError

close()

关闭侦听器对象的绑定套接字或命名管道。当监听器被垃圾回收时,这被自动调用。但是建议明确调用它。

侦听器对象具有以下只读属性:

address

Listener对象正在使用的地址。

last_accepted

最后接受的连接来自的地址。如果这不可用,那么它是 None

3.3 新版功能: 监听器对象现在支持上下文管理协议 - 请参阅 上下文管理器类型__enter__() 返回侦听器对象,__exit__() 调用 close()

multiprocessing.connection.wait(object_list, timeout=None)

等待 object_list 中的对象准备就绪。返回 object_list 中已准备好的对象的列表。如果 timeout 是一个浮点数,那么调用最多会阻塞这么多秒。如果 timeoutNone,那么它将阻塞无限期。负超时相当于零超时。

对于Unix和Windows,如果是 object_list,对象可以出现在 object_list

当有可用于从其读取数据或另一端已关闭时,连接或套接字对象就绪就绪。

Unixwait(object_list, timeout) 几乎相当的 select.select(object_list, [], [], timeout)。区别在于,如果 select.select() 被信号中断,则它可以以 EINTR 的错误号提高 OSError,而 wait() 不会。

视窗object_list 中的项必须是可等待的整数句柄(根据Win32函数 WaitForMultipleObjects() 的文档所使用的定义),或者它可以是带有 fileno() 方法的对象,该方法返回套接字句柄或管道句柄。 (注意,管道手柄和插座手柄是 等待手柄。)

3.3 新版功能.

例子

以下服务器代码创建使用 'secret password' 作为认证密钥的侦听器。然后它等待连接并向客户端发送一些数据:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

以下代码连接到服务器并从服务器接收一些数据:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

以下代码使用 wait() 一次等待来自多个进程的消息:

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

17.2.2.10.1. 地址格式

  • 'AF_INET' 地址是 (hostname, port) 形式的元组,其中 hostname 是字符串,port 是整数。

  • 'AF_UNIX' 地址是表示文件系统上的文件名的字符串。

  • 'AF_PIPE' 地址是一个形式的字符串

    r'\\.\pipe\PipeName'。要使用 Client() 连接到名为 ServerName 的远程计算机上的命名管道,应该使用形式为 r'\\ServerName\pipe\PipeName' 的地址。

请注意,任何以两个反斜杠开头的字符串默认为 'AF_PIPE' 地址,而不是 'AF_UNIX' 地址。

17.2.2.11. 验证密钥

当使用 Connection.recv 时,接收的数据将自动取消。不幸的是,从不受信任的源解压数据是一种安全风险。因此 ListenerClient() 使用 hmac 模块提供摘要认证。

认证密钥是可以被认为是密码的字节字符串:一旦建立连接,两端将要求证明另一方知道认证密钥。 (演示两端都使用相同的密钥, 涉及通过连接发送密钥。)

如果请求认证但未指定认证密钥,则使用 current_process().authkey 的返回值(请参阅 Process)。此值将由当前进程创建的任何 Process 对象自动继承。这意味着(默认地)多进程程序的所有进程将共享单个认证密钥,当在它们之间建立连接时可以使用该认证密钥。

还可以通过使用 os.urandom() 来生成合适的认证密钥。

17.2.2.12. 记录

一些支持日志记录可用。然而,请注意,logging 包不使用进程共享锁,因此可能(取决于处理程序类型)来自不同进程的消息混淆。

multiprocessing.get_logger()

返回 multiprocessing 使用的记录器。如果需要,将创建一个新的。

当第一次创建记录器具有级别 logging.NOTSET 和没有默认处理程序。发送到此记录器的消息将不会默认传播到根记录器。

请注意,在Windows子进程将只继承父进程记录器的级别 - 记录器的任何其他定制将不会继承。

multiprocessing.log_to_stderr()

此函数执行对 get_logger() 的调用,但除了返回由get_logger创建的记录器之外,它还添加了一个处理程序,该处理程序使用格式 '[%(levelname)s/%(processName)s] %(message)s' 将输出发送到 sys.stderr

以下是启用日志记录的示例会话:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

有关日志记录级别的完整表,请参阅 logging 模块。

17.2.2.13. multiprocessing.dummy 模块

multiprocessing.dummy 复制 multiprocessing 的API,但只不过是 threading 模块的一个包装器。

17.2.3. 编程指南

使用 multiprocessing 时应遵守一定的准则和惯用语。

17.2.3.1. 所有星级方法

以下适用于所有启动方法。

避免共享状态

尽可能地,应该尽量避免在进程之间转移大量的数据。

最好坚持使用队列或管道进行进程之间的通信,而不是使用较低级别的同步原语。

可取性

确保代理方法的参数是可拾取的。

线程安全代理

不要使用来自多个线程的代理对象,除非您使用锁来保护它。

(使用 same 代理的不同进程从不会有问题。)

加入僵尸进程

在Unix上,当一个进程完成但没有加入时,它变成一个僵尸。应该永远不会有很多,因为每次一个新的进程开始(或 active_children() 被调用)所有尚未加入的完成的进程将被加入。也调用完成的进程的 Process.is_alive 将加入进程。即使这样,显然加入您开始的所有进程也许是一个好习惯。

更好地继承比pickle/unpickle

当使用 spawnforkserver 启动方法时,multiprocessing 中的许多类型都需要可选,以便子进程可以使用它们。但是,通常应避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问其他地方创建的共享资源的进程可以从祖先进程继承它。

避免终止进程

使用 Process.terminate 方法停止进程可能导致进程当前正在使用的任何共享资源(例如锁,信号量,管道和队列)被破坏或对其他进程不可用。

因此,最好只考虑在不使用任何共享资源的进程上使用 Process.terminate

加入使用队列的进程

请记住,将项目置于队列中的进程将在终止之前等待,直到所有缓冲的项目由“馈线”线程馈送到基础管道。 (子进程可以调用队列的 Queue.cancel_join_thread 方法,以避免此行为。)

这意味着,无论何时使用队列,您都需要确保已经放在队列中的所有项目最终都会在加入流程之前被删除。否则,您不能确定已将项目放入队列的进程将终止。还要记住,非守护进程将自动加入。

将死锁的示例如下:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

这里的修复将是交换最后两行(或简单地删除 p.join() 行)。

将资源显式传递给子进程

在Unix上使用 fork 启动方法,子进程可以使用在父进程中使用全局资源创建的共享资源。但是,最好将该对象作为参数传递给子进程的构造函数。

除了使代码(潜在地)兼容Windows和其他启动方法,这还确保只要子进程仍然活着,对象不会在父进程中被垃圾收集。如果在父进程中对对象进行垃圾回收时释放某些资源,这可能很重要。

例如

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

应重写为

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

注意用“文件类对象”替换 sys.stdin

multiprocessing 最初无条件调用:

os.close(sys.stdin.fileno())

multiprocessing.Process._bootstrap() 方法中—这导致了进程中的进程的问题。这已更改为:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

这解决了进程彼此冲突的根本问题,导致错误的文件描述符错误,但是对于用具有输出缓冲的“文件状对象”替换 sys.stdin() 的应用程序引入潜在的危险。这个危险是,如果多个进程在这个类文件对象上调用 close(),它可能导致相同的数据被多次刷新到对象,导致损坏。

如果你写一个类文件对象并实现你自己的缓存,你可以通过存储pid每当你追加到缓存,并在pid更改时丢弃缓存,使它叉安全。例如:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

有关更多信息,请参阅 issue 5155issue 5313issue 5331

17.2.3.2. spawnforkserver 启动方法

有一些额外的限制不适用于 fork 启动方法。

更多picklability

确保 Process.__init__() 的所有参数都是可拾取的。此外,如果你子类化 Process,那么确保在调用 Process.start 方法时实例将是可拾取的。

全局变量

请记住,如果在子进程中运行的代码尝试访问全局变量,那么它看到的值(如果有)可能与调用 Process.start 时父进程中的值不同。

但是,只是模块级常量的全局变量不会导致问题。

安全导入主模块

确保主模块可以通过新的Python解释器安全导入,而不会导致意外的副作用(如启动一个新进程)。

例如,使用 spawnforkserver 启动方法运行以下模块将失败,并使用 RuntimeError:

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

相反,应该通过使用 if __name__ == '__main__': 如下保护程序的“入口点”:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(如果程序将正常运行而不是冻结,则可以省略 freeze_support() 行。)

这允许新生成的Python解释器安全地导入模块,然后运行模块的 foo() 函数。

如果在主模块中创建了池或管理器,则应用类似的限制。

17.2.4. 例子

演示如何创建和使用自定义管理器和代理:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

使用 Pool

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

显示如何使用队列将任务提供给工作进程集合并收集结果的示例:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()