如何将 python asyncio 与线程结合起来?

2025-03-20 08:48:00
admin
原创
44
摘要:问题描述:我已经成功使用 Python asyncio 和 aiohttp 构建了一个RESTful 微服务,它可以监听 POST 事件以从各种馈送器收集实时事件。然后,它构建一个内存结构,将过去 24 小时的事件缓存在嵌套的 defaultdict/deque 结构中。现在我想定期将该结构检查点到磁盘,最好...

问题描述:

我已经成功使用 Python asyncio 和 aiohttp 构建了一个RESTful 微服务,它可以监听 POST 事件以从各种馈送器收集实时事件。

然后,它构建一个内存结构,将过去 24 小时的事件缓存在嵌套的 defaultdict/deque 结构中。

现在我想定期将该结构检查点到磁盘,最好使用 pickle。

由于内存结构可能>100MB,因此我希望避免在检查结构点所需的时间内延迟传入事件的处理。

我宁愿创建结构的快照副本(例如深度复制),然后花时间将其写入磁盘并在预设的时间间隔内重复。

我一直在寻找如何组合线程的示例(线程甚至是最佳解决方案吗?)以及用于此目的的 asyncio,但找不到可以对我有帮助的东西。

非常感谢任何可以开始的指点!


解决方案 1:

使用以下方法将方法委托给线程或子进程非常简单BaseEventLoop.run_in_executor

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_operation(x):
    time.sleep(x) # This is some operation that is CPU-bound

@asyncio.coroutine
def main():
    # Run cpu_bound_operation in the ProcessPoolExecutor
    # This will make your coroutine block, but won't block
    # the event loop; other coroutines can run in meantime.
    yield from loop.run_in_executor(p, cpu_bound_operation, 5)


loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())

至于是否使用 aProcessPoolExecutorThreadPoolExecutor,这有点难说;腌制大型对象肯定会消耗一些 CPU 周期,这最初会让你认为ProcessPoolExecutor这是可行的方法。但是,将 100MB 对象传递给Process池中的 a 需要在您的主进程中腌制实例,通过 IPC 将字节发送到子进程,在子进程中对其进行反腌制,然后再次腌制它以便将其写入磁盘。鉴于此,我猜腌制/反腌制开销会很大,因此您最好使用ThreadPoolExecutor,即使您会因为 GIL 而受到性能影响。

话虽如此,测试这两种方式并确定答案都非常简单,所以你不妨这样做。

解决方案 2:

我也使用过run_in_executor,但我发现这个函数在大多数情况下有点恶心,因为它需要partial()关键字参数,而且我从来没有用除单个执行器和默认事件循环之外的任何东西来调用它。所以我为它制作了一个方便的包装器,具有合理的默认值和自动关键字参数处理。

from time import sleep
import asyncio as aio
loop = aio.get_event_loop()

class Executor:
    """In most cases, you can just use the 'execute' instance as a
    function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in
    the executor, assign result to y. The defaults can be changed, though,
    with your own instantiation of Executor, i.e. execute =
    Executor(nthreads=4)"""
    def __init__(self, loop=loop, nthreads=1):
        from concurrent.futures import ThreadPoolExecutor
        self._ex = ThreadPoolExecutor(nthreads)
        self._loop = loop
    def __call__(self, f, *args, **kw):
        from functools import partial
        return self._loop.run_in_executor(self._ex, partial(f, *args, **kw))
execute = Executor()

...

def cpu_bound_operation(t, alpha=30):
    sleep(t)
    return 20*alpha

async def main():
    y = await execute(cpu_bound_operation, 5, alpha=-2)

loop.run_until_complete(main())

解决方案 3:

另一种选择是使用loop.call_soon_threadsafe作为asyncio.Queue沟通的中间渠道。

Python 3 的当前文档中还有一节关于使用 asyncio 进行开发 - 并发和多线程:

import asyncio

# This method represents your blocking code
def blocking(loop, queue):
    import time
    while True:
        loop.call_soon_threadsafe(queue.put_nowait, 'Blocking A')
        time.sleep(2)
        loop.call_soon_threadsafe(queue.put_nowait, 'Blocking B')
        time.sleep(2)

# This method represents your async code
async def nonblocking(queue):
    await asyncio.sleep(1)
    while True:
        queue.put_nowait('Non-blocking A')
        await asyncio.sleep(2)
        queue.put_nowait('Non-blocking B')
        await asyncio.sleep(2)

# The main sets up the queue as the communication channel and synchronizes them
async def main():
    queue = asyncio.Queue()
    loop = asyncio.get_running_loop()

    blocking_fut = loop.run_in_executor(None, blocking, loop, queue)
    nonblocking_task = loop.create_task(nonblocking(queue))

    running = True  # use whatever exit condition
    while running:
        # Get messages from both blocking and non-blocking in parallel
        message = await queue.get()
        # You could send any messages, and do anything you want with them
        print(message)

asyncio.run(main())

如何将 asyncio 任务发送到其他线程中循环运行也可能对您有所帮助。

如果你需要更“强大”的示例,请查看我的Wrapper,用于从线程代码启动异步任务。它将为你处理线程安全部分(大部分),并让你执行以下操作:

# See https://gist.github.com/Lonami/3f79ed774d2e0100ded5b171a47f2caf for the full example

async def async_main(queue):
    # your async code can go here
    while True:
        command = await queue.get()
        if command.id == 'print':
            print('Hello from async!')
        elif command.id == 'double':
            await queue.put(command.data * 2)

with LaunchAsync(async_main) as queue:
    # your threaded code can go here
    queue.put(Command('print'))
    queue.put(Command('double', 7))
    response = queue.get(timeout=1)
    print('The result of doubling 7 is', response)

解决方案 4:

使用run_in_executor()就足够了,直到复杂的实时更改开始破坏结构状态。由于复制和 pickle 不会立即完成,因此不完整的更改可能会进入转储。在这种情况下,您必须同步对结构的访问以避免这种情况。

在简单的情况下,你只需要这样的东西:

lock = asyncio.Lock()

async with lock:
    await loop.run_in_executor(...)

...

async with lock:
    # do something else in other tasks

但是如果函数耗时太长,那么需要使用结构的其他所有任务都必须等待。即使使用结构已经很安全。您可以考虑仅在正确的位置直接在函数中进行同步,但您不能在asyncio.Lock异步函数之外使用,它也不是线程安全的,而且使用线程会阻​​塞事件循环。

但是,使用 (我是aiologicaiologic的创建者)可以执行不阻塞事件循环的同步。你可以这样做:aiologic.Lock

lock = aiologic.Lock()

with lock:
    structure_copy = copy.deepcopy(structure)

...

async with lock:
    # do something else in async tasks

您甚至不必aiologic.Lock在当前事件循环中创建实例。因此,您可以安全地将实例作为结构的成员,并在异步和同步环境中使用它。这是创建通用线程安全对象的方法。

如果您关心性能,或者您有更复杂的情况,请查看aiologic提供的其他原语。例如,哪些原语可以用作锁。

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用