等待 Python 中的所有线程完成
- 2025-02-18 09:24:00
- admin 原创
- 113
问题描述:
我想同时运行多个线程,并等到所有线程都完成后再继续。
import subprocess
# I want these to happen simultaneously:
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)
# I want to wait until the above threads are all finished, and then run this:
print("All threads are done.")
我尝试使用类似这里的threading
例子:
from threading import Thread
import subprocess
def call_script(args)
subprocess.call(args)
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()
# TODO: Wait for all threads to finish.
print("All threads are done.")
如何在运行最后一行之前等待线程完成?
解决方案 1:
将线程放入列表中,.start()
每个线程,然后.join()
每个线程:
threads = [
Thread(...),
Thread(...),
Thread(...),
]
# Start all threads.
for t in threads:
t.start()
# Wait for all threads to finish.
for t in threads:
t.join()
解决方案 2:
您需要在脚本末尾使用对象的连接方法。Thread
t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
因此主线程将等待t1
,t2
直到t3
完成执行。
解决方案 3:
在 Python3 中,自 Python 3.2 以来,有一种新方法可以达到相同的结果,我个人更喜欢传统的线程创建/启动/加入,包concurrent.futures
:https://docs.python.org/3/library/concurrent.futures.html
使用ThreadPoolExecutor
代码如下:
from concurrent.futures.thread import ThreadPoolExecutor
import time
def call_script(ordinal, arg):
print('Thread', ordinal, 'argument:', arg)
time.sleep(2)
print('Thread', ordinal, 'Finished')
args = ['argumentsA', 'argumentsB', 'argumentsC']
with ThreadPoolExecutor(max_workers=2) as executor:
ordinal = 1
for arg in args:
executor.submit(call_script, ordinal, arg)
ordinal += 1
print('All tasks has been finished')
上述代码的输出如下:
Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished
其中一个优点是您可以通过设置最大并发工作者来控制吞吐量。
要使用多处理,您可以使用ProcessPoolExecutor。
解决方案 4:
我更喜欢使用基于输入列表的列表理解:
inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]
解决方案 5:
您可以拥有类似下面的类,从中您可以添加想要并行执行的'n'个函数或控制台脚本,然后开始执行并等待所有作业完成。
from multiprocessing import Process
class ProcessParallel(object):
"""
To Process the functions parallely
"""
def __init__(self, *jobs):
"""
"""
self.jobs = jobs
self.processes = []
def fork_processes(self):
"""
Creates the process objects for given function deligates
"""
for job in self.jobs:
proc = Process(target=job)
self.processes.append(proc)
def start_all(self):
"""
Starts the functions process all together.
"""
for proc in self.processes:
proc.start()
def join_all(self):
"""
Waits untill all the functions executed.
"""
for proc in self.processes:
proc.join()
def two_sum(a=2, b=2):
return a + b
def multiply(a=2, b=2):
return a * b
#How to run:
if __name__ == '__main__':
#note: two_sum, multiply can be replace with any python console scripts which
#you wanted to run parallel..
procs = ProcessParallel(two_sum, multiply)
#Add all the process in list
procs.fork_processes()
#starts process execution
procs.start_all()
#wait until all the process got executed
procs.join_all()
解决方案 6:
来自threading
模块文档
有一个“主线程”对象;这对应于 Python 程序中的初始控制线程。它不是守护线程。
有可能创建“虚拟线程对象”。这些是与“外来线程”相对应的线程对象,外来线程是线程模块之外启动的控制线程,例如直接从 C 代码启动。虚拟线程对象功能有限;它们始终被视为活动且处于守护状态,无法被
join()
删除。它们永远不会被删除,因为无法检测到外来线程的终止。
因此,当您不想保留所创建的线程列表时,要捕获这两种情况:
import threading as thrd
def alter_data(data, index):
data[index] *= 2
data = [0, 2, 6, 20]
for i, value in enumerate(data):
thrd.Thread(target=alter_data, args=[data, i]).start()
for thread in thrd.enumerate():
if thread.daemon:
continue
try:
thread.join()
except RuntimeError as err:
if 'cannot join current thread' in err.args[0]:
# catchs main thread
continue
else:
raise
于是:
>>> print(data)
[0, 4, 12, 40]
解决方案 7:
我刚刚遇到了同样的问题,我需要等待使用 for 循环创建的所有线程。我刚刚尝试了以下一段代码。它可能不是完美的解决方案,但我认为这是一个简单的测试解决方案:
for t in threading.enumerate():
try:
t.join()
except RuntimeError as err:
if 'cannot join current thread' in err:
continue
else:
raise
解决方案 8:
也许,类似
for t in threading.enumerate():
if t.daemon:
t.join()
解决方案 9:
创建一个ThreadPoolExecutor
(或ProcessPoolExecutor
)。
然后,调用.map
所需的函数func
和参数列表xs
:
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=len(xs)) as executor:
results = list(executor.map(func, xs))
.map
返回一个包含每个函数的返回值的迭代器,我们将其收集到一个列表中。
就你的情况而言:
from concurrent.futures import ThreadPoolExecutor
argss = [
["python", "scriptA.py", "a"],
["python", "scriptA.py", "b"],
["python", "scriptA.py", "c"],
]
with ThreadPoolExecutor(max_workers=len(argss)) as executor:
results = list(executor.map(call_script, argss))
解决方案 10:
仅使用 join 可能会导致与线程的误报交互。如文档中所述:
当 timeout 参数存在且不为 None 时,它应为一个浮点数,以秒为单位(或其分数)。由于 join() 始终返回 None,因此必须在 join() 之后调用 isAlive() 来决定是否发生超时 - 如果线程仍处于活动状态,则 join() 调用超时。
以及一段示例代码:
threads = []
for name in some_data:
new = threading.Thread(
target=self.some_func,
args=(name,)
)
threads.append(new)
new.start()
over_threads = iter(threads)
curr_th = next(over_threads)
while True:
curr_th.join()
if curr_th.is_alive():
continue
try:
curr_th = next(over_threads)
except StopIteration:
break
扫码咨询,免费领取项目管理大礼包!