Async IO 是一种并发编程模型。本文我们主要学习 Python 中 Async IO 的使用。类比 Golang 中的协程,Python 中协程的调度不再依赖操作系统而是由程序自身完成。Async IO 适用于 IO 密集型的场景,它通过 event loop 在 IO 等待的同时执行其它协程,以充分利用 CPU。CPython 已经内置了 asyncio 包和 async、await 关键字来支持 Async IO。
"event loop" 是协程的调度器,我们可以把它假想为一个无限循环的程序,一直在监听协程,找到空闲的时机,触发协程运行。
多进程是真正意义上的同时执行。它依赖多核 CPU ,比较适合 CPU 密集型的场景,比如执行各种数学运算。
多进程可以和 Async IO 同时使用。更多介绍:https://www.youtube.com/watch?v=0kXaLh8Fz3k&t=630s。
多线程是一种并发执行的模型,一个进程可以执行多个线程。在 Python 中由于 GIL 的存在,多线程比较复杂。线程的切换需要依赖操作系统来完成。多线程和 Async IO 同样适用于 IO 密集型的场景,但是线程的切换、创建、销毁的成本要比协程高。多线程的程序还要处理线程安全问题。一台机器上通常创建的线程数最多是几万个。
对于多线程和 Async IO ,在下面的场景下更适合 Async IO :
CPU 密集型场景的特点是计算机的内核不断努力工作,而 IO 密集型场景的特点是大量等待输入/输出的时间。
asyncio
包 和 async
/await
我们先使用同步的方法写一个示例代码:
#!/usr/bin/env python3
import time
def count():
print("One")
time.sleep(1)
print("Two")
def main():
# 循环三次
for _ in range(3):
count()
if __name__ == "__main__":
s = time.perf_counter()
main()
elapsed = time.perf_counter() - s
# 输出总的执行时间
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
输出结果如下:
/demo2.py
One
Two
One
Two
One
Two
/demo2.py executed in 3.02 seconds.
上面的代码逻辑很简单,循环调用 count() 方法三次,count() 方法每次睡眠一秒钟,三次调用完成,最后计算总的执行时间。一共耗时3.02秒。
如果我们使用 asyncio
和async
/await
来实现,代码如下:
#!/usr/bin/env python3
import asyncio
async def count():
print("One")
# 使用 await 等待1秒执行完,使用 asyncio 的 sleep() 方法,而不是 time 包的 sleep().
await asyncio.sleep(1)
print("Two")
async def main():
# 异步执行 count() 任务。
await asyncio.gather(count(), count(), count())
if __name__ == "__main__":
import time
s = time.perf_counter()
# 启动异步执行
asyncio.run(main())
elapsed = time.perf_counter() - s
print(f"{__file__} executed in {elapsed:0.2f} seconds.")
输入结果如下:
/demo1.py
One
One
One
Two
Two
Two
/demo1.py executed in 1.00 seconds.
和上一个例子不同的地方我们都在代码中添加了注释来说明。通过和上一个例子输出结果比较我们可以看到两点不同:
输出顺序不同体现了 Async IO 在执行过程中的异步特性,不会一直等待返回结果,而是在某个任务等待时调度其他的任务去执行。因为任务之间没有顺序等待,所以总的执行时间也就约等于耗时最久任务的执行时间。
不得不说,这个 async await 的语法和 JavaScript 中的异步语法一摸一样。
async def
引入了原生协程和异步生成器。当然也可以使用async with
and async for.
关键字 await
将函数控制权传递回 event loop。
下面是几个语法正确和错误的例子:
async def f(x):
y = await z(x) # OK - `await` and `return` allowed in coroutines
return y
async def g(x):
yield x # OK - this is an async generator
async def m(x):
yield from gen(x) # No - SyntaxError
# 不使用 async 直接使用 await 会报错
def m(x):
y = await z(x) # Still no - SyntaxError (no `async def` here)
return y
其次,在使用 await f() 时,需要确保 f() 是“可等待的“。可等待对象通常需要实现 __await__() 方法。async f() 就是可等待的。
在 async 方法中使用 yield,会生成一个异步的生成器。可以使用async for
迭代返回值。比如:
async def mygen(u: int = 10):
"""Yield powers of 2."""
i = 0
while i < u:
yield 2 ** i
i += 1
await asyncio.sleep(0.1)
async def main():
# This does *not* introduce concurrent execution
# It is meant to show syntax only
g = [i async for i in mygen()]
f = [j async for j in mygen() if not (j // 3 % 5)]
return g, f
g, f = asyncio.run(main())
# g = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
# f = [1, 2, 16, 32, 256, 512]
asyncio
的一些其它APIasyncio.Queue(maxsize=0) 实现了一个先进先出的队列(FIFO)。如果 maxsize 小于或等于零,则队列大小是无限的。如果它是一个大于 0 的整数,那么当队列达到 maxsize 时 await put() 会阻塞,直到一个任务被 get() 移除。
使用 asyncio.Queue() 实现发布订阅模式:
#!/usr/bin/env python3
# asyncq.py
import asyncio
import itertools as it
import os
import random
import time
async def makeitem(size: int = 5) -> str:
return os.urandom(size).hex()
async def randsleep(caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(f"{caller} sleeping for {i} seconds.")
await asyncio.sleep(i)
async def produce(name: int, q: asyncio.Queue) -> None:
n = random.randint(0, 10)
for _ in it.repeat(None, n): # Synchronous loop for each single producer
await randsleep(caller=f"Producer {name}")
i = await makeitem()
t = time.perf_counter()
await q.put((i, t))
print(f"Producer {name} added <{i}> to queue.")
async def consume(name: int, q: asyncio.Queue) -> None:
while True:
await randsleep(caller=f"Consumer {name}")
i, t = await q.get()
now = time.perf_counter()
print(f"Consumer {name} got element <{i}>"
f" in {now - t:0.5f} seconds.")
q.task_done()
async def main(nprod: int, ncon: int):
q = asyncio.Queue()
producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
await asyncio.gather(*producers)
await q.join() # Implicitly awaits consumers, too
for c in consumers:
c.cancel()
if __name__ == "__main__":
import argparse
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--nprod", type=int, default=5)
parser.add_argument("-c", "--ncon", type=int, default=10)
ns = parser.parse_args()
start = time.perf_counter()
asyncio.run(main(**ns.__dict__))
elapsed = time.perf_counter() - start
print(f"Program completed in {elapsed:0.5f} seconds.")
通过异步的队列,解耦生产者和消费者,加快程序的执行。
使用 create_task() 创建多个
import asyncio
async def coro(seq) -> list:
"""'IO' wait time is proportional to the max element."""
await asyncio.sleep(max(seq))
return list(reversed(seq))
async def main():
# This is a bit redundant in the case of one task
# We could use `await coro([3, 2, 1])` on its own
t = asyncio.create_task(coro([3, 2, 1])) # Python 3.7+
await t
print(f't: type {type(t)}')
print(f't done: {t.done()}')
t = asyncio.run(main())
# asyncio.run(main()) 等价于下面的写法
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
4 . Python 线程在绑定给 event loop 之前,自身不会执行。
5 . Event loop 是插拔式的。可以使用任何 event loop 的实现,与协程本身的结构没有关系。也可以参照 uvloop(https://github.com/MagicStack/uvloop)例子,实现自己的event loop。asyncio 包自身就有两种协程实现可以选择。
https://realpython.com/async-io-python/(例子都摘自此文)
https://zhuanlan.zhihu.com/p/64991670
https://docs.python.org/3/library/asyncio-queue.html
Copyright© 2013-2020
All Rights Reserved 京ICP备2023019179号-8