多处理:理解“chunksize”背后的逻辑

2025-02-17 09:25:00
admin
原创
80
摘要:问题描述:chunksize哪些因素决定了诸如 之类的方法的最佳参数multiprocessing.Pool.map()?该.map()方法似乎对其默认块大小使用了任意启发式方法(如下所述);是什么促使了这种选择,并且是否存在基于某些特定情况/设置的更周到的方法?例如 - 说我是:传递一个iterable有约...

问题描述:

chunksize哪些因素决定了诸如 之类的方法的最佳参数multiprocessing.Pool.map()?该.map()方法似乎对其默认块大小使用了任意启发式方法(如下所述);是什么促使了这种选择,并且是否存在基于某些特定情况/设置的更周到的方法?

例如 - 说我是:

  • 传递一个iterable有约.map()1500 万个元素的;

  • 在具有 24 个核心的机器上工作并使用默认processes = os.cpu_count()设置multiprocessing.Pool()

我天真地认为,给 24 名工人每人分配一个大小相同的块,即15_000_000 / 24625,000。大块应该可以减少人员流动/管理费用,同时充分利用所有工人。但这似乎忽略了给每个工人分配大批量的一些潜在缺点。这是不完整的图景吗?我遗漏了什么?


我的问题部分源于 if chunksize=None: both .map()and .starmap()call的默认逻辑.map_async(),如下所示:

def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
               error_callback=None):
    # ... (materialize `iterable` to list if it's an iterator)
    if chunksize is None:
        chunksize, extra = divmod(len(iterable), len(self._pool) * 4)  # ????
        if extra:
            chunksize += 1
    if len(iterable) == 0:
        chunksize = 0

背后的逻辑是什么divmod(len(iterable), len(self._pool) * 4)?这意味着块大小将更接近。乘以4 的 15_000_000 / (24 * 4) == 156_250目的是什么?len(self._pool)

这使得最终的块大小比我上面的“简单逻辑”小 4 倍,后者仅包括将可迭代对象的长度除以中的工作者数量pool._pool

最后,Python 文档中的这段代码.imap()也进一步激发了我的好奇心:

chunksize参数与方法使用的参数相同map()
。对于非常长的可迭代对象,使用较大的值chunksize可以使作业完成得比使用默认值 1 快得多。


相关答案很有帮助,但有点太高级了:Python 多处理:为什么大块大小更慢?。


解决方案 1:

简短答案

Pool 的块大小算法是一种启发式算法。它为您试图塞入 Pool 方法的所有可以想象到的问题场景提供了一个简单的解决方案。因此,它无法针对任何特定场景进行优化。

该算法任意将可迭代对象划分为比原始方法多大约四倍的块。更多的块意味着更多的开销,但增加了调度灵活性。这个答案将如何显示,这会导致平均工人利用率更高,但不能保证每种情况下的总计算时间更短。

您可能会想:“知道这点很好,但知道这点对我解决具体的多处理问题有什么帮助呢?”其实没有。更诚实的简短回答是:“没有简短的答案”、“多处理很复杂”和“视情况而定”。即使是在类似的场景中,观察到的症状也可能有不同的根源。

本回答旨在为您提供一些基本概念,帮助您更清楚地了解 Pool 的调度黑匣子。它还试图为您提供一些基本工具,帮助您识别和避免与块大小相关的潜在悬崖。


目录

第一部分

  1. 定义

  2. 并行化目标

  3. 并行化场景

  4. 块大小 > 1 的风险

  5. 池的块大小算法

  6. 量化算法效率

6.1 模型

6.2 并行调度

6.3 效率

6.3.1 绝对分配效率(ADE)

6.3.2 相对分配效率(RDE)

第二部分

  1. 简单算法与 Pool 的块大小算法

  2. 现实检验

  3. 结论

首先有必要澄清一些重要的术语。


1.定义

此处的块是池方法调用中指定的参数的份额iterable。如何计算块大小以及这会产生什么影响是本回答的主题。

任务

下图显示了工作进程中任务的物理数据表示。

图0

该图显示了对 的示例调用pool.map(),显示在取自 函数的一行代码中multiprocessing.pool.worker,其中从 读取的任务inqueue被解包。是池工作进程worker的底层主函数。对于带有 的单调用方法(如 和) ,池方法中指定的参数将仅与函数内的变量匹配。 对于其余带有参数的池方法,处理函数将是一个映射器函数(或)。 此函数将用户指定的参数映射到可迭代传输块的每个元素上(-->“map-tasks”)。 此操作所花费的时间也将任务定义为工作单元MainThread`funcfuncworkerapply_asyncimapchunksize=1chunksizefuncmapstarstarmapstarfunc`

