Python 多处理:处理父进程中的子进程错误

2025-03-20 08:47:00
admin
原创
33
摘要:问题描述:我目前正在研究多处理和队列。我编写了一段代码来从 mongoDB 导出数据,将其映射到关系(平面)结构中,将所有值转换为字符串并将它们插入到 mysql 中。每个步骤都作为一个过程提交,并给出导入/导出队列,这对于父级处理的 mongoDB 导出来说是安全的。正如您将在下面看到的,我使用队列,并且子...

问题描述:

我目前正在研究多处理和队列。我编写了一段代码来从 mongoDB 导出数据,将其映射到关系(平面)结构中,将所有值转换为字符串并将它们插入到 mysql 中。

每个步骤都作为一个过程提交,并给出导入/导出队列,这对于父级处理的 mongoDB 导出来说是安全的。

正如您将在下面看到的,我使用队列,并且子进程在从队列中读取“无”时会自行终止。我目前遇到的问题是,如果子进程遇到未处理的异常,父进程无法识别,其余进程将继续运行。我希望发生的是整个过程退出,并最好重新引发子错误。

我有两个问题:

  1. 如何检测父级中的子级错误?

  2. 检测到错误后如何终止子进程(最佳实践)?我意识到将“无”放入队列以终止子进程是相当肮脏的。

我正在使用 python 2.7。

以下是我的代码的基本部分:

# Establish communication queues
mongo_input_result_q = multiprocessing.Queue()
mapper_result_q = multiprocessing.Queue()
converter_result_q = multiprocessing.Queue()

[...]

    # create child processes
    # all processes generated here are subclasses of "multiprocessing.Process"

    # create mapper
    mappers = [mongo_relational_mapper.MongoRelationalMapper(mongo_input_result_q, mapper_result_q, columns, 1000)
               for i in range(10)]

    # create datatype converter, converts everything to str
    converters = [datatype_converter.DatatypeConverter(mapper_result_q, converter_result_q, 'str', 1000)
                  for i in range(10)]

    # create mysql writer
    # I create a list of writers. currently only one, 
    # but I have the option to parallellize it further
    writers = [mysql_inserter.MySqlWriter(mysql_host, mysql_user, mysql_passwd, mysql_schema, converter_result_q
               , columns, 'w_'+mysql_table, 1000) for i in range(1)]

    # starting mapper
    for mapper in mappers:
        mapper.start()
    time.sleep(1)

    # starting converter
    for converter in converters:
        converter.start()

    # starting writer
    for writer in writers:
        writer.start()

[...初始化 mongo db 连接...]

    # put each dataset read to queue for the mapper
    for row in mongo_collection.find({inc_column: {"$gte": start}}):
        mongo_input_result_q.put(row)
        count += 1
        if count % log_counter == 0:
            print 'Mongo Reader' + " " + str(count)
    print "MongoReader done"

    # Processes are terminated when they read "None" object from queue
    # now that reading is finished, put None for each mapper in the queue so they terminate themselves
    # the same for all followup processes
    for mapper in mappers:
        mongo_input_result_q.put(None)
    for mapper in mappers:
        mapper.join()
    for converter in converters:
        mapper_result_q.put(None)
    for converter in converters:
        converter.join()
    for writer in writers:
        converter_result_q.put(None)
    for writer in writers:
        writer.join()

解决方案 1:

为什么不让进程处理它自己的异常呢,就像这样:

from __future__ import print_function
import multiprocessing as mp
import traceback

class Process(mp.Process):
    def __init__(self, *args, **kwargs):
        mp.Process.__init__(self, *args, **kwargs)
        self._pconn, self._cconn = mp.Pipe()
        self._exception = None

    def run(self):
        try:
            mp.Process.run(self)
            self._cconn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._cconn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._pconn.poll():
            self._exception = self._pconn.recv()
        return self._exception

现在,您已经掌握了错误和回溯:

def target():
    raise ValueError('Something went wrong...')

p = Process(target = target)
p.start()
p.join()

if p.exception:
    error, traceback = p.exception
    print(traceback)

