asyncio:是否有可能取消执行器运行的未来?
- 2025-04-10 09:45:00
- admin 原创
- 23
问题描述:
我想使用 asyncio 调用 loop.run_in_executor 在 Executor 中启动一个阻塞函数,然后稍后取消它,但这对我来说似乎不起作用。
以下是代码:
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_func(seconds_to_block):
for i in range(seconds_to_block):
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
@asyncio.coroutine
def non_blocking_func(seconds):
for i in range(seconds):
print('yielding {}/{}'.format(i, seconds))
yield from asyncio.sleep(1)
print('done non blocking {}'.format(seconds))
@asyncio.coroutine
def main():
non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
blocking_future = loop.run_in_executor(None, blocking_func, 5)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel()
yield from asyncio.wait(non_blocking_futures)
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()
我希望上面的代码只允许阻塞函数输出:
blocking 0/5
blocking 1/5
然后查看非阻塞函数的输出。但即使在我取消之后,阻塞的未来仍然继续。
有可能吗?还有其他方法吗?
谢谢
编辑:有关使用 asyncio 运行阻塞和非阻塞代码的更多讨论:如何使用 asyncio 连接阻塞和非阻塞代码
解决方案 1:
在这种情况下,一旦它实际开始运行,就无法取消Future
,因为您依赖于的行为concurrent.futures.Future
,并且其文档说明如下:
cancel()
尝试取消调用。如果调用当前正在执行且无法取消,则该方法将返回
False
,否则调用将被取消且该方法将返回True
。
因此,只有在任务仍在 中待处理时,取消才会成功Executor
。现在,您实际上是在将asyncio.Future
包裹在 中concurrent.futures.Future
,实际上,如果您在调用 之后尝试执行 ,则asyncio.Future
返回的loop.run_in_executor()
会引发,即使底层任务实际上已在运行。但是,它实际上不会取消 中任务的执行。CancellationError
`yield fromcancel()
Executor`
如果您确实需要取消任务,则需要使用更常规的方法来中断线程中正在运行的任务。具体如何操作取决于用例。对于您在示例中提出的用例,您可以使用threading.Event
:
def blocking_func(seconds_to_block, event):
for i in range(seconds_to_block):
if event.is_set():
return
print('blocking {}/{}'.format(i, seconds_to_block))
time.sleep(1)
print('done blocking {}'.format(seconds_to_block))
...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)
blocking_future.cancel() # Mark Future as cancelled
event.set() # Actually interrupt blocking_func
解决方案 2:
由于线程与进程共享相同的内存地址空间,因此没有安全的方法来终止正在运行的线程。这就是为什么大多数编程语言不允许终止正在运行的线程的原因(有很多丑陋的黑客绕过这个限制)。
Java 通过艰苦的努力才学会了这一点。
解决方案是在单独的进程而不是线程中运行你的函数并正常终止它。
Pebble库提供了类似于支持取消concurrent.futures
运行的接口。Futures
from pebble import ProcessPool
def function(foo, bar=0):
return foo + bar
with ProcessPool() as pool:
future = pool.schedule(function, args=[1])
# if running, the container process will be terminated
# a new process will be started consuming the next task
future.cancel()
扫码咨询,免费领取项目管理大礼包!