多处理:如何在多个进程之间共享一个字典?

2024-12-26 08:43:00
admin
原创
153
摘要:问题描述:一个程序,它创建多个在可连接队列上工作的进程Q,并最终可能操纵全局字典D来存储结果。(因此每个子进程都可以用来D存储其结果,也可以查看其他子进程正在产生的结果)如果我在子进程中打印字典 D,我会看到对它所做的修改(即对 D 所做的修改)。但是在主进程加入 Q 之后,如果我打印 D,它就是一个空字典!...

问题描述:

一个程序,它创建多个在可连接队列上工作的进程Q,并最终可能操纵全局字典D来存储结果。(因此每个子进程都可以用来D存储其结果,也可以查看其他子进程正在产生的结果)

如果我在子进程中打印字典 D,我会看到对它所做的修改(即对 D 所做的修改)。但是在主进程加入 Q 之后,如果我打印 D,它就是一个空字典!

我理解这是同步/锁定问题。有人能告诉我这里发生了什么吗,以及如何同步对 D 的访问?


解决方案 1:

一般答案涉及使用Manager对象。改编自文档:

from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print d

输出:

$ python mul.py 
{1: '111', '2': 6}

解决方案 2:

除了这里的@senderle 之外,有些人可能还想知道如何使用 的功能multiprocessing.Pool

令人高兴的是,.Pool()实例中有一种方法manager可以模仿顶层的所有熟悉的 API multiprocessing

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = f"Hi, I was written by process {pid:d}"

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict
        pprint.pprint(dict(d))

输出:

$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}

这是一个略有不同的例子,其中每个进程仅将其进程 ID 记录到全局DictProxy对象中d

解决方案 3:

多处理与线程不同。每个子进程都会获得主进程内存的副本。通常,状态通过通信(管道/套接字)、信号或共享内存共享。

多处理为您的用例提供了一些抽象 - 通过使用代理或共享内存被视为本地的共享状态:http ://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

相关章节:

解决方案 4:

我想分享我自己的工作,它比 Manager 的 dict 更快,比 pyshmht 库更简单、更稳定,后者占用大量内存,并且不适用于 Mac OS。尽管我的 dict 仅适用于纯字符串,并且目前是不可变的。我使用线性探测实现,并将键和值对存储在表后的单独内存块中。

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable


class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)
            kvl.append(k)
            kvl.append(v)

        self.m = mmap(-1, kvp)
        for p in ht:
            self.m.write(uint_format.pack(p))
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write_byte(chr(len(x)))
            else:
                self.m.write(uint_format.pack(0x80000000 + len(x)))
            self.m.write(x)

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            self.m.seek(x)
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):
        self.m.close()

uint_format = struct.Struct('>I')


def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))


def uin(a, k):
    return to_str(k) in a


def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s


def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s


def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))


def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))


def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))


if __name__ == '__main__':
    mmap_test()
    manager_test()
    shm_test()

在我的笔记本电脑上,性能结果是:

mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec

简单使用示例:

ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')

解决方案 5:

也许您可以尝试pyshmht,基于共享内存的 Python 哈希表扩展。

注意

  1. 尚未经过全面测试,仅供参考。

  2. 它目前缺少用于多处理的锁/sem 机制。

解决方案 6:

就我而言,我没有获得一致的输出,例如,值__total_count__并不总是 20。

from itertools import repeat
import multiprocessing as mp
import os
import pprint
from functools import partial
import numpy as np
import time

def counter(value, d: dict) -> None:
    if value not in d:
        d["__unique_count__"] += 1
        d[value] = 1
    else:
        d[value] += 1

    d["__total_count__"] += 1


if __name__ == '__main__':
    mp.freeze_support()
    with mp.Manager() as manager:
        d = manager.dict()
        d["__unique_count__"] = 0 
        d["__total_count__"] = 0
        numbers = np.random.randint(0,5,size=100)
        print(len(numbers))
        with manager.Pool() as pool:
            pool.map(partial(counter, d=d), numbers)
        # `d` is a DictProxy object that can be converted to dict
        final_d = dict(d)
        pprint.pprint(final_d)
        print(final_d["__unique_count__"], final_d["__total_count__"])

输出1

100
{0: 26,
 1: 16,
 2: 26,
 3: 14,
 4: 18,
 '__total_count__': 92,
 '__unique_count__': 5}
5 92

输出2

100
{0: 10,
 1: 21,
 2: 28,
 3: 22,
 4: 19,
 '__total_count__': 95,
 '__unique_count__': 5}
5 95
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2500  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1541  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   16  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   23  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   26  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用