当前位置: 首页 > news >正文

RabbitMQ 从零到实战:概念、配置与 Spring Boot 集成指南

RabbitMQ 从零到实战:概念、配置与 Spring Boot 集成指南

一、RabbitMQ 核心概念

1.1 AMQP 模型中的角色

Producer(生产者) │ │ 发送消息 ▼ Exchange(交换机) ──── Binding(绑定规则) ──── Queue(队列) │ │ 消费消息 ▼ Consumer(消费者)
角色说明
Producer消息发送方,将消息发送到 Exchange
Exchange接收消息并按规则路由到 Queue。消息不直接发到 Queue
BindingExchange 和 Queue 之间的绑定关系,定义路由规则
Queue消息最终存储的地方,消费者从这里拉取消息
Consumer消息接收方,从 Queue 中消费消息
Routing Key消息携带的路由键,Exchange 根据它决定发到哪个 Queue
Virtual Host逻辑隔离单元,类似数据库的 schema,不同 vhost 之间资源互不可见

1.2 Exchange 四种类型

类型路由规则典型场景
DirectRouting Key 精确匹配点对点,如订单处理
Fanout广播到所有绑定的 Queue,忽略 Routing Key通知广播,如日志
TopicRouting Key 模式匹配(*匹配一个词,#匹配多个词)灵活路由,如order.createorder.*
Headers根据消息头属性匹配(几乎不用)特殊场景

1.3 与 Kafka 等术语的对应关系

很多人混淆 RabbitMQ 和 Kafka 的术语:

概念RabbitMQKafkaRocketMQ
消息存储单元QueuePartition (within Topic)Queue (within Topic)
消息分类/频道Exchange + Routing KeyTopicTopic
消费者分组多个 Consumer 消费同一 Queue(竞争消费)Consumer GroupConsumer Group
消息广播Fanout Exchange 绑定多个 Queue不同 Consumer Group不同 Consumer Group

注意:RabbitMQ 中没有原生的 “Topic”(作为消息分类的概念),它用 Exchange + Queue + Binding 的组合来实现类似能力。Kafka 和 RocketMQ 才有 Topic 的概念。

1.4 消费模式

竞争消费(Work Queue)

  • 多个 Consumer 监听同一个 Queue
  • 一条消息只被其中一个 Consumer 消费
  • 用于负载均衡

发布订阅(Pub/Sub)

  • 使用 Fanout Exchange 绑定多个 Queue
  • 每个 Queue 对应一个 Consumer
  • 一条消息被所有 Consumer 消费
  • 用于广播通知

1.5 消息确认机制(ACK)

模式说明风险
自动确认(auto-ack)消息发送给 Consumer 后立即从 Queue 删除Consumer 处理失败则消息丢失
手动确认(manual-ack)Consumer 处理完后主动发 ACK,Queue 才删除消息更安全,但需处理超时和重复投递
拒绝(reject/nack)Consumer 明确拒绝消息,可选择重新入队或丢弃死循环风险(不断重新入队)

1.6 消息持久化

  • Queue 持久化:Broker 重启后 Queue 定义不丢失(durable=true
  • 消息持久化:消息写入磁盘,Broker 重启后消息不丢失(deliveryMode=2
  • Exchange 持久化:Broker 重启后 Exchange 不丢失

三者都要配置才能保证消息在 Broker 故障恢复后不丢。


注:

博客:

https://blog.csdn.net/badao_liumang_qizhi

二、首次申请和部署 RabbitMQ

2.1 方式一:本地安装(开发环境)

Docker 方式(推荐)

dockerrun-d--namerabbitmq\-p5672:5672\-p15672:15672\-eRABBITMQ_DEFAULT_USER=admin\-eRABBITMQ_DEFAULT_PASS=admin123\rabbitmq:3-management
  • 5672:AMQP 协议端口(程序连接用)
  • 15672:管理控制台端口(浏览器访问)
  • 访问http://localhost:15672使用 admin/admin123 登录管理后台

2.2 方式二:云服务(生产环境)

以阿里云 AMQP(即本文参考工程使用的方式)为例:

申请步骤

  1. 登录阿里云控制台 → 搜索"消息队列 AMQP"
  2. 创建实例(选择地域、规格)
  3. 获取实例信息:
    • 接入点地址(addresses):如amqp-cn-xxx.mq-amqp.cn-qingdao-xxx.aliyuncs.com
    • 实例 ID(instance-id)
    • AccessKey ID / Secret
    • 用户名/密码(由 AccessKey 生成的 Base64 编码)
  4. 在实例控制台中创建 Virtual Host
  5. 创建 Exchange
  6. 创建 Queue
  7. 建立 Binding(将 Queue 绑定到 Exchange,设置 Routing Key)

2.3 管理后台中的操作

在 RabbitMQ Management UI 中需要手动或通过 API 创建:

1. 创建 Exchange - Name: my-app.exchange - Type: direct - Durable: true 2. 创建 Queue - Name: my-app.import-task - Durable: true 3. 创建 Binding - Source: my-app.exchange - Destination: my-app.import-task - Routing Key: import-task(通常和 Queue 名相同)

三、Spring Boot 集成 RabbitMQ 配置详解

3.1 配置

spring:rabbitmq:# AMQP 接入点地址(阿里云格式,自建则为 host:port)addresses:amqp-cn-xxx5.mq-amqp.cn-qingdao-xxxx-a.aliyuncs.com# 认证信息(阿里云 AMQP 用 Base64 编码的 AccessKey)username:xxxx...(Base64编码)password:xxxxxx...(Base64编码)# 阿里云 AMQP 特有配置access-key-id:xxxxxxaccess-key-secret:xxxxinstance-id:amqp-cn-xxxxregion:cn-qingdao# 消费者监听配置listener:simple:# 消费失败时不重新入队(避免死循环)default-requeue-rejected:false# 最小并发消费者数concurrency:1# 最大并发消费者数max-concurrency:3# 预取数量(每次从 Broker 拉取几条未确认的消息)prefetch:1

3.2 配置项含义详解

配置项含义建议值
addressesBroker 地址,多个用逗号分隔生产用集群地址
username / password连接认证不要用默认 guest
virtual-host虚拟主机,逻辑隔离按环境或团队划分
connection-timeout连接超时5000~10000ms
listener.simple.concurrency每个 @RabbitListener 启动的最小消费者线程数IO 密集设 2~5
listener.simple.max-concurrency最大消费者线程数5~20
listener.simple.prefetch每个消费者一次预取的消息数1(保证公平分发),高吞吐设 10~50
listener.simple.default-requeue-rejected消费异常时是否重新入队false(配合死信队列)

3.3 自建 RabbitMQ 的标准配置

spring:rabbitmq:host:192.168.1.1xxport:5672username:myapppassword:myapp123virtual-host:/myappconnection-timeout:5000listener:simple:acknowledge-mode:manual# 手动确认concurrency:2max-concurrency:5prefetch:1default-requeue-rejected:falsepublisher-confirm-type:correlated# 生产者确认publisher-returns:true# 消息无法路由时回调

四、完整示例:Spring Boot + RabbitMQ 异步导入

4.1 依赖(pom.xml)

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

4.2 配置(application.yml)

spring:rabbitmq:host:localhostport:5672username:adminpassword:admin123virtual-host:/app:mq:exchange:app.exchangequeue:app.import-taskrouting-key:import-task

4.3 RabbitMQ 资源声明(自动创建 Exchange、Queue、Binding)

packagecom.example.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/** * RabbitMQ 资源配置. * 应用启动时自动创建 Exchange、Queue 并建立绑定关系. */@ConfigurationpublicclassRabbitMqConfig{@Value("${app.mq.exchange}")privateStringexchangeName;@Value("${app.mq.queue}")privateStringqueueName;@Value("${app.mq.routing-key}")privateStringroutingKey;/** 声明持久化交换机. */@BeanpublicDirectExchangeimportExchange(){returnnewDirectExchange(exchangeName,true,false);}/** 声明持久化队列. */@BeanpublicQueueimportQueue(){returnnewQueue(queueName,true);}/** 建立绑定关系. */@BeanpublicBindingimportBinding(QueueimportQueue,DirectExchangeimportExchange){returnBindingBuilder.bind(importQueue).to(importExchange).with(routingKey);}}

4.4 消息体 DTO

packagecom.example.dto;importjava.io.Serializable;importjava.util.List;importlombok.Data;/** * 导入任务消息体. */@DatapublicclassImportTaskMessageimplementsSerializable{/** 批次ID. */privateLongbatchId;/** 批次号. */privateStringbatchNo;/** 待插入的数据(JSON序列化后体积较大时,可改为只传batchId,消费端自己查数据). */privateList<ImportRecord>records;}

4.5 生产者(发送消息)

packagecom.example.mq;importcom.example.dto.ImportTaskMessage;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Value;importorg.springframework.stereotype.Component;importjakarta.annotation.Resource;/** * 导入任务消息生产者. */@Slf4j@ComponentpublicclassImportTaskProducer{@ResourceprivateRabbitTemplaterabbitTemplate;@Value("${app.mq.exchange}")privateStringexchange;@Value("${app.mq.routing-key}")privateStringroutingKey;/** * 发送导入任务消息. */publicvoidsend(ImportTaskMessagemessage){log.info("发送导入任务消息,批次号:{}",message.getBatchNo());rabbitTemplate.convertAndSend(exchange,routingKey,message);log.info("导入任务消息发送完成");}}

4.6 消费者(接收并处理消息)

packagecom.example.mq;importcom.example.dto.ImportRecord;importcom.example.dto.ImportTaskMessage;importcom.example.mapper.ImportRecordMapper;importjakarta.annotation.Resource;importjava.util.List;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/** * 导入任务消息消费者. */@Slf4j@ComponentpublicclassImportTaskConsumer{privatestaticfinalintBATCH_SIZE=2000;@ResourceprivateImportRecordMapperimportRecordMapper;/** * 消费导入任务. * 使用 @RabbitListener 注解声明监听哪个队列. */@RabbitListener(queues="${app.mq.queue}")publicvoidconsume(ImportTaskMessagemessage){log.info("收到导入任务,批次号:{},数据量:{}",message.getBatchNo(),message.getRecords().size());try{List<ImportRecord>records=message.getRecords();inttotal=records.size();// 分批插入for(inti=0;i<total;i+=BATCH_SIZE){intend=Math.min(i+BATCH_SIZE,total);List<ImportRecord>batch=records.subList(i,end);importRecordMapper.batchInsert(batch);}log.info("导入任务处理完成,批次号:{},共插入{}条",message.getBatchNo(),total);}catch(Exceptione){log.error("导入任务处理失败,批次号:{}",message.getBatchNo(),e);// 抛出异常后,根据 default-requeue-rejected 配置决定是否重新入队throwe;}}}

4.7 Service 层(校验通过后发消息)

packagecom.example.service.impl;importcom.example.dto.ImportRecord;importcom.example.dto.ImportTaskMessage;importcom.example.mq.ImportTaskProducer;importcom.example.service.ImportService;importjakarta.annotation.Resource;importjava.util.List;importorg.springframework.stereotype.Service;@ServicepublicclassImportServiceImplimplementsImportService{@ResourceprivateImportTaskProducerproducer;@OverridepublicStringimportData(List<ImportRecord>records,StringoperatorId){// 同步校验for(inti=0;i<records.size();i++){if(records.get(i).getAmount()<=0){thrownewRuntimeException("第"+(i+1)+"行:数量必须大于0");}}// 保存主表...(略)StringbatchNo="20260610001";// 发送MQ消息,异步执行插入ImportTaskMessagemessage=newImportTaskMessage();message.setBatchId(1L);message.setBatchNo(batchNo);message.setRecords(records);producer.send(message);return"导入任务已提交,批次号:"+batchNo;}}

4.8 执行时序

请求线程 RabbitMQ Broker 消费者线程 │ │ │ │── 同步校验 ──→ │ │ │── 保存主表 ──→ │ │ │── producer.send(msg) ──→ │ │ │ │── 持久化消息 ──→ │ │← 返回"任务已提交" ── │ │ │ │── 推送消息 ──→ │ │ (HTTP已响应) │ │── 批量INSERT │ │ │── ... │ │ │── INSERT完成 │ │←── ACK ── │ │ │── 删除消息 │

五、生产环境最佳实践

5.1 消息可靠投递(不丢消息)

Producer ──确认──→ Exchange ──确认──→ Queue ──确认──→ Consumer │ │ │ │ │ publisher-confirm │ mandatory │ 持久化 │ 手动ACK
  • Publisher Confirm:Broker 收到消息后回调确认
  • Mandatory:消息无法路由到任何 Queue 时回调
  • Queue 持久化 + 消息持久化:Broker 重启不丢
  • 手动 ACK:Consumer 处理完才确认

6.2 消费幂等性

消息可能被重复投递(网络抖动、Consumer 超时未ACK),消费逻辑必须幂等:

  • 用唯一业务ID做去重判断
  • 使用数据库唯一约束
  • 先查后插的 “SELECT + INSERT” 模式

6.3 死信队列(DLQ)

消费失败的消息不要无限重试,设置死信队列:

正常 Queue ──(消费失败/超时/被拒绝)──→ 死信 Exchange ──→ 死信 Queue

运维人员可以查看死信队列中的消息,分析失败原因后手动处理或重新投递。

6.4 消息体大小

  • RabbitMQ 默认单条消息上限 128MB,但建议控制在 1MB 以内
  • 大量数据(如12万条记录)不要直接放消息体里
  • 推荐做法:消息体只放batchId,消费者根据 ID 去数据库/文件系统取数据

七、总结

阶段关键动作
申请资源创建 RabbitMQ 实例(云服务或自建)→ 获取连接信息
资源规划设计 Exchange、Queue、Binding 的命名和路由关系
配置接入application.yml 中配置连接信息和消费者参数
代码开发声明资源 → 生产者发送 → 消费者处理
可靠性保障Publisher Confirm + 消息持久化 + 手动ACK + 死信队列
运维监控Management UI 监控队列积压、消费速率、死信数量
http://www.gsyq.cn/news/1502521.html

相关文章:

  • 郑州国窖回收技术全解析:鉴别、估价与合规交易推荐 - 优质品牌商家
  • 掌握 Self-Attention(自注意力)机制——Transformer 与大模型的核心基础
  • 3分钟掌握:免费使用Cursor Pro功能的完整教程与终极指南
  • 别再只会写一种了!用Verilog的三种描述方式搞定三人表决器(附完整代码)
  • 2026年6月国产PCB厂家综合实力排行榜评测
  • 掌握多头自注意力机制(Multi-Head Self-Attention)——Transformer 强大表达能力的核心来源
  • 如何在非Windows系统上完美编辑Visio文件?drawio-desktop为您提供专业解决方案
  • 2026-6学习计划
  • 做工业控制和物联网网关的朋友最近经常问:屏幕刷新卡顿、AI算力不够、PCB面积又受限,这该怎么选型?
  • BiliTools智能解析:轻松获取B站视频资源的一站式解决方案
  • PostgreSQL 保姆级入门:为什么说它“养活”了国产数据库?
  • 告别Excel图表!用aardio+ScottPlot在Windows桌面快速绘制38种专业图表(附完整源码)
  • 2026年幕墙安装改造公司靠谱度排行:西安幕墙维修公司、贵阳幕墙安装公司、贵阳幕墙维修公司、重庆幕墙安装公司、重庆幕墙维修公司选择指南 - 优质品牌商家
  • 群论中的稳定群与完全群:构造与分类
  • 13ft Ladder:3分钟搭建个人专属付费墙绕过阅读助手
  • 2026年最新|Turnitin检测告急?英文文章降AI率从86%降至20%以下的实测指南 - 降AI实验室
  • 从Qt摄像头显示到RKNN推理:手把手解析RK3568上SSD模型的实时部署流程
  • 2026南昌黄金回收全攻略 多家靠谱门店详解及避坑指南 - 润富黄金回收
  • 数据的加密与解密(07:11)
  • C#工业视觉项目实战:Halcon 3D点云数据如何通过ActiViz在WinForm中流畅显示(附完整代码)
  • 告别万用表手动测算!给老旧STC89C51开发板加个新功能:自动电路特性测试
  • 手把手教你用FPGA驱动24位高精度ADC芯片ADS1256(附Verilog代码避坑指南)
  • 终极指南:高效扩展FossFLOW等距图表工具的完整方案
  • 2026年6月郑州黄金回收店推荐:五大机构专业评测报价透明特点适用场景 - 品牌推荐
  • 2026最佳Chrome代理插件推荐:4个插件工具测评(附详细评测)
  • 褐矮星系统动力学:潮汐演化与轨道特性研究
  • 2026 安徽安庆彩钢瓦翻新防水 TOP4 权威推荐(全区域服务 + 避坑指南) - 本地便民网
  • 2026年6月10日博客精选
  • FanControl深度解析:Windows风扇智能控制架构与实战配置
  • 黑洞吸积动力学与QPO频率的数值模拟研究