multiprocessing.Pool:map_async 和 imap 有什么区别?
- 2025-03-10 08:52:00
- admin 原创
- 60
问题描述:
我正在尝试学习如何使用 Python 的multiprocessing
包,但我不明白map_async
和之间的区别imap
。我注意到map_async
和都是imap
异步执行的。那么我什么时候应该使用其中一个而不是另一个呢?我应该如何检索返回的结果map_async
?
我应该使用这样的东西吗?
def test():
result = pool.map_async()
pool.close()
pool.join()
return result.get()
result=test()
for i in result:
print i
解决方案 1:
imap
/imap_unordered
和map
/之间有两个主要区别map_async
:
它们使用您传递给它们的可迭代对象的方式。
他们将结果返回给您的方式。
map
通过将可迭代对象转换为列表(假设它还不是列表),将其拆分成块,然后将这些块发送到 中的工作进程来使用可迭代Pool
对象。将可迭代对象拆分成块比在进程之间一次传递可迭代对象中的每个项目的性能更好 - 特别是当可迭代对象很大时。但是,将可迭代对象转换为列表以对其进行分块可能会产生非常高的内存成本,因为整个列表都需要保存在内存中。
imap
不会将您提供的可迭代对象转换为列表,也不会将其拆分成块(默认情况下)。它将一次迭代一个元素,并将它们分别发送给工作进程。这意味着您不会因为将整个可迭代对象转换为列表而占用内存,但这也意味着由于缺乏分块,大型可迭代对象的性能会更慢。chunksize
不过,可以通过传递大于默认值 1 的参数来缓解这种情况。
imap
/imap_unordered
和map
/之间的另一个主要区别map_async
是,使用imap
/ imap_unordered
,您可以在工作器准备就绪后立即开始接收它们的结果,而不必等待所有工作器完成。使用map_async
,AsyncResult
会立即返回,但您实际上无法从该对象中检索结果,直到所有结果都已处理完毕,此时它返回与 相同的列表map
(map
实际上在内部实现为map_async(...).get()
)。没有办法获得部分结果;您要么拥有整个结果,要么一无所获。
imap
和imap_unordered
都立即返回可迭代对象。使用imap
,结果将在可迭代对象准备好后立即产生,同时仍保留输入可迭代对象的顺序。使用imap_unordered
,结果将在可迭代对象准备好后立即产生,无论输入可迭代对象的顺序如何。因此,假设您有这个:
import multiprocessing
import time
def func(x):
time.sleep(x)
return x + 2
if __name__ == "__main__":
p = multiprocessing.Pool()
start = time.time()
for x in p.imap(func, [1,5,3]):
print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))
这将输出:
3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)
如果你使用p.imap_unordered
而不是p.imap
,你会看到:
3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)
如果你使用p.map
或p.map_async().get()
,你会看到:
3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)
imap
因此,使用/imap_unordered
的主要原因map_async
是:
您的可迭代对象足够大,将其转换为列表会导致耗尽或使用过多的内存。
您希望能够在所有结果完成之前开始处理结果。
解决方案 2:
可接受的答案指出imap_unordered
“结果将在准备好后立即产生”,人们可能推断结果将按完成顺序返回。但我只想澄清,这通常不是真的。文档指出结果是按任意顺序返回的。考虑以下程序,它使用池大小为 4、可迭代大小为 20 和块大小值为 5。工作函数根据其传递的参数休眠可变的时间,这也确保池中没有一个进程抓取所有提交的任务。因此,我希望池中的每个进程都有要20 / 4 = 5
处理的任务:
from multiprocessing import Pool
import time
def worker(x):
print(f'x = {x}', flush=True)
time.sleep(.1 * (20 - x))
# return approximate completion time with passed argument:
return time.time(), x
if __name__ == '__main__':
pool = Pool(4)
results = pool.imap_unordered(worker, range(20), chunksize=5)
for t, x in results:
print('result:', t, x)
印刷:
x = 0
x = 5
x = 10
x = 15
x = 16
x = 17
x = 11
x = 18
x = 19
x = 6
result: 1621512513.7737606 15
result: 1621512514.1747007 16
result: 1621512514.4758775 17
result: 1621512514.675989 18
result: 1621512514.7766125 19
x = 12
x = 1
x = 13
x = 7
x = 14
x = 2
result: 1621512514.2716103 10
result: 1621512515.1721854 11
result: 1621512515.9727488 12
result: 1621512516.6744206 13
result: 1621512517.276999 14
x = 8
x = 9
x = 3
result: 1621512514.7695887 5
result: 1621512516.170747 6
result: 1621512517.4713914 7
result: 1621512518.6734042 8
result: 1621512519.7743165 9
x = 4
result: 1621512515.268784 0
result: 1621512517.1698637 1
result: 1621512518.9698756 2
result: 1621512520.671273 3
result: 1621512522.2716706 4
您可以清楚地看到,这些结果不是按完成顺序生成的。例如,我已返回,1621512519.7743165 9
然后是1621512515.268784 0
,该结果由工作函数返回,比之前返回的结果早 4 秒以上。但是,如果我将chunksize值更改为 1,则打印输出将变成:
x = 0
x = 1
x = 2
x = 3
x = 4
result: 1621513028.888357 3
x = 5
result: 1621513028.9863524 2
x = 6
result: 1621513029.0838938 1
x = 7
result: 1621513029.1825204 0
x = 8
result: 1621513030.4842813 7
x = 9
result: 1621513030.4852195 6
x = 10
result: 1621513030.4872172 5
x = 11
result: 1621513030.4892178 4
x = 12
result: 1621513031.3908074 11
x = 13
result: 1621513031.4895358 10
x = 14
result: 1621513031.587289 9
x = 15
result: 1621513031.686152 8
x = 16
result: 1621513032.1877549 15
x = 17
result: 1621513032.1896958 14
x = 18
result: 1621513032.1923752 13
x = 19
result: 1621513032.1923752 12
result: 1621513032.2935638 19
result: 1621513032.3927407 18
result: 1621513032.4912949 17
result: 1621513032.5884912 16
这是按完成顺序进行的。但是,我犹豫着是否要声明如果将块大小值指定为 1,则imap_unordered
始终会在结果可用时返回结果,尽管根据此实验似乎确实如此,因为文档中没有这样声明。
讨论
当指定块大小为 5 时,20 个任务将放在单个输入队列中,由池中的 4 个进程以大小为 5 的块进行处理。因此,空闲的进程将从队列中取出下一个 5 个任务块,并依次处理每个任务,然后再次空闲。因此,第一个进程将处理x
参数 0 到 4,第二个进程x
处理参数 5 到 9,等等。这就是您看到打印的初始x
值为 0、5、10 和 15 的原因。
但是,虽然参数 0 的结果在参数 9 的x
结果之前完成x
,但看起来结果会作为块一起写出,因此x
参数 0 的结果将不会返回,直到x
在同一块中排队的参数(即 1、2、3 和 4)的结果也可用时为止。
扫码咨询,免费领取项目管理大礼包!