Python 中最简单的 async/await 示例
- 2025-04-15 09:20:00
- admin 原创
- 27
问题描述:
asyncio
我读过很多关于 Python 3.5+ 中//async
的例子、博客文章和问答await
,很多都很复杂,我发现最简单的大概就是这个了。
它仍然使用了,为了学习 Python 中的异步编程,我希望看到一个更简洁的例子,以及完成一个基本的 async / await 示例所需的最少工具ensure_future
是什么。
问题:是否可以给出一个简单的示例来展示如何工作async
,await
仅使用这两个关键字+代码来运行异步循环+其他 Python 代码而不使用其他asyncio
函数?
例如:像这样:
import asyncio
async def async_foo():
print("async_foo started")
await asyncio.sleep(5)
print("async_foo done")
async def main():
asyncio.ensure_future(async_foo()) # fire and forget async_foo()
print('Do some actions 1')
await asyncio.sleep(5)
print('Do some actions 2')
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
但没有ensure_future
,仍然演示了 await / async 如何工作。
解决方案 1:
为了回答您的问题,我将针对同一问题提供三种不同的解决方案。
情况 1:普通 Python
import time
def sleep():
print(f'Time: {time.time() - start:.2f}')
time.sleep(1)
def sum_(name, numbers):
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
sleep()
total += number
print(f'Task {name}: Sum = {total}
')
start = time.time()
tasks = [
sum_("A", [1, 2]),
sum_("B", [1, 2, 3]),
]
end = time.time()
print(f'Time: {end-start:.2f} sec')
输出:
Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3
Task B: Computing 0+1
Time: 2.00
Task B: Computing 1+2
Time: 3.00
Task B: Computing 3+3
Time: 4.00
Task B: Sum = 6
Time: 5.00 sec
案例 2:async/await 执行错误
import asyncio
import time
async def sleep():
print(f'Time: {time.time() - start:.2f}')
time.sleep(1)
async def sum_(name, numbers):
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
await sleep()
total += number
print(f'Task {name}: Sum = {total}
')
start = time.time()
loop = asyncio.new_event_loop()
tasks = [
loop.create_task(sum_("A", [1, 2])),
loop.create_task(sum_("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print(f'Time: {end-start:.2f} sec')
输出:
Task A: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task A: Sum = 3
Task B: Computing 0+1
Time: 2.00
Task B: Computing 1+2
Time: 3.00
Task B: Computing 3+3
Time: 4.00
Task B: Sum = 6
Time: 5.00 sec
案例 3:正确使用 async/await
与情况 2 相同,但sleep
功能不同:
async def sleep():
print(f'Time: {time.time() - start:.2f}')
await asyncio.sleep(1)
输出:
Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.01
Task B: Computing 1+2
Time: 1.01
Task A: Sum = 3
Task B: Computing 3+3
Time: 2.01
Task B: Sum = 6
Time: 3.02 sec
情况 1 和情况 2 都给出了相同的5 秒,而情况 3 只给出了3 秒。因此,正确执行的 async/await速度更快。
造成差异的原因在于sleep
函数的实现上。
# Case 1
def sleep():
...
time.sleep(1)
# Case 2
async def sleep():
...
time.sleep(1)
# Case 3
async def sleep():
...
await asyncio.sleep(1)
在情况 1 和情况 2 中,它们是“相同的”:它们“休眠”而不允许其他人使用资源。而在情况 3 中,它在休眠时允许访问资源。
在第二种情况下,我们将其添加async
到普通函数中。然而,事件循环会不间断地运行它。为什么?因为我们没有说明循环在哪里可以中断你的函数来运行另一个任务。
在情况 3 中,我们告诉事件循环应该在哪里中断函数并运行另一个任务。具体在哪里?就在这里!
await asyncio.sleep(1)
欲了解更多信息,请阅读此处。
考虑阅读
异步编程指南
Asyncio Futures 和协程
解决方案 2:
是否可以给出一个简单的示例来展示
async
/如何await
工作,仅使用这两个关键字 +asyncio.get_event_loop()
+
run_until_complete
+ 其他 Python 代码而不使用其他asyncio
函数?
这样就可以编写有效的代码:
import asyncio
async def main():
print('done!')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
但这样就无法证明为什么需要 asyncio。
顺便问一下,为什么你需要asyncio
,而不是简单的代码?答案是:asyncio
当你并行化 I/O 阻塞操作(比如读写网络)时,它可以让你获得性能优势。为了写一个有用的示例,你需要使用这些操作的异步实现。
请阅读此答案以获得更详细的解释。
更新:
好的,这里有一个用来asyncio.sleep
模拟 I/O 阻塞操作的示例,asyncio.gather
它展示了如何同时运行多个阻塞操作:
import asyncio
async def io_related(name):
print(f'{name} started')
await asyncio.sleep(1)
print(f'{name} finished')
async def main():
await asyncio.gather(
io_related('first'),
io_related('second'),
) # 1s + 1s = over 1s
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
输出:
first started
second started
first finished
second finished
[Finished in 1.2s]
请注意两者io_related
是如何开始的,仅仅一秒钟之后,两者就完成了。
解决方案 3:
Python 3.7+ 现在有一个更简单的 API(在我看来),措辞更简单(比“ensure_future”更容易记住):您可以使用create_task
它返回一个 Task 对象(如果需要,以后可以用来取消任务)。
基本示例 1
import asyncio
async def hello(i):
print(f"hello {i} started")
await asyncio.sleep(4)
print(f"hello {i} done")
async def main():
task1 = asyncio.create_task(hello(1)) # returns immediately, the task is created
await asyncio.sleep(3)
task2 = asyncio.create_task(hello(2))
await task1
await task2
asyncio.run(main()) # main loop
结果:
你好 1 已开始
你好 2 已开始
你好 1 已完成
你好 2 已完成
基本示例 2
如果你需要获取这些异步函数的返回值gather
,那么很有用。以下示例灵感来自文档。
import asyncio
async def factorial(n):
f = 1
for i in range(2, n + 1):
print(f"Computing factorial({n}), currently i={i}...")
await asyncio.sleep(1)
f *= i
return f
async def main():
L = await asyncio.gather(factorial(2), factorial(3), factorial(4))
print(L) # [2, 6, 24]
asyncio.run(main())
预期输出:
计算阶乘(2),当前 i=2...
计算阶乘(3),当前 i=2...
计算阶乘(4),当前 i=2...
计算阶乘(3),当前 i=3...
计算阶乘(4),当前 i=3...
计算阶乘(4),当前 i=4...
[2, 6, 24]
PS:无论您使用asyncio
还是trio
其他库,后者的教程都对我理解 Python 异步编程很有帮助。
解决方案 4:
既然一切都解释得很好,那么让我们运行一些带有事件循环的示例,比较同步代码和异步代码。
同步代码:
import time
def count():
time.sleep(1)
print('1')
time.sleep(1)
print('2')
time.sleep(1)
print('3')
def main():
for i in range(3):
count()
if __name__ == "__main__":
t = time.perf_counter()
main()
t2 = time.perf_counter()
print(f'Total time elapsed: {t2:0.2f} seconds')
输出:
1
2
3
1
2
3
1
2
3
Total time elapsed: 9.00 seconds
我们可以看到,每个计数循环都运行完毕后,下一个循环才开始。
异步代码:
import asyncio
import time
async def count():
await asyncio.sleep(1)
print('1')
await asyncio.sleep(1)
print('2')
await asyncio.sleep(1)
print('3')
async def main():
await asyncio.gather(count(), count(), count())
if __name__ == "__main__":
t = time.perf_counter()
asyncio.run(main())
t2 = time.perf_counter()
print(f'Total time elapsed: {t2:0.2f} seconds')
输出:
1
1
1
2
2
2
3
3
3
Total time elapsed: 3.00 seconds
另一方面,异步等效看起来是这样的,运行花了三秒钟,而不是九秒钟。第一个计数周期开始了,一旦它到达await
睡眠状态,Python 就可以自由地做其他工作,例如启动第二个计数周期,然后是第三个计数周期。这就是为什么我们先将所有管数设为 1,然后再将管数设为 3。在输出中,并发编程可能是一种非常有价值的工具。多处理让操作系统完成所有的多任务工作,在 Python 中,它是多核并发的唯一选择,即让你的程序在多个 CPU 核心上执行。如果使用线程,那么操作系统仍然在做所有的多任务工作,在 cpython 中,全局重复锁会阻止异步编程中的多核并发。没有操作系统干预,只有一个进程,有一个线程,所以任务可以在等待期间释放 CPU,以便其他任务可以使用它。
import asyncio
loop = asyncio.get_event_loop()
async def greeter(name):
print(f"Hi, {name} you're in a coroutine.")
try:
print('starting coroutine')
coro = greeter('LP')
print('entering event loop')
loop.run_until_complete(coro)
finally:
print('closing event loop')
loop.close()
输出:
starting coroutine
entering event loop
Hi, LP you're in a coroutine.
closing event loop
异步框架需要一个调度程序,通常称为事件循环。此事件循环跟踪所有正在运行的任务,当某个函数挂起时,它会将控制权返回给事件循环,然后事件循环会寻找另一个函数来启动或恢复,这称为协作式多任务处理。异步 IO 提供了一个以此事件循环为中心的异步框架,它可以高效地处理输入/输出事件。应用程序与事件循环交互时,它会显式地注册要运行的代码,然后让事件循环调度程序在资源可用时对应用程序代码进行必要的调用。因此,如果网络服务器打开套接字,然后注册它们以便在它们发生输入事件时收到通知,则当有新的传入连接或有数据需要读取时,事件循环将提醒服务器代码。如果没有更多数据需要从套接字读取,则服务器将控制权交还给事件循环。
将控制权交还给事件循环的机制依赖于协同程序。协同程序是一种专为并发操作而设计的语言结构。协同程序可以使用 awake 关键字暂停与另一个协同程序的执行。在暂停期间,协同程序的状态保持不变,允许它从中断处恢复执行。一个协同程序可以启动另一个协同程序,然后等待结果。这使得将任务分解为可重用部分变得更加容易。
import asyncio
loop = asyncio.get_event_loop()
async def outer():
print('in outer')
print('waiting for result 1')
result1 = await phase1()
print('waiting for result 2')
result2 = await phase2(result1)
return result1, result2
async def phase1():
print('in phase1')
return 'phase1 result'
async def phase2(arg):
print('in phase2')
return 'result2 derived from {}'.format(arg)
asyncio.run(outer())
输出:
in outer
waiting for result 1
in phase1
waiting for result 2
in phase2
此示例要求两个阶段必须按顺序执行,但可以与其他操作并发运行。awake
使用关键字 而不是将新的协同程序添加到循环中,因为控制流已经位于循环管理的协同程序内部。无需指示循环管理新的协同程序。
解决方案 5:
我不知道为什么,但是关于这个主题的所有解释都太复杂了,或者他们使用了无用的 asyncio.sleep() 的示例......到目前为止,我发现的最好的代码示例是:https: //codeflex.co/python3-async-await-example/
解决方案 6:
似乎每个人都专注于切换time.sleep
到asyncio.sleep
,但在现实世界中,这始终是不可能的。有时你需要进行库调用,而这可能需要 API 调用(例如:向 Google 请求签名 URL)。
以下是仍然可以使用的方法time.sleep
,但是以异步方式:
import asyncio
import time
from concurrent.futures.thread import ThreadPoolExecutor
def sleep():
print(f'Time: {time.time() - start:.2f}')
time.sleep(1)
async def sum(name, numbers):
_executor = ThreadPoolExecutor(2)
total = 0
for number in numbers:
print(f'Task {name}: Computing {total}+{number}')
await loop.run_in_executor(_executor, sleep)
total += number
print(f'Task {name}: Sum = {total}
')
start = time.time()
loop = asyncio.get_event_loop()
tasks = [
loop.create_task(sum("A", [1, 2])),
loop.create_task(sum("B", [1, 2, 3])),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print(f'Time: {end-start:.2f} sec')
输出:
Task A: Computing 0+1
Time: 0.00
Task B: Computing 0+1
Time: 0.00
Task A: Computing 1+2
Time: 1.00
Task B: Computing 1+2
Time: 1.00
Task A: Sum = 3
Task B: Computing 3+3
Time: 2.01
Task B: Sum = 6
Time: 3.01 sec
解决方案 7:
简单..甜蜜..棒极了..✅
import asyncio
import time
import random
async def eat():
wait = random.randint(0,3)
await asyncio.sleep(wait)
print("Done With Eating")
async def sleep():
wait = random.randint(0,3)
await asyncio.sleep(wait)
print("Done With Sleeping")
async def repeat():
wait = random.randint(0,3)
await asyncio.sleep(wait)
print("Done With Repeating")
async def main():
for x in range(5):
await asyncio.gather(eat(),sleep(),repeat())
time.sleep(2)
print("+","-"*20)
if __name__ == "__main__":
t = time.perf_counter()
asyncio.run(main())
t2 = time.perf_counter()
print(f'Total time elapsed: {t2:0.2f} seconds')
解决方案 8:
import asyncio
import requests
async def fetch_users():
response = requests.get('https://www.testjsonapi.com/users/')
users = response.json()
return users
async def print_users():
# create an asynchronous task to run concurrently
# which wont block executing print statement before it finishes
response = asyncio.create_task(fetch_users())
print("Fetching users ")
# wait to get users data from response before printing users
users = await response
for user in users:
print(f"name : {user['name']} email : {user['email']}")
asyncio.run(print_users())
print("All users printed in console")
输出将如下所示
Fetching users
name : Harjas Malhotra email : harjas@gmail.com
name : Alisha Paul email : alisha@gmail.com
name : Mart Right email : marrk9658@yahoo.com
name : Brad Pitter email : brad@gmail.com
name : Ervin Dugg email : Ervin69@gmail.com
name : Graham Bell email : Graham@bell.biz
name : James Rush email : james369@hotmail.com
name : Deepak Dev email : deepak@gmail.com
name : Ajay Rich email : therichposts@gmail.com
All users printed in console
让我们看看代码是如何工作的。首先,当python调用print_users()
它时,它不会让下面的打印语句执行,直到它完成为止。因此,进入内部后将print_users()
创建一个并发任务,以便它下面的语句可以与此处的任务同时运行fetch_users()
。此任务的运行时间Fetching users
将打印在控制台中。之后,python将等待响应,fetch_users()
因为用户在接收之前不应打印。完成后,fetch_users()
所有用户的姓名和电子邮件都将打印在控制台中。因此,下面的打印语句完成后,print_users()
它将被执行。
解决方案 9:
尽管上面的一些答案有点抽象
from datetime import datetime
import asyncio
async def time_taking(max_val,task_no):
print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))
await asyncio.sleep(2)
value_list = []
for i in range(0,max_val):
value_list.append(i)
print("****FINSIHING UP TASk NO {} **".format(task_no))
return value_list
async def test2(task_no):
await asyncio.sleep(5)
print("**TASK STARTING TO EXECUTE CONCURRENT TASk NO {} ***".format(task_no))
await asyncio.sleep(5)
print("****FINSIHING UP TASk NO {} **".format(task_no))
async def function(value = None):
tasks = []
start_time = datetime.now()
# CONCURRENT TASKS
tasks.append(asyncio.create_task(time_taking(20,1)))
tasks.append(asyncio.create_task(time_taking(43,2)))
tasks.append(asyncio.create_task(test2(3)))
# concurrent execution
lists = await asyncio.gather(*tasks)
end_time = datetime.now()
time_taken = end_time - start_time
return lists,time_taken
# run inside event loop
res,time_taken = asyncio.run(function())
print(res,time_taken)
解决方案 10:
这里有一个非常简单和流畅的例子:
import asyncio
async def my_task1():
print("Task 1 started")
await asyncio.sleep(1) # some light io task
print("Task 1 completed")
return "Done1"
async def my_task2():
print("Task 2 started")
await asyncio.sleep(2) # some heavy io task
print("Task 2 completed")
return "Done2"
async def main():
# both the functions are independent of each other,
# as tasks gets completes, `.gather` keeps on storing the results
results = await asyncio.gather(my_task1(), my_task2())
print(f"The results are {results}")
# if task1 is dependent on completion of task2, then use this
ret1 = await my_task2()
ret2 = await my_task1()
print(f"The ret1: {ret1} ret2 {ret2}")
asyncio.run(main())
扫码咨询,免费领取项目管理大礼包!