当前位置: 首页 > news >正文

线程池版流水线模式 技术笔记

一、模式核心思想

流水线模式本质是任务分阶段串行处理,把一个完整业务任务拆分成多道独立工序(本例拆分为 TaskA、TaskB、TaskC 三个阶段)。
每个阶段由独立线程池负责消费处理,上一阶段处理完成后,自动把任务提交给下一阶段线程池,形成生产者→阶段A→阶段B→阶段C→任务收尾的流水线链路。
// 核心特点:工序解耦、各阶段线程池独立管控、任务异步串行流转、支持高并发持续生产任务

二、整体结构分层

  1. 任务实体类 Task:封装业务数据 + 各阶段业务处理逻辑
  2. 阶段任务处理类 TaskA/TaskB/TaskC:实现 Runnable,绑定对应业务阶段,处理完自动投递下一线程池
  3. 主线程启动类 Main:初始化多组独立线程池、任务队列、完成任务集合;启动生产者线程持续生成任务;启动监控线程观测队列积压与任务计算正确性

三、核心代码实现

1. 任务实体类 Task

封装任务核心变量 num,提供三段流水线处理方法,每个方法模拟业务耗时。

packageduoxiancheng.xq0529;publicclassTask{// 任务核心运算数据intnum;// 流水线第一阶段处理publicvoidtaskA(){try{// 模拟业务处理耗时500msThread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 阶段A业务逻辑:数值+20num+=20;}// 流水线第二阶段处理publicvoidtaskB(){try{// 模拟业务处理耗时500msThread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 阶段B业务逻辑:数值*10num*=10;}// 流水线第三阶段处理publicvoidtaskC(){try{// 模拟业务处理耗时500msThread.sleep(500);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 阶段C业务逻辑:数值平方num*=num;}}

// 备注:三个方法严格串行依赖,必须按 A→B→C 顺序执行,最终预期结果:(0+20)*10 再平方 = 40000

2. 阶段任务处理器

2.1 第一阶段 TaskA

执行完 taskA 后,将任务交给第二阶段线程池

packageduoxiancheng.xq0529;importjava.util.ArrayList;importjava.util.concurrent.Executor;// 流水线第一阶段任务publicclassTaskAimplementsRunnable{ExecutorexecutorB;// 绑定第二阶段线程池ExecutorexecutorC;// 绑定第三阶段线程池ArrayList<Task>taskDoneList;// 存放最终完成的任务Tasktask;// 当前待处理任务publicTaskA(ExecutorexecutorB,ExecutorexecutorC,ArrayList<Task>taskDoneList,Tasktask){this.executorB=executorB;this.executorC=executorC;this.taskDoneList=taskDoneList;this.task=task;}@Overridepublicvoidrun(){// 执行第一阶段业务逻辑task.taskA();// 流转:提交任务给第二阶段线程池executorB.execute(newTaskB(task,executorC,taskDoneList));}}
2.2 第二阶段 TaskB

执行完 taskB 后,将任务交给第三阶段线程池

packageduoxiancheng.xq0529;importjava.util.ArrayList;importjava.util.concurrent.Executor;// 流水线第二阶段任务classTaskBimplementsRunnable{Tasktask;ExecutorexecutorC;ArrayList<Task>taskDoneList;publicTaskB(Tasktask,ExecutorexecutorC,ArrayList<Task>taskDoneList){this.task=task;this.executorC=executorC;this.taskDoneList=taskDoneList;}@Overridepublicvoidrun(){// 执行第二阶段业务逻辑task.taskB();// 流转:提交任务给第三阶段线程池executorC.execute(newTaskC(task,taskDoneList));}}
2.3 第三阶段 TaskC

最后阶段处理完毕,把任务加入完成集合

packageduoxiancheng.xq0529;importjava.util.ArrayList;// 流水线第三阶段任务classTaskCimplementsRunnable{Tasktask;ArrayList<Task>taskDoneList;publicTaskC(Tasktask,ArrayList<Task>taskDoneList){this.task=task;this.taskDoneList=taskDoneList;}@Overridepublicvoidrun(){// 执行第三阶段业务逻辑task.taskC();// 整个流水线处理完成,加入完成列表taskDoneList.add(task);}}

// 备注:各阶段通过构造器传递下一级线程池与任务上下文,实现任务自动流转,无需手动阻塞等待

3. 主线程工厂启动类 Main

初始化三级线程池、队列,启动生产者持续造任务,启动监控线程观测运行状态。

packageduoxiancheng.xq0529;importjava.util.ArrayList;importjava.util.concurrent.ArrayBlockingQueue;importjava.util.concurrent.Executor;importjava.util.concurrent.ThreadPoolExecutor;importjava.util.concurrent.TimeUnit;publicclassMain{publicstaticvoidmain(String[]args){// 为每个阶段独立定义阻塞队列,容量20ArrayBlockingQueue<Runnable>queueA=newArrayBlockingQueue<>(20);ArrayBlockingQueue<Runnable>queueB=newArrayBlockingQueue<>(20);ArrayBlockingQueue<Runnable>queueC=newArrayBlockingQueue<>(20);// 保存所有流水线执行完成的任务ArrayList<Task>taskDoneList=newArrayList<>();// 初始化三级独立线程池 // 核心线程5、最大线程10、空闲超时1000msExecutorexecutorA=newThreadPoolExecutor(5,10,1000,TimeUnit.MILLISECONDS,queueA);ExecutorexecutorB=newThreadPoolExecutor(5,10,1000,TimeUnit.MILLISECONDS,queueB);ExecutorexecutorC=newThreadPoolExecutor(5,10,1000,TimeUnit.MILLISECONDS,queueC);// 生产者线程:循环不断生成新任务,提交给第一阶段线程池newThread(()->{intcount=0;while(true){try{// 控制生产速率Thread.sleep(50);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 新建任务实例Tasktask=newTask();// 提交到流水线首阶段executorA.execute(newTaskA(executorB,executorC,taskDoneList,task));count++;System.out.println("生产者已生成任务数:"+count);}}).start();// 监控线程:定时查看队列积压、完成任务数、计算正确率newThread(()->{while(true){try{// 每秒监控一次Thread.sleep(1000);}catch(InterruptedExceptione){thrownewRuntimeException(e);}// 打印三个阶段队列积压大小System.out.println("队列A大小:"+queueA.size()+" 队列B大小:"+queueB.size()+" 队列C大小:"+queueC.size());// 统计结果正确的任务数量intcorrectCount=0;for(inti=0;i<taskDoneList.size();i++){// 符合预期结果40000即为正确if(taskDoneList.get(i).num==40000){correctCount++;}}System.out.println("已完成任务总数:"+taskDoneList.size()+" 运算正确任务数:"+correctCount);}}).start();}}

// 关键备注1:每个阶段独享线程池和队列,互不阻塞,可单独调优核心线程数、队列容量
// 关键备注2:生产者无限循环生成任务,通过 Thread.sleep 控流,避免瞬间打满队列
// 关键备注3:监控线程实时观测队列积压,可用来判断线程池参数是否合理、是否有任务堆积瓶颈

四、执行流程梳理

  1. Main 初始化 A/B/C 三组线程池 + 对应阻塞队列
  2. 生产者线程循环创建 Task 对象,提交给 executorA
  3. executorA 调度执行 TaskA,完成后把任务封装为 TaskB 提交给 executorB
  4. executorB 调度执行 TaskB,完成后封装为 TaskC 提交给 executorC
  5. executorC 调度执行 TaskC,完成后将任务加入完成列表
  6. 监控线程每秒打印队列积压、完成任务数、业务计算正确数

五、技术要点总结

  1. 流水线拆分:复杂任务拆分为多阶段,单一职责,便于维护和单独扩容
  2. 线程池隔离:各阶段独立线程池,某一阶段阻塞不影响其他阶段运行
  3. 任务自动流转:通过 Runnable 构造器传递下一级线程池,实现链式投递
  4. 流量可控:生产者通过休眠控制任务生产速率,配合有界队列防止无限积压
  5. 可观测性:内置监控线程,实时查看队列水位与任务处理正确率,方便调参和排查瓶颈
http://www.gsyq.cn/news/1424697.html

相关文章:

  • 豆包在抖音生态中的实战应用场景指南
  • 口袋里的工艺密码 一件衣服的细节革命史
  • 2026 主流桌面管理系统盘点,降本增效必备
  • 如何用Sherpa-Onnx构建完全离线的跨平台语音AI应用
  • RTX-Tiny多版本库管理实践与Keil工程配置
  • 量子模拟解析1T-TaS2电子弛豫的噪声辅助机制
  • 架构进阶:从 Docker 环境变量到 Nacos 统一配置中心实战
  • 第16篇 实战:用 Docker Compose 编排 WordPress 与 MySQL
  • AI搜索推广工具如何工程化落地:中科信枢龙虾智能体的内容资产与多平台分发架构
  • 神经形态计算π²架构:突破AI硬件能效瓶颈
  • 手把手教你用Python+sklearn计算classification_report(附多分类不平衡数据集实战)
  • 【2024最严AI监管倒计时】:Claude风险评估矩阵4.2版紧急升级清单(含GDPR/CCPA/《生成式AI服务管理暂行办法》三重映射表)
  • AI看懂“弦外之音“:中科院软件所等机构联合攻克视频隐喻理解难题
  • AI健康管家:大模型赋能私域健康服务,重塑新零售智慧运营体系
  • 石漠化区耕作污染的地下水微生物—毒理联合响应机制及模拟方法解析【附代码】
  • 上海厂区化粪池清理技术实操推荐:上海专业管道清洗/上海化粪池油污清理/上海化粪池清理电话/正规服务品牌参考 - 优质品牌商家
  • 浙江大学与伦敦大学学院联手打造“科学地图“
  • 每日算法快闪赛:高效刷题的技术秘籍
  • 基于Arduino与超声波传感器的智能停车辅助系统DIY指南
  • 别再浪费硬盘了!用Ubuntu的mdadm组RAID 0,榨干旧硬盘性能当高速缓存盘
  • 宇视VM易用性推宣—相机报表导出
  • 格式排版也能 “躺平”?okbiye 论文格式神器,让你和几十页格式指南说再见
  • 别再手动删点了!用Python的RDP算法5分钟搞定轨迹数据简化(附完整代码)
  • 网安圈的“世界杯”!一文讲透传说中的“护网行动”
  • 矫平机用着用着就出问题?这几类常见故障你该提前了解
  • 情感提示(Emotion Prompting)的原理是什么?“深呼吸“这类提示为什么有效?
  • 零基础学 PLC,千万不要一开始就报名,不想采坑必看
  • 如何通过Atmosphere大气层系统为你的Switch解锁终极性能
  • C++初阶 模版进阶
  • NTU、HKU等多所顶校联手,让AI同时“多角度看片“