多处理:如何在类中定义的函数上使用 Pool.map?
- 2025-03-04 08:24:00
- admin 原创
- 90
问题描述:
当我运行类似这样的操作时:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
它工作正常。但是,将其作为类的函数:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
出现以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
我见过 Alex Martelli 发表的一篇文章,讨论同样的问题,但不够明确。
解决方案 1:
我无法使用迄今为止发布的代码,因为使用“multiprocessing.Pool”的代码不适用于 lambda 表达式,而未使用“multiprocessing.Pool”的代码会产生与工作项一样多的进程。
我修改了代码,使其生成预定义数量的工作程序,并且仅在存在空闲工作程序时才遍历输入列表。我还为工作程序启用了“守护进程”模式,因此 ctrl-c 可以按预期工作。
import multiprocessing
def fun(f, q_in, q_out):
while True:
i, x = q_in.get()
if i is None:
break
q_out.put((i, f(x)))
def parmap(f, X, nprocs=multiprocessing.cpu_count()):
q_in = multiprocessing.Queue(1)
q_out = multiprocessing.Queue()
proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
for _ in range(nprocs)]
for p in proc:
p.daemon = True
p.start()
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[q_in.put((None, None)) for _ in range(nprocs)]
res = [q_out.get() for _ in range(len(sent))]
[p.join() for p in proc]
return [x for i, x in sorted(res)]
if __name__ == '__main__':
print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
解决方案 2:
除非您跳出标准库,否则多处理和 pickling 将会被破坏且受到限制。
multiprocessing
如果使用called的分支pathos.multiprocesssing
,则可以在多处理map
函数中直接使用类和类方法。这是因为dill
使用了 而不是pickle
或cPickle
,并且dill
可以序列化 Python 中的几乎任何内容。
pathos.multiprocessing
还提供了异步映射函数……并且可以map
用多个参数来运行(例如map(math.pow, [1,2,3], [4,5,6])
)
查看讨论:
多处理和 dill 可以一起做什么?
以及:
http ://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
它甚至可以处理您最初编写的代码(无需修改)以及来自解释器的代码。 为什么要做其他更脆弱且特定于单个案例的事情呢?
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
... def run(self):
... def f(x):
... return x*x
... p = Pool()
... return p.map(f, [1,2,3])
...
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]
在此处获取代码:
https://github.com/uqfoundation/pathos
下面,我们来展示一下它还能做些什么:
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> p = Pool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
解决方案 3:
我还对pool.map可以接受的函数类型的限制感到烦恼。我写了下面的代码来解决这个问题。它似乎有效,即使对于parmap的递归使用也是如此。
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe, x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f, X):
pipe = [Pipe() for x in X]
proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p, c) in pipe]
if __name__ == '__main__':
print parmap(lambda x: x**x, range(1, 5))
解决方案 4:
据我所知,目前您的问题没有解决方案:您提供的函数map()
必须通过导入模块才能访问。这就是 robert 的代码有效的原因:f()
可以通过导入以下代码来获取该函数:
def f(x):
return x*x
class Calculate(object):
def run(self):
p = Pool()
return p.map(f, [1,2,3])
if __name__ == '__main__':
cl = Calculate()
print cl.run()
我实际上添加了一个“main”部分,因为这遵循了Windows 平台的建议(“确保主模块可以被新的 Python 解释器安全地导入,而不会导致意外的副作用”)。
我还在 前面添加了一个大写字母Calculate
,以符合PEP 8。:)
解决方案 5:
mrule 的解决方案是正确的,但是有一个错误:如果子进程发回大量数据,它可能会填满管道的缓冲区,从而阻塞在子进程的 上pipe.send()
,而父进程则在 上等待子进程退出pipe.join()
。解决方案是在读取子进程之前读取子进程的数据join()
。此外,子进程应关闭父进程的管道末端以防止死锁。以下代码修复了这个问题。还请注意,这parmap
会为 中的每个元素创建一个进程X
。更高级的解决方案是使用multiprocessing.cpu_count()
分成X
多个块,然后在返回之前合并结果。我把这个留给读者作为练习,以免破坏 mrule 的简洁性。;)
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(ppipe, cpipe,x):
ppipe.close()
cpipe.send(f(x))
cpipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
[p.start() for p in proc]
ret = [p.recv() for (p,c) in pipe]
[p.join() for p in proc]
return ret
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
解决方案 6:
我也为此苦苦挣扎。我将函数作为类的数据成员,作为一个简单的示例:
from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# Needed to do something like this (the following line won't work)
return pool.map(self.f,list1,list2)
我需要在同一个类中的 Pool.map() 调用中使用函数 self.f,而 self.f 不接受元组作为参数。由于此函数嵌入在类中,因此我不清楚如何编写其他答案建议的包装器类型。
我通过使用另一个包装器解决了这个问题,该包装器采用元组/列表,其中第一个元素是函数,其余元素是该函数的参数,称为 eval_func_tuple(f_args)。使用这个,有问题的行可以用 return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2)) 替换。以下是完整代码:
文件:util.py
def add(a, b): return a+b
def eval_func_tuple(f_args):
"""Takes a tuple of a function and args, evaluates and returns result"""
return f_args[0](*f_args[1:])
文件:main.py
from multiprocessing import Pool
import itertools
import util
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# The following line will now work
return pool.map(util.eval_func_tuple,
itertools.izip(itertools.repeat(self.f), list1, list2))
if __name__ == '__main__':
myExample = Example(util.add)
list1 = [1, 2, 3]
list2 = [10, 20, 30]
print myExample.add_lists(list1, list2)
运行 main.py 将得到 [11, 22, 33]。您可以随意改进它,例如,eval_func_tuple 也可以修改为接受关键字参数。
另外,在另一个答案中,当进程数多于可用 CPU 数时,函数“parmap”可以变得更高效。我在下面复制了一个编辑后的版本。这是我的第一篇文章,我不确定是否应该直接编辑原始答案。我还重命名了一些变量。
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
numProcesses = len(processes)
processNum = 0
outputList = []
while processNum < numProcesses:
endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)
for proc in processes[processNum:endProcessNum]:
proc.start()
for proc in processes[processNum:endProcessNum]:
proc.join()
for proc,c in pipe[processNum:endProcessNum]:
outputList.append(proc.recv())
processNum = endProcessNum
return outputList
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
解决方案 7:
我知道这个问题是8年10个月前提出的,但我想向您介绍我的解决方案:
from multiprocessing import Pool
class Test:
def __init__(self):
self.main()
@staticmethod
def methodForMultiprocessing(x):
print(x*x)
def main(self):
if __name__ == "__main__":
p = Pool()
p.map(Test.methodForMultiprocessing, list(range(1, 11)))
p.close()
TestObject = Test()
你只需要将类函数变成静态方法即可。但使用类方法也是可行的:
from multiprocessing import Pool
class Test:
def __init__(self):
self.main()
@classmethod
def methodForMultiprocessing(cls, x):
print(x*x)
def main(self):
if __name__ == "__main__":
p = Pool()
p.map(Test.methodForMultiprocessing, list(range(1, 11)))
p.close()
TestObject = Test()
在 Python 3.7.3 中测试
解决方案 8:
我知道这个问题已经问了 6 年多了,但我只是想添加我的解决方案,因为上面的一些建议看起来非常复杂,但我的解决方案实际上非常简单。
我所要做的就是将pool.map()调用包装到一个辅助函数中。将类对象与方法的参数一起作为元组传递,看起来有点像这样。
def run_in_parallel(args):
return args[0].method(args[1])
myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)
解决方案 9:
我采纳了 klaus se 和 aganders3 的答案,并制作了一个更易读且保存在一个文件中的文档模块。您可以将其添加到您的项目中。它甚至有一个可选的进度条!
"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.
Adapted from http://stackoverflow.com/a/16071616/287297
Example usage:
print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)
Comments:
"It spawns a predefined amount of workers and only iterates through the input list
if there exists an idle worker. I also enabled the "daemon" mode for the workers so
that KeyboardInterupt works as expected."
Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.
Alternatively, use this fork of multiprocessing:
https://github.com/uqfoundation/multiprocess
"""
# Modules #
import multiprocessing
from tqdm import tqdm
################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
while not queue_in.empty():
num, obj = queue_in.get()
queue_out.put((num, func_to_apply(obj)))
################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
# Number of processes to use #
if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
# Create queues #
q_in = multiprocessing.Queue()
q_out = multiprocessing.Queue()
# Process list #
new_proc = lambda t,a: multiprocessing.Process(target=t, args=a)
processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
# Put all the items (objects) in the queue #
sent = [q_in.put((i, x)) for i, x in enumerate(items)]
# Start them all #
for proc in processes:
proc.daemon = True
proc.start()
# Display progress bar or not #
if verbose:
results = [q_out.get() for x in tqdm(range(len(sent)))]
else:
results = [q_out.get() for x in range(len(sent))]
# Wait for them to finish #
for proc in processes: proc.join()
# Return results #
return [x for i, x in sorted(results)]
################################################################################
def test():
def slow_square(x):
import time
time.sleep(2)
return x**2
objs = range(20)
squares = prll_map(slow_square, objs, 4, verbose=True)
print "Result: %s" % squares
编辑:添加了@alexander-mcfarlane 建议和测试功能
解决方案 10:
在类中定义的函数(甚至在类中的函数内)实际上不会进行 pickle。但是,这有效:
def f(x):
return x*x
class calculate(object):
def run(self):
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
解决方案 11:
我修改了 klaus se 的方法,因为虽然它适用于小列表,但当项目数量达到约 1000 或更多时,它就会挂起。None
我没有使用停止条件一次推送一个作业,而是一次性加载输入队列,让进程不断处理它,直到队列为空。
from multiprocessing import cpu_count, Queue, Process
def apply_func(f, q_in, q_out):
while not q_in.empty():
i, x = q_in.get()
q_out.put((i, f(x)))
# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
q_in, q_out = Queue(), Queue()
proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[p.start() for p in proc]
res = [q_out.get() for _ in sent]
[p.join() for p in proc]
return [x for i,x in sorted(res)]
编辑:不幸的是现在我在我的系统上遇到了这个错误:多处理队列最大大小限制为 32767,希望那里的解决方法能够有所帮助。
解决方案 12:
这是我的解决方案,我认为它比这里的大多数解决方案都更简洁。它与 nightowl 的答案类似。
someclasses = [MyClass(), MyClass(), MyClass()]
def method_caller(some_object, some_method='the method'):
return getattr(some_object, some_method)()
othermethod = partial(method_caller, some_method='othermethod')
with Pool(6) as pool:
result = pool.map(othermethod, someclasses)
解决方案 13:
Pool
如果您以某种方式手动忽略类中对象列表中的对象,则可以毫无问题地运行代码,因为它无法pickle
像错误所说的那样运行。您可以使用__getstate__
函数(也请查看此处)执行此操作,如下所示。Pool
对象将尝试查找__getstate__
和__setstate__
函数,如果在您运行等时找到它们map
,则执行它们map_async
:
class calculate(object):
def __init__(self):
self.p = Pool()
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['p']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
def f(self, x):
return x*x
def run(self):
return self.p.map(self.f, [1,2,3])
然后执行以下操作:
cl = calculate()
cl.run()
将会给出输出:
[1, 4, 9]
我已经在 Python 3.x 中测试了上述代码并且它可以运行。
解决方案 14:
这可能不是一个很好的解决方案,但就我而言,我是这样解决的。
from multiprocessing import Pool
def foo1(data):
self = data.get('slf')
lst = data.get('lst')
return sum(lst) + self.foo2()
class Foo(object):
def __init__(self, a, b):
self.a = a
self.b = b
def foo2(self):
return self.a**self.b
def foo(self):
p = Pool(5)
lst = [1, 2, 3]
result = p.map(foo1, (dict(slf=self, lst=lst),))
return result
if __name__ == '__main__':
print(Foo(2, 4).foo())
我必须将其传递self
给我的函数,因为我必须通过该函数访问我的类的属性和函数。这对我来说很有效。欢迎提出更正和建议。
解决方案 15:
这是我为在 python3 中使用多处理池编写的样板,具体来说,python3.7.7 用于运行测试。我使用 获得了最快的运行速度imap_unordered
。只需插入您的场景并尝试一下。您可以使用timeit
或 只是time.time()
为了找出哪种最适合您。
import multiprocessing
import time
NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MP_FUNCTION = 'starmap' # 'imap_unordered' or 'starmap' or 'apply_async'
def process_chunk(a_chunk):
print(f"processig mp chunk {a_chunk}")
return a_chunk
map_jobs = [1, 2, 3, 4]
result_sum = 0
s = time.time()
if MP_FUNCTION == 'imap_unordered':
pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
for i in pool.imap_unordered(process_chunk, map_jobs):
result_sum += i
elif MP_FUNCTION == 'starmap':
pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
try:
map_jobs = [(i, ) for i in map_jobs]
result_sum = pool.starmap(process_chunk, map_jobs)
result_sum = sum(result_sum)
finally:
pool.close()
pool.join()
elif MP_FUNCTION == 'apply_async':
with multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) as pool:
result_sum = [pool.apply_async(process_chunk, [i, ]).get() for i in map_jobs]
result_sum = sum(result_sum)
print(f"result_sum is {result_sum}, took {time.time() - s}s")
上面的场景imap_unordered
对我来说实际上表现最差。尝试一下你的案例并在你计划运行它的机器上进行基准测试。另外阅读一下进程池。干杯!
解决方案 16:
我不确定是否采用了这种方法,但我正在使用的解决方法是:
from multiprocessing import Pool
t = None
def run(n):
return t.f(n)
class Test(object):
def __init__(self, number):
self.number = number
def f(self, x):
print x * self.number
def pool(self):
pool = Pool(2)
pool.map(run, range(10))
if __name__ == '__main__':
t = Test(9)
t.pool()
pool = Pool(2)
pool.map(run, range(10))
输出应该是:
0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81
解决方案 17:
class Calculate(object):
# Your instance method to be executed
def f(self, x, y):
return x*y
if __name__ == '__main__':
inp_list = [1,2,3]
y = 2
cal_obj = Calculate()
pool = Pool(2)
results = pool.map(lambda x: cal_obj.f(x, y), inp_list)
您可能希望将此函数应用于类的每个不同实例。那么以下也是解决方案
class Calculate(object):
# Your instance method to be executed
def __init__(self, x):
self.x = x
def f(self, y):
return self.x*y
if __name__ == '__main__':
inp_list = [Calculate(i) for i in range(3)]
y = 2
pool = Pool(2)
results = pool.map(lambda x: x.f(y), inp_list)
解决方案 18:
摘自http://www.rueckstiess.net/research/snippets/show/ca1d7d90和http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html
我们可以创建一个外部函数并使用类自身对象为其植入种子:
from joblib import Parallel, delayed
def unwrap_self(arg, **kwarg):
return square_class.square_int(*arg, **kwarg)
class square_class:
def square_int(self, i):
return i * i
def run(self, num):
results = []
results = Parallel(n_jobs= -1, backend="threading")\n (delayed(unwrap_self)(i) for i in zip([self]*len(num), num))
print(results)
或者不使用 joblib:
from multiprocessing import Pool
import time
def unwrap_self_f(arg, **kwarg):
return C.f(*arg, **kwarg)
class C:
def f(self, name):
print 'hello %s,'%name
time.sleep(5)
print 'nice to meet you.'
def run(self):
pool = Pool(processes=2)
names = ('frank', 'justin', 'osi', 'thomas')
pool.map(unwrap_self_f, zip([self]*len(names), names))
if __name__ == '__main__':
c = C()
c.run()
解决方案 19:
要在 aws lambda 中实现多处理,我们有两种方法。注意:线程池在 aws lambda 中不起作用
使用 aws 团队提供的示例解决方案,请使用此链接https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/
我已经使用这两个解决方案实现了我的 lambda 函数,并且两者都运行良好,无法在这里分享我的代码,但这两个链接肯定会对你有所帮助。
我发现第二种方法更容易实现。
解决方案 20:
还有一些库可以使这更容易,例如autothread
(仅适用于 Python 3.6 及更高版本):
import autothread
class calculate(object):
def run(self):
@autothread.multiprocessed()
def f(x: int):
return x*x
return f([1,2,3])
cl = calculate()
print(cl.run())
您还可以看看lox。
扫码咨询,免费领取项目管理大礼包!