当前位置: 首页 > news >正文

Python asyncio深入解析:从事件循环到协程调度

引言异步编程是现代Python后端开发的核心技能之一。作为从Python转向Rust的开发者我深刻理解asyncio的设计理念和性能优势。本文将深入解析asyncio的内部机制从事件循环到协程调度帮助你全面掌握Python异步编程。一、asyncio基础架构1.1 事件循环Event Loop事件循环是asyncio的核心组件负责调度协程、处理IO事件和管理任务import asyncio async def main(): print(Hello, asyncio!) # 获取事件循环 loop asyncio.get_event_loop() loop.run_until_complete(main()) # Python 3.7 简化写法 asyncio.run(main())1.2 协程Coroutine协程是轻量级的并发原语可以暂停和恢复执行async def fetch_data(url): print(fFetching {url}) await asyncio.sleep(1) # 模拟IO操作 return fData from {url} async def main(): result await fetch_data(https://api.example.com) print(result) asyncio.run(main())1.3 任务Task任务是协程的包装器可以并发执行async def task1(): await asyncio.sleep(1) return Task 1 completed async def task2(): await asyncio.sleep(2) return Task 2 completed async def main(): # 创建任务 t1 asyncio.create_task(task1()) t2 asyncio.create_task(task2()) # 等待所有任务完成 results await asyncio.gather(t1, t2) print(results) # [Task 1 completed, Task 2 completed] asyncio.run(main())二、事件循环深入2.1 事件循环的工作原理事件循环使用Reactor模式处理IO事件import asyncio import socket async def handle_client(reader, writer): data await reader.read(100) message data.decode() addr writer.get_extra_info(peername) print(fReceived {message!r} from {addr!r}) writer.close() await writer.wait_closed() async def main(): server await asyncio.start_server( handle_client, 127.0.0.1, 8888 ) async with server: await server.serve_forever() asyncio.run(main())2.2 事件循环实现对比实现平台特点SelectorEventLoop跨平台使用select/poll/epollProactorEventLoopWindows使用IOCPuvloopLinux/macOS高性能第三方实现2.3 使用uvloop提升性能import asyncio import uvloop # 使用uvloop替换默认事件循环 uvloop.install() async def main(): print(Running on uvloop) asyncio.run(main())三、协程调度机制3.1 协程状态机协程有四种状态PENDING初始状态RUNNING正在执行FINISHED执行完成CANCELLED被取消import asyncio async def my_coroutine(): try: while True: await asyncio.sleep(1) print(Running...) except asyncio.CancelledError: print(Cancelled!) raise async def main(): task asyncio.create_task(my_coroutine()) await asyncio.sleep(3) task.cancel() # 取消任务 try: await task except asyncio.CancelledError: print(Task was cancelled) asyncio.run(main())3.2 任务调度策略asyncio使用协作式调度协程必须主动让出控制权async def compute(): # 计算密集型任务会阻塞事件循环 result 0 for i in range(10**7): result i return result async def io_task(): print(Starting IO task) await asyncio.sleep(0.1) print(IO task completed) async def main(): # 计算任务会阻塞IO任务 compute_task asyncio.create_task(compute()) io_task asyncio.create_task(io_task()) await compute_task await io_task asyncio.run(main())3.3 解决计算密集型任务阻塞问题import asyncio from concurrent.futures import ThreadPoolExecutor async def compute(): loop asyncio.get_event_loop() with ThreadPoolExecutor() as pool: # 在线程池中执行计算密集型任务 result await loop.run_in_executor(pool, heavy_compute) return result def heavy_compute(): result 0 for i in range(10**7): result i return result async def main(): result await compute() print(fResult: {result}) asyncio.run(main())四、并发模式4.1 并行执行多个任务async def fetch(url): print(fFetching {url}) await asyncio.sleep(1) return fResult from {url} async def main(): urls [ https://api.example.com/1, https://api.example.com/2, https://api.example.com/3 ] # 使用gather并发执行 results await asyncio.gather(*[fetch(url) for url in urls]) print(results) asyncio.run(main())4.2 带超时的任务执行async def long_running_task(): await asyncio.sleep(5) return Done async def main(): try: result await asyncio.wait_for(long_running_task(), timeout2) print(result) except asyncio.TimeoutError: print(Task timed out) asyncio.run(main())4.3 任务优先级控制async def high_priority(): print(High priority task started) await asyncio.sleep(0.1) print(High priority task completed) async def low_priority(): print(Low priority task started) await asyncio.sleep(1) print(Low priority task completed) async def main(): # 先启动低优先级任务 low asyncio.create_task(low_priority()) await asyncio.sleep(0.01) # 让低优先级任务开始 # 启动高优先级任务 high asyncio.create_task(high_priority()) await high await low asyncio.run(main())五、异步IO操作5.1 文件操作async def read_file(path): loop asyncio.get_event_loop() with open(path, r) as f: # 使用线程池执行阻塞IO content await loop.run_in_executor(None, f.read) return content async def main(): content await read_file(example.txt) print(content) asyncio.run(main())5.2 网络操作import asyncio import aiohttp async def fetch_with_aiohttp(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() async def main(): content await fetch_with_aiohttp(https://www.python.org) print(content[:500]) asyncio.run(main())5.3 数据库操作import asyncio import asyncpg async def query_database(): conn await asyncpg.connect(postgresql://user:passlocalhost/db) result await conn.fetch(SELECT * FROM users LIMIT 10) await conn.close() return result async def main(): users await query_database() print(users) asyncio.run(main())六、实战构建异步Web服务6.1 使用FastAPIfrom fastapi import FastAPI import asyncio app FastAPI() async def heavy_computation(): await asyncio.sleep(1) return {result: 42} app.get(/) async def read_root(): return {message: Hello World} app.get(/compute) async def compute(): return await heavy_computation()6.2 WebSocket支持from fastapi import FastAPI, WebSocket import asyncio app FastAPI() app.websocket(/ws) async def websocket_endpoint(websocket: WebSocket): await websocket.accept() while True: data await websocket.receive_text() await websocket.send_text(fMessage received: {data}) await asyncio.sleep(0.1)6.3 中间件实现import asyncio from fastapi import FastAPI, Request app FastAPI() app.middleware(http) async def log_requests(request: Request, call_next): start_time asyncio.get_event_loop().time() response await call_next(request) duration asyncio.get_event_loop().time() - start_time print(fRequest took {duration:.2f} seconds) return response七、性能优化技巧7.1 避免阻塞调用# 错误阻塞事件循环 async def bad_example(): import time time.sleep(1) # 阻塞 # 正确使用异步版本 async def good_example(): await asyncio.sleep(1) # 非阻塞7.2 批量操作优化async def process_items(items): # 分批处理 batch_size 100 for i in range(0, len(items), batch_size): batch items[i:ibatch_size] tasks [process_item(item) for item in batch] await asyncio.gather(*tasks)7.3 连接池管理import asyncpg from typing import List class DatabasePool: def __init__(self, dsn: str, min_size: int 5, max_size: int 20): self.dsn dsn self.min_size min_size self.max_size max_size self.pool None async def init(self): self.pool await asyncpg.create_pool( dsnself.dsn, min_sizeself.min_size, max_sizeself.max_size ) async def query(self, sql: str, *args): async with self.pool.acquire() as conn: return await conn.fetch(sql, *args)八、从Python到Rust的异步迁移8.1 asyncio vs Tokio对比特性Python asyncioRust Tokio运行时单线程事件循环多线程工作窃取并发模型协程协程 线程池性能较好接近原生内存安全运行时检查编译时保证8.2 代码对比Python版本import asyncio async def fetch(url): print(fFetching {url}) await asyncio.sleep(1) return fData from {url} async def main(): results await asyncio.gather( fetch(url1), fetch(url2), fetch(url3) ) print(results) asyncio.run(main())Rust版本use tokio; async fn fetch(url: str) - String { println!(Fetching {}, url); tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; format!(Data from {}, url) } #[tokio::main] async fn main() { let results tokio::join!( fetch(url1), fetch(url2), fetch(url3) ); println!({:?}, results); }九、常见问题与解决方案9.1 事件循环阻塞# 问题计算密集型任务阻塞事件循环 async def blocking_task(): result 0 for i in range(10**8): result i # 解决方案使用线程池 async def non_blocking_task(): loop asyncio.get_event_loop() with ThreadPoolExecutor() as pool: result await loop.run_in_executor(pool, heavy_compute) return result9.2 协程泄漏# 问题忘记await任务 async def main(): task asyncio.create_task(long_running()) # task被丢弃可能导致资源泄漏 # 解决方案确保await所有任务 async def main(): task asyncio.create_task(long_running()) await task9.3 递归深度限制# 问题协程递归过深 async def recursive(n): if n 0: return 0 return n await recursive(n - 1) # 解决方案使用循环替代递归 async def iterative(n): result 0 for i in range(n 1): result i return result十、总结asyncio是Python异步编程的核心库通过事件循环、协程和任务实现高效的并发编程。关键要点包括事件循环负责调度协程和处理IO事件协程轻量级的并发原语任务协程的并发包装器并发模式gather、wait_for、join等性能优化避免阻塞、使用线程池、批量操作通过掌握asyncio你可以构建高性能的异步应用为后续学习Rust异步编程打下坚实基础。参考资料asyncio官方文档https://docs.python.org/3/library/asyncio.htmlFastAPI文档https://fastapi.tiangolo.com/uvloop文档https://uvloop.readthedocs.io/
http://www.gsyq.cn/news/1382230.html