问候,马雷克

解决方案 2:

我不知道标准做法是什么,但我发现,为了实现可靠的多处理,我会专门设计方法/类/等来处理多处理。否则,你永远不知道另一边发生了什么(除非我错过了一些机制)。

我具体是这样做的:

  • 子类化multiprocessing.Process或创建专门支持多处理的函数(如有必要,包装您无法控制的函数)

  • multiprocessing.Queue始终从主进程向每个工作进程提供共享错误

  • 将整个运行代码放在 中try: ... except Exception as e。然后当发生意外情况时,发送一个错误包:

    • 已死亡的进程 ID

    • 异常及其原始上下文(点击此处查看)。如果您想在主进程中记录有用的信息,原始上下文非常重要。

  • 当然在工人的正常操作范围内正常处理预期问题

  • (类似于你已经说过的)假设一个长时间运行的过程,用循环包装正在运行的代码(在 try/catch-all 内部)

    • 在类中或为函数定义一个停止标记。

    • 当主进程希望工作进程停止时,只需发送停止令牌。要停止所有人,请为所有进程发送足够的令牌。

    • 包装循环检查输入 q 是否为 token 或者其他你想要的输入

最终结果是工作进程可以存活很长时间,当出现问题时,它们可以让您知道发生了什么。它们会悄无声息地死去,因为您可以在捕获所有异常后处理您需要做的任何事情,并且您还会知道何时需要重新启动工作进程。

再次强调,我刚刚通过反复试验才得出这个模式,所以我不知道它有多标准。这对你的要求有帮助吗?

解决方案 3:

@mrkwjc 的解决方案很简单,因此很容易理解和实施,但是该解决方案有一个缺点。当我们只有几个进程,并且我们想在任何一个进程出现错误时停止所有进程时,我们需要等到所有进程都完成后才能检查是否p.exception。以下是修复此问题的代码(即,当一个子进程出现错误时,我们也会终止另一个子进程):

import multiprocessing
import traceback

from time import sleep


class Process(multiprocessing.Process):
    """
    Class which returns child Exceptions to Parent.
    https://stackoverflow.com/a/33599967/4992248
    """

    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        self._parent_conn, self._child_conn = multiprocessing.Pipe()
        self._exception = None

    def run(self):
        try:
            multiprocessing.Process.run(self)
            self._child_conn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._child_conn.send((e, tb))
            # raise e  # You can still rise this exception if you need to

    @property
    def exception(self):
        if self._parent_conn.poll():
            self._exception = self._parent_conn.recv()
        return self._exception


class Task_1:
    def do_something(self, queue):
        queue.put(dict(users=2))


class Task_2:
    def do_something(self, queue):
        queue.put(dict(users=5))


def main():
    try:
        task_1 = Task_1()
        task_2 = Task_2()

        # Example of multiprocessing which is used:
        # https://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/
        task_1_queue = multiprocessing.Queue()
        task_2_queue = multiprocessing.Queue()

        task_1_process = Process(
            target=task_1.do_something,
            kwargs=dict(queue=task_1_queue))

        task_2_process = Process(
            target=task_2.do_something,
            kwargs=dict(queue=task_2_queue))

        task_1_process.start()
        task_2_process.start()

        while task_1_process.is_alive() or task_2_process.is_alive():
            sleep(10)

            if task_1_process.exception:
                error, task_1_traceback = task_1_process.exception

                # Do not wait until task_2 is finished
                task_2_process.terminate()

                raise ChildProcessError(task_1_traceback)

            if task_2_process.exception:
                error, task_2_traceback = task_2_process.exception

                # Do not wait until task_1 is finished
                task_1_process.terminate()

                raise ChildProcessError(task_2_traceback)

        task_1_process.join()
        task_2_process.join()

        task_1_results = task_1_queue.get()
        task_2_results = task_2_queue.get()

        task_1_users = task_1_results['users']
        task_2_users = task_2_results['users']

    except Exception:
        # Here usually I send email notification with error.
        print('traceback:', traceback.format_exc())


