asyncio:是否有可能取消执行器运行的未来?

2025-04-10 09:45:00
admin
原创
22
摘要:问题描述:我想使用 asyncio 调用 loop.run_in_executor 在 Executor 中启动一个阻塞函数,然后稍后取消它,但这对我来说似乎不起作用。以下是代码:import asyncio import time from concurrent.futures import Thread...

问题描述:

我想使用 asyncio 调用 loop.run_in_executor 在 Executor 中启动一个阻塞函数,然后稍后取消它,但这对我来说似乎不起作用。

以下是代码:

import asyncio
import time

from concurrent.futures import ThreadPoolExecutor


def blocking_func(seconds_to_block):
    for i in range(seconds_to_block):
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


@asyncio.coroutine
def non_blocking_func(seconds):
    for i in range(seconds):
        print('yielding {}/{}'.format(i, seconds))
        yield from asyncio.sleep(1)

    print('done non blocking {}'.format(seconds))


@asyncio.coroutine
def main():
    non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)]
    blocking_future = loop.run_in_executor(None, blocking_func, 5)
    print('wait a few seconds!')
    yield from asyncio.sleep(1.5)

    blocking_future.cancel()
    yield from asyncio.wait(non_blocking_futures)



loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=1)
loop.set_default_executor(executor)
asyncio.async(main())
loop.run_forever()

我希望上面的代码只允许阻塞函数输出:

blocking 0/5
blocking 1/5

然后查看非阻塞函数的输出。但即使在我取消之后,阻塞的未来仍然继续。

有可能吗?还有其他方法吗?

谢谢

编辑:有关使用 asyncio 运行阻塞和非阻塞代码的更多讨论:如何使用 asyncio 连接阻塞和非阻塞代码


解决方案 1:

在这种情况下,一旦它实际开始运行,就无法取消Future,因为您依赖于的行为concurrent.futures.Future,并且其文档说明如下:

cancel()

尝试取消调用。如果调用当前正在执行且无法取消,则该方法将返回False,否则调用将被取消且该方法将返回True

因此,只有在任务仍在 中待处理时,取消才会成功Executor。现在,您实际上是在将asyncio.Future包裹在 中concurrent.futures.Future,实际上,如果您在调用 之后尝试执行 ,则asyncio.Future返回的loop.run_in_executor()会引发,即使底层任务实际上已在运行。但是,它实际上不会取消 中任务的执行。CancellationError`yield fromcancel()Executor`

如果您确实需要取消任务,则需要使用更常规的方法来中断线程中正在运行的任务。具体如何操作取决于用例。对于您在示例中提出的用例,您可以使用threading.Event

def blocking_func(seconds_to_block, event):
    for i in range(seconds_to_block):
        if event.is_set():
            return
        print('blocking {}/{}'.format(i, seconds_to_block))
        time.sleep(1)

    print('done blocking {}'.format(seconds_to_block))


...
event = threading.Event()
blocking_future = loop.run_in_executor(None, blocking_func, 5, event)
print('wait a few seconds!')
yield from asyncio.sleep(1.5)

blocking_future.cancel()  # Mark Future as cancelled
event.set() # Actually interrupt blocking_func

解决方案 2:

由于线程与进程共享相同的内存地址空间,因此没有安全的方法来终止正在运行的线程。这就是为什么大多数编程语言不允许终止正在运行的线程的原因(有很多丑陋的黑客绕过这个限制)。

Java 通过艰苦的努力才学会了这一点。

解决方案是在单独的进程而不是线程中运行你的函数并正常终止它。

Pebble库提供了类似于支持取消concurrent.futures运行的接口。Futures

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()  
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用