Python 中最简单的 async/await 示例

2025-04-15 09:20:00
admin
原创
27
摘要:问题描述:asyncio我读过很多关于 Python 3.5+ 中//async的例子、博客文章和问答await,很多都很复杂,我发现最简单的大概就是这个了。 它仍然使用了,为了学习 Python 中的异步编程,我希望看到一个更简洁的例子,以及完成一个基本的 async / await 示例所需的最少工具e...

问题描述:

asyncio我读过很多关于 Python 3.5+ 中//async的例子、博客文章和问答await,很多都很复杂,我发现最简单的大概就是这个了。

它仍然使用了,为了学习 Python 中的异步编程,我希望看到一个更简洁的例子,以及完成一个基本的 async / await 示例所需的最少工具ensure_future是什么。

问题:是否可以给出一个简单的示例来展示如何工作asyncawait仅使用这两个关键字+代码来运行异步循环+其他 Python 代码而不使用其他asyncio函数?

例如:像这样:

import asyncio

async def async_foo():
    print("async_foo started")
    await asyncio.sleep(5)
    print("async_foo done")

async def main():
    asyncio.ensure_future(async_foo())  # fire and forget async_foo()
    print('Do some actions 1')
    await asyncio.sleep(5)
    print('Do some actions 2')

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

但没有ensure_future,仍然演示了 await / async 如何工作。


解决方案 1:

为了回答您的问题,我将针对同一问题提供三种不同的解决方案。

情况 1:普通 Python

import time

def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

