跳转到正文
莫尔索随笔
返回

Python 协程深度解析:从 yield 到 async/await 的异步编程实践

预计 5 分钟

第一时间捕获有价值的信号

深入理解 Python 协程机制,从 yield、yield from 到 async/await 语法,掌握同步/异步、阻塞/非阻塞概念,提升 Python 异步编程效率。

核心内容

同步比较直观,编码和维护都比较容易,但是效率低;异步效率高,对于 callback 这种形式逻辑容易被代码割裂,代码可读性差,而异步协程的方式既看起来直观,又在效率上有保证。所以想在此谈谈网络请求中的异步表现形式——协程,并通过 python 代码展示各自的优缺点。

基础概念

同步、异步与阻塞、非阻塞

当提到同步与异步,肯定会想到阻塞与非阻塞,网络 I/O 里所讨论的同步与异步,是指对于请求的发起者,是否需要等到请求的结果(同步),还是说请求完毕的时候以某种方式通知请求发起者(异步)。在这个语义环境下,阻塞与非阻塞,是指请求的受理者在处理某个请求的状态,如果在处理这个请求的时候不能做其它事情(请求处理时间不确定),那么称之为阻塞,否则为非阻塞。

简单理解

举个例子,同步的操作如下:浏览器首先发送第一个请求,等待服务器回复后,再发送第二个请求,依次类推,直到所有请求完成。异步的操作如下:浏览器发送第一个请求,可以不用等待服务器返回,可以继续发送第二个请求。

阻塞与非阻塞属于进程的 API 执行动作的方式,例如进行需要 read 数据,阻塞方式操作流程是:如果没有数据,则read会一直等着数据到来,才能进行后续的动作;而非阻塞则是read没有到数据后,则可以进行后续的动作,当有数据的时候再回来读取。通常 Linux 网络API默认都是阻塞的,例如connect、send、recv等。

协程演化

实现了两个同步IO任务taskIO_1()和taskIO_2(),则最后总耗时就是5秒。计算机中CPU的运算速率要远远大于IO速率,如果再要闲置很长时间去等待IO任务完成才能进行下一个任务的CPU计算,这样的任务执行效率很低。

import time
def taskIO_1(t):
    print(f'开始运行{taskIO_1.__name__}')
    time.sleep(t)
    print(f'{taskIO_1.__name__}耗时{t}s')
def taskIO_2(t):
    print(f'开始运行{taskIO_2.__name__}')
    time.sleep(t)
    print(f'{taskIO_2.__name__}耗时{t}s')
if __name__ == "__main__":
    start=time.time()
    taskIO_1(2)
    taskIO_2(3)
    print(f'总耗时{time.time()-start}s')

改进策略很容易想到的:能否在上述IO任务执行前暂且中断当前IO任务,进行另一个任务,当该IO任务完成后再唤醒该任务。在Python中生成器中的关键字yield可以实现中断功能,所以刚开始协程是基于生成器的变形进行实现的。

yield from 和 yield

yield在生成器中有中断的功能,可以传出值,也可以从函数外部接收值,而yield from的实现就是简化了yield操作。

def generator_1(toc: true
titles):
    yield toc: true
titles
def generator_2(toc: true
titles):
    yield from toc: true
titles

toc: true
titles = ['Python','Java','C++']
for toc: true
title in generator_1(toc: true
titles):
    print('生成器1:',toc: true
title)
for toc: true
title in generator_2(toc: true
titles):
    print('生成器2:',toc: true
title)

# 等价于yield from toc: true
titles
for toc: true
title in toc: true
titles:
    yield toc: true
title

yield from省去了很多异常的处理,不再需要我们手动编写,其内部已经实现大部分异常处理。详见PEP 380

  • 子生成器:yield from后的generator_1()生成器函数是子生成器
  • 委托生成器:generator_2()是程序中的委托生成器,它负责委托子生成器完成具体任务。
  • 调用方:main()是程序中的调用方,负责调用委托生成器。
def generator_1(): # 子生成器
    total = 0
    while True:
        x = yield
        print('',x)
        if not x:
            break
        total += x
    return total
