Python 中的异步方法调用?

2025-02-28 08:22:00
admin
原创
79
摘要:问题描述:我想知道Python中是否有用于异步方法调用的库。如果你能做类似的事情就太好了@async def longComputation(): <code> token = longComputation() token.registerCallback(callback_func...

问题描述:

我想知道Python中是否有用于异步方法调用的库。如果你能做类似的事情就太好了

@async
def longComputation():
    <code>


token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
    doSomethingElse()
    if token.finished():
        result = token.result()

或者异步调用非异步例程

def longComputation()
    <code>

token = asynccall(longComputation())

如果语言核心中有一个更精细的策略作为母语,那就太好了。有考虑过这个吗?


解决方案 1:

类似于:

import threading

thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done

有关更多详细信息,请参阅https://docs.python.org/library/threading.html上的文档。

解决方案 2:

您可以使用Python 2.6 中添加的多处理模块。您可以使用进程池,然后使用以下命令异步获取结果:

apply_async(func[, args[, kwds[, callback]]])

例如:

import time
from multiprocessing import Pool

def postprocess(result):
    print("finished: %s" % result)

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback=postprocess) # Evaluate "f(10)" asynchronously calling callback when finished.
    print("waiting...")
    time.sleep(1)

这只是一种选择。该模块提供了很多功能来实现您想要的功能。此外,使用它来制作装饰器也非常容易。

解决方案 3:

从 Python 3.5 开始,您可以使用增强的生成器来执行异步函数。

import asyncio
import datetime

增强的生成器语法:

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

async/await语法:

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

解决方案 4:

虽然它不在语言核心中,但Twisted是一个非常成熟的库,可以满足您的需求。它引入了 Deferred 对象,您可以将回调或错误处理程序(“errbacks”)附加到该对象。Deferred 基本上是一个“承诺”,即函数最终会产生结果。

解决方案 5:

您可以实现装饰器以使您的函数异步,尽管这有点棘手。该multiprocessing模块充满了小怪癖和看似随意的限制 - 但更有理由将其封装在一个友好的界面后面。

from inspect import getmodule
from multiprocessing import Pool


def async(decorated):
    r'''Wraps a top-level function around an asynchronous dispatcher.

        when the decorated function is called, a task is submitted to a
        process pool, and a future object is returned, providing access to an
        eventual return value.

        The future object has a blocking get() method to access the task
        result: it will return immediately if the job is already done, or block
        until it completes.

        This decorator won't work on methods, due to limitations in Python's
        pickling machinery (in principle methods could be made pickleable, but
        good luck on that).
    '''
    # Keeps the original function visible from the module global namespace,
    # under a name consistent to its __name__ attribute. This is necessary for
    # the multiprocessing pickling machinery to work properly.
    module = getmodule(decorated)
    decorated.__name__ += '_original'
    setattr(module, decorated.__name__, decorated)

    def send(*args, **opts):
        return async.pool.apply_async(decorated, args, opts)

    return send

下面的代码说明了装饰器的用法:

@async
def printsum(uid, values):
    summed = 0
    for value in values:
        summed += value

    print("Worker %i: sum value is %i" % (uid, summed))

    return (uid, summed)


if __name__ == '__main__':
    from random import sample

    # The process pool must be created inside __main__.
    async.pool = Pool(4)

    p = range(0, 1000)
    results = []
    for i in range(4):
        result = printsum(i, sample(p, 100))
        results.append(result)

    for result in results:
        print("Worker %i: sum value is %i" % result.get())

在现实世界中,我会对装饰器进行更详细的阐述,提供一些方法来关闭它以进行调试(同时保持未来的接口),或者可能提供一种处理异常的工具;但我认为这已经足够说明这个原理了。

解决方案 6:

只是

import threading, time

def f():
    print "f started"
    time.sleep(3)
    print "f finished"

threading.Thread(target=f).start()

解决方案 7:

我的解决方案是:

import threading

class TimeoutError(RuntimeError):
    pass

