如何将 python asyncio 与线程结合起来?
- 2025-03-20 08:48:00
- admin 原创
- 43
问题描述:
我已经成功使用 Python asyncio 和 aiohttp 构建了一个RESTful 微服务,它可以监听 POST 事件以从各种馈送器收集实时事件。
然后,它构建一个内存结构,将过去 24 小时的事件缓存在嵌套的 defaultdict/deque 结构中。
现在我想定期将该结构检查点到磁盘,最好使用 pickle。
由于内存结构可能>100MB,因此我希望避免在检查结构点所需的时间内延迟传入事件的处理。
我宁愿创建结构的快照副本(例如深度复制),然后花时间将其写入磁盘并在预设的时间间隔内重复。
我一直在寻找如何组合线程的示例(线程甚至是最佳解决方案吗?)以及用于此目的的 asyncio,但找不到可以对我有帮助的东西。
非常感谢任何可以开始的指点!
解决方案 1:
使用以下方法将方法委托给线程或子进程非常简单BaseEventLoop.run_in_executor
:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_operation(x):
time.sleep(x) # This is some operation that is CPU-bound
@asyncio.coroutine
def main():
# Run cpu_bound_operation in the ProcessPoolExecutor
# This will make your coroutine block, but won't block
# the event loop; other coroutines can run in meantime.
yield from loop.run_in_executor(p, cpu_bound_operation, 5)
loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())
至于是否使用 aProcessPoolExecutor
或ThreadPoolExecutor
,这有点难说;腌制大型对象肯定会消耗一些 CPU 周期,这最初会让你认为ProcessPoolExecutor
这是可行的方法。但是,将 100MB 对象传递给Process
池中的 a 需要在您的主进程中腌制实例,通过 IPC 将字节发送到子进程,在子进程中对其进行反腌制,然后再次腌制它以便将其写入磁盘。鉴于此,我猜腌制/反腌制开销会很大,因此您最好使用ThreadPoolExecutor
,即使您会因为 GIL 而受到性能影响。
话虽如此,测试这两种方式并确定答案都非常简单,所以你不妨这样做。
解决方案 2:
我也使用过run_in_executor
,但我发现这个函数在大多数情况下有点恶心,因为它需要partial()
关键字参数,而且我从来没有用除单个执行器和默认事件循环之外的任何东西来调用它。所以我为它制作了一个方便的包装器,具有合理的默认值和自动关键字参数处理。
from time import sleep
import asyncio as aio
loop = aio.get_event_loop()
class Executor:
"""In most cases, you can just use the 'execute' instance as a
function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in
the executor, assign result to y. The defaults can be changed, though,
with your own instantiation of Executor, i.e. execute =
Executor(nthreads=4)"""
def __init__(self, loop=loop, nthreads=1):
from concurrent.futures import ThreadPoolExecutor
self._ex = ThreadPoolExecutor(nthreads)
self._loop = loop
def __call__(self, f, *args, **kw):
from functools import partial
return self._loop.run_in_executor(self._ex, partial(f, *args, **kw))
execute = Executor()
...
def cpu_bound_operation(t, alpha=30):
sleep(t)
return 20*alpha
async def main():
y = await execute(cpu_bound_operation, 5, alpha=-2)
loop.run_until_complete(main())
解决方案 3:
另一种选择是使用loop.call_soon_threadsafe
作为asyncio.Queue
沟通的中间渠道。
Python 3 的当前文档中还有一节关于使用 asyncio 进行开发 - 并发和多线程:
import asyncio
# This method represents your blocking code
def blocking(loop, queue):
import time
while True:
loop.call_soon_threadsafe(queue.put_nowait, 'Blocking A')
time.sleep(2)
loop.call_soon_threadsafe(queue.put_nowait, 'Blocking B')
time.sleep(2)
# This method represents your async code
async def nonblocking(queue):
await asyncio.sleep(1)
while True:
queue.put_nowait('Non-blocking A')
await asyncio.sleep(2)
queue.put_nowait('Non-blocking B')
await asyncio.sleep(2)
# The main sets up the queue as the communication channel and synchronizes them
async def main():
queue = asyncio.Queue()
loop = asyncio.get_running_loop()
blocking_fut = loop.run_in_executor(None, blocking, loop, queue)
nonblocking_task = loop.create_task(nonblocking(queue))
running = True # use whatever exit condition
while running:
# Get messages from both blocking and non-blocking in parallel
message = await queue.get()
# You could send any messages, and do anything you want with them
print(message)
asyncio.run(main())
如何将 asyncio 任务发送到其他线程中循环运行也可能对您有所帮助。
如果你需要更“强大”的示例,请查看我的Wrapper,用于从线程代码启动异步任务。它将为你处理线程安全部分(大部分),并让你执行以下操作:
# See https://gist.github.com/Lonami/3f79ed774d2e0100ded5b171a47f2caf for the full example
async def async_main(queue):
# your async code can go here
while True:
command = await queue.get()
if command.id == 'print':
print('Hello from async!')
elif command.id == 'double':
await queue.put(command.data * 2)
with LaunchAsync(async_main) as queue:
# your threaded code can go here
queue.put(Command('print'))
queue.put(Command('double', 7))
response = queue.get(timeout=1)
print('The result of doubling 7 is', response)
解决方案 4:
使用run_in_executor()
就足够了,直到复杂的实时更改开始破坏结构状态。由于复制和 pickle 不会立即完成,因此不完整的更改可能会进入转储。在这种情况下,您必须同步对结构的访问以避免这种情况。
在简单的情况下,你只需要这样的东西:
lock = asyncio.Lock()
async with lock:
await loop.run_in_executor(...)
...
async with lock:
# do something else in other tasks
但是如果函数耗时太长,那么需要使用结构的其他所有任务都必须等待。即使使用结构已经很安全。您可以考虑仅在正确的位置直接在函数中进行同步,但您不能在asyncio.Lock
异步函数之外使用,它也不是线程安全的,而且使用线程会阻塞事件循环。
但是,使用 (我是aiologicaiologic
的创建者)可以执行不阻塞事件循环的同步。你可以这样做:aiologic.Lock
lock = aiologic.Lock()
with lock:
structure_copy = copy.deepcopy(structure)
...
async with lock:
# do something else in async tasks
您甚至不必aiologic.Lock
在当前事件循环中创建实例。因此,您可以安全地将实例作为结构的成员,并在异步和同步环境中使用它。这是创建通用线程安全对象的方法。
如果您关心性能,或者您有更复杂的情况,请查看aiologic
提供的其他原语。例如,哪些原语可以用作锁。
扫码咨询,免费领取项目管理大礼包!