塔斯克尔

虽然“任务”一词用于表示一个块的整个处理与 中的代码相匹配multiprocessing.pool,但没有迹象表明应该如何引用对用户指定的 的单个调用func(以块的一个元素作为参数)。为了避免因命名冲突而产生的混淆(想想maxtasksperchildPool 的 -method 的 -parameter __init__),此答案将任务中的单个工作单元称为taskel

任务单元(由任务 + 元素组成)是任务中最小的工作单元。它是使用方法func的参数指定的函数的单次执行,使用从传输的块的单个元素Pool获取的参数进行调用。任务任务单元组成。chunksize

并行化开销 (PO)

PO包括 Python 内部开销和进程间通信 (IPC) 开销。Python 中每个任务的开销包括打包和解包任务及其结果所需的代码。IPC 开销包括线程的必要同步和不同地址空间之间的数据复制(需要两个复制步骤:父级 -> 队列 -> 子级)。IPC 开销的数量取决于操作系统、硬件和数据大小,因此很难概括其影响。


  1. 并行化目标

使用多处理时,我们的总体目标(显然)是尽量减少所有任务的总处理时间。为了实现这一总体目标,我们的技术目标需要优化硬件资源的利用率

实现技术目标的一些重要子目标是:

  • 最小化并行化开销(最著名的,但不是唯一:IPC)

  • 所有 CPU 核心利用率高

  • 限制内存使用量,以防止操作系统过度分页(垃圾)

首先,任务需要足够繁重(密集),才能收回我们为并行化支付的 PO。PO 的相关性随着每个任务组的绝对计算时间的增加而降低。或者,反过来说,对于您的问题,每个任务组的绝对计算时间越大,减少 PO 的需要就越不相关。如果您的计算每个任务组需要数小时,那么 IPC 开销相比之下可以忽略不计。这里的主要问题是防止在分配所有任务后工作进程闲置。保持所有核心满负荷意味着我们正在尽可能地并行化。


  1. 并行化场景

哪些因素决定了 multiprocessing.Pool.map() 等方法的最佳 chunksize 参数

问题的主要因素是单个任务组之间的计算时间差异。具体来说,最佳块大小的选择取决于每个任务组的计算时间变异系数( CV )。

从这种变化的程度来看,有两种极端情况:

  1. 所有任务组都需要完全相同的计算时间。

  2. 一个任务可能需要几秒钟甚至几天才能完成。

为了更好地记忆,我将这些场景称为:

  1. 密集场景

  2. 广泛场景

密集场景

密集场景中,最好一次性分配所有任务组,以将必要的 IPC 和上下文切换保持在最低限度。这意味着我们只想创建尽可能多的块,尽可能多的工作进程。如上所述,PO 的权重随着每个任务组的计算时间缩短而增加。

为了实现最大吞吐量,我们还希望所有工作进程都忙碌,直到所有任务都处理完毕(没有空闲的工作进程)。为了实现此目标,分布式块的大小应相等或接近。

广泛场景

广义场景的主要示例是优化问题,其中结果要么快速收敛,要么计算可能需要数小时甚至数天。通常,在这种情况下,无法预测任务将包含哪些“轻任务”和“重任务”的混合,因此不建议一次在任务批次中分配太多任务。一次分配比可能更少的任务意味着增加调度灵活性。这是实现我们所有核心高利用率的子目标所必需的。

如果Pool方法默认完全针对密集场景进行优化,那么它们将逐渐为靠近广泛场景的每个问题创建次优时间。


  1. 块大小 > 1 的风险

考虑这个Wide Scenario -iterable的简化伪代码示例,我们希望将其传递到池方法中:

good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]

我们假装看到的是所需的计算时间(以秒为单位),而不是实际值,为了简单起见,我们只显示 1 分钟或 1 天。我们假设池中有四个工作进程(在四个核心上),并且chunksize设置为2。由于顺序将保持不变,因此发送给工作进程的块将是这些:

[(60, 60), (86400, 60), (86400, 60), (60, 84600)]

由于我们有足够的工作进程,并且计算时间足够长,我们可以说,每个工作进程首先都会得到一个块来处理。(对于快速完成的任务,情况不一定如此)。此外,我们可以说,整个处理将花费大约 86400+60 秒,因为这是此人工场景中块的最高总计算时间,并且我们仅分发一次块。

现在考虑这个可迭代对象,与前一个可迭代对象相比,它只有一个元素切换了位置:

bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]

...以及相应的块:

[(60, 60), (86400, 86400), (60, 60), (60, 84600)]

