当前位置: 首页 > news >正文

Python异步编程高级模式:asyncio事件循环与并发控制

Python异步编程高级模式:asyncio事件循环与并发控制

前言

Python异步编程是现代高性能应用开发的关键技术。asyncio作为Python的内置异步框架,提供了强大的事件循环和并发控制能力。本文将深入探讨Python异步编程的高级模式,帮助你构建高效的并发应用。

1. asyncio核心概念

1.1 事件循环机制

事件循环是asyncio的核心,负责调度和执行协程:

# 事件循环基本原理importasyncioasyncdefexample_coroutine():"""示例协程"""print("协程开始")awaitasyncio.sleep(1)# 模拟IO操作print("协程结束")return"结果"# 获取事件循环loop=asyncio.get_event_loop()# 运行协程result=loop.run_until_complete(example_coroutine())print(f"结果:{result}")# 关闭循环loop.close()

1.2 协程与任务

协程是asyncio的基本执行单元,任务是对协程的封装:

importasyncioasyncdeffetch_data(url:str,delay:float)->str:"""模拟数据获取"""print(f"开始获取{url}")awaitasyncio.sleep(delay)# 模拟网络延迟print(f"完成获取{url}")returnf"来自{url}的数据"asyncdefmain():# 创建多个协程coros=[fetch_data("https://api1.example.com",2),fetch_data("https://api2.example.com",1),fetch_data("https://api3.example.com",3),]# 方法1:顺序执行(不推荐)# for coro in coros:# result = await coro# 方法2:并发执行(推荐)tasks=[asyncio.create_task(coro)forcoroincoros]results=awaitasyncio.gather(*tasks)print(f"所有结果:{results}")# 运行asyncio.run(main())

2. 高级并发模式

2.1 信号量控制并发数

importasynciofromtypingimportList,AnyclassConcurrencyLimiter:"""并发限制器"""def__init__(self,max_concurrent:int):self.semaphore=asyncio.Semaphore(max_concurrent)self.active_count=0self.max_concurrent=max_concurrentasyncdefexecute(self,coro):"""执行受限的协程"""asyncwithself.semaphore:self.active_count+=1try:result=awaitcororeturnresultfinally:self.active_count-=1asyncdefexecute_many(self,coros:List)->List[Any]:"""批量执行受限的协程"""tasks=[self.execute(coro)forcoroincoros]returnawaitasyncio.gather(*tasks)# 使用示例asyncdeflimited_fetch(url:str,limiter:ConcurrencyLimiter)->str:"""受限的数据获取"""print(f"等待获取{url}(当前活跃:{limiter.active_count})")returnawaitlimiter.execute(fetch_data(url,1))asyncdefmain():# 限制最多3个并发limiter=ConcurrencyLimiter(max_concurrent=3)urls=[f"https://api{i}.example.com"foriinrange(10)]coros=[limited_fetch(url,limiter)forurlinurls]results=awaitlimiter.execute_many(coros)print(f"获取完成:{len(results)}个结果")asyncio.run(main())

2.2 线程池与进程池集成

importasynciofromconcurrent.futuresimportThreadPoolExecutor,ProcessPoolExecutorimporttimeclassHybridExecutor:"""混合执行器:结合线程池和进程池"""def__init__(self,thread_workers:int=4,process_workers:int=2):self.thread_executor=ThreadPoolExecutor(max_workers=thread_workers)self.process_executor=ProcessPoolExecutor(max_workers=process_workers)asyncdefrun_in_thread(self,func,*args):"""在线程池中运行"""loop=asyncio.get_event_loop()returnawaitloop.run_in_executor(self.thread_executor,func,*args)asyncdefrun_in_process(self,func,*args):"""在进程池中运行"""loop=asyncio.get_event_loop()returnawaitloop.run_in_executor(self.process_executor,func,*args)defshutdown(self):"""关闭执行器"""self.thread_executor.shutdown()self.process_executor.shutdown()# CPU密集型任务defcpu_intensive_task(n:int)->int:"""CPU密集型计算"""total=0foriinrange(n):total+=i*ireturntotal# IO密集型任务defio_intensive_task(duration:float)->str:"""IO密集型任务"""time.sleep(duration)returnf"IO任务完成,耗时{duration}秒"asyncdefmain():executor=HybridExecutor(thread_workers=4,process_workers=2)try:# 混合执行不同类型的任务tasks=[]# CPU密集型任务使用进程池foriinrange(3):task=executor.run_in_process(cpu_intensive_task,1000000)tasks.append(task)# IO密集型任务使用线程池foriinrange(5):task=executor.run_in_thread(io_intensive_task,0.5)tasks.append(task)results=awaitasyncio.gather(*tasks)print(f"所有任务完成:{results}")finally:executor.shutdown()asyncio.run(main())