class AsyncCall(object):
    def __init__(self, fnc, callback = None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
        self.Thread.start()
        return self

    def wait(self, timeout = None):
        self.Thread.join(timeout)
        if self.Thread.isAlive():
            raise TimeoutError()
        else:
            return self.Result

    def run(self, *args, **kwargs):
        self.Result = self.Callable(*args, **kwargs)
        if self.Callback:
            self.Callback(self.Result)

class AsyncMethod(object):
    def __init__(self, fnc, callback=None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)

def Async(fnc = None, callback = None):
    if fnc == None:
        def AddAsyncCallback(fnc):
            return AsyncMethod(fnc, callback)
        return AddAsyncCallback
    else:
        return AsyncMethod(fnc, callback)

并且完全按照要求工作:

@Async
def fnc():
    pass

解决方案 8:

您可以使用 eventlet。它允许您编写看似同步的代码,但可以通过网络异步运行。

这是一个超极简爬虫的示例:

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
     "https://wiki.secondlife.com/w/images/secondlife.jpg",
     "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

import eventlet
from eventlet.green import urllib2

def fetch(url):

  return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
  print "got body", len(body)

解决方案 9:

Python 3.7asyncio及更高版本中较新的运行方法是使用而不是创建和调用以及关闭它:asyncio.run()`loop`loop.run_until_complete()

import asyncio
import datetime

async def display_date(delay):
    loop = asyncio.get_running_loop()
    end_time = loop.time() + delay
    while True:
        print("Blocking...", datetime.datetime.now())
        await asyncio.sleep(1)
        if loop.time() > end_time:
            print("Done.")
            break


asyncio.run(display_date(5))

解决方案 10:

类似这样的方法对我有用,然后您可以调用该函数,它会将自身分派到新线程中。

from thread import start_new_thread

def dowork(asynchronous=True):
    if asynchronous:
        args = (False)
        start_new_thread(dowork,args) #Call itself on a new thread.
    else:
        while True:
            #do something...
            time.sleep(60) #sleep for a minute
    return

解决方案 11:

您可以使用current.futures(在Python 3.2中添加)。

import time
from concurrent.futures import ThreadPoolExecutor


def long_computation(duration):
    for x in range(0, duration):
        print(x)
        time.sleep(1)
    return duration * 2


print('Use polling')
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(long_computation, 5)
    while not future.done():
        print('waiting...')
        time.sleep(0.5)

    print(future.result())

print('Use callback')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(long_computation, 5)
future.add_done_callback(lambda f: print(f.result()))

print('waiting for callback')

executor.shutdown(False)  # non-blocking

print('shutdown invoked')

解决方案 12:

2021 年使用 Python 3.9 进行异步调用的原生 Python 方式也适用于 Jupyter / Ipython Kernel

Camabeh 的答案是 Python 3.3 以来的必由之路。

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

这可以在 Jupyter Notebook/Jupyter Lab 中运行,但会引发错误:

RuntimeError: This event loop is already running

由于 Ipython 使用事件循环,我们需要一种称为嵌套异步循环的东西,而Python 中尚未实现这种东西。幸运的是,有nest_asyncio可以解决这个问题。您需要做的就是:

!pip install nest_asyncio # use ! within Jupyter Notebook, else pip install in shell
import nest_asyncio
nest_asyncio.apply()

(基于此线程)

只有当你调用loop.close()它时才会引发另一个错误,因为它可能指的是 Ipython 的主循环。

RuntimeError: Cannot close a running event loop

一旦有人回答这个 github 问题,我就会更新这个答案。

解决方案 13:

有什么理由不使用线程吗?您可以使用threading类。使用 而不是finished()函数isAlive()result()函数可以join()线程并检索结果。并且,如果可以的话,覆盖run()__init__函数以调用构造函数中指定的函数并将值保存到类的实例的某个位置。

解决方案 14:

你可以使用进程。如果你想让它永远运行,在你的函数中使用 while(像网络一样):

from multiprocessing import Process
def foo():
    while 1:
        # Do something

p = Process(target = foo)
p.start()

如果你只想运行一次,请这样做:

from multiprocessing import Process
def foo():
    # Do something

p = Process(target = foo)
p.start()
p.join()
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2941  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1803  
  PLM(产品生命周期管理)系统在企业的产品研发、生产与管理过程中扮演着至关重要的角色。然而,在实际运行中,资源冲突是经常会遇到的难题。资源冲突可能导致项目进度延迟、成本增加以及产品质量下降等一系列问题,严重影响企业的效益与竞争力。因此,如何有效应对PLM系统中的资源冲突,成为众多企业关注的焦点。接下来,我们将详细探讨5...
plm项目管理系统   31  
  敏捷项目管理与产品生命周期管理(PLM)的融合,正成为企业在复杂多变的市场环境中提升研发效率、增强竞争力的关键举措。随着技术的飞速发展和市场需求的快速更迭,传统的研发流程面临着诸多挑战,而将敏捷项目管理理念融入PLM,有望在2025年实现研发流程的深度优化,为企业创造更大的价值。理解敏捷项目管理与PLM的核心概念敏捷项...
plm项目   31  
  模块化设计在现代产品开发中扮演着至关重要的角色,它能够提升产品开发效率、降低成本、增强产品的可维护性与可扩展性。而产品生命周期管理(PLM)系统作为整合产品全生命周期信息的关键平台,对模块化设计有着强大的支持能力。随着技术的不断发展,到 2025 年,PLM 系统在支持模块化设计方面将有一系列令人瞩目的技术实践。数字化...
plm软件   28  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用