等待 Python 中的所有线程完成

2025-02-18 09:24:00
admin
原创
113
摘要:问题描述:我想同时运行多个线程,并等到所有线程都完成后再继续。import subprocess # I want these to happen simultaneously: subprocess.call(scriptA + argumentsA) subprocess.call(scriptA + ...

问题描述:

我想同时运行多个线程,并等到所有线程都完成后再继续。

import subprocess

# I want these to happen simultaneously:
subprocess.call(scriptA + argumentsA)
subprocess.call(scriptA + argumentsB)
subprocess.call(scriptA + argumentsC)

# I want to wait until the above threads are all finished, and then run this:
print("All threads are done.")

我尝试使用类似这里的threading例子:

from threading import Thread
import subprocess

def call_script(args)
    subprocess.call(args)

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

# TODO: Wait for all threads to finish.

print("All threads are done.")

如何在运行最后一行之前等待线程完成?


解决方案 1:

将线程放入列表中,.start()每个线程,然后.join()每个线程:

threads = [
    Thread(...),
    Thread(...),
    Thread(...),
]

# Start all threads.
for t in threads:
    t.start()

# Wait for all threads to finish.
for t in threads:
    t.join()

解决方案 2:

您需要在脚本末尾使用对象的连接方法。Thread

t1 = Thread(target=call_script, args=(scriptA + argumentsA))
t2 = Thread(target=call_script, args=(scriptA + argumentsB))
t3 = Thread(target=call_script, args=(scriptA + argumentsC))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

因此主线程将等待t1t2直到t3完成执行。

解决方案 3:

在 Python3 中,自 Python 3.2 以来,有一种新方法可以达到相同的结果,我个人更喜欢传统的线程创建/启动/加入,包concurrent.futureshttps://docs.python.org/3/library/concurrent.futures.html

使用ThreadPoolExecutor代码如下:

from concurrent.futures.thread import ThreadPoolExecutor
import time
    
def call_script(ordinal, arg):
    print('Thread', ordinal, 'argument:', arg)
    time.sleep(2)
    print('Thread', ordinal, 'Finished')
    
args = ['argumentsA', 'argumentsB', 'argumentsC']
    
with ThreadPoolExecutor(max_workers=2) as executor:
    ordinal = 1
    for arg in args:
        executor.submit(call_script, ordinal, arg)
        ordinal += 1
print('All tasks has been finished')

上述代码的输出如下:

Thread 1 argument: argumentsA
Thread 2 argument: argumentsB
Thread 1 Finished
Thread 2 Finished
Thread 3 argument: argumentsC
Thread 3 Finished
All tasks has been finished

其中一个优点是您可以通过设置最大并发工作者来控制吞吐量。

要使用多处理,您可以使用ProcessPoolExecutor。

解决方案 4:

我更喜欢使用基于输入列表的列表理解:

inputs = [scriptA + argumentsA, scriptA + argumentsB, ...]
threads = [Thread(target=call_script, args=(i)) for i in inputs]
[t.start() for t in threads]
[t.join() for t in threads]

解决方案 5:

您可以拥有类似下面的类,从中您可以添加想要并行执行的'n'个函数或控制台脚本,然后开始执行并等待所有作业完成。

from multiprocessing import Process

class ProcessParallel(object):
    """
    To Process the  functions parallely

    """    
    def __init__(self, *jobs):
        """
        """
        self.jobs = jobs
        self.processes = []

    def fork_processes(self):
        """
        Creates the process objects for given function deligates
        """
        for job in self.jobs:
            proc  = Process(target=job)
            self.processes.append(proc)

    def start_all(self):
        """
        Starts the functions process all together.
        """
        for proc in self.processes:
            proc.start()

    def join_all(self):
        """
        Waits untill all the functions executed.
        """
        for proc in self.processes:
            proc.join()


def two_sum(a=2, b=2):
    return a + b

def multiply(a=2, b=2):
    return a * b