2.3 异步上下文管理器

importasynciofromtypingimportOptionalclassAsyncDatabaseConnection:"""异步数据库连接"""def__init__(self,connection_string:str):self.connection_string=connection_string self.connection:Optional[object]=Noneself.is_connected=Falseasyncdef__aenter__(self):"""进入异步上下文"""print(f"连接到数据库:{self.connection_string}")awaitasyncio.sleep(0.1)# 模拟连接时间self.connection=f"Connection({self.connection_string})"self.is_connected=Truereturnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):"""退出异步上下文"""ifself.connection:print("关闭数据库连接")awaitasyncio.sleep(0.05)# 模拟关闭时间self.connection=Noneself.is_connected=False# 返回False表示不抑制异常returnFalseasyncdefexecute
http://www.gsyq.cn/news/1427606.html

相关文章:

  • 从零自制简易直流电机:深入理解电磁原理与动手实践
  • 抖音短视频无水印下载技术解析:从网页解析到桌面应用的完整实现方案
  • QMCDecode:QQ音乐加密格式转换方案实现指南
  • 硬核盘点!2026AI论文写作工具大盘点(覆盖 99% 毕业论文需求)
  • CPAL脚本避坑指南:TestcaseFail和TestCaseSkipped用不对,小心你的测试结果全乱套
  • 基于ESP32-C3与太阳能供电的物联网植物监测系统全解析
  • 量子计算硬件基准测试:原理、指标与实践指南
  • 用导电材料与微控制器打造地面互动版西蒙游戏:从电路原理到Scratch编程实践
  • C语言数组10秒搞懂!从原理到代码,新手一看就会
  • 机器人舵机供电方案:多路可调电源设计与避坑指南
  • GTA5线上小助手:新手也能轻松上手的洛圣都全能工具箱
  • 2026郑州吉修匠专注厨卫阳台屋顶漏水,免砸砖一站式防水修缮 - 吉修匠
  • 基于Arduino与MQ-35传感器搭建桌面空气质量监测站
  • 5步搭建个人游戏串流服务器:Sunshine跨平台串流终极指南
  • 测试新手也能玩转:手把手教你用龙测AI-TestOps搞定银行App的登录支付测试
  • 基于Arduino与SIM900的GSM短信温湿度监控系统实战指南
  • 现代 AI 系统技术全景图:从硅片到智能应用的完整价值链
  • 阴阳师自动化脚本:解放双手的智能游戏助手,3步开启高效挂机体验
  • 如何快速提取Godot游戏资源:终极PCK解包工具指南
  • 从零搭建低成本机器人平台:Arduino/ESP32与L298N电机驱动实战
  • 如何用SMUDebugTool解锁AMD Ryzen终极性能:10个硬件调校技巧
  • Pan-Baidu-Download技术方案:命令行环境下的百度网盘高速下载解决方案
  • Arduino Nano与OLED屏创意磁贴:从原型设计到3D打印的完整实践
  • 码力全开特辑直播预告|6月1日19:00,Triton昇腾亲和扩展编程实践
  • 低秩模型重构理论应用方案【附仿真】
  • 3步解锁网易云音乐NCM加密文件:ncmdumpGUI终极免费解密工具
  • 多智能体系统编程实战:从协调协议到混合架构的踩坑与优化
  • FreeBSD 使用代理运行命令
  • 深入Yjs与Quill的‘黑盒’:手把手教你调试协同编辑中的数据流与冲突解决
  • 一个粉丝的软考独白:我可能考砸了,但这不重要