Async IO in Python

707次阅读  |  发布于1年以前

Async IO 是一种并发编程模型。本文我们主要学习 Python 中 Async IO 的使用。类比 Golang 中的协程,Python 中协程的调度不再依赖操作系统而是由程序自身完成。Async IO 适用于 IO 密集型的场景,它通过 event loop 在 IO 等待的同时执行其它协程,以充分利用 CPU。CPython 已经内置了 asyncio 包和 async、await 关键字来支持 Async IO。

"event loop" 是协程的调度器,我们可以把它假想为一个无限循环的程序,一直在监听协程,找到空闲的时机,触发协程运行。

Async IO 不同于多进程和多线程

多进程是真正意义上的同时执行。它依赖多核 CPU ,比较适合 CPU 密集型的场景,比如执行各种数学运算。

多进程可以和 Async IO 同时使用。更多介绍:https://www.youtube.com/watch?v=0kXaLh8Fz3k&t=630s。

多线程是一种并发执行的模型,一个进程可以执行多个线程。在 Python 中由于 GIL 的存在,多线程比较复杂。线程的切换需要依赖操作系统来完成。多线程和 Async IO 同样适用于 IO 密集型的场景,但是线程的切换、创建、销毁的成本要比协程高。多线程的程序还要处理线程安全问题。一台机器上通常创建的线程数最多是几万个。

对于多线程和 Async IO ,在下面的场景下更适合 Async IO :

  1. 大量的任务。
  2. 耗时较长的 IO 操作。

CPU 密集型场景的特点是计算机的内核不断努力工作,而 IO 密集型场景的特点是大量等待输入/输出的时间。

使用 asyncio 包 和 async/await

我们先使用同步的方法写一个示例代码:

#!/usr/bin/env python3
import time


def count():
    print("One")
    time.sleep(1)
    print("Two")


def main():
    # 循环三次
    for _ in range(3):
        count()


if __name__ == "__main__":
    s = time.perf_counter()
    main()
    elapsed = time.perf_counter() - s
    # 输出总的执行时间
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

输出结果如下:

/demo2.py 
One
Two
One
Two
One
Two
/demo2.py executed in 3.02 seconds.

上面的代码逻辑很简单,循环调用 count() 方法三次,count() 方法每次睡眠一秒钟,三次调用完成,最后计算总的执行时间。一共耗时3.02秒。

如果我们使用 asyncioasync/await来实现,代码如下:

#!/usr/bin/env python3

import asyncio


async def count():
    print("One")
    # 使用 await 等待1秒执行完,使用 asyncio 的 sleep() 方法,而不是 time 包的 sleep().
    await asyncio.sleep(1)
    print("Two")


async def main():
    # 异步执行 count() 任务。
    await asyncio.gather(count(), count(), count())


if __name__ == "__main__":
    import time

    s = time.perf_counter()
    # 启动异步执行
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

输入结果如下:

/demo1.py 
One
One
One
Two
Two
Two
/demo1.py executed in 1.00 seconds.

和上一个例子不同的地方我们都在代码中添加了注释来说明。通过和上一个例子输出结果比较我们可以看到两点不同:

  1. One 和 Two 的输出顺序不同。
  2. 执行时间只有1.00秒。

输出顺序不同体现了 Async IO 在执行过程中的异步特性,不会一直等待返回结果,而是在某个任务等待时调度其他的任务去执行。因为任务之间没有顺序等待,所以总的执行时间也就约等于耗时最久任务的执行时间。

不得不说,这个 async await 的语法和 JavaScript 中的异步语法一摸一样。

async await 的一些规则说明

async def 引入了原生协程和异步生成器。当然也可以使用async with and async for.

关键字 await 将函数控制权传递回 event loop。

下面是几个语法正确和错误的例子:

async def f(x):
    y = await z(x)  # OK - `await` and `return` allowed in coroutines
    return y

async def g(x):
    yield x  # OK - this is an async generator

async def m(x):
    yield from gen(x)  # No - SyntaxError

# 不使用 async 直接使用 await 会报错
def m(x):
    y = await z(x)  # Still no - SyntaxError (no `async def` here)
    return y

其次,在使用 await f() 时,需要确保 f() 是“可等待的“。可等待对象通常需要实现 __await__() 方法。async f() 就是可等待的。

async 中使用 yield

在 async 方法中使用 yield,会生成一个异步的生成器。可以使用async for迭代返回值。比如:

async def mygen(u: int = 10):
    """Yield powers of 2."""
    i = 0
    while i < u:
        yield 2 ** i
        i += 1
        await asyncio.sleep(0.1)

async def main():
    # This does *not* introduce concurrent execution
    # It is meant to show syntax only
    g = [i async for i in mygen()]
    f = [j async for j in mygen() if not (j // 3 % 5)]
    return g, f

g, f = asyncio.run(main())

# g = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
# f = [1, 2, 16, 32, 256, 512]

asyncio的一些其它API

Queue

asyncio.Queue(maxsize=0) 实现了一个先进先出的队列(FIFO)。如果 maxsize 小于或等于零,则队列大小是无限的。如果它是一个大于 0 的整数,那么当队列达到 maxsize 时 await put() 会阻塞,直到一个任务被 get() 移除。

使用 asyncio.Queue() 实现发布订阅模式:

#!/usr/bin/env python3
# asyncq.py

import asyncio
import itertools as it
import os
import random
import time


async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()


async def randsleep(caller=None) -> None:
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)


async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(0, 10)
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")


async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now - t:0.5f} seconds.")
        q.task_done()


async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()


if __name__ == "__main__":
    import argparse

    random.seed(444)
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--nprod", type=int, default=5)
    parser.add_argument("-c", "--ncon", type=int, default=10)
    ns = parser.parse_args()
    start = time.perf_counter()
    asyncio.run(main(**ns.__dict__))
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")

通过异步的队列,解耦生产者和消费者,加快程序的执行。

create_task

使用 create_task() 创建多个

import asyncio

async def coro(seq) -> list:
    """'IO' wait time is proportional to the max element."""
    await asyncio.sleep(max(seq))
    return list(reversed(seq))

async def main():
    # This is a bit redundant in the case of one task
    # We could use `await coro([3, 2, 1])` on its own
    t = asyncio.create_task(coro([3, 2, 1]))  # Python 3.7+
    await t
    print(f't: type {type(t)}')
    print(f't done: {t.done()}')

t = asyncio.run(main())

补充

  1. asyncio 包并不是 Async IO 的唯一的实现,还有一些其它比较知名的包,比如:curio、trio。不过使用 asyncio 仍然是不错的选择。
  2. 在 Python 中 ,Async IO 默认通过单个 CPU 的单个进程完成,也可以主动使用多进程。
  3. asyncio.run() 的另一种写法:
# asyncio.run(main()) 等价于下面的写法
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
finally:
    loop.close()

4 . Python 线程在绑定给 event loop 之前,自身不会执行。

5 . Event loop 是插拔式的。可以使用任何 event loop 的实现,与协程本身的结构没有关系。也可以参照 uvloop(https://github.com/MagicStack/uvloop)例子,实现自己的event loop。asyncio 包自身就有两种协程实现可以选择。

参考

https://realpython.com/async-io-python/(例子都摘自此文)

https://zhuanlan.zhihu.com/p/64991670

https://docs.python.org/3/library/asyncio-queue.html

Copyright© 2013-2020

All Rights Reserved 京ICP备2023019179号-8