Python异步编程避坑指南:从‘协程未等待’警告到asyncio.gather的正确用法
Python异步编程避坑指南:从协程陷阱到高效并发实战
当你第一次在Python中看到async/await语法时,可能会觉得这不过是另一种写回调的方式。但真正开始使用后,各种奇怪的报错和不符合预期的行为会让你意识到:异步编程完全是另一个世界。本文不会重复基础教程,而是聚焦于那些让开发者掉头发的问题场景,通过真实代码示例带你避开常见陷阱。
1. 那些年我们忘记加的await
新手最容易犯的错误就是忘记在协程调用前加await。看看这个典型例子:
import asyncio async def fetch_data(): print("开始获取数据") await asyncio.sleep(1) return "数据结果" async def main(): result = fetch_data() # 这里忘记加await print(f"获取结果: {result}") asyncio.run(main())运行后会看到两个问题:
- 控制台输出
获取结果: <coroutine object fetch_data at 0x...> - 同时收到警告
RuntimeWarning: coroutine 'fetch_data' was never awaited
原理剖析:
async def定义的函数被调用时返回的是协程对象,而非直接执行- 只有通过
await或交给事件循环调度,协程才会真正执行 - 未等待的协程会在垃圾回收时触发警告,可能导致资源未正确释放
提示:现代IDE如PyCharm会对此类问题给出警告,建议开启所有代码检查选项
正确的写法应该是在所有协程调用前明确使用await:
async def proper_main(): result = await fetch_data() # 正确添加await print(f"获取结果: {result}")2. asyncio.run的隐藏限制
asyncio.run()是Python 3.7+推荐的入口点,但它有几个容易被忽略的限制:
- 不能嵌套调用:尝试在已有事件循环中再次调用会抛出
RuntimeError - 每次调用都会创建新事件循环:不适合需要复用循环的场景
- 会取消所有剩余任务:可能导致未完成的任务被意外终止
考虑这个需要多次运行异步代码的场景:
async def task_runner(task_name): print(f"开始任务 {task_name}") await asyncio.sleep(1) print(f"完成任务 {task_name}") def run_multiple_tasks(): # 错误示范:多次调用asyncio.run for i in range(3): asyncio.run(task_runner(f"任务-{i}"))解决方案有两种:
- 合并为单个入口点(推荐):
async def proper_runner(): await asyncio.gather( task_runner("任务-A"), task_runner("任务-B"), task_runner("任务-C") )- 手动管理事件循环(高级用法):
def manual_loop_management(): loop = asyncio.new_event_loop() try: for i in range(3): loop.run_until_complete(task_runner(f"任务-{i}")) finally: loop.close()3. 任务创建与调度的艺术
asyncio.create_task()是将协程转为可调度任务的常用方法,但何时创建、如何等待任务大有讲究。
3.1 过早创建任务的陷阱
看看这个看似合理的代码:
async def processor(item): await asyncio.sleep(0.5) return f"处理结果:{item}" async def premature_tasks(): tasks = [asyncio.create_task(processor(i)) for i in range(10)] # 过早创建 await asyncio.sleep(2) # 模拟其他操作 results = await asyncio.gather(*tasks) print(results)问题在于:
- 所有任务在创建后立即开始执行
- 如果后续操作耗时较长,可能造成资源浪费
- 无法根据中间结果决定是否继续执行某些任务
改进方案:按需创建任务
async def lazy_tasks(): items = range(10) # 先准备参数,不立即创建任务 process_coros = (processor(i) for i in items) # 需要时才批量创建 tasks = [asyncio.create_task(coro) for coro in process_coros] results = await asyncio.gather(*tasks) print(results)3.2 gather vs wait:选择正确的并发工具
| 特性 | asyncio.gather | asyncio.wait |
|---|---|---|
| 返回值顺序 | 保持输入顺序 | 按完成顺序 |
| 异常处理 | return_exceptions参数控制 | 需要手动处理 |
| 使用场景 | 需要有序结果时 | 需要完成状态检查时 |
| 超时处理 | 整体超时 | 可设置多种完成条件 |
典型gather用法:
async def reliable_gather(): tasks = [ fetch_user_data(), fetch_product_list(), get_inventory_status() ] try: user, products, inventory = await asyncio.gather(*tasks) except Exception as e: print(f"某个任务失败: {e}") raise带异常处理的wait示例:
async def flexible_wait(): pending = { asyncio.create_task(fetch_data("A")), asyncio.create_task(fetch_data("B")) } while pending: done, pending = await asyncio.wait( pending, timeout=1.5, return_when=asyncio.FIRST_EXCEPTION ) for task in done: if task.exception(): print(f"任务出错: {task.exception()}") else: print(f"得到结果: {task.result()}")4. 异常处理的深层逻辑
异步代码的异常处理比同步代码更复杂,因为错误可能发生在任何await点。
4.1 捕获特定协程的异常
async def may_fail(task_id): await asyncio.sleep(0.2) if task_id % 3 == 0: raise ValueError(f"故意失败的任务 {task_id}") return f"成功 {task_id}" async def handle_individual_errors(): tasks = [asyncio.create_task(may_fail(i)) for i in range(5)] results = [] for task in tasks: try: results.append(await task) except ValueError as e: print(f"捕获到错误: {e}") results.append(f"替代值 {task.get_name()}") print(results)4.2 gather的return_exceptions参数
这个布尔参数决定了异常是立即抛出还是作为结果返回:
async def gather_with_exceptions(): tasks = [may_fail(i) for i in range(5)] # 异常作为正常结果返回 results = await asyncio.gather(*tasks, return_exceptions=True) for i, res in enumerate(results): if isinstance(res, Exception): print(f"任务{i}失败: {res}") else: print(f"任务{i}成功: {res}")4.3 取消任务的正确姿势
取消正在运行的任务需要特别注意资源清理:
async def cancellable_work(): try: await asyncio.sleep(10) except asyncio.CancelledError: print("收到取消信号,执行清理...") await asyncio.sleep(0.5) # 模拟清理操作 raise # 必须重新抛出 async def proper_cancellation(): task = asyncio.create_task(cancellable_work()) await asyncio.sleep(0.1) task.cancel() try: await task except asyncio.CancelledError: print("任务已取消")5. 性能优化实战技巧
5.1 限制并发数量
使用信号量控制最大并发:
async def bounded_fetch(url, semaphore): async with semaphore: print(f"开始获取 {url}") await asyncio.sleep(1) # 模拟网络请求 return f"{url} 的内容" async def run_with_limit(): sem = asyncio.Semaphore(3) # 最大并发3 tasks = [ bounded_fetch(f"url-{i}", sem) for i in range(10) ] return await asyncio.gather(*tasks)5.2 超时处理的三种模式
- 整体超时:
async def overall_timeout(): try: async with asyncio.timeout(1.5): await asyncio.sleep(2) except TimeoutError: print("整体操作超时")- 单个任务超时:
async def single_task_timeout(): try: await asyncio.wait_for(asyncio.sleep(2), timeout=1) except asyncio.TimeoutError: print("单个任务超时")- 弹性超时:
async def flexible_timeout(): tasks = [asyncio.sleep(i) for i in range(1, 4)] done, pending = await asyncio.wait( tasks, timeout=2.5, return_when=asyncio.ALL_COMPLETED ) print(f"完成{len(done)}个,剩余{len(pending)}个")5.3 上下文管理器的妙用
异步上下文管理器能优雅处理资源获取/释放:
class AsyncConnection: async def __aenter__(self): print("建立连接") await asyncio.sleep(0.2) return self async def __aexit__(self, exc_type, exc, tb): print("关闭连接") await asyncio.sleep(0.1) async def query(self): await asyncio.sleep(0.3) return "查询结果" async def use_context(): async with AsyncConnection() as conn: result = await conn.query() print(result)6. 调试与测试策略
6.1 启用调试模式
async def debug_coroutine(): await asyncio.sleep(0.1) undefined_var += 1 # 故意制造错误 def run_with_debug(): asyncio.run(debug_coroutine(), debug=True)调试模式会提供:
- 更详细的协程创建/销毁日志
- 未等待协程的堆栈跟踪
- 慢回调警告(默认超过100ms)
6.2 模拟时间的测试技巧
使用asyncio.test_utils进行时间相关测试:
from unittest import IsolatedAsyncioTestCase class TestAsync(IsolatedAsyncioTestCase): async def test_timeout(self): with self.assertRaises(asyncio.TimeoutError): await asyncio.wait_for(asyncio.sleep(1), timeout=0.1)6.3 记录协程执行流程
自定义事件循环策略记录任务生命周期:
class TracingEventLoopPolicy(asyncio.DefaultEventLoopPolicy): def new_event_loop(self): loop = super().new_event_loop() def tracing_callback(context): print(f"事件循环执行: {context}") loop.set_debug(True) loop.set_task_factory( lambda loop, coro: loop.create_task(coro).add_done_callback( lambda t: print(f"任务完成: {t}") ) ) return loop async def traced_execution(): asyncio.set_event_loop_policy(TracingEventLoopPolicy()) await asyncio.sleep(0.1) print("执行完成")