相关文章:

  • 3分钟上手Harepacker-resurrected:MapleStory游戏资源编辑完全指南
  • 仅限首批200家ISV开放的DeepSeek边缘编译器DSL规范(v1.8 beta),5大算子融合规则首次披露
  • skill-sample-nodejs-fact测试与认证:如何通过Alexa技能商店审核
  • 当所有低代码都在卷画布时,我们押注了源代码本身
  • 如何快速掌握JavaScript异步编程:Async-JavaScript-Cheatsheet项目完全解析
  • nnAudio部署指南:跨平台兼容性与生产环境最佳实践
  • 如何用WaveTools实现《鸣潮》性能优化:从卡顿到流畅的完整解决方案
  • RookieAI_yolov8:基于YOLOv8的智能目标检测与交互系统技术解析
  • 基于树莓派与433MHz射频模块的无线智能家居系统DIY指南
  • 大湾区民营建筑企业排名/排行榜 - 奔跑123
  • 如何选择深圳环保板材全屋定制?2024年决策维度与趋势解析 - 产品测评官
  • 5分钟解决Umi-OCR启动崩溃:OCR引擎插件缺失的终极修复指南
  • 小程序真机抓包实战:HTTPS解密与微信开发者工具联动
  • 如何用eSpeak NG实现127种语言的免费文本转语音?终极指南
  • 粤港澳大湾区实力民营建企排行/排行榜 - 奔跑123
  • 不想直接用 hccl?第一次了解 hcomm 能做什么
  • 第一次做 PD 分离推理?先了解 hixl 能做什么
  • B站CC字幕下载神器:三步搞定视频字幕,离线学习超简单!
  • 这个Skill太香了!Karpathy说的AI写代码的毛病,直接治好
  • FreeJ2ME:现代设备上重温经典J2ME游戏的终极指南
  • eSpeak NG共振峰合成引擎架构解析与多语言TTS集成实战
  • 揭秘Midjourney V6霓虹渲染底层逻辑:为何--stylize 1000反而毁掉光晕?RGB偏移阈值与--sref权重的黄金配比首次公开
  • Android BLE蓝牙开发实战:使用BluetoothKit框架实现高效设备通信
  • 为你的OpenClaw智能体工作流配置Taotoken作为稳定模型供应商
  • 终极指南:如何快速下载网站Git仓库并恢复完整代码
  • 【2026实测】怎么提高论文原创度?盘点8款主流降AI工具,附结构级优化指南
  • Social Likes三大皮肤主题深度对比:如何选择最适合您网站的社交按钮样式
  • 如何用LabelImg2快速完成图像标注:从零开始的完整指南
  • 用PyTorch复现FactorVAE:一个能同时预测收益和风险的量化模型实战教程
  • 2026贵阳高端美容院推荐|皮肤管理避坑指南与官方对接通道 - 精选优质企业推荐官