#How to run:
if __name__ == '__main__':
    #note: two_sum, multiply can be replace with any python console scripts which
    #you wanted to run parallel..
    procs =  ProcessParallel(two_sum, multiply)
    #Add all the process in list
    procs.fork_processes()
    #starts  process execution 
    procs.start_all()
    #wait until all the process got executed
    procs.join_all()

解决方案 6:

来自threading 模块文档

有一个“主线程”对象;这对应于 Python 程序中的初始控制线程。它不是守护线程。

有可能创建“虚拟线程对象”。这些是与“外来线程”相对应的线程对象,外来线程是线程模块之外启动的控制线程,例如直接从 C 代码启动。虚拟线程对象功能有限;它们始终被视为活动且处于守护状态,无法被join()删除。它们永远不会被删除,因为无法检测到外来线程的终止。

因此,当您不想保留所创建的线程列表时,要捕获这两种情况:

import threading as thrd


def alter_data(data, index):
    data[index] *= 2


data = [0, 2, 6, 20]

for i, value in enumerate(data):
    thrd.Thread(target=alter_data, args=[data, i]).start()

for thread in thrd.enumerate():
    if thread.daemon:
        continue
    try:
        thread.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err.args[0]:
            # catchs main thread
            continue
        else:
            raise

于是:

>>> print(data)
[0, 4, 12, 40]

解决方案 7:

我刚刚遇到了同样的问题,我需要等待使用 for 循环创建的所有线程。我刚刚尝试了以下一段代码。它可能不是完美的解决方案,但我认为这是一个简单的测试解决方案:

for t in threading.enumerate():
    try:
        t.join()
    except RuntimeError as err:
        if 'cannot join current thread' in err:
            continue
        else:
            raise

解决方案 8:

也许,类似

for t in threading.enumerate():
    if t.daemon:
        t.join()

解决方案 9:

创建一个ThreadPoolExecutor(或ProcessPoolExecutor)。

然后,调用.map所需的函数func和参数列表xs

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=len(xs)) as executor:
    results = list(executor.map(func, xs))

.map返回一个包含每个函数的返回值的迭代器,我们将其收集到一个列表中。


就你的情况而言:

from concurrent.futures import ThreadPoolExecutor

argss = [
    ["python", "scriptA.py", "a"],
    ["python", "scriptA.py", "b"],
    ["python", "scriptA.py", "c"],
]

with ThreadPoolExecutor(max_workers=len(argss)) as executor:
    results = list(executor.map(call_script, argss))

解决方案 10:

仅使用 join 可能会导致与线程的误报交互。如文档中所述:

当 timeout 参数存在且不为 None 时,它​​应为一个浮点数,以秒为单位(或其分数)。由于 join() 始终返回 None,因此必须在 join() 之后调用 isAlive() 来决定是否发生超时 - 如果线程仍处于活动状态,则 join() 调用超时。

以及一段示例代码:

threads = []
for name in some_data:
    new = threading.Thread(
        target=self.some_func,
        args=(name,)
    )
    threads.append(new)
    new.start()
    
over_threads = iter(threads)
curr_th = next(over_threads)
while True:
    curr_th.join()
    if curr_th.is_alive():
        continue
    try:
        curr_th = next(over_threads)
    except StopIteration:
        break
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   3970  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   2740  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Freshdesk、ClickUp、nTask、Hubstaff、Plutio、Productive、Targa、Bonsai、Wrike。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在项目管理过程中面临着诸多痛点,如任务分配不...
项目管理系统   79  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Monday、TeamGantt、Filestage、Chanty、Visor、Smartsheet、Productive、Quire、Planview。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多项目经理和团队在管理复杂项目时,常...
开源项目管理工具   87  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Smartsheet、GanttPRO、Backlog、Visor、ResourceGuru、Productive、Xebrio、Hive、Quire。在当今快节奏的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在选择项目管理工具时常常面临困惑:...
项目管理系统   74  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用