当前位置: 首页 > news >正文

SpringBoot 广播消息实现(发布/订阅)

在 RabbitMQ 的五大工作模式中,发布/订阅(Publish/Subscribe)广播模式是分布式系统中非常核心的通信方式。

我们日常使用的普通点对点队列,一条消息只会被一个消费者消费(竞争消费);而广播模式可以实现一条消息、多服务、多消费者同时接收,完美实现一对多通知。

缓存刷新、配置更新、全局通知、多节点日志同步、服务状态广播等场景,全部依赖 Fanout 广播模式。

一、什么是 MQ 广播(发布/订阅)模式?

1. 核心定义

广播模式基于FanoutExchange(扇形交换机)实现,核心逻辑:

生产者发送一条消息到 Fanout 交换机,所有绑定该交换机的队列,都会完整收到这条消息。

不管路由键是什么、不管队列名称,只要完成绑定,就会无条件广播投递。

2. 核心特性

  • • 无视routingKey,路由键传空、传任意值都不生效

  • • 纯广播、全量投递、一对多分发

  • • 每条消息独立进入每一个绑定队列

  • • 天然支持多服务、多节点同步通知

  • • 无匹配规则,绑定即接收

3. 适用业务场景

  • • 分布式缓存全局刷新(多节点统一清空缓存)

  • • 系统配置动态推送、热更新

  • • 全站公告、全局消息推送

  • • 微服务多节点日志采集、链路追踪

  • • 服务上下线、状态同步广播

  • • 多端消息同步(PC/APP/小程序)

二、四大交换机模式核心对比

交换机类型

匹配规则

消费模式

核心场景

Direct(直连)

完全匹配 routingKey

点对点竞争消费

订单、支付、任务处理

Topic(主题)

通配符模糊匹配

选择性多消费

日志分级、消息订阅

Fanout(广播)无视路由键,全部投递

全员订阅消费

缓存刷新、全局通知

Headers

匹配消息头参数

自定义匹配

极少使用

三、关键认知误区

1:同一个队列多消费者可以实现广播

绝对错误!

同一个队列下的多个消费者,默认是竞争消费,一条消息只会被一个消费者消费。

广播必备条件:每个消费者对应一个独立队列,全部绑定同一个 Fanout 交换机。

2:Fanout 交换机需要配置路由键

Fanout 交换机底层逻辑直接忽略 routingKey,无论发送时传什么值,都不会影响广播效果。

3:广播消息天然可靠、不会丢失

默认非持久化、自动ACK 场景下,广播消息极易丢失,生产必须做持久化+手动ACK

四、SpringBoot 完整实现

1. 基础依赖

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

2. 生产级配置文件

spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: simple: # 手动ACK 保证广播消息不丢 acknowledge-mode: manual # 限制预取数,防止单节点消息堆积 prefetch: 5 # 开启消费重试 retry: enabled: true max-attempts: 3 initial-interval: 1000

3. 广播交换机、队列、绑定配置类