def generator_2(): # 委托生成器
    while True:
        total = yield from generator_1()
def main(): # 调用方
    g1 = generator_1()
    g1.send(None)
    g1.send(2)
   与93行的代码重复,需要删除。93行代码为```python
def main(): # 调用方
    g1 = generator_1()
    g1.send(None)
    g1.send(2)
    g1.send(3)
    try:
        g1.send(None)
    except StopIteration as e:
        print(e.value)
if __name__ == "__main__":
    main()

yield from 建立调用方和子生成器的通道,在上述代码中main()每一次在调用send(value)时,value不是传递给了委托生成器generator_2(),而是借助yield from传递给了子生成器generator_1()中的yield,同理子生成器中的数据也是通过yield直接发送到调用方main()中。

结合@asyncio.coroutine实现协程

将同步IO任务的代码中修改成协程的用法,在同步IO任务的代码中使用的time.sleep(2)来假设任务执行了2秒,但在协程中yield from后面必须是子生成器函数,而time.sleep()并不是生成器,所以这里需要使用内置模块提供的生成器函数asyncio.sleep()。

import time
import asyncio
@asyncio.coroutine
def taskIO_1(t):
    print(f'开始运行{taskIO_1.__name__}')
    yield from asyncio.sleep(t)
    print(f'{taskIO_1.__name__}耗时{t}s')
    return taskIO_1.__name__

@asyncio.coroutine
def taskIO_2(t):
    print(f'开始运行{taskIO_2.__name__}')
    yield from asyncio.sleep(t)
    print(f'{taskIO_2.__name__}耗时{t}s')
    return taskIO_2.__name__

@asyncio.coroutine
def main():
    # 打包任务
    tasks=[taskIO_1(2),taskIO_2(3)]
    # done表示已完成的任务列表,pending表示未完成的任务列表
    done,pending=yield from asyncio.wait(tasks)
    # 遍历已完成任务并调用result()取出结果
    for work in done:
        print(f'协程无序返回值:{work.result()}')

if __name__ == "__main__":
    start=time.time()
    # get_event_loop()获取了一个标准事件循环loop
    loop=asyncio.get_event_loop()
    try:
        # 运行协程,直到循环事件的所有事件都处理完才能完整结束
        loop.run_until_complete(main())
    finally:
        loop.close()
    print(f'总耗时{time.time()-start}s')

当轮询到某个事件时(如taskIO_1()),直到遇到该任务中的yield from中断,开始处理下一个事件(如taskIO_2())),当yield from后面的子生成器完成任务时,该事件才再次被唤醒

使用async和await实现协程

Python 3.5开始引入了新的语法async和await,并更好地标识异步IO编码

import time
import asyncio

async def taskIO_1(t):
    print(f'开始运行{taskIO_1.__name__}')
    await asyncio.sleep(t)
    print(f'{taskIO_1.__name__}耗时{t}s')
    return taskIO_1.__name__

async def taskIO_2(t):
    print(f'开始运行{taskIO_2.__name__}')
    await asyncio.sleep(t)
    print(f'{taskIO_2.__name__}耗时{t}s')
    return taskIO_2.__name__

async def main():
    # 打包任务
    tasks=[taskIO_1(2),taskIO_2(3)]
    # done表示已完成的任务列表,pending表示未完成的任务列表
    done,pending=await asyncio.wait(tasks)
    # 遍历已完成任务并调用result()取出结果
    for work in done:
        print(f'协程无序返回值:{work.result()}')

if __name__ == "__main__":
    start=time.time()
    # get_event_loop()获取了一个标准事件循环loop
    loop=asyncio.get_event_loop()
    try:
        # 运行协程,直到循环事件的所有事件都处理完才能完整完整结束
        loop.run_until_complete(main())
    finally:
        loop.close()
    print(f'总耗时{time.time()-start}s')

显然,协程让我们可以用同步的方式写异步的代码,代码可读性提高,减少采用线程方式之间的切换消耗,提高了并发效率,在 IO 密集型任务中很有用。

参考链接