只是我们对可迭代对象进行排序时运气不好,导致总处理时间几乎增加了一倍(86400+86400)!获得恶意(86400, 86400)块的工人阻止了其任务中的第二个重任务组分配给已经完成(60, 60)块的空闲工人之一。如果我们设置,我们显然不会冒这种不愉快的结果的风险chunksize=1

这就是大块大小的风险。使用更大的块大小,我们牺牲了调度灵活性,以换取更少的开销,而在上述情况下,这是一笔糟糕的交易。

我们将在第6 章量化算法效率中看到,更大的块大小也会导致密集场景的次优结果。


  1. 池的块大小算法

下面您将在源代码中找到该算法的一个略微修改的版本。如您所见,我截断了下半部分并将其包装成一个用于chunksize在外部计算参数的函数。我还4factor参数替换了它并外包了len()调用。

# mp_utils.py

def calc_chunksize(n_workers, len_iterable, factor=4):
    """Calculate chunksize argument for Pool-methods.

    Resembles source-code within `multiprocessing.pool.Pool._map_async`.
    """
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    return chunksize

为了确保我们都在同一页面上,这里是divmod

divmod(x, y)是一个内置函数,它返回(x//y, x%y)
x // y是向下取整除法,返回 的向下舍入商x / y,而
x % y是取模运算,返回 的余数x / y。因此例如divmod(10, 3)返回(3, 1)

现在,当您查看时chunksize, extra = divmod(len_iterable, n_workers * 4),您会注意到这里是中的n_workers除数,并且乘以,如果不进行后续进一步的调整,则会导致初始块大小至少比其他情况小四倍(对于)。y`x / y4if extra: chunksize +=1`len_iterable >= n_workers * 4

4为了查看乘法对中间块大小结果的影响,请考虑以下函数:

def compare_chunksizes(len_iterable, n_workers=4):
    """Calculate naive chunksize, Pool's stage-1 chunksize and the chunksize
    for Pool's complete algorithm. Return chunksizes and the real factors by
    which naive chunksizes are bigger.
    """
    cs_naive = len_iterable // n_workers or 1  # naive approach
    cs_pool1 = len_iterable // (n_workers * 4) or 1  # incomplete pool algo.
    cs_pool2 = calc_chunksize(n_workers, len_iterable)

    real_factor_pool1 = cs_naive / cs_pool1
    real_factor_pool2 = cs_naive / cs_pool2

    return cs_naive, cs_pool1, cs_pool2, real_factor_pool1, real_factor_pool2

上述函数计算了朴素的块大小 ( cs_naive) 和 Pool 块大小算法的第一步块大小 ( cs_pool1),以及完整 Pool 算法的块大小 ( cs_pool2)。此外,它还计算了实数因子 rf_pool1 = cs_naive / cs_pool1rf_pool2 = cs_naive / cs_pool2,这告诉我们朴素计算的块大小比 Pool 的内部版本大多少倍。

下面您将看到使用此函数的输出创建的两幅图。左图仅显示了n_workers=4直到可迭代长度为的块大小500。右图显示了的值rf_pool1。对于可迭代长度16,实际因子变为>=4(对于len_iterable >= n_workers * 4),其最大值为7可迭代长度28-31。这与算法收敛到的较长可迭代的原始因子有很大偏差4。这里的“更长”是相对的,取决于指定的工作者数量。

图1

请记住,chunksizecs_pool1仍然缺少完整算法中包含extra的余数的调整。divmod`cs_pool2`

该算法继续:

if extra:
    chunksize += 1

现在,在有余数(来自 divmod 操作)的情况下extra,将块大小增加 1 显然不能适用于所有任务。毕竟,如果可以,那么一开始就不会有余数。

如下图所示,"额外处理" 的效果是,现在的真实因子下方rf_pool2收敛,偏差稍微平滑一些。和 的标准偏差从下降到。4 4`n_workers=4len_iterable=5000.5233rf_pool10.4115`rf_pool2

图2

最终,增加chunksize1 的结果是,传输的最后任务的大小仅为len_iterable % chunksize or chunksize

更有趣的是,我们稍后会看到,额外处理对生成的块数( )的影响更为显著n_chunks。对于足够长的可迭代对象,Pool 的完整块大小算法(n_pool2如下图所示)将使块数稳定在。相比之下,随着可迭代对象的长度增加,朴素算法(在初始打嗝之后)会在和n_chunks == n_workers * 4之间不断交替。n_chunks == n_workers`n_chunks == n_workers + 1`

图3

下面您将看到 Pool 和简单块大小算法的两个增强信息函数。下一章将需要这些函数的输出。

# mp_utils.py

from collections import namedtuple


Chunkinfo = namedtuple(
    'Chunkinfo', ['n_workers', 'len_iterable', 'n_chunks',
                  'chunksize', 'last_chunk']
)

def calc_chunksize_info(n_workers, len_iterable, factor=4):
    """Calculate chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers * factor)
    if extra:
        chunksize += 1
    # `+ (len_iterable % chunksize > 0)` exploits that `True == 1`
    n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
    # exploit `0 == False`
    last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

不要被可能意想不到的外观所迷惑calc_naive_chunksize_info。from不用于计算块大小extradivmod

def calc_naive_chunksize_info(n_workers, len_iterable):
    """Calculate naive chunksize numbers."""
    chunksize, extra = divmod(len_iterable, n_workers)
    if chunksize == 0:
        chunksize = 1
        n_chunks = extra
        last_chunk = chunksize
    else:
        n_chunks = len_iterable // chunksize + (len_iterable % chunksize > 0)
        last_chunk = len_iterable % chunksize or chunksize

    return Chunkinfo(
        n_workers, len_iterable, n_chunks, chunksize, last_chunk
    )

6.量化算法效率

现在,我们已经看到了Poolchunksize-algorithm 的输出与朴素算法的输出有何不同......

  • 如何判断 Pool 的方法是否确实改善了某些事情?

  • 那么这个东西到底是什么呢?

如上一章所示,对于较长的可迭代对象(任务组数量较多),Pool 的块大小算法可迭代对象划分为比简单方法四倍的块。较小的块意味着更多的任务,更多的任务意味着更多的并行化开销 (PO),必须权衡这一成本与增加调度灵活性的好处(回想一下“块大小>1 的风险”)。

由于相当明显的原因,Pool 的基本块大小算法无法为我们权衡调度灵活性和PO。IPC开销取决于操作系统、硬件和数据大小。该算法无法知道我们在什么硬件上运行代码,也不知道一个任务组需要多长时间才能完成。它是一种启发式算法,为所有可能的场景提供基本功能。这意味着它无法针对任何特定场景进行优化。如前所述,随着每个任务组的计算时间增加(负相关), PO也变得越来越不重要。

当你回忆起第 2 章中的并行化目标时,其中一个要点是:

  • 所有 CPU 核心利用率高

前面提到的 Pool 的 chunksize 算法可以尝试改进的是最小化空闲工作进程数,也就是最小化CPU 核心的利用率

multiprocessing.Pool有人在 SO 上反复询问有关未使用的核心/空闲工作进程的问题,因为您认为所有工作进程都很忙。虽然这可能有很多原因,但在计算结束时空闲工作进程是我们经常观察到的现象,即使在密集场景(每个任务组的计算时间相等)中,在工作进程数量不是块数量的除数n_chunks % n_workers > 0的情况下也是如此( )。

现在的问题是:

我们如何才能将我们对块大小的理解实际转化为某种东西,以便我们能够解释观察到的工人利用率,甚至比较不同算法在这方面的效率?


6.1 模型

为了在此获得更深入的见解,我们需要一种并行计算的抽象形式,将过于复杂的现实简化到可管理的复杂程度,同时在定义的范围内保留重要性。这种抽象称为模型。如果要收集数据,这种“并行化模型”(PM)的实现会生成工作者映射的元数据(时间戳),就像实际计算一样。模型生成的元数据允许在某些约束条件下预测并行计算的指标。

图4

此处定义的PM中的两个子模型之一是分布模型 (DM)。DM解释了原子工作单元 (taskel) 如何在并行工作者和时间上分布,此时除了相应的块大小算法、工作者数量、输入迭代 (taskel 数量) 及其计算持续时间之外不考虑任何其他因素。这意味着不包括任何形式的开销。

为了获得完整的PMDM扩展了一个开销模型 (OM),表示各种形式的并行化开销 (PO)。这种模型需要针对每个节点单独校准(硬件、操作系统依赖性)。OM 中表示多少种形式的开销尚未确定,因此可以存在具有不同复杂程度的多个OM 。实施的OM需要哪种级别的精度取决于特定计算的PO的总体权重。任务组越短, PO的权重就越高,如果我们试图预测并行化效率 (PE) ,这反过来又需要更精确的OM


6.2 并行计划(PS)

并行调度是并行计算的二维表示,其中 x 轴表示时间,y 轴表示并行工作者池。工作者数量和总计算时间标记矩形的延伸,其中绘制了较小的矩形。这些较小的矩形代表原子工作单元(任务单元)。

下面是使用来自Pool 的 chunksize 算法的DM的数据针对密集场景绘制的PS的可视化效果。

图5

  • x 轴被分成相等的时间单位,每个单位代表任务所需的计算时间。

  • y 轴分为池使用的工作进程数。

  • 这里的任务组显示为最小的青色矩形,放入匿名工作进程的时间线(时间表)中。

  • 任务是工作时间线中的一个或多个任务组,以相同的色调连续突出显示。

  • 空闲时间单位用红色的瓷砖表示。

  • 并行计划分为几个部分。最后一个部分是尾部部分。

组成部件的名称如下图所示。

图6

在包含OM的完整PM中,空闲份额不仅限于尾部,还包括任务之间甚至任务组之间的空间。


6.3 效率

上面介绍的模型可以量化工人利用率。我们可以区分:

  • 分布效率 (DE) – 借助DM (或密集场景的简化方法)计算。

  • 并行化效率 (PE) - 可以通过校准的PM(预测)计算,也可以通过实际计算的元数据计算。

需要注意的是,对于给定的并行化问题,计算出的效率并不会自动与更快的整体计算相关联。在这种情况下,工人利用率仅区分具有已启动但未完成的任务组的工人和没有这种“开放”任务组的工人。这意味着,不会记录任务组时间跨度可能的空闲情况。

所有上述效率基本上都是通过计算Busy Share / Parallel Schedule的商来获得的。DE和PE之间的差异在于,对于开销扩展的PM来说,Busy Share 在整个 Parallel Schedule 中所占的比例较小。

本答案将进一步讨论一种计算密集场景的DE的简单方法。这足以比较不同的块大小算法,因为...

  1. ... DM是PM的一部分,它随着采用的不同块大小算法而变化。

  2. ...每个任务组的计算持续时间相等的密集场景描绘了一种“稳定状态”,对于这种状态,这些时间跨度不再起作用。任何其他场景都只会导致随机结果,因为任务组的顺序很重要。


6.3.1 绝对分配效率(ADE)

通常可以通过将繁忙份额除以并行计划的总潜力来计算基本效率:

绝对分配效率(ADE) =繁忙共享/并行调度

对于密集场景,简化的计算代码如下所示:

# mp_utils.py

def calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Absolute Distribution Efficiency (ADE).

    `len_iterable` is not used, but contained to keep a consistent signature
    with `calc_rde`.
    """
    if n_workers == 1:
        return 1

    potential = (
        ((n_chunks // n_workers + (n_chunks % n_workers > 1)) * chunksize)
        + (n_chunks % n_workers == 1) * last_chunk
    ) * n_workers

    n_full_chunks = n_chunks - (chunksize > last_chunk)
    taskels_in_regular_chunks = n_full_chunks * chunksize
    real = taskels_in_regular_chunks + (chunksize > last_chunk) * last_chunk
    ade = real / potential

    return ade

如果没有空闲份额繁忙份额等于并行计划,因此我们得到的ADE为 100%。在我们的简化模型中,这种情况是所有可用进程在处理所有任务所需的整个时间内都处于繁忙状态。换句话说,整个作业实际上并行化到 100%。

但为什么我在这里一直将PE称为绝对 PE呢?

为了理解这一点,我们必须考虑一个可能的情况,即块大小(cs)可以确保最大的调度灵活性(当然,也可以确保高地人的数量。巧合吗?):

__ ~ 一 ~ __

例如,如果我们有 4 个工作进程和 37 个任务组,那么即使有,也会有闲置的工作进程chunksize=1,只是因为n_workers=4它不是 37 的除数。37/4 的余数是 1。这个剩余的任务组必须由一个工人来处理,而剩下的三个处于闲置状态。

同样,仍然会有一个拥有 39 个任务组的闲置工人,如下图所示。

图7

当您将 的上部并行计划chunksize=1的下部版本进行比较时chunksize=3,您会注意到 的上部并行计划较小,x 轴上的时间线较短。现在应该很明显了,更大的块大小意外地也会导致总体计算时间增加,即使对于密集场景也是如此。

但是为什么不直接使用 x 轴的长度来计算效率呢?

因为该模型不包含开销。对于两种块大小,开销会有所不同,因此 x 轴实际上无法直接比较。开销仍然会导致更长的总计算时间,如下图案例2所示。

图8


6.3.2 相对分配效率(RDE)

如果将 chunksize 设置为 1,则可以更好地分配任务组,那么ADE值不包含该信息。这里的更好仍然意味着更小的空闲份额

为了获得根据最大可能DE调整的DE值,我们必须将考虑的ADE除以我们获得的ADEchunksize=1

相对分配效率 (RDE) = ADE_cs_x / ADE_cs_1

代码如下:

# mp_utils.py

def calc_rde(n_workers, len_iterable, n_chunks, chunksize, last_chunk):
    """Calculate Relative Distribution Efficiency (RDE)."""
    ade_cs1 = calc_ade(
        n_workers, len_iterable, n_chunks=len_iterable,
        chunksize=1, last_chunk=1
    )
    ade = calc_ade(n_workers, len_iterable, n_chunks, chunksize, last_chunk)
    rde = ade / ade_cs1

    return rde

RDE ,在这里如何定义,本质上是关于并行计划尾部的故事。RDE受尾部所含最大有效块大小的影响。(此尾部可以是 x 轴长度chunksizelast_chunk。)结果是,对于各种“尾部外观”, RDE自然会收敛到 100%(甚至),如下图所示。

图9

RDE ...

  • 强烈暗示了优化的潜力。

  • 自然,较长的迭代次数就会减少,因为整个并行计划的相对尾部会缩小。


此答案的第二部分请参见此处。

解决方案 2:

关于这个答案

该答案是上述已接受答案的第二部分。


  1. 简单算法与 Pool 的 Chunksize 算法

在详细介绍之前,请先考虑下面的两个 gif。对于不同iterable长度的范围,它们显示了两种比较的算法如何对传递的数据进行分块iterable(到那时它将是一个序列)以及生成的任务如何分配。工人的顺序是随机的,对于轻型任务组和/或宽场景中的任务组,每个工人实际分配的任务数量可能与此图像不同。如前所述,这里也不包括开销。然而,对于密集场景中传输数据大小可忽略的足够重的任务组,实际计算会绘制出非常相似的图景。

cs_4_50

cs_200_250

如“ 5. 池的块大小算法”一章所示,使用池的块大小算法,n_chunks == n_workers * 4对于足够大的可迭代对象,块数将稳定在,而使用简单方法时,它会在n_chunks == n_workers和之间不断切换。简单算法适用于:因为对于,将创建一个新的部分,其中只雇用一名工人。n_chunks == n_workers + 1`n_chunks % n_workers == 1Truen_chunks == n_workers + 1`

简单的块大小算法:

您可能认为您在相同数量的工人中创建了任务,但这仅适用于没有余数的情况len_iterable / n_workers。 如果有余数,则会有一个新部分,其中只有一个工人的任务。 此时您的计算将不再是并行的。

下面您会看到一个类似于第 5 章中所示的图,但显示的是部分数而不是块数。对于 Pool 的完整块大小算法 ( n_pool2),n_sections将稳定在臭名昭著的硬编码因子4。对于朴素算法,n_sections将在 1 和 2 之间交替。

图10

n_chunks = n_workers * 4对于 Pool 的块大小算法,通过前面提到的额外处理实现稳定,可防止在此处创建新部分,并将空闲份额限制为一个工作器足够长的迭代次数。不仅如此,该算法还会不断缩小空闲份额的相对大小,从而使 RDE 值收敛到 100%。

n_workers=4例如len_iterable=210,对于“足够长”的迭代器,空闲共享将仅限于一个工作者,这一特性最初由于4块大小算法中的乘法而丢失。

图11

简单的块大小算法也收敛到 100%,但速度较慢。收敛效果完全取决于这样一个事实:在有两个部分的情况下,尾部的相对部分会缩小。只有一名受雇工人的尾部长度限制为 x 轴长度n_workers - 1,即 的可能最大余数len_iterable / n_workers

对于简单算法和 Pool 的块大小算法,实际的 RDE 值有何不同?

下面有两个热图,显示了所有可迭代长度(最高 5000)的RDE值,所有工作线程数(从 2 到 100)的 RDE 值。颜色范围从 0.5 到 1(50%-100%)。您会注意到左侧热图中的朴素算法有更多暗区(较低的 RDE 值)。相比之下,右侧 Pool 的块大小算法绘制的画面更加明亮。

图12

左下角暗角与右上角亮角的对角线梯度再次显示了所谓“长可迭代”对工人数量的依赖。

每种算法的糟糕程度有多大?

使用 Pool 的 chunksize 算法, 81.25% 的RDE值是上面指定的工作器范围和可迭代长度的最低值:

图13

使用简单的块大小算法,情况可能会变得更糟。这里计算出的最低RDE为 50.72%。在这种情况下,几乎一半的计算时间只有一个工作程序在运行!所以, Knights Landing的骄傲拥有者们要小心了。;)

图14


  1. 现实检验

在前几章中,我们考虑了一个纯数学分布问题的简化模型,该模型剥离了使多处理成为棘手话题的琐碎细节。为了更好地理解仅靠分布模型 (DM)能够在多大程度上解释实际观察到的工人利用率,我们现在将研究一下由实际计算得出的并行调度。

设置

以下图表均涉及一个简单的、受 CPU 限制的虚拟函数的并行执行,该函数使用各种参数进行调用,因此我们可以观察绘制的并行计划如何根据输入值而变化。此函数中的“工作”仅包括对范围对象的迭代。由于我们传入了大量数字,这已经足以让核心保持忙碌。该函数可选地接受一些 taskel 特有的额外值data,这些值将保持不变。由于每个 taskel 都包含完全相同的工作量,因此我们仍在处理密集场景。

该函数由一个包装器修饰,该包装器采用 ns 分辨率的时间戳(Python 3.7+)。时间戳用于计算任务组的时间跨度,从而可以绘制经验并行计划。

@stamp_taskel
def busy_foo(i, it, data=None):
    """Dummy function for CPU-bound work."""
    for _ in range(int(it)):
        pass
    return i, data


def stamp_taskel(func):
    """Decorator for taking timestamps on start and end of decorated
    function execution.
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time_ns()
        result = func(*args, **kwargs)
        end_time = time_ns()
        return (current_process().name, (start_time, end_time)), result
    return wrapper

Pool 的 starmap 方法也经过了修饰,只对 starmap 调用本身进行计时。此调用的“开始”和“结束”决定了生成的并行计划 x 轴上的最小值和最大值。

我们将观察一台机器上四个工作进程中 40 个任务组的计算情况,该机器的配置如下:Python 3.7.1、Ubuntu 18.04.2、Intel® Core™ i7-2600K CPU @ 3.40GHz × 8

将会变化的输入值是 for 循环中的迭代次数(30k、30M、600M)和额外发送的数据大小(每个任务组、numpy-ndarray:0 MiB、50 MiB)。

...
N_WORKERS = 4
LEN_ITERABLE = 40
ITERATIONS = 30e3  # 30e6, 600e6
DATA_MiB = 0  # 50

iterable = [
    # extra created data per taskel
    (i, ITERATIONS, np.arange(int(DATA_MiB * 2**20 / 8)))  # taskel args
    for i in range(LEN_ITERABLE)
]


with Pool(N_WORKERS) as pool:
    results = pool.starmap(busy_foo, iterable)

下面显示的运行是经过精心挑选的,具有相同的块顺序,因此您可以更好地发现与分布模型中的并行计划相比的差异,但不要忘记工作人员获得任务的顺序是不确定的。

DM 预测

重申一下,分布模型“预测”并行计划,就像我们在第 6.2 章中已经看到的那样:

图15

第一次运行:每个任务 30k 次迭代和 0 MiB 数据

图16

我们在这里第一次运行的时间很短,任务组非常“轻”。整个pool.starmap()调用总共只用了 14.5 毫秒。您会注意到,与DM相反,空闲不仅限于尾部,还发生在任务之间,甚至任务组之间。这是因为我们在这里的实际计划自然包括各种开销。这里的空闲意味着任务组之外的一切。任务组期间可能的实际空闲并没有像之前提到的那样被捕获。

此外,你可以看到,并非所有的工人都会同时获得他们的任务。这是因为所有工人都通过共享文件接收数据inqueue,并且一次只有一个工人可以从中读取数据。这同样适用于outqueue。当你传输非边际大小的数据时,这可能会导致更大的混乱(我们稍后会看到)。

此外,您可以看到,尽管每个任务组的工作量相同,但实际测量到的任务组时间跨度差异很大。分配给 worker-3 和 worker-4 的任务组需要的时间比前两个 worker 处理的任务组要多。对于这次运行,我怀疑这是因为worker-3/4 的核心上此时不再有turbo boost可用,因此他们以较低的时钟速率处理任务。

整个计算过程非常轻松,以至于硬件或操作系统引入的混乱因素可能会严重扭曲PS。计算过程就像“风中飘来的树叶”,即使对于理论上合适的场景,DM预测也没有什么意义。

第二次运行:每个任务 3000 万次迭代和 0 MiB 数据

图17

将 for 循环中的迭代次数从 30,000 次增加到 3000 万次,可实现真正的并行调度,它与DM提供的数据所预测的调度几乎完美匹配,太棒了!现在,每个任务组的计算量足够大,可以将开始时和中间的空闲部分边缘化,只显示DM预测的较大空闲份额。

第三次运行:每个任务 3000 万次迭代和 50 MiB 数据

图18

保持 30M 次迭代,但另外来回发送每个任务 50 MiB 又会扭曲图像。在这里,排队效应非常明显。工人 4 需要比工人 1 等待更长的时间才能完成第二项任务。现在想象一下有 70 名工人的时间表!

如果任务组计算量很小,但可以提供大量数据作为有效负载,那么单个共享队列的瓶颈可能会阻碍向池中添加更多工作器的任何额外好处,即使它们由物理核心支持。在这种情况下,甚至在 Worker-40 获得其第一个任务之前,Worker-1 就可以完成其第一个任务并等待新任务。

现在应该很明显了,为什么计算时间Pool并不总是随着工作器的数量线性减少。发送相对较大的数据量可能会导致大部分时间都花在等待数据复制到工作器的地址空间上,并且一次只能为一个工作器提供数据。

第四次运行:每个任务 6 亿次迭代和 50 MiB 数据

图19

这里我们再次发送 50 MiB,但将迭代次数从 30M 提高到 600M,这使总计算时间从 10 秒增加到 152 秒。再次绘制的并行计划与预测的计划几乎完美匹配,通过数据复制产生的开销被最小化。


  1. 结论

讨论的乘法4增加了调度灵活性,但也利用了任务组分布的不均匀性。如果没有这个乘法,空闲份额将仅限于单个工作者,即使对于短迭代器也是如此(对于具有密集场景的DM)。池的块大小算法需要输入迭代器达到一定大小才能恢复该特性。

正如这个答案所希望显示的,与简单方法相比,Pool 的块大小算法平均可实现更好的核心利用率,至少在平均情况下是如此,并且只要不考虑开销即可。此处的简单算法的分配效率 (DE) 可低至约 51%,而 Pool 的块大小算法的分配效率 (DE) 低至约 81%。然而, DE并不像 IPC 那样包含并行化开销 (PO)。第 8 章表明,DE仍然可以对密集场景具有强大的预测能力,并且开销很小。

尽管 Pool 的块大小算法与简单方法相比实现了更高的DE ,但它并不能为每个输入组合提供最佳的任务分配。虽然简单的静态分块算法无法优化(包括开销)并行化效率 (PE),但没有内在原因导致它无法始终提供 100% 的相对分配效率 (RDE),也就是说,与 相同的DEchunksize=1。简单的块大小算法仅由基本数学组成,可以自由地以任何方式“切分蛋糕”。

与 Pool 的“等大小分块”算法实现不同,“等大小分块”算法将为每个/组合提供 100% 的RDE。在 Pool 的源代码中,等大小分块算法的实现会稍微复杂一些,但只需在外部打包任务即可在现有算法的基础上进行调整(如果我发布关于如何做到这一点的问答,我将从这里提供链接)。len_iterable`n_workers`

解决方案 3:

我认为您忽略的部分是,您的天真估计假设每个工作单元花费的时间相同,在这种情况下您的策略将是最好的。但如果某些作业比其他作业完成得早,那么某些核心可能会闲置,等待缓慢的作业完成。

因此,通过将块分成 4 倍以上的部分,如果一个块提前完成,则该核心可以启动下一个块(而其他核心继续处理其较慢的块)。

我不知道他们为什么精确地选择了因子 4,但这需要在最小化地图代码的开销(需要尽可能大的块)和平衡花费不同时间的块(需要尽可能最小的块)之间进行权衡。

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2560  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1552  
  IPD(Integrated Product Development)流程作为一种先进的产品开发管理模式,在众多企业中得到了广泛应用。其中,技术评审与决策评审是IPD流程中至关重要的环节,它们既有明显的区别,又存在紧密的协同关系。深入理解这两者的区别与协同,对于企业有效实施IPD流程,提升产品开发效率与质量具有重要意义...
IPD管理流程   1  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、ClickUp、Freshdesk、GanttPRO、Planview、Smartsheet、Asana、Nifty、HubPlanner、Teamwork。在当今快速变化的商业环境中,项目管理软件已成为企业提升效率、优化资源分配和确保项目按时交付的关键工具。然而...
项目管理系统   2  
  建设工程项目质量关乎社会公众的生命财产安全,也影响着企业的声誉和可持续发展。高质量的建设工程不仅能为使用者提供舒适、安全的环境,还能提升城市形象,推动经济的健康发展。在实际的项目操作中,诸多因素会对工程质量产生影响,从规划设计到施工建设,再到后期的验收维护,每一个环节都至关重要。因此,探寻并运用有效的方法来提升建设工程...
工程项目管理制度   3  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用