def sum_(name, numbers):
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        sleep()
        total += number
    print(f'Task {name}: Sum = {total}
')

start = time.time()
tasks = [
    sum_("A", [1, 2]),
    sum_("B", [1, 2, 3]),
]
end = time.time()
print(f'Time: {end-start:.2f} sec')

输出:

Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 0+1
Time: 2.00
Task B: Computing 1+2
Time: 3.00
Task B: Computing 3+3
Time: 4.00
Task B: Sum = 6

Time: 5.00 sec

案例 2:async/await 执行错误

import asyncio
import time

async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

async def sum_(name, numbers):
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        await sleep()
        total += number
    print(f'Task {name}: Sum = {total}
')

start = time.time()

loop = asyncio.new_event_loop()
tasks = [
    loop.create_task(sum_("A", [1, 2])),
    loop.create_task(sum_("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

end = time.time()
print(f'Time: {end-start:.2f} sec')

输出:

Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 0+1
Time: 2.00
Task B: Computing 1+2
Time: 3.00
Task B: Computing 3+3
Time: 4.00
Task B: Sum = 6

Time: 5.00 sec

案例 3:正确使用 async/await

与情况 2 相同,但sleep功能不同:

async def sleep():
    print(f'Time: {time.time() - start:.2f}')
    await asyncio.sleep(1)

输出:

Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.01
Task B: Computing 1+2
Time: 1.01
Task A: Sum = 3

Task B: Computing 3+3
Time: 2.01
Task B: Sum = 6

Time: 3.02 sec

情况 1 和情况 2 都给出了相同的5 秒,而情况 3 只给出了3 秒。因此,正确执行的 async/await速度更快。

造成差异的原因在于sleep函数的实现上。

# Case 1
def sleep():
    ...
    time.sleep(1)

# Case 2
async def sleep():
    ...
    time.sleep(1)

# Case 3
async def sleep():
    ...
    await asyncio.sleep(1)

在情况 1 和情况 2 中,它们是“相同的”:它们“休眠”而不允许其他人使用资源。而在情况 3 中,它在休眠时允许访问资源。

在第二种情况下,我们将其添加async到普通函数中。然而,事件循环会不间断地运行它。为什么?因为我们没有说明循环在哪里可以中断你的函数来运行另一个任务。

在情况 3 中,我们告诉事件循环应该在哪里中断函数并运行另一个任务。具体在哪里?就在这里!

await asyncio.sleep(1)

欲了解更多信息,请阅读此处。

考虑阅读

  • 异步编程指南

  • Asyncio Futures 和协程

解决方案 2:

是否可以给出一个简单的示例来展示async/如何await
工作,仅使用这两个关键字 + asyncio.get_event_loop()+
run_until_complete+ 其他 Python 代码而不使用其他asyncio函数?

这样就可以编写有效的代码:

import asyncio


async def main():
    print('done!')


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

但这样就无法证明为什么需要 asyncio。

顺便问一下,为什么你需要asyncio,而不是简单的代码?答案是:asyncio当你并行化 I/O 阻塞操作(比如读写网络)时,它可以让你获得性能优势。为了写一个有用的示例,你需要使用这些操作的异步实现。

请阅读此答案以获得更详细的解释。

更新:

好的,这里有一个用来asyncio.sleep模拟 I/O 阻塞操作的示例,asyncio.gather它展示了如何同时运行多个阻塞操作:

import asyncio


async def io_related(name):
    print(f'{name} started')
    await asyncio.sleep(1)
    print(f'{name} finished')


async def main():
    await asyncio.gather(
        io_related('first'),
        io_related('second'),
    )  # 1s + 1s = over 1s


if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

first started
second started
first finished
second finished
[Finished in 1.2s]

请注意两者io_related是如何开始的,仅仅一秒钟之后,两者就完成了。

解决方案 3:

Python 3.7+ 现在有一个更简单的 API(在我看来),措辞更简单(比“ensure_future”更容易记住):您可以使用create_task它返回一个 Task 对象(如果需要,以后可以用来取消任务)。

基本示例 1

import asyncio

async def hello(i):
    print(f"hello {i} started")
    await asyncio.sleep(4)
    print(f"hello {i} done")

async def main():
    task1 = asyncio.create_task(hello(1))  # returns immediately, the task is created
    await asyncio.sleep(3)
    task2 = asyncio.create_task(hello(2))
    await task1
    await task2

asyncio.run(main())  # main loop

结果:

你好 1 已开始

你好 2 已开始

你好 1 已完成

你好 2 已完成


基本示例 2

如果你需要获取这些异步函数的返回值gather,那么很有用。以下示例灵感来自文档。

import asyncio

async def factorial(n):
    f = 1
    for i in range(2, n + 1):
        print(f"Computing factorial({n}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    return f

async def main():
    L = await asyncio.gather(factorial(2), factorial(3), factorial(4))
    print(L)  # [2, 6, 24]

asyncio.run(main())

预期输出:

计算阶乘(2),当前 i=2...

计算阶乘(3),当前 i=2...

计算阶乘(4),当前 i=2...

计算阶乘(3),当前 i=3...

计算阶乘(4),当前 i=3...

计算阶乘(4),当前 i=4...

[2, 6, 24]


PS:无论您使用asyncio还是trio其他库,后者的教程都对我理解 Python 异步编程很有帮助。

解决方案 4:

既然一切都解释得很好,那么让我们运行一些带有事件循环的示例,比较同步代码和异步代码。

同步代码:

import time

def count():
    time.sleep(1)
    print('1')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('3')

def main():
    for i in range(3):
        count()

if __name__ == "__main__":
    t = time.perf_counter()
    main()
    t2 = time.perf_counter()
    
    print(f'Total time elapsed: {t2:0.2f} seconds')

输出:

1
2
3
1
2
3
1
2
3
Total time elapsed: 9.00 seconds

我们可以看到,每个计数循环都运行完毕后,下一个循环才开始。

异步代码:

import asyncio
import time

async def count():
    await asyncio.sleep(1)
    print('1')
    await asyncio.sleep(1)
    print('2')
    await asyncio.sleep(1)
    print('3')

async def main():
    await asyncio.gather(count(), count(), count())

if __name__ == "__main__":
    t = time.perf_counter()
    asyncio.run(main())
    t2 = time.perf_counter()

    print(f'Total time elapsed: {t2:0.2f} seconds')

输出:

1
1
1
2
2
2
3
3
3
Total time elapsed: 3.00 seconds

另一方面,异步等效看起来是这样的,运行花了三秒钟,而不是九秒钟。第一个计数周期开始了,一旦它到达await睡眠状态,Python 就可以自由地做其他工作,例如启动第二个计数周期,然后是第三个计数周期。这就是为什么我们先将所有管数设为 1,然后再将管数设为 3。在输出中,并发编程可能是一种非常有价值的工具。多处理让操作系统完成所有的多任务工作,在 Python 中,它是多核并发的唯一选择,即让你的程序在多个 CPU 核心上执行。如果使用线程,那么操作系统仍然在做所有的多任务工作,在 cpython 中,全局重复锁会阻止异步编程中的多核并发。没有操作系统干预,只有一个进程,有一个线程,所以任务可以在等待期间释放 CPU,以便其他任务可以使用它。

import asyncio

loop = asyncio.get_event_loop()


async def greeter(name):
    print(f"Hi, {name} you're in a coroutine.")

try:
    print('starting coroutine')
    coro = greeter('LP')
    print('entering event loop')
    loop.run_until_complete(coro)
finally:
    print('closing event loop')
    loop.close()

输出:

starting coroutine
entering event loop
Hi, LP you're in a coroutine.
closing event loop

异步框架需要一个调度程序,通常称为事件循环。此事件循环跟踪所有正在运行的任务,当某个函数挂起时,它会将控制权返回给事件循环,然后事件循环会寻找另一个函数来启动或恢复,这称为协作式多任务处理。异步 IO 提供了一个以此事件循环为中心的异步框架,它可以高效地处理输入/输出事件。应用程序与事件循环交互时,它会显式地注册要运行的代码,然后让事件循环调度程序在资源可用时对应用程序代码进行必要的调用。因此,如果网络服务器打开套接字,然后注册它们以便在它们发生输入事件时收到通知,则当有新的传入连接或有数据需要读取时,事件循环将提醒服务器代码。如果没有更多数据需要从套接字读取,则服务器将控制权交还给事件循环。

将控制权交还给事件循环的机制依赖于协同程序。协同程序是一种专为并发操作而设计的语言结构。协同程序可以使用 awake 关键字暂停与另一个协同程序的执行。在暂停期间,协同程序的状态保持不变,允许它从中断处恢复执行。一个协同程序可以启动另一个协同程序,然后等待结果。这使得将任务分解为可重用部分变得更加容易。

import asyncio

loop = asyncio.get_event_loop()

async def outer():
    print('in outer')
    print('waiting for result 1')
    result1 = await phase1()
    print('waiting for result 2')
    result2 = await phase2(result1)
    return result1, result2


async def phase1():
    print('in phase1')
    return 'phase1 result'

async def phase2(arg):
    print('in phase2')
    return 'result2 derived from {}'.format(arg)

asyncio.run(outer())

输出:

in outer
waiting for result 1
in phase1
waiting for result 2
in phase2

此示例要求两个阶段必须按顺序执行,但可以与其他操作并发运行。awake使用关键字 而不是将新的协同程序添加到循环中,因为控制流已经位于循环管理的协同程序内部。无需指示循环管理新的协同程序。

解决方案 5:

我不知道为什么,但是关于这个主题的所有解释都太复杂了,或者他们使用了无用的 asyncio.sleep() 的示例......到目前为止,我发现的最好的代码示例是:https: //codeflex.co/python3-async-await-example/

解决方案 6:

似乎每个人都专注于切换time.sleepasyncio.sleep,但在现实世界中,这始终是不可能的。有时你需要进行库调用,而这可能需要 API 调用(例如:向 Google 请求签名 URL)。

以下是仍然可以使用的方法time.sleep,但是以异步方式:

import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor

def sleep():
    print(f'Time: {time.time() - start:.2f}')
    time.sleep(1)

async def sum(name, numbers):
    _executor = ThreadPoolExecutor(2)
    total = 0
    for number in numbers:
        print(f'Task {name}: Computing {total}+{number}')
        await loop.run_in_executor(_executor, sleep)
        total += number
    print(f'Task {name}: Sum = {total}
')

start = time.time()

loop = asyncio.get_event_loop()
tasks = [
    loop.create_task(sum("A", [1, 2])),
    loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

end = time.time()
print(f'Time: {end-start:.2f} sec')

输出:

Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3

Task B: Computing 3+3
Time: 2.01
Task B: Sum = 6

Time: 3.01 sec

解决方案 7:

简单..甜蜜..棒极了..✅

  import asyncio
  import time
  import random

  async def eat():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Eating")

  async def sleep():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Sleeping")

  async def repeat():
     wait = random.randint(0,3)
     await asyncio.sleep(wait)
     print("Done With Repeating")

  async def main():
     for x in range(5):
        await asyncio.gather(eat(),sleep(),repeat())
        time.sleep(2)
        print("+","-"*20)

  if __name__ == "__main__":
     t = time.perf_counter()
     asyncio.run(main())
     t2 = time.perf_counter()

     print(f'Total time elapsed: {t2:0.2f} seconds')

解决方案 8:

import asyncio
import requests

async def fetch_users():
    response = requests.get('https://www.testjsonapi.com/users/')
    users = response.json()
    return users

async def print_users():
    # create an asynchronous task to run concurrently 
    # which wont block executing print statement before it finishes
    response = asyncio.create_task(fetch_users())
    print("Fetching users ")
    # wait to get users data from response before printing users
    users = await response

    for user in users:
        print(f"name : {user['name']} email : {user['email']}")

asyncio.run(print_users())
print("All users printed in console")

输出将如下所示

Fetching users
name : Harjas Malhotra email : harjas@gmail.com
name : Alisha Paul email : alisha@gmail.com
name : Mart Right email : marrk9658@yahoo.com
name : Brad Pitter email : brad@gmail.com
name : Ervin Dugg email : Ervin69@gmail.com 
name : Graham Bell email : Graham@bell.biz
name : James Rush email : james369@hotmail.com
name : Deepak Dev email : deepak@gmail.com
name : Ajay Rich email : therichposts@gmail.com
All users printed in console

让我们看看代码是如何工作的。首先,当python调用print_users()它时,它不会让下面的打印语句执行,直到它完成为止。因此,进入内部后将print_users()创建一个并发任务,以便它下面的语句可以与此处的任务同时运行fetch_users()。此任务的运行时间Fetching users将打印在控制台中。之后,python将等待响应,fetch_users()因为用户在接收之前不应打印。完成后,fetch_users()所有用户的姓名和电子邮件都将打印在控制台中。因此,下面的打印语句完成后,print_users()它将被执行。

解决方案 9:

尽管上面的一些答案有点抽象

from datetime import datetime
import asyncio




async def time_taking(max_val,task_no):
    print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))

    await asyncio.sleep(2)
    value_list = []
    for i in range(0,max_val):
        value_list.append(i)

    print("****FINSIHING UP TASk NO {}  **".format(task_no))
    return value_list



async def test2(task_no):
    await asyncio.sleep(5)
    print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))
    await asyncio.sleep(5)
    print("****FINSIHING UP  TASk NO {}  **".format(task_no))

async def function(value = None):
    tasks = []
    start_time = datetime.now()
    
    # CONCURRENT TASKS
    tasks.append(asyncio.create_task(time_taking(20,1)))
    tasks.append(asyncio.create_task(time_taking(43,2)))
    tasks.append(asyncio.create_task(test2(3)))
    
    # concurrent execution
    lists = await asyncio.gather(*tasks)
    end_time = datetime.now()
    
    time_taken = end_time - start_time
    return lists,time_taken


# run inside event loop 
res,time_taken = asyncio.run(function())

print(res,time_taken)

解决方案 10:

这里有一个非常简单和流畅的例子:

import asyncio

async def my_task1():
    print("Task 1 started")
    await asyncio.sleep(1)  # some light io task
    print("Task 1 completed")
    return "Done1"

async def my_task2():
    print("Task 2 started")
    await asyncio.sleep(2)  # some heavy io task
    print("Task 2 completed")
    return "Done2"

async def main():
    # both the functions are independent of each other, 
    # as tasks gets completes, `.gather` keeps on storing the results
    results = await asyncio.gather(my_task1(), my_task2())
    print(f"The results are {results}")
    
    # if task1 is dependent on completion of task2, then use this
    ret1 = await my_task2()
    ret2 = await my_task1()
    print(f"The ret1: {ret1} ret2 {ret2}")

asyncio.run(main())
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用