Coroutine, Event Loop
Python 的 async/await
关键词的本质是协程(Coroutine)。而 Python Coroutine 的实现采用的是事件循环机制(Event Loop)。
Python Coroutine 主要优化的 IO 密集的问题,包括:
- 文件处理
- 网络请求
- 数据库操作
如下所示:
import time
def io_operation():
print("Some time-consuming IO operations...")
time.sleep(2) # 模拟耗时的 IO
def main():
print("Begin operation")
io_operation()
print("End operation")
main()
当进入 io_operation
时会导致大量的时间开销,这个时候其实是 IO 设备在工作,而 CPU 只是在忙等而已(其实也不是,CPython 也可以调度不同线程),所以我们最好让 CPU 去干其他的计算任务(就是最为基础的异步思想)。
事件循环机制就是这样,当我们执行到 io_operation
时,就自动 yield,然后从 event loop 中选择一个其他 event 进行执行,当 io_operation
结束的时候,它就重新把 main
加入到 event loop 中,这样我们就实现了一个协程机制。
async/await
根据上面的分析,我们可以将上面的版本改成异步版本:
import asyncio
# 异步函数需要使用 async 声明
async def io_operation():
print("Some time-consuming IO operations...")
await asyncio.sleep(2) # 模拟耗时的 IO
# 只要使用了 await 就需要声明
async def main():
print("Begin operation")
# yield main, 将 CPU 让给 io_operation
# main 会 wait 到 io_operation 执行完成
await io_operation()
print("End operation")
# async 不能直接调用,而是要用 run 方法
asyncio.run(main())
在上面这段代码中, await
关键字很容易给人两个印象:
yield
并启动一个async
函数- wait 直到
async
函数执行完
这两个印象在这个情景下虽然都对,但是 await
在其他场景中,其实更侧重第二点。启动 async
函数执行不一定需要 await
。
Gather, Tasks
为什么启动 async
函数不一定需要 await
呢,是因为这样就 event loop 中很难超过一个协程,最后的执行过程还是串行的,相当于只是 yield 了。为了解决这个问题,我们需要同时启动多个 async
函数。
gather
函数可以有这种效果:
import asyncio
async def io_operation(id, delay):
await asyncio.sleep(delay)
print(f"IO {id} delayed {delay}s")
async def main():
# gather 接受任意个参数
await asyncio.gather(io_operation(1, 1), io_operation(2, 2))
asyncio.run(main())
这个程序执行了 2 s 左右,也就是说,2 个 task 并发执行。
还有一种办法,就是显式创建 task ,如下所示:
import asyncio
async def io_operation(id, delay):
await asyncio.sleep(delay)
print(f"IO {id} delayed {delay}s")
async def main():
task1 = asyncio.create_task(io_operation(1, 1)) # task1 开始运行
task2 = asyncio.create_task(io_operation(2, 2)) # task2 开始运行
# 此处并没有启动 task1, task2 ,只是等待 task 结束
await task1
await task2
# async 不能直接调用,而是要用 run 方法
asyncio.run(main())
async for/ async with
async for
这个关键字听上去,似乎和 gather
功能是类似的,而实际上依然是顺序遍历。如下所示:
import asyncio
class AsyncIterable:
def __init__(self, max_count):
self.max_count = max_count
async def __aiter__(self):
for i in range(self.max_count):
await asyncio.sleep(1) # 模拟异步操作
yield i
async def main():
async for item in AsyncIterable(5):
print(f"Received: {item}")
asyncio.run(main())
最终 item
会按序输出。那么异步的地方其实是在迭代中。换句话说, async for
允许我们在迭代的时候去干点别的事情,等迭代好了再回来,但是依然是按顺序的。
async with
也是相同的逻辑,如下所示:
import asyncio
class AsyncDBConnection:
async def __aenter__(self):
print("Connecting to database...")
await asyncio.sleep(1) # 模拟异步连接数据库
return self # 返回数据库连接对象
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Disconnecting from database...")
await asyncio.sleep(1) # 模拟异步断开数据库连接
async def query(self, sql):
await asyncio.sleep(1) # 模拟异步数据库查询
return f"Results of query: {sql}"
async def main():
async with AsyncDBConnection() as db:
result = await db.query("SELECT * FROM users")
print(result)
asyncio.run(main())
它的异步过程发生在创建和退出 context manager 的过程中。
GIL
常常有一种说法是,python 是一种单线程语言。但是这句话并不是那么显然的,因为 python 是有 threading
模块的。
应该这么说,“python 是一种单线程语言”的来源是 Global Interpreter Lock(GIL) ,它是 CPython 解释器的全局锁,确保同一时刻仅有一个线程执行 Python 字节码。其设计初衷是保护内存管理(如引用计数)的线程安全。
所以更正确的版本是“ Python 是无法并行执行的 ”。如果希望并行执行,那么用 C 扩展处理关键代码段,要么换用 Jython 或 IronPython(无 GIL)。
所以 python 的多线程只对于 IO 密集型的 workload 具有优化作用(这点和协程类似,一个是 OS 控制,一个是 User 控制的区别),而对于 CPU 密集型的 workload 没有作用了,甚至还有副作用(切换线程也有开销)。
那么在有 GIL 的情况下,是不是就不需要 lock 了呢?显然不是的(这个问题我居然还思考了半天),这是因为 lock 还可以用于构建原子事务,将多个操作打包成一个原子事务,显然是需要 lock 的。