17.2. multiprocessing
—基于过程的并行性¶
源代码: Lib/multiprocessing/
17.2.1. 介绍¶
multiprocessing
是一个包,它支持使用类似于 threading
模块的API来生成进程。 multiprocessing
包提供了本地和远程并发,通过使用子进程而不是线程有效地对 Global Interpreter Lock 进行侧移。因此,multiprocessing
模块允许编程人员充分利用给定机器上的多个处理器。它可以在Unix和Windows上运行。
multiprocessing
模块还引入了在 threading
模块中没有模拟的API。其中一个主要的例子是 Pool
对象,它提供了一种方便的方法来并行化多个输入值的函数执行,跨进程分布输入数据(数据并行性)。以下示例演示了在模块中定义此类函数的常见做法,以便子进程可以成功导入该模块。这个数据并行的基本示例使用 Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
将打印到标准输出
[1, 4, 9]
17.2.1.1. Process
类¶
在 multiprocessing
中,通过创建 Process
对象然后调用其 start()
方法来生成进程。 Process
遵循 threading.Thread
的API。多进程程序的一个简单例子是
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
要显示涉及的各个进程ID,这里是一个扩展示例:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
有关为什么需要 if __name__ == '__main__'
部分的说明,请参见 编程指南。
17.2.1.2. 上下文和启动方法¶
根据平台,multiprocessing
支持三种方式启动进程。这些 启动方法 是
- spawn
父进程启动一个新的python解释器过程。子进程将只继承运行过程对象
run()
方法所需的那些资源。特别地,来自父进程的不必要的文件描述符和句柄将不会被继承。与使用 fork 或 forkserver 相比,使用该方法开始过程相当慢。在Unix和Windows上可用。 Windows上的默认值。
- fork
父进程使用
os.fork()
来分叉Python解释器。子进程在开始时,与父进程有效地相同。父进程的所有资源都由子进程继承。请注意,安全地分支多线程进程是有问题的。仅在Unix上可用。 Unix上的默认值。
- forkserver
当程序启动并选择 forkserver 启动方法时,启动服务器进程。从那时起,每当需要一个新进程时,父进程连接到服务器并请求它分叉一个新进程。 fork服务器进程是单线程的,因此它使用
os.fork()
是安全的。没有不必要的资源被继承。在Unix平台上可用,它支持通过Unix管道传递文件描述符。
在 3.4 版更改: spawn 在所有unix平台上添加,forkserver 为某些unix平台添加。子进程在Windows上不再继承所有父级可继承句柄。
在Unix上使用 spawn 或 forkserver 启动方法还将启动一个 信号量跟踪器 进程,该进程跟踪由程序进程创建的未链接的命名信号量。当所有进程退出后,信号量跟踪器取消链接任何剩余的信号量。通常应该没有,但如果一个过程被一个信号杀死,可能有一些“泄漏”的信号量。 (取消链接命名的信号量是一个严重的问题,因为系统只允许有限的数量,它们将不会自动取消链接,直到下次重新启动。
要选择启动方法,请在主模块的 if __name__ == '__main__'
子句中使用 set_start_method()
。例如:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
不应在程序中使用多次。
或者,您可以使用 get_context()
获取上下文对象。上下文对象具有与多处理模块相同的API,并允许在同一程序中使用多个启动方法。
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
注意,与一个上下文相关的对象可能与用于不同上下文的进程不兼容。特别地,使用 fork 上下文创建的锁不能传递到使用 spawn 或 forkserver 启动方法启动的进程。
想要使用特定启动方法的库应该使用 get_context()
,以避免干扰库用户的选择。
17.2.1.3. 在进程之间交换对象¶
multiprocessing
支持进程之间的两种类型的通信通道:
队列
Queue
类是queue.Queue
的近似克隆。例如:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()队列是线程和过程安全的。
管道
Pipe()
函数返回由管道连接的一对连接对象,默认情况下是双向(双向)。例如:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()
Pipe()
返回的两个连接对象表示管道的两端。每个连接对象都有send()
和recv()
方法(等等)。请注意,如果两个进程(或线程)尝试同时从管道的 same 末端读取或写入,则管道中的数据可能会损坏。当然,没有同时使用不同管道末端的进程的腐败风险。
17.2.1.4. 进程之间的同步¶
multiprocessing
包含来自 threading
的所有同步原语的等同物。例如,可以使用锁定来确保一次只有一个进程打印到标准输出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
不使用锁定输出从不同的进程很容易得到所有混合。
17.2.1.5. 进程之间的共享状态¶
如上所述,当进行并发编程时,通常最好避免使用尽可能共享的状态。在使用多个进程时尤其如此。
然而,如果你真的需要使用一些共享数据,那么 multiprocessing
提供了这样做的几种方法。
共享内存
数据可以使用
Value
或Array
存储在共享存储器映射中。例如,下面的代码from multiprocessing import Process, Value, Array def f(n, a): n.value = 3.1415927 for i in range(len(a)): a[i] = -a[i] if __name__ == '__main__': num = Value('d', 0.0) arr = Array('i', range(10)) p = Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:])将打印
3.1415927 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]在创建
num
和arr
时使用的'd'
和'i'
参数是array
模块使用的类型类型:'d'
表示双精度浮点数,'i'
表示有符号整数。这些共享对象将是进程和线程安全的。为了更灵活地使用共享内存,可以使用
multiprocessing.sharedctypes
模块,它支持创建从共享内存分配的任意ctypes对象。
服务器进程
Manager()
返回的管理器对象控制一个服务器进程,该进程保存Python对象,并允许其他进程使用代理来操作它们。由
Manager()
返回的管理器将支持类型list
,dict
,Namespace
,Lock
,RLock
,Semaphore
,BoundedSemaphore
,Condition
,Event
,Barrier
,Queue
,Value
和Array
。例如,from multiprocessing import Process, Manager def f(d, l): d[1] = '1' d['2'] = 2 d[0.25] = None l.reverse() if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list(range(10)) p = Process(target=f, args=(d, l)) p.start() p.join() print(d) print(l)将打印
{0.25: None, 1: '1', '2': 2} [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以由网络上不同计算机上的进程共享。但是,它们比使用共享内存慢。
17.2.1.6. 使用工人池¶
Pool
类表示工作进程池。它具有允许以几种不同的方式将任务卸载到工作进程的方法。
例如:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
请注意,池的方法只应该被创建它的进程使用。
注解
此包中的功能要求 __main__
模块可由孩子导入。这是在 编程指南 涵盖,但值得在这里指出。这意味着一些例子,如 multiprocessing.pool.Pool
示例将不能在交互式解释器中工作。例如:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(如果你尝试这样,它将实际输出以半随机方式交织的三个完整的追踪,然后你可能不得不停止主进程。)
17.2.2. 参考¶
multiprocessing
包大多复制 threading
模块的API。
17.2.2.1. Process
和异常¶
-
class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶ 进程对象表示在单独进程中运行的活动。
Process
类具有threading.Thread
的所有方法的等同物。应始终使用关键字参数调用构造函数。 group 应始终为
None
;它仅存在于与threading.Thread
的兼容性。 target 是要由run()
方法调用的可调用对象。它默认为None
,意味着什么也不调用。 name 是进程名称(有关详细信息,请参见name
)。 args 是目标调用的参数元组。 kwargs 是目标调用的关键字参数的字典。如果提供,则仅关键字 daemon 参数将进程daemon
标志设置为True
或False
。如果None
(默认),这个标志将继承自创建过程。默认情况下,没有参数传递给 target。
如果子类覆盖了构造函数,它必须确保它在对进程做任何其他事情之前调用基类构造函数(
Process.__init__()
)。在 3.3 版更改: 添加了 daemon 参数。
-
run
()¶ 表示进程活动的方法。
您可以在子类中覆盖此方法。标准
run()
方法调用传递给对象的构造函数的可调用对象作为目标参数(如果有的话),其中顺序和关键字参数分别取自 args 和 kwargs 参数。
-
join
([timeout])¶ 如果可选参数 timeout 是
None
(缺省值),则该方法将阻塞,直到调用join()
方法的进程终止。如果 timeout 为正数,则它最多阻塞 timeout 秒。注意,如果方法终止或方法超时,该方法返回None
。检查进程的exitcode
以确定它是否终止。一个过程可以连接多次。
进程不能加入自身,因为这将导致死锁。尝试在进程启动之前加入进程是一个错误。
-
name
¶ 进程的名称。名称是仅用于识别目的的字符串。它没有语义。可以给多个进程指定相同的名称。
初始名称由构造函数设置。如果没有为构造函数提供显式名称,则构造形式为“Process-N 1:N 2:...:N k ”的名称,其中每个N k 是其第N个子父母。
-
daemon
¶ 进程的守护进程标志,一个布尔值。这必须在调用
start()
之前设置。初始值从创建过程继承。
当进程退出时,它会尝试终止所有的daemonic子进程。
注意,不允许daemonic进程创建子进程。否则,一个守护进程会使其子进程成为孤立的,如果它的父进程退出时终止。此外,这些是 不 Unix守护进程或服务,它们是正常进程,如果非守护进程已退出,它们将被终止(而不是加入)。
除了
threading.Thread
API之外,Process
对象还支持以下属性和方法:-
pid
¶ 返回进程ID。在进程诞生之前,这将是
None
。
-
exitcode
¶ 孩子的退出代码。如果进程尚未终止,这将是
None
。负值 -N 表示孩子被信号 N 终止。
-
authkey
¶ 进程的认证密钥(字节字符串)。
当
multiprocessing
初始化时,主进程使用os.urandom()
分配一个随机字符串。当
Process
对象被创建时,它将继承其父进程的认证密钥,虽然这可以通过将authkey
设置为另一个字节字符串来改变。见 验证密钥。
-
sentinel
¶ 系统对象的数字句柄,在过程结束时将变为“就绪”。
如果要使用
multiprocessing.connection.wait()
一次等待几个事件,可以使用此值。否则调用join()
比较简单。在Windows上,这是可用于
WaitForSingleObject
和WaitForMultipleObjects
系列API调用的操作系统句柄。在Unix上,这是一个文件描述符,可与select
模块的原语一起使用。3.3 新版功能.
-
terminate
()¶ 终止进程。在Unix上,这是使用
SIGTERM
信号完成的;在Windows上使用TerminateProcess()
。注意,退出处理程序和finally子句等不会被执行。注意,进程的后代进程将终止 not - 它们将只是成为孤立的。
警告
如果在相关联的进程正在使用管道或队列时使用此方法,则管道或队列容易被损坏,并且可能变得不能由其他进程使用。类似地,如果进程已经获得锁或信号量等,则终止它可能导致其他进程死锁。
请注意,
start()
,join()
,is_alive()
,terminate()
和exitcode
方法只应由创建过程对象的进程调用。Process
的一些方法的示例用法:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process(Process-1, initial)> False >>> p.start() >>> print(p, p.is_alive()) <Process(Process-1, started)> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process(Process-1, stopped[SIGTERM])> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception
multiprocessing.
ProcessError
¶ 所有
multiprocessing
异常的基类。
-
exception
multiprocessing.
BufferTooShort
¶ 当所提供的缓冲区对象对于消息读取而言过小时,
Connection.recv_bytes_into()
引发的异常。如果
e
是BufferTooShort
的实例,则e.args[0]
将消息作为字节字符串。
-
exception
multiprocessing.
AuthenticationError
¶ 在出现身份验证错误时触发。
-
exception
multiprocessing.
TimeoutError
¶ 在超时到期时由超时引发的方法引发。
17.2.2.2. 管道和队列¶
当使用多个进程时,通常使用消息传递来用于进程之间的通信,并避免必须使用任何同步原语,如锁。
对于传递消息,可以使用 Pipe()
(用于两个进程之间的连接)或队列(允许多个生产者和消费者)。
Queue
,SimpleQueue
和 JoinableQueue
类型是在标准库中的 queue.Queue
类建模的多生产者,多消费者 FIFO(先进先出) 队列。它们的区别在于 Queue
缺少在Python 2.5的 queue.Queue
类中引入的 task_done()
和 join()
方法。
如果你使用 JoinableQueue
,那么你 必须 调用 JoinableQueue.task_done()
从队列中删除每个任务,否则用于计数未完成任务的数量的信号量可能最终溢出,引发异常。
注意,也可以通过使用管理器对象创建共享队列 - 请参阅 经理。
注解
multiprocessing
使用通常的 queue.Empty
和 queue.Full
异常来发出超时。它们在 multiprocessing
命名空间中不可用,因此您需要从 queue
导入它们。
注解
当一个对象放在一个队列上时,该对象被腌制,后台线程随后将经过腌制的数据清洗到底层管道。这有一些后果是有点令人惊讶,但不应该导致任何实际困难 - 如果他们真的打扰你,那么你可以改为使用一个 经理 创建的队列。
将对象放在空队列上之后,在队列的
empty()
方法返回False
之前可能存在无穷小的延迟,并且get_nowait()
可以在不提高queue.Empty
的情况下返回。如果多个进程将对象排队,则可能在另一端无序地接收对象。然而,由相同进程排队的对象将始终以相对于彼此的期望顺序。
警告
如果在尝试使用 Queue
时使用 Process.terminate()
或 os.kill()
终止进程,则队列中的数据可能会被损坏。这可能导致任何其他进程在尝试稍后使用队列时获得异常。
警告
如上所述,如果子进程将项目放在队列上(并且它没有使用 JoinableQueue.cancel_join_thread
),那么该进程将不会终止,直到所有缓冲的项目都被刷新到管道。
这意味着,如果您尝试加入该过程,您可能会遇到死锁,除非您确定已经放入队列的所有项目已被占用。类似地,如果子进程是非守护进程,则父进程在试图加入其所有非守护进程的子进程时可能在退出时挂起。
请注意,使用管理器创建的队列没有此问题。见 编程指南。
有关进程间通信的队列使用示例,请参阅 例子。
-
multiprocessing.
Pipe
([duplex])¶ 返回表示管道末端的
Connection
对象的对(conn1, conn2)
。如果 duplex 是
True
(默认值),则管道是双向的。如果 duplex 是False
,则管道是单向的:conn1
只能用于接收消息,conn2
只能用于发送消息。
-
class
multiprocessing.
Queue
([maxsize])¶ 返回使用管道和几个锁/信号量实现的进程共享队列。当进程首先将项目放在队列上时,启动了将对象从缓冲区传送到管道中的馈线线程。
来自标准库的
queue
模块的通常的queue.Empty
和queue.Full
异常被引发用于信号超时。Queue
实现queue.Queue
的所有方法,除了task_done()
和join()
。-
qsize
()¶ 返回队列的大致大小。由于多线程/多进程语义,这个数字是不可靠的。
请注意,这可能会在Unix平台上引发
NotImplementedError
,例如未实现sem_getvalue()
的Mac OS X。
-
empty
()¶ 如果队列为空,返回
True
,否则返回False
。由于多线程/多进程语义,这是不可靠的。
-
full
()¶ 如果队列已满,返回
True
,否则返回False
。由于多线程/多进程语义,这是不可靠的。
-
put
(obj[, block[, timeout]])¶ 将obj放入队列。如果可选参数 block 是
True
(默认值),timeout 是None
(默认值),则必要时阻塞,直到空闲插槽可用。如果 timeout 是正数,则它最多阻塞 timeout 秒,并且如果在该时间内没有空闲时隙,则提高queue.Full
异常。否则(block 是False
),如果空闲时隙立即可用,则将一个项目放在队列上,否则提高queue.Full
异常(在这种情况下忽略 timeout)。
-
put_nowait
(obj)¶ 相当于
put(obj, False)
。
-
get
([block[, timeout]])¶ 从队列中删除并返回项目。如果可选的参数 block 是
True
(默认值),timeout 是None
(默认值),如有必要,直到项目可用为止。如果 timeout 是正数,则它最多阻塞 timeout 秒,并且如果在该时间内没有项可用,则提高queue.Empty
异常。否则(块是False
),返回一个项,如果一个立即可用,否则提出queue.Empty
异常(在这种情况下,timeout 被忽略)。
-
get_nowait
()¶ 相当于
get(False)
。
multiprocessing.Queue
有一些在queue.Queue
中找不到的附加方法。这些方法通常不需要大多数代码:-
close
()¶ 指示当前进程不会在此队列上放置更多数据。一旦它将所有缓冲的数据刷新到管道,后台线程将退出。当队列被垃圾回收时,这被自动调用。
-
join_thread
()¶ 加入后台线程。这只能在调用
close()
后使用。它阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道。默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。该过程可以调用
cancel_join_thread()
使join_thread()
什么也不做。
-
cancel_join_thread
()¶ 防止
join_thread()
阻塞。特别地,这防止后台线程在进程退出时自动连接 - 参见join_thread()
。此方法的更好的名称可能是
allow_exit_without_flush()
。它可能导致入队的数据丢失,你几乎肯定不会需要使用它。它只是在那里,如果你需要当前的进程立即退出,而不等待刷新排队的数据到底层的管道,你不在乎丢失的数据。
注解
此类的功能需要在主机操作系统上运行共享信号量实现。没有一个,这个类中的功能将被禁用,并且尝试实例化
Queue
将导致ImportError
。有关其他信息,请参阅 issue 3770。这同样适用于下面列出的任何专门的队列类型。-
-
class
multiprocessing.
SimpleQueue
¶ 它是一种简化的
Queue
类型,非常接近锁定的Pipe
。-
empty
()¶ 如果队列为空,返回
True
,否则返回False
。
-
get
()¶ 从队列中删除并返回项目。
-
put
(item)¶ 将 item 放入队列。
-
-
class
multiprocessing.
JoinableQueue
([maxsize])¶ JoinableQueue
,Queue
子类,是另外具有task_done()
和join()
方法的队列。-
task_done
()¶ 指示以前入队的任务已完成。由队列使用者使用。对于用于获取任务的每个
get()
,对task_done()
的后续调用告诉队列任务上的处理完成。如果
join()
当前阻塞,则当所有项目都被处理时(意味着对于已经被put()
进入队列的每个项目接收到task_done()
呼叫),它将恢复。如果调用的次数比在队列中放置的项目多,则引发
ValueError
。
-
join
()¶ 阻塞,直到队列中的所有项目都被获取和处理。
每当项目添加到队列时,未完成任务的计数就会增加。当消费者调用
task_done()
以指示项目已检索并且其上的所有工作都完成时,计数下降。当未完成任务的计数下降到零时,join()
解除阻塞。
-
17.2.2.3. 杂¶
-
multiprocessing.
active_children
()¶ 返回当前进程的所有活的孩子的列表。
调用这会产生“加入”已经完成的任何进程的副作用。
-
multiprocessing.
cpu_count
()¶ 返回系统中的CPU数。
此数字不等于当前进程可以使用的CPU数。可以使用
len(os.sched_getaffinity(0))
获得可用的CPU数可能提出
NotImplementedError
。
-
multiprocessing.
freeze_support
()¶ 添加对使用
multiprocessing
的程序已冻结以生成Windows可执行文件的支持。 (已经用 py2exe,PyInstaller 和 cx_Freeze 测试。)需要在主模块的
if __name__ == '__main__'
线之后直接调用此函数。例如:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
如果省略
freeze_support()
行,则试图运行冻结的可执行文件将提高RuntimeError
。在Windows以外的任何操作系统上调用
freeze_support()
时,调用freeze_support()
无效。此外,如果模块在Windows上由Python解释器正常运行(程序尚未冻结),则freeze_support()
不起作用。
-
multiprocessing.
get_all_start_methods
()¶ 返回支持的开始方法的列表,其中第一个是默认值。可能的启动方法是
'fork'
,'spawn'
和'forkserver'
。在Windows上只有'spawn'
可用。在Unix上'fork'
和'spawn'
总是被支持,'fork'
是默认值。3.4 新版功能.
-
multiprocessing.
get_context
(method=None)¶ 返回一个与
multiprocessing
模块具有相同属性的上下文对象。如果 method 是
None
,则返回默认上下文。否则 method 应为'fork'
,'spawn'
,'forkserver'
。如果指定的启动方法不可用,则引发ValueError
。3.4 新版功能.
-
multiprocessing.
get_start_method
(allow_none=False)¶ 返回用于启动进程的start方法的名称。
如果start方法没有被修复,并且 allow_none 为false,那么start方法被固定为默认值,并返回名称。如果start方法未被修复,allow_none 为true,则返回
None
。返回值可以是
'fork'
,'spawn'
,'forkserver'
或None
。'fork'
是Unix上的默认值,而'spawn'
是Windows上的默认值。3.4 新版功能.
-
multiprocessing.
set_executable
()¶ 设置Python解释器在启动子进程时要使用的路径。 (默认使用
sys.executable
)。 Embedders可能需要做一些事情set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
才能创建子进程。
在 3.4 版更改: 现在在使用
'spawn'
启动方法时支持Unix。
-
multiprocessing.
set_start_method
(method)¶ 设置应用于启动子进程的方法。 method 可以是
'fork'
,'spawn'
或'forkserver'
。请注意,这应该最多调用一次,它应该在主模块的
if __name__ == '__main__'
子句中保护。3.4 新版功能.
17.2.2.4. 连接对象¶
连接对象允许发送和接收可拾取对象或字符串。它们可以被认为是面向消息的连接套接字。
连接对象通常使用 Pipe()
创建 - 另见 监听器和客户端。
-
class
multiprocessing.
Connection
¶ -
send
(obj)¶ 将对象发送到应使用
recv()
读取的连接的另一端。对象必须可拾取。非常大的腌汁(大约32 MB +,虽然它取决于操作系统)可能会引发
ValueError
异常。
-
fileno
()¶ 返回连接使用的文件描述符或句柄。
-
close
()¶ 关闭连接。
当连接被垃圾回收时,这被自动调用。
-
poll
([timeout])¶ 返回是否有任何数据可供读取。
如果没有指定 timeout,它将立即返回。如果 timeout 是数字,那么它指定阻止的最大时间(以秒为单位)。如果 timeout 是
None
,则使用无限超时。注意,可以使用
multiprocessing.connection.wait()
一次轮询多个连接对象。
-
send_bytes
(buffer[, offset[, size]])¶ 从 bytes-like object 发送字节数据作为完整消息。
如果给出 offset,则从 buffer 中的该位置读取数据。如果给出 size,那么将从缓冲器读取许多字节。非常大的缓冲区(大约32 MB +,虽然它取决于操作系统)可能会引发
ValueError
异常
-
recv_bytes
([maxlength])¶ 返回从连接的另一端发送的字节数据的完整消息作为字符串。阻塞直到有东西要接收。如果没有剩余的接收和另一端已关闭,则提升
EOFError
。如果指定了 maxlength 并且消息长于 maxlength,则会引发
OSError
,并且连接将不再可读。
-
recv_bytes_into
(buffer[, offset])¶ 从 buffer 读取从连接的另一端发送的字节数据的完整消息,并返回消息中的字节数。阻塞直到有东西要接收。如果没有什么剩下来接收,另一端被关闭,则提高
EOFError
。buffer 必须是可写的 bytes-like object。如果给定 offset,则消息将从该位置写入缓冲器。偏移量必须是小于 buffer 长度的非负整数(以字节为单位)。
如果缓冲区太短,则引发
BufferTooShort
异常,并且完整消息作为e.args[0]
可用,其中e
是异常实例。
在 3.3 版更改: 连接对象本身现在可以在使用
Connection.send()
和Connection.recv()
的进程之间传输。3.3 新版功能: 连接对象现在支持上下文管理协议 - 请参阅 上下文管理器类型。
__enter__()
返回连接对象,__exit__()
调用close()
。-
例如:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
Connection.recv()
方法会自动取消接收的数据,这可能是一种安全风险,除非您可以信任发送消息的进程。
因此,除非使用 Pipe()
生成连接对象,否则您只能在执行某种身份验证后才使用 recv()
和 send()
方法。见 验证密钥。
警告
如果进程在尝试读取或写入管道时被杀死,则管道中的数据可能会被破坏,因为可能无法确定消息边界位于何处。
17.2.2.5. 同步原语¶
通常,在多进程程序中同步原语不像在多线程程序中那样必要。请参阅 threading
模块的文档。
请注意,也可以使用管理器对象创建同步原语 - 请参阅 经理。
-
class
multiprocessing.
Barrier
(parties[, action[, timeout]])¶ 屏障对象:
threading.Barrier
的克隆。3.3 新版功能.
-
class
multiprocessing.
BoundedSemaphore
([value])¶ 有界信号对象:
threading.BoundedSemaphore
的近似模拟。与其近似模拟存在独立的差异:其
acquire
方法的第一个参数被命名为 block,与Lock.acquire()
一致。注解
在Mac OS X上,这与
Semaphore
是不可区分的,因为在该平台上没有实现sem_getvalue()
。
-
class
multiprocessing.
Condition
([lock])¶ 条件变量:
threading.Condition
的别名。如果指定了 lock,则它应该是来自
multiprocessing
的Lock
或RLock
对象。在 3.3 版更改: 添加
wait_for()
方法。
-
class
multiprocessing.
Event
¶ threading.Event
的克隆。
-
class
multiprocessing.
Lock
¶ 非递归锁定对象:
threading.Lock
的近似模拟。一旦进程或线程获得了锁,随后从任何进程或线程获取它的尝试将阻塞,直到它被释放;任何进程或线程都可以释放它。除非另有说明,否则threading.Lock
的概念和行为适用于线程在multiprocessing.Lock
中被复制,因为它适用于进程或线程。注意,
Lock
实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.Lock
实例。Lock
支持 context manager 协议,因此可以在with
语句中使用。-
acquire
(block=True, timeout=None)¶ 获取锁定,阻止或非阻止。
将 block 参数设置为
True
(默认值),方法调用将阻塞,直到锁处于未锁定状态,然后将其设置为锁定并返回True
。请注意,此第一个参数的名称与threading.Lock.acquire()
中的名称不同。将 block 参数设置为
False
,方法调用不会阻止。如果锁当前处于锁定状态,则返回False
;否则将锁设置为锁定状态并返回True
。当为 timeout 使用正值浮点值调用时,只要不能获取锁定,最多只能阻止 timeout 指定的秒数。对于 timeout 的负值的调用等于 timeout 为零。 timeout 值为
None
的调用(默认值)将超时周期设置为无限。注意,对 timeout 的负或None
值的处理不同于在threading.Lock.acquire()
中实现的行为。如果 block 参数设置为False
,则 timeout 参数没有实际意义,因此被忽略。如果已获取锁定则返回True
,如果已超过超时时间,则返回False
。
-
release
()¶ 释放锁。这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程。
行为与
threading.Lock.release()
中的相同,除了当在解锁的锁上被调用时,产生ValueError
。
-
-
class
multiprocessing.
RLock
¶ 递归锁对象:
threading.RLock
的一个接近的模拟。递归锁必须由获取它的进程或线程释放。一旦进程或线程已经获得递归锁,相同的进程或线程可以再次获取它而不阻塞;该进程或线程必须每次释放它一次它已被获取。注意,
RLock
实际上是一个工厂函数,它返回使用默认上下文初始化的multiprocessing.synchronize.RLock
实例。RLock
支持 context manager 协议,因此可以在with
语句中使用。-
acquire
(block=True, timeout=None)¶ 获取锁定,阻止或非阻止。
当使用设置为
True
的 block 参数调用时,阻塞直到锁处于未锁定状态(不由任何进程或线程拥有),除非锁已由当前进程或线程拥有。当前进程或线程然后获取锁的所有权(如果它还没有所有权),并且锁中的递归级别增加1,导致True
的返回值。注意,与threading.RLock.acquire()
的实现相比,第一个参数的行为有几个不同,从参数本身的名称开始。当使用设置为
False
的 block 参数调用时,不要阻止。如果锁已经被另一进程或线程获取(并且因此被拥有),则当前进程或线程不取得所有权,并且锁中的递归级别不改变,导致False
的返回值。如果锁处于解锁状态,则当前进程或线程取得所有权,并递归递归级别,导致返回值True
。timeout 参数的使用和行为与
Lock.acquire()
中的相同。注意,timeout 的这些行为中的一些不同于在threading.RLock.acquire()
中实现的行为。
-
release
()¶ 释放锁,递减递归级别。如果在递减之后递归级别为零,将锁重置为解锁(不由任何进程或线程拥有),并且如果任何其他进程或线程被阻塞等待锁被解锁,则只允许其中一个进程继续。如果递减后递归级别仍然为非零,则锁保持锁定并由调用进程或线程拥有。
只有在调用进程或线程拥有锁时才调用此方法。如果此方法由除所有者之外的进程或线程调用或如果锁处于未锁定(无主)状态,则会引发
AssertionError
。请注意,在这种情况下引发的异常类型与threading.RLock.release()
中实现的行为不同。
-
-
class
multiprocessing.
Semaphore
([value])¶ 信号量对象:
threading.Semaphore
的紧密模拟。与其近似模拟存在独立的差异:其
acquire
方法的第一个参数被命名为 block,与Lock.acquire()
一致。
注解
在Mac OS X上,sem_timedwait
不受支持,因此使用超时调用 acquire()
将使用睡眠循环来模拟该函数的行为。
注解
如果由 Ctrl-C
产生的SIGINT信号到达,而主线程被对 BoundedSemaphore.acquire()
,Lock.acquire()
,RLock.acquire()
,Semaphore.acquire()
,Condition.acquire()
或 Condition.wait()
的调用阻塞,则该调用将立即中断,并且将提高 KeyboardInterrupt
。
这与 threading
的行为不同,其中在等效阻塞调用正在进行时将忽略SIGINT。
注解
此程序包的某些功能需要在主机操作系统上运行共享信号量实现。没有一个,multiprocessing.synchronize
模块将被禁用,并且尝试导入它将导致 ImportError
。有关其他信息,请参阅 issue 3770。
17.2.2.7. 经理¶
管理器提供一种创建可在不同进程之间共享的数据的方法,包括在不同机器上运行的进程之间通过网络共享。管理器对象控制管理 共享对象 的服务器进程。其他进程可以通过使用代理访问共享对象。
返回已启动的
SyncManager
对象,可用于在进程之间共享对象。返回的管理器对象对应于生成的子进程,并具有将创建共享对象并返回相应代理的方法。
管理器进程将在垃圾收集或其父进程退出后立即关闭。管理器类在 multiprocessing.managers
模块中定义:
-
class
multiprocessing.managers.
BaseManager
([address[, authkey]])¶ 创建一个BaseManager对象。
一旦创建,应调用
start()
或get_server().serve_forever()
以确保管理器对象引用已启动的管理器进程。address 是管理器进程侦听新连接的地址。如果 address 是
None
,则选择任意一个。authkey 是将用于检查到服务器进程的传入连接的有效性的认证密钥。如果 authkey 是
None
,则使用current_process().authkey
。否则使用 authkey,它必须是字节字符串。-
start
([initializer[, initargs]])¶ 启动子过程以启动管理器。如果 initializer 不是
None
,则子进程在启动时将调用initializer(*initargs)
。
-
get_server
()¶ 返回一个
Server
对象,它表示在Manager控制下的实际服务器。Server
对象支持serve_forever()
方法:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
另外具有address
属性。
-
connect
()¶ 将本地管理器对象连接到远程管理器进程:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 5000), authkey=b'abc') >>> m.connect()
-
register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ 可以用于向管理器类注册类型或可调用的类方法。
typeid 是用于标识特定类型的共享对象的“类型标识符”。这必须是字符串。
callable 是用于为此类型标识符创建对象的可调用项。如果一个管理器实例将使用
connect()
方法连接到服务器,或者如果 create_method 参数是False
,那么这可以留作None
。proxytype 是
BaseProxy
的子类,用于使用此 typeid 创建共享对象的代理。如果None
,则自动创建代理类。exposed 用于指定应该允许使用
BaseProxy._callmethod()
访问此类型代理的方法名称序列。 (如果 exposed 是None
,则使用proxytype._exposed_
,如果存在)。在没有指定公开列表的情况下,共享对象的所有“公共方法”将是可访问的。 (这里的“公共方法”是指具有__call__()
方法并且其名称不以'_'
开头的任何属性。)method_to_typeid 是一个映射,用于指定应该返回代理的那些公开方法的返回类型。它将方法名映射到typeid字符串。 (如果 method_to_typeid 是
None
,则使用proxytype._method_to_typeid_
,如果存在)。如果方法的名称不是此映射的键,或者映射是None
,则方法返回的对象将按值进行复制。create_method 确定是否应该使用名称 typeid 创建一个方法,该方法可以用于告诉服务器进程创建一个新的共享对象并为其返回一个代理。默认为
True
。
BaseManager
实例也有一个只读属性:-
address
¶ 经理使用的地址。
在 3.3 版更改: Manager对象支持上下文管理协议 - 请参阅 上下文管理器类型。
__enter__()
启动服务器进程(如果它尚未启动),然后返回管理器对象。__exit__()
调用shutdown()
。在以前的版本中,如果尚未启动管理器的服务器进程,则
__enter__()
没有启动它。-
-
class
multiprocessing.managers.
SyncManager
¶ BaseManager
的子类,可用于进程的同步。这种类型的对象由multiprocessing.Manager()
返回。它的方法创建和返回 代理对象,用于许多常用的数据类型,以跨进程同步。这尤其包括共享列表和字典。
-
Barrier
(parties[, action[, timeout]])¶ 创建一个共享的
threading.Barrier
对象并为其返回一个代理。3.3 新版功能.
-
BoundedSemaphore
([value])¶ 创建一个共享的
threading.BoundedSemaphore
对象并为其返回一个代理。
-
Condition
([lock])¶ 创建一个共享的
threading.Condition
对象并为其返回一个代理。如果提供了 lock,那么它应该是
threading.Lock
或threading.RLock
对象的代理。在 3.3 版更改: 添加
wait_for()
方法。
-
Event
()¶ 创建一个共享的
threading.Event
对象并为其返回一个代理。
-
Lock
()¶ 创建一个共享的
threading.Lock
对象并为其返回一个代理。
-
Queue
([maxsize])¶ 创建一个共享的
queue.Queue
对象并为其返回一个代理。
-
RLock
()¶ 创建一个共享的
threading.RLock
对象并为其返回一个代理。
-
Semaphore
([value])¶ 创建一个共享的
threading.Semaphore
对象并为其返回一个代理。
-
Array
(typecode, sequence)¶ 创建一个数组并返回一个代理。
-
Value
(typecode, value)¶ 创建一个具有可写
value
属性的对象,并为其返回一个代理。
在 3.6 版更改: 共享对象能够嵌套。例如,诸如共享列表的共享容器对象可以包含将由
SyncManager
管理和同步的其他共享对象。-
-
class
multiprocessing.managers.
Namespace
¶ 可以在
SyncManager
注册的类型。命名空间对象没有公共方法,但有可写属性。其表示显示其属性的值。
但是,当为名称空间对象使用代理时,以
'_'
开头的属性将是代理的属性,而不是引用对象的属性:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
17.2.2.7.1. 定制经理¶
要创建自己的管理器,创建一个 BaseManager
的子类,并使用 register()
类方法向管理器类注册新类型或可调用。例如:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
17.2.2.7.2. 使用远程管理器¶
可以在一台计算机上运行管理器服务器,并让客户端从其他计算机使用它(假设涉及的防火墙允许它)。
运行以下命令为远程客户端可以访问的单个共享队列创建一个服务器:
>>> from multiprocessing.managers import BaseManager
>>> import queue
>>> queue = queue.Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
一个客户端可以访问服务器,如下所示:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
另一个客户端也可以使用它:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地进程也可以访问该队列,使用上面的代码在客户端上远程访问它:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
17.2.2.8. 代理对象¶
代理是 refers 到共享对象的对象,它(在大概)存在于不同的进程中。共享对象被称为代理的 referent。多个代理对象可以具有相同的指示。
代理对象具有调用其指示对象的相应方法的方法(尽管不是指示对象的每个方法都必须通过代理可用)。这样,可以使用代理,就像它的指示可以:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
注意,将 str()
应用于代理将返回指示对象的表示,而应用 repr()
将返回代理的表示。
代理对象的一个重要特征是它们是可拾取的,因此它们可以在进程之间传递。因此,指示物可以含有 代理对象。这允许嵌套这些管理列表,dicts和其他 代理对象:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
类似地,dict和list代理可以嵌套在一起:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
如果标准(非代理) list
或 dict
对象包含在引用中,那么对这些可变值的修改不会通过管理器传播,因为代理无法知道何时修改其中包含的值。然而,在容器代理(触发代理对象上的 __setitem__
)中存储值通过管理器传播,因此有效地修改这样的项目,可以将修改的值重新分配给容器代理:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
与大多数使用情况下使用嵌套 代理对象 相比,这种方法可能不太方便,但也显示了对同步的一个级别的控制。
注解
multiprocessing
中的代理类型不支持按值进行比较。因此,例如,我们有:
>>> manager.list([1,2,3]) == [1,2,3]
False
在进行比较时,应该只使用指示物的副本。
-
class
multiprocessing.managers.
BaseProxy
¶ 代理对象是
BaseProxy
的子类的实例。-
_callmethod
(methodname[, args[, kwds]])¶ 调用并返回代理引用对象的方法的结果。
如果
proxy
是指向是obj
的代理,则表达式proxy._callmethod(methodname, args, kwds)
将评估表达式
getattr(obj, methodname)(*args, **kwds)
在经理的过程中。
返回值将是调用结果的副本或新的共享对象的代理 - 请参阅
BaseManager.register()
的 method_to_typeid 参数的文档。如果调用引发异常,则由
_callmethod()
重新生成异常。如果在管理器的进程中引发一些其他异常,则这被转换为RemoteError
异常并且由_callmethod()
产生。特别注意,如果 methodname 没有 exposed,将会引发异常。
_callmethod()
的使用示例:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue
()¶ 返回指示对象的副本。
如果指示符不可取消,那么这将引发异常。
-
__repr__
()¶ 返回代理对象的表示。
-
__str__
()¶ 返回指示对象的表示。
-
17.2.2.9. 过程池¶
可以创建一个进程池,它将执行与 Pool
类提交的任务。
-
class
multiprocessing.pool.
Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶ 一个进程池对象,它控制可以提交作业的工作进程池。它支持具有超时和回调的异步结果,并具有并行映射实现。
processes 是要使用的工作进程的数量。如果 processes 是
None
,则使用os.cpu_count()
返回的数字。如果 initializer 不是
None
,则每个工作进程在启动时将调用initializer(*initargs)
。maxtasksperchild 是工作进程可以完成的任务数,在它将退出并被新的工作进程替换之前,可以释放未使用的资源。默认的 maxtasksperchild 是
None
,这意味着工作进程将与池一样长。context 可以用于指定用于启动工作进程的上下文。通常使用上下文对象的函数
multiprocessing.Pool()
或Pool()
方法创建池。在这两种情况下,context 都被适当设置。请注意,池对象的方法只应该由创建池的进程调用。
3.2 新版功能: maxtasksperchild
3.4 新版功能: context
注解
Pool
中的工作进程通常在池的工作队列的整个持续时间内生存。在其他系统(例如Apache,mod_wsgi等)中发现的用于释放由工作者拥有的资源的频繁模式是允许池中的工作者在退出,被清理和产生新进程之前完成一定量的工作以取代旧的。Pool
的 maxtasksperchild 参数向最终用户公开此功能。-
apply
(func[, args[, kwds]])¶ 使用参数 args 和关键字参数 kwds 调用 func。它阻塞,直到结果准备好。给定这个块,
apply_async()
更好地适于并行执行工作。此外,func 仅在池的工人之一中执行。
-
apply_async
(func[, args[, kwds[, callback[, error_callback]]]])¶ 返回结果对象的
apply()
方法的变体。如果指定 callback 那么它应该是一个可调用它接受一个参数。当结果变为就绪时,callback 应用于它,这是除非调用失败,在这种情况下应用 error_callback。
如果指定 error_callback 那么它应该是一个可调用它接受一个参数。如果目标函数失败,则使用异常实例调用 error_callback。
回调应该立即完成,否则处理结果的线程将被阻塞。
-
map
(func, iterable[, chunksize])¶ map()
内置函数的并行等效(它只支持一个 iterable 参数)。它阻塞,直到结果准备好。此方法将iterable切成多个块,将其作为单独的任务提交到进程池。这些块的(近似)大小可以通过将 chunksize 设置为正整数来指定。
-
map_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ 返回结果对象的
map()
方法的变体。如果指定 callback 那么它应该是一个可调用它接受一个参数。当结果变为就绪时,callback 应用于它,这是除非调用失败,在这种情况下应用 error_callback。
如果指定 error_callback 那么它应该是一个可调用它接受一个参数。如果目标函数失败,则使用异常实例调用 error_callback。
回调应该立即完成,否则处理结果的线程将被阻塞。
-
imap
(func, iterable[, chunksize])¶ map()
的lazier版本。chunksize 参数与
map()
方法使用的参数相同。对于非常长的迭代,对 chunksize 使用大的值可以使作业完成 许多 比使用默认值1
更快。此外,如果 chunksize 是
1
,则imap()
方法返回的迭代器的next()
方法具有可选的 timeout 参数:如果结果在 timeout 秒内不能返回,则next(timeout)
将提高multiprocessing.TimeoutError
。
-
imap_unordered
(func, iterable[, chunksize])¶ 与
imap()
相同,除了返回的迭代器的结果的顺序应该被认为是任意的。 (只有当只有一个工作进程时,该命令保证是“正确的”。)
-
starmap
(func, iterable[, chunksize])¶ 像
map()
,除了 iterable 的元素被期望是被解包作为参数的迭代。因此,
[(1,2), (3, 4)]
的 iterable 导致[func(1,2), func(3,4)]
。3.3 新版功能.
-
starmap_async
(func, iterable[, chunksize[, callback[, error_back]]])¶ starmap()
和map_async()
的组合,它迭代可迭代的 iterable,并调用 func 与可解迭的iterable。返回结果对象。3.3 新版功能.
-
close
()¶ 防止任何其他任务提交到池。一旦所有任务完成,工作进程将退出。
-
terminate
()¶ 立即停止工作进程,而不完成未完成的工作。当池对象被垃圾回收时,
terminate()
将被立即调用。
-
join
()¶ 等待工作进程退出。在使用
join()
之前必须调用close()
或terminate()
。
3.3 新版功能: 池对象现在支持上下文管理协议 - 请参阅 上下文管理器类型。
__enter__()
返回池对象,__exit__()
调用terminate()
。-
-
class
multiprocessing.pool.
AsyncResult
¶ Pool.apply_async()
和Pool.map_async()
返回的结果的类。-
get
([timeout])¶ 返回结果到达时。如果 timeout 不是
None
并且结果在 timeout 秒内没有到达,则multiprocessing.TimeoutError
被引发。如果远程调用引发了异常,那么该异常将由get()
重新审核。
-
wait
([timeout])¶ 等待结果可用或直到 timeout 秒过去。
-
ready
()¶ 返回呼叫是否已完成。
-
successful
()¶ 返回调用是否完成,而不引发异常。如果结果没有准备就会提高
AssertionError
。
-
以下示例演示了池的使用:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
17.2.2.10. 监听器和客户端¶
通常,进程之间的消息传递使用队列或通过使用 Pipe()
返回的 Connection
对象来完成。
然而,multiprocessing.connection
模块允许一些额外的灵活性。它基本上提供了一个高级的面向消息的API来处理套接字或Windows命名管道。它还支持 摘要认证 使用 hmac
模块,并且同时轮询多个连接。
-
multiprocessing.connection.
deliver_challenge
(connection, authkey)¶ 将随机生成的消息发送到连接的另一端,并等待回复。
如果回复与使用 authkey 作为键的消息摘要匹配,则将欢迎消息发送到连接的另一端。否则引发
AuthenticationError
。
-
multiprocessing.connection.
answer_challenge
(connection, authkey)¶ 接收消息,使用 authkey 作为关键字计算消息的摘要,然后发送摘要。
如果没有接收到欢迎消息,则
AuthenticationError
被引发。
-
multiprocessing.connection.
Client
(address[, family[, authenticate[, authkey]]])¶ 尝试设置到使用地址 address 的侦听器的连接,返回
Connection
。连接的类型由 family 参数确定,但这通常可以省略,因为它通常可以从 address 的格式推断。 (见 地址格式)
如果 authenticate 是
True
或 authkey 是字节字符串,则使用摘要认证。如果 authkey 是None
,用于认证的密钥将是 authkey 或current_process().authkey
。如果认证失败,则提高AuthenticationError
。见 验证密钥。
-
class
multiprocessing.connection.
Listener
([address[, family[, backlog[, authenticate[, authkey]]]]])¶ 绑定套接字或Windows命名管道的包装器,正在侦听连接。
address 是要由侦听器对象的绑定套接字或命名管道使用的地址。
注解
如果使用地址“0.0.0.0”,则该地址将不是Windows上的可连接终点。如果需要可连接的端点,则应使用“127.0.0.1”。
family 是要使用的套接字(或命名管道)的类型。这可以是字符串
'AF_INET'
(对于TCP套接字),'AF_UNIX'
(对于Unix域套接字)或'AF_PIPE'
(对于Windows命名管道)之一。其中只有第一个保证可用。如果 family 是None
,则该族从 address 的格式推断。如果 address 也是None
,则选择默认值。此默认值是假定为最快可用的系列。见 地址格式。注意,如果 family 是'AF_UNIX'
,地址是None
,那么套接字将在使用tempfile.mkstemp()
创建的私有临时目录中创建。如果侦听器对象使用套接字,则 backlog (默认为1)一旦被绑定就传递给套接字的
listen()
方法。如果 authenticate 是
True
(默认为False
)或 authkey 不是None
,则使用摘要认证。如果 authkey 是字节字符串,则它将被用作认证密钥;否则必须是
None
。如果 authkey 是
None
,authenticate 是True
,则使用current_process().authkey
作为认证密钥。如果 authkey 是None
并且 authenticate 是False
,则不进行认证。如果认证失败,则AuthenticationError
升高。见 验证密钥。-
accept
()¶ 在侦听器对象的绑定套接字或命名管道上接受连接并返回
Connection
对象。如果尝试认证并且失败,则引发AuthenticationError
。
-
close
()¶ 关闭侦听器对象的绑定套接字或命名管道。当监听器被垃圾回收时,这被自动调用。但是建议明确调用它。
侦听器对象具有以下只读属性:
-
address
¶ Listener对象正在使用的地址。
-
last_accepted
¶ 最后接受的连接来自的地址。如果这不可用,那么它是
None
。
3.3 新版功能: 监听器对象现在支持上下文管理协议 - 请参阅 上下文管理器类型。
__enter__()
返回侦听器对象,__exit__()
调用close()
。-
-
multiprocessing.connection.
wait
(object_list, timeout=None)¶ 等待 object_list 中的对象准备就绪。返回 object_list 中已准备好的对象的列表。如果 timeout 是一个浮点数,那么调用最多会阻塞这么多秒。如果 timeout 是
None
,那么它将阻塞无限期。负超时相当于零超时。对于Unix和Windows,如果是 object_list,对象可以出现在 object_list 中
可读的
Connection
对象;连接且可读的
socket.socket
对象;要么
当有可用于从其读取数据或另一端已关闭时,连接或套接字对象就绪就绪。
Unix:
wait(object_list, timeout)
几乎相当的select.select(object_list, [], [], timeout)
。区别在于,如果select.select()
被信号中断,则它可以以EINTR
的错误号提高OSError
,而wait()
不会。视窗:object_list 中的项必须是可等待的整数句柄(根据Win32函数
WaitForMultipleObjects()
的文档所使用的定义),或者它可以是带有fileno()
方法的对象,该方法返回套接字句柄或管道句柄。 (注意,管道手柄和插座手柄是 不 等待手柄。)3.3 新版功能.
例子
以下服务器代码创建使用 'secret password'
作为认证密钥的侦听器。然后它等待连接并向客户端发送一些数据:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
以下代码连接到服务器并从服务器接收一些数据:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
以下代码使用 wait()
一次等待来自多个进程的消息:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
17.2.2.11. 验证密钥¶
当使用 Connection.recv
时,接收的数据将自动取消。不幸的是,从不受信任的源解压数据是一种安全风险。因此 Listener
和 Client()
使用 hmac
模块提供摘要认证。
认证密钥是可以被认为是密码的字节字符串:一旦建立连接,两端将要求证明另一方知道认证密钥。 (演示两端都使用相同的密钥,不 涉及通过连接发送密钥。)
如果请求认证但未指定认证密钥,则使用 current_process().authkey
的返回值(请参阅 Process
)。此值将由当前进程创建的任何 Process
对象自动继承。这意味着(默认地)多进程程序的所有进程将共享单个认证密钥,当在它们之间建立连接时可以使用该认证密钥。
还可以通过使用 os.urandom()
来生成合适的认证密钥。
17.2.2.12. 记录¶
一些支持日志记录可用。然而,请注意,logging
包不使用进程共享锁,因此可能(取决于处理程序类型)来自不同进程的消息混淆。
-
multiprocessing.
get_logger
()¶ 返回
multiprocessing
使用的记录器。如果需要,将创建一个新的。当第一次创建记录器具有级别
logging.NOTSET
和没有默认处理程序。发送到此记录器的消息将不会默认传播到根记录器。请注意,在Windows子进程将只继承父进程记录器的级别 - 记录器的任何其他定制将不会继承。
-
multiprocessing.
log_to_stderr
()¶ 此函数执行对
get_logger()
的调用,但除了返回由get_logger创建的记录器之外,它还添加了一个处理程序,该处理程序使用格式'[%(levelname)s/%(processName)s] %(message)s'
将输出发送到sys.stderr
。
以下是启用日志记录的示例会话:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
有关日志记录级别的完整表,请参阅 logging
模块。
17.2.2.13. multiprocessing.dummy
模块¶
multiprocessing.dummy
复制 multiprocessing
的API,但只不过是 threading
模块的一个包装器。
17.2.3. 编程指南¶
使用 multiprocessing
时应遵守一定的准则和惯用语。
17.2.3.1. 所有星级方法¶
以下适用于所有启动方法。
避免共享状态
尽可能地,应该尽量避免在进程之间转移大量的数据。
最好坚持使用队列或管道进行进程之间的通信,而不是使用较低级别的同步原语。
可取性
确保代理方法的参数是可拾取的。
线程安全代理
不要使用来自多个线程的代理对象,除非您使用锁来保护它。
(使用 same 代理的不同进程从不会有问题。)
加入僵尸进程
在Unix上,当一个进程完成但没有加入时,它变成一个僵尸。应该永远不会有很多,因为每次一个新的进程开始(或
active_children()
被调用)所有尚未加入的完成的进程将被加入。也调用完成的进程的Process.is_alive
将加入进程。即使这样,显然加入您开始的所有进程也许是一个好习惯。
更好地继承比pickle/unpickle
当使用 spawn 或 forkserver 启动方法时,
multiprocessing
中的许多类型都需要可选,以便子进程可以使用它们。但是,通常应避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问其他地方创建的共享资源的进程可以从祖先进程继承它。
避免终止进程
使用
Process.terminate
方法停止进程可能导致进程当前正在使用的任何共享资源(例如锁,信号量,管道和队列)被破坏或对其他进程不可用。因此,最好只考虑在不使用任何共享资源的进程上使用
Process.terminate
。
加入使用队列的进程
请记住,将项目置于队列中的进程将在终止之前等待,直到所有缓冲的项目由“馈线”线程馈送到基础管道。 (子进程可以调用队列的
Queue.cancel_join_thread
方法,以避免此行为。)这意味着,无论何时使用队列,您都需要确保已经放在队列中的所有项目最终都会在加入流程之前被删除。否则,您不能确定已将项目放入队列的进程将终止。还要记住,非守护进程将自动加入。
将死锁的示例如下:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()这里的修复将是交换最后两行(或简单地删除
p.join()
行)。
将资源显式传递给子进程
在Unix上使用 fork 启动方法,子进程可以使用在父进程中使用全局资源创建的共享资源。但是,最好将该对象作为参数传递给子进程的构造函数。
除了使代码(潜在地)兼容Windows和其他启动方法,这还确保只要子进程仍然活着,对象不会在父进程中被垃圾收集。如果在父进程中对对象进行垃圾回收时释放某些资源,这可能很重要。
例如
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()应重写为
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
注意用“文件类对象”替换 sys.stdin
,
multiprocessing
最初无条件调用:os.close(sys.stdin.fileno())在
multiprocessing.Process._bootstrap()
方法中—这导致了进程中的进程的问题。这已更改为:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)这解决了进程彼此冲突的根本问题,导致错误的文件描述符错误,但是对于用具有输出缓冲的“文件状对象”替换
sys.stdin()
的应用程序引入潜在的危险。这个危险是,如果多个进程在这个类文件对象上调用close()
,它可能导致相同的数据被多次刷新到对象,导致损坏。如果你写一个类文件对象并实现你自己的缓存,你可以通过存储pid每当你追加到缓存,并在pid更改时丢弃缓存,使它叉安全。例如:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache有关更多信息,请参阅 issue 5155,issue 5313 和 issue 5331
17.2.3.2. spawn 和 forkserver 启动方法¶
有一些额外的限制不适用于 fork 启动方法。
更多picklability
确保
Process.__init__()
的所有参数都是可拾取的。此外,如果你子类化Process
,那么确保在调用Process.start
方法时实例将是可拾取的。
全局变量
请记住,如果在子进程中运行的代码尝试访问全局变量,那么它看到的值(如果有)可能与调用
Process.start
时父进程中的值不同。但是,只是模块级常量的全局变量不会导致问题。
安全导入主模块
确保主模块可以通过新的Python解释器安全导入,而不会导致意外的副作用(如启动一个新进程)。
例如,使用 spawn 或 forkserver 启动方法运行以下模块将失败,并使用
RuntimeError
:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()相反,应该通过使用
if __name__ == '__main__':
如下保护程序的“入口点”:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(如果程序将正常运行而不是冻结,则可以省略
freeze_support()
行。)这允许新生成的Python解释器安全地导入模块,然后运行模块的
foo()
函数。如果在主模块中创建了池或管理器,则应用类似的限制。
17.2.4. 例子¶
演示如何创建和使用自定义管理器和代理:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
使用 Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
显示如何使用队列将任务提供给工作进程集合并收集结果的示例:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()