if __name__ == "__main__":
    main()

解决方案 4:

感谢 kobejohn,我找到了一个既好又稳定的解决方案。

  1. 我创建了 multiprocessing.Process 的一个子类,它实现了一些函数并重写了run()方法,将新的 saferun 方法包装到 try-catch 块中。此类需要初始化 feedback_queue,用于将信息、调试、错误消息报告回父级。类中的日志方法是包的全局定义的日志函数的包装器:

class EtlStepProcess(multiprocessing.Process):

    def __init__(self, feedback_queue):
        multiprocessing.Process.__init__(self)
        self.feedback_queue = feedback_queue

    def log_info(self, message):
        log_info(self.feedback_queue, message, self.name)

    def log_debug(self, message):
        log_debug(self.feedback_queue, message, self.name)

    def log_error(self, err):
        log_error(self.feedback_queue, err, self.name)

    def saferun(self):
        """Method to be run in sub-process; can be overridden in sub-class"""
        if self._target:
            self._target(*self._args, **self._kwargs)

    def run(self):
        try:
            self.saferun()
        except Exception as e:
            self.log_error(e)
            raise e
        return
  1. 我已经从 EtlStepProcess 中子类化了所有其他流程步骤。要运行的代码是在 saferun() 方法中实现的,而不是在 run 中实现的。这样我就不必在其周围添加 try catch 块,因为这已经由 run() 方法完成了。示例:

class MySqlWriter(EtlStepProcess):

    def __init__(self, mysql_host, mysql_user, mysql_passwd, mysql_schema, mysql_table, columns, commit_count,
                 input_queue, feedback_queue):
        EtlStepProcess.__init__(self, feedback_queue)
        self.mysql_host = mysql_host
        self.mysql_user = mysql_user
        self.mysql_passwd = mysql_passwd
        self.mysql_schema = mysql_schema
        self.mysql_table = mysql_table
        self.columns = columns
        self.commit_count = commit_count
        self.input_queue = input_queue

    def saferun(self):
        self.log_info(self.name + " started")
        #create mysql connection
        engine = sqlalchemy.create_engine('mysql://' + self.mysql_user + ':' + self.mysql_passwd + '@' + self.mysql_host + '/' + self.mysql_schema)
        meta = sqlalchemy.MetaData()
        table = sqlalchemy.Table(self.mysql_table, meta, autoload=True, autoload_with=engine)
        connection = engine.connect()
        try:
            self.log_info("start MySQL insert")
            counter = 0
            row_list = []
            while True:
                next_row = self.input_queue.get()
                if isinstance(next_row, Terminator):
                    if counter % self.commit_count != 0:
                        connection.execute(table.insert(), row_list)
                    # Poison pill means we should exit
                    break
                row_list.append(next_row)
                counter += 1
                if counter % self.commit_count == 0:
                    connection.execute(table.insert(), row_list)
                    del row_list[:]
                    self.log_debug(self.name + ' ' + str(counter))

        finally:
            connection.close()
        return
  1. 在我的主文件中,我提交了一个负责所有工作的进程,并为其提供了一个 feedback_queue。此进程启动所有步骤,然后从 mongoDB 读取并将值放入初始队列。我的主进程监听反馈队列并打印所有日志消息。如果它收到错误日志,它会打印错误并终止其子进程,反过来,这也会终止其所有子进程,然后再终止。

if __name__ == '__main__':
feedback_q = multiprocessing.Queue()
p = multiprocessing.Process(target=mongo_python_export, args=(feedback_q,))
p.start()

while p.is_alive():
    fb = feedback_q.get()
    if fb["type"] == "error":
        p.terminate()
        print "ERROR in " + fb["process"] + "
"
        for child in multiprocessing.active_children():
            child.terminate()
    else:
        print datetime.datetime.fromtimestamp(fb["timestamp"]).strftime('%Y-%m-%d %H:%M:%S') + " " + \n                                              fb["process"] + ": " + fb["message"]

p.join()

我考虑用它制作一个模块并将其放在 github 上,但我必须先做一些清理和评论工作。

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用