import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutBroadcastConfig { // 广播交换机名称 public static final String FANOUT_EXCHANGE = "system.fanout.broadcast.exchange"; // 三个独立消费者队列 public static final String QUEUE_CACHE_REFRESH = "queue.cache.refresh"; public static final String QUEUE_NOTICE = "queue.system.notice"; public static final String QUEUE_LOG = "queue.log.collect"; // 声明 Fanout 广播交换机:持久化、不自动删除 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE, true, false); } // 队列1:缓存刷新队列 @Bean public Queue cacheRefreshQueue() { return new Queue(QUEUE_CACHE_REFRESH, true); } // 队列2:系统通知队列 @Bean public Queue noticeQueue() { return new Queue(QUEUE_NOTICE, true); } // 队列3:日志采集队列 @Bean public Queue logQueue() { return new Queue(QUEUE_LOG, true); } // 全部绑定到广播交换机 @Bean public Binding bindingCacheRefresh(Queue cacheRefreshQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(cacheRefreshQueue).to(fanoutExchange); } @Bean public Binding bindingNotice(Queue noticeQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(noticeQueue).to(fanoutExchange); } @Bean public Binding bindingLog(Queue logQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(logQueue).to(fanoutExchange); } }

4. 广播消息生产者

import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class BroadcastProducer { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send/broadcast") public String sendBroadcastMsg(@RequestParam String content) { // Fanout广播:路由键传空字符串 rabbitTemplate.convertAndSend( FanoutBroadcastConfig.FANOUT_EXCHANGE, "", content ); return "✅ 广播消息发送成功:" + content; } }

5. 多消费者实现(全员接收)

消费者1:缓存刷新消费者

import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class CacheRefreshConsumer { @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_CACHE_REFRESH) public void consume(String msg, Message message, Channel channel) throws IOException { try { System.out.println("【缓存服务】接收广播消息:" + msg); // 执行缓存刷新业务逻辑 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 消费失败,重回队列重试 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }

消费者2:系统通知消费者

@Component public class SystemNoticeConsumer { @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_NOTICE) public void consume(String msg, Message message, Channel channel) throws IOException { try { System.out.println("【通知服务】接收广播消息:" + msg); // 执行消息推送业务 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }

消费者3:日志采集消费者

@Component public class LogCollectConsumer { @RabbitListener(queues = FanoutBroadcastConfig.QUEUE_LOG) public void consume(String msg, Message message, Channel channel) throws IOException { try { System.out.println("【日志服务】接收广播消息:" + msg); // 执行日志采集业务 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }

五、测试效果

访问接口:

http://localhost:8080/send/broadcast?content=全局缓存刷新通知

控制台输出:

【缓存服务】接收广播消息:全局缓存刷新通知 【通知服务】接收广播消息:全局缓存刷新通知 【日志服务】接收广播消息:全局缓存刷新通知

✅ 一条消息,多服务同时消费,广播生效!

六、总结

1. 必须开启持久化

交换机、队列全部设置持久化,防止重启丢失广播配置。

2. 强制手动ACK

广播场景多为重要通知、缓存同步,自动ACK会导致业务未执行完成消息丢失。

3. 每个服务独立队列

不同微服务必须使用独立队列,避免竞争消费,保证广播全覆盖。

4. 广播消息建议做幂等

MQ 重试、网络抖动会导致广播消息重复推送,核心业务必须基于消息ID做幂等防重。

5. 禁止设置复杂路由键

Fanout 无视路由键,统一传空字符串,保持代码规范。


写在最后

广播发布订阅模式是微服务分布式通信的重要基石,区别于传统的点对点任务消费,它主打全局通知、多节点同步、状态广播,是缓存刷新、配置热更新、系统公告等场景的最优解。

很多开发者一直混淆“竞争消费”和“广播消费”的本质,导致线上通知不全、同步失效等隐性问题。吃透 Fanout 交换机的底层原理与落地规范,能帮你彻底解决分布式多节点同步难题。

持续更新 SpringBoot、微服务、MQ 中间件、架构实战、面试刷题干货,帮你夯实技术底盘,轻松搞定工作与面试。

觉得文章有用,点赞、收藏、转发一波,持续关注,解锁更多生产级技术干货!

http://www.gsyq.cn/news/1401373.html

相关文章:

  • STM32HAL 集成 EasyFlash:打造轻量级嵌入式键值存储数据库(裸机开发)
  • AI驱动开发实战:2小时零代码部署云端应用
  • Coze智能体开发:平台架构
  • iOS滑动菜单开发实战:基于SwipeMenuViewController构建响应式界面
  • 极域电子教室防控制工具:如何快速解除限制,实现自由学习
  • 【深度解析】Flutter 环境搭建中 Dart SDK 下载失败:从 BITS 到 WebRequest 的故障排查与镜像配置实战
  • 终极跨平台资源下载器:5分钟掌握res-downloader的完整使用指南
  • 如何快速掌握开源字体:思源宋体7步实现专业中文排版
  • MTK Camera调试实战:精准控制Log开关与Buffer Dump策略
  • 我们改变不了房价, 改变不了这个社会的运行规则。但 可以改变自己
  • 绝区零一条龙:终极自动化游戏助手完全指南
  • WizardLM-13B-Uncensored微调教程:如何定制专属AI助手
  • 小米第一季营收991亿:净利47亿 再启动200亿股份回购计划
  • 英飞凌TC3XX芯片调试实战:如何通过CSA链表快速定位函数调用栈溢出问题
  • 从静态测试到动态评估:构建面向工程实践的代码生成大模型评估框架
  • Proteus和Keil联调STM32温控系统,我踩过的那些坑(附完整代码和接线图)
  • 告别eNSP路由器启动报错40:深入VirtualBox虚拟网卡#2的注册表修复指南
  • 别再只懂FAT32了!手把手带你用WinHex解析FAT16/FAT32目录项,从根目录到长文件名的秘密
  • 如何快速上手戴森球计划FactoryBluePrints:新手终极避坑指南
  • 如何高效管理HEIC文件:Windows用户的终极解决方案
  • 从零解析COMTRADE:电力系统故障录波数据的标准格式
  • 2026集安市本地黄金+铂金+白银+K金回收渠道实地走访,五家实力门店综合体验测评 - 亦辰小黄鸭
  • 手把手教你搞定ACM会议LaTeX模板:从下载到成功编译(附双盲审稿配置)
  • 秦皇岛回收店盘点 闲置黄金奢侈品变现避坑实用指南 - 百航
  • 源代码论文分享|Spring Boot 社区物业管理系统!
  • Unity 2020.2保姆级教程:用Obi Fluid插件5分钟搞定一个会‘粘墙’的流体特效
  • bert-base-german-dbmdz-uncased实战教程:用Python轻松实现德语文本掩码填充
  • Python GUI开发终极指南:如何用PyQt-Fluent-Widgets打造现代化界面
  • 从数据到部署:jeffding/indonesian-roberta-large-openmind训练全流程详解
  • 抖音批量下载终极指南:5分钟掌握无水印视频采集技巧