什么是Python协程?

协程(Coroutine)是Python中实现并发编程的一种方式,它允许你在单个线程内执行多个任务,通过任务切换而非线程切换来实现并发。Python通过asyncio库和async/await语法提供了对协程的原生支持。

注意: 协程与线程不同,它们由事件循环管理,在单个线程中运行,通过任务切换而非线程切换来实现并发。

为什么需要增加任务?

在异步编程中,我们经常需要同时处理多个I/O密集型操作(如网络请求、文件读写等)。通过增加任务,我们可以:

  • 同时发起多个操作而不互相阻塞
  • 更高效地利用系统资源
  • 提高程序的响应速度和吞吐量
  • 简化复杂异步逻辑的管理
任务1
任务2
任务3
任务4

在协程中增加任务的三种方法

1. 使用asyncio.create_task()

这是Python 3.7+推荐的方式,用于将协程包装成任务并加入事件循环。

import asyncio async def my_task(name, delay): print(f"任务 {name} 开始") await asyncio.sleep(delay) print(f"任务 {name} 完成") async def main(): # 创建并添加任务 task1 = asyncio.create_task(my_task("A", 2)) task2 = asyncio.create_task(my_task("B", 1)) # 等待两个任务完成 await task1 await task2 asyncio.run(main())

输出结果:
任务 A 开始
任务 B 开始
任务 B 完成 (1秒后)
任务 A 完成 (2秒后)

2. 使用asyncio.gather()

当需要同时运行多个协程并等待它们全部完成时,asyncio.gather()是更好的选择。

async def main(): # 使用gather同时运行多个任务 await asyncio.gather( my_task("C", 3), my_task("D", 1), my_task("E", 2) ) asyncio.run(main())

输出结果:
任务 C 开始
任务 D 开始
任务 E 开始
任务 D 完成 (1秒后)
任务 E 完成 (2秒后)
任务 C 完成 (3秒后)

3. 使用asyncio.ensure_future() (Python 3.7之前)

在Python 3.7之前,可以使用asyncio.ensure_future()来创建任务。

async def main(): # 兼容旧版本Python的创建任务方式 task1 = asyncio.ensure_future(my_task("F", 2)) task2 = asyncio.ensure_future(my_task("G", 1)) await task1 await task2 # 旧版本的事件循环启动方式 loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()

任务管理的最佳实践

高效任务管理的要点

  • 使用asyncio.create_task()创建任务并立即开始执行
  • 使用asyncio.gather()管理一组相关任务
  • 使用asyncio.wait()处理任务完成的不同状态
  • 为任务设置超时:await asyncio.wait_for(task, timeout=5.0)
  • 使用asyncio.shield()防止任务被取消
  • 合理控制并发任务数量,避免资源耗尽

任务取消与异常处理

正确处理任务取消和异常对于构建健壮的异步应用至关重要。

async def main(): task = asyncio.create_task(my_task("H", 5)) try: # 等待任务完成,但最多3秒 await asyncio.wait_for(task, timeout=3.0) except asyncio.TimeoutError: print("任务超时,正在取消...") task.cancel() try: await task # 等待任务处理取消 except asyncio.CancelledError: print("任务已成功取消")

高级应用:任务队列模式

当需要限制同时运行的协程数量时,可以使用任务队列模式。

async def worker(queue): while True: # 从队列获取任务 task_func = await queue.get() await task_func() # 执行任务 queue.task_done() # 标记任务完成 async def main(): # 创建最大容量为5的队列 queue = asyncio.Queue(maxsize=5) # 启动3个worker协程 workers = [ asyncio.create_task(worker(queue)) for _ in range(3) ] # 添加20个任务到队列 for i in range(20): await queue.put(lambda: my_task(f"Task-{i}", 1)) # 等待所有任务完成 await queue.join() # 取消worker任务 for w in workers: w.cancel() asyncio.run(main())