当前位置: 首页 > news >正文

【RocketMQ】阿里万亿级消息中间件MQ保姆级教程

>MQ主要解决应用解耦、异步消息、流量削峰、日志处理等问题

目录

一、RocketMQ概述

1.1 RocketMQ是什么

1.2 核心架构和概念

1.3 部署架构模式

二、快速上手

2.1 Docker部署

2.2 三中发送方式

2.3 顺序消息

2.4 事务消息

2.5 延迟消息

2.6 消息过滤

三、源码分析

3.1 Broker 启动流程

3.2 消息发送核心流程

3.3 消息消费核心流程


一、RocketMQ概述

1.1 RocketMQ是什么

Apache RocketMQ 是阿里巴巴开源的一款分布式、队列模型的消息中间件,后捐赠给 Apache 基金会并成为顶级项目(TLP)。

各mq之间的比较:

维度RocketMQKafkaRabbitMQ
定位消息中间件(业务消息)日志流式处理传统消息中间件
协议自定义协议自定义协议AMQP
顺序消息✅ 天然支持⚠️ 仅分区有序⚠️ 较弱
事务消息✅ 完整方案❌ 不支持⚠️ 弱支持
延迟消息✅ 内置 18 个级别❌ 需外部方案⚠️ 插件
消息堆积✅ 亿级✅ 强❌ 弱
吞吐量百万级百万级万级
运维复杂度

选型建议:

  • 业务消息(订单、交易)→RocketMQ(推荐)

  • 大数据日志流 → Kafka

  • 传统企业集成、复杂路由 → RabbitMQ

1.2 核心架构和概念

┌──────────────────────────────────────────────┐ │ RocketMQ 集群 │ │ │ │ ┌────────────┐ ┌──────────────────┐ │ │ │ NameServer │ ←──→ │ NameServer(集群)│ │ │ └────────────┘ └──────────────────┘ │ │ ↑ │ │ │ 注册/发现 │ │ ↓ │ │ ┌────────────┐ 主从同步 ┌────────────┐ │ │ │ Broker-A │ ←───────→ │ Broker-A-S │ │ │ └────────────┘ └────────────┘ │ │ ↑ │ │ │ 发送/消费 │ │ ↓ │ │ ┌────────────┐ ┌────────────┐ │ │ │ Producer │ │ Consumer │ │ │ └────────────┘ └────────────┘ │ └──────────────────────────────────────────────┘
角色说明
NameServer路由注册中心,集群中各角色通过它获取路由信息(无状态)
Broker消息存储与中转,负责消息的存储、投递、查询、高可用
Producer消息生产者,向 Broker 发送消息
Consumer消息消费者,从 Broker 拉取消息进行消费
Topic消息主题,一类消息的集合
Message Queue消息队列,Topic 被切分为多个 Queue 以提高并行度
Tag子主题,用于消息过滤
Group一类 Producer 或 Consumer 的集合
Offset消费位点,记录消费进度

1.3 部署架构模式

模式说明
单 Master不推荐,仅用于本地调试
多 Master集群无 Slave, Broker 集群
多 Master 多 Slave(异步)Master 写成功即返回,Slave 异步复制
多 Master 多 Slave(同步)双写成功才返回,强一致
Dledger 集群基于 Raft 协议的自动选主(4.5+)

二、快速上手

2.1 Docker部署

docker-compose.yml文件如下,注意IP需要修改为自己的公网IP:

version: '3.8' services: # ==================== 1. NameServer ==================== rmqnamesrv: image: apache/rocketmq:4.9.4 container_name: rmqnamesrv ports: - "9876:9876" environment: - JAVA_OPT_EXT=-server -Xms256m -Xmx256m -Xmn128m command: sh mqnamesrv networks: - rocketmq_net # ==================== 2. Broker Master ==================== rmqbroker-master: image: apache/rocketmq:4.9.4 container_name: rmqbroker-master ports: - "10911:10911" # 业务端口 - "10909:10909" # VIP 端口 (10911-2) - "10912:10912" # HA 端口 environment: - NAMESRV_ADDR=rmqnamesrv:9876 - JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m command: > sh -c " echo 'brokerClusterName=DefaultCluster' > /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerName=broker-a' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerId=0' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'deleteWhen=04' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'fileReservedTime=48' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerRole=ASYNC_MASTER' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'flushDiskType=ASYNC_FLUSH' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerIP1=175.24.167.232' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerIP2=rmqbroker-master' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'autoCreateTopicEnable=true' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'listenPort=10911' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'haListenPort=10912' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && sh mqbroker -n rmqnamesrv:9876 -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf & tail -f /home/rocketmq/logs/rocketmqlogs/broker.log " depends_on: - rmqnamesrv networks: - rocketmq_net # ==================== 3. Broker Slave ==================== rmqbroker-slave: image: apache/rocketmq:4.9.4 container_name: rmqbroker-slave ports: - "10921:10921" # 业务端口(Slave 独立) - "10919:10919" # VIP 端口(10921-2) - "10922:10922" # HA 端口 environment: - NAMESRV_ADDR=rmqnamesrv:9876 - JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Xmn256m command: > sh -c " echo 'brokerClusterName=DefaultCluster' > /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerName=broker-a' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerId=1' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'deleteWhen=04' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'fileReservedTime=48' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerRole=SLAVE' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'flushDiskType=ASYNC_FLUSH' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerIP1=175.24.167.232' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'brokerIP2=175.24.167.232' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'autoCreateTopicEnable=true' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'listenPort=10921' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'haListenPort=10922' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && echo 'haMasterAddress=175.24.167.232:10912' >> /home/rocketmq/rocketmq-4.9.4/conf/broker.conf && sh mqbroker -n rmqnamesrv:9876 -c /home/rocketmq/rocketmq-4.9.4/conf/broker.conf & tail -f /home/rocketmq/logs/rocketmqlogs/broker.log " depends_on: - rmqnamesrv - rmqbroker-master networks: - rocketmq_net # ==================== 4. RocketMQ Dashboard ==================== rmqdashboard: image: apacherocketmq/rocketmq-dashboard:1.0.0 container_name: rmqdashboard ports: - "8180:8080" environment: - JAVA_OPTS=-Drocketmq.namesrv.addr=rmqnamesrv:9876 depends_on: - rmqnamesrv - rmqbroker-master - rmqbroker-slave networks: - rocketmq_net networks: rocketmq_net: driver: bridge

浏览器访问:ip:8180,出现下面的页面说明mq集群部署成功

2.2 三中发送方式

添加Maven依赖

<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>

不同的发送方式

方式特点适用场景
同步发送等待 Broker 应答,可靠性高重要通知、订单消息
异步发送回调通知,性能高对响应时间敏感
单向发送不关心结果,性能最高日志收集

发送结果状态:

返回状态含义
SEND_OK发送成功
FLUSH_DISK_TIMEOUT刷盘超时(异步刷盘模式可能仍成功)
FLUSH_SLAVE_TIMEOUT主从同步超时(异步复制可能仍成功)
SLAVE_NOT_AVAILABLE从节点不可用

2.3 顺序消息

类型如下:

类型描述性能
全局顺序整个 Topic 只有一个 Queue
分区顺序同一业务 key 投递到同一 Queue

2.4 事务消息

流程图:

生产者代码:

消费者代码:

回查代码:

关键设计点

  • Half 消息: 对消费者不可见

  • Op 消息: Commit/Rollback 操作消息

  • 反查机制: 解决本地事务执行时间过长的问题

2.5 延迟消息

内置18个延迟级别:

"1s", "5s", "10s", "30s","1m", "2m", "3m", "4m", "5m", "6m", "7m", "8m", "9m", "10m", "20m", "30m", "1h", "2h"

实现原理:

  1. 消息写入 CommitLog

  2. 如果是延迟消息,替换 Topic 为SCHEDULE_TOPIC_XX

  3. 调度线程按级别将消息从 SCHEDULE_TOPIC 投递回原 Topic

  4. 消费者消费

2.6 消息过滤

三、源码分析

3.1 Broker 启动流程

BrokerStartup.createBrokerController(args)
→ BrokerController.initialize()
→ 加载配置
→ 创建各种管理器(消息存储、消费者偏移、订阅等)
→ BrokerController.start()
→ 启动 Netty Server
→ 注册到 NameServer
→ 启动各种定时任务

关键线程:

线程作用
SendMessageProcessor处理消息发送请求
PullMessageProcessor处理消息拉取请求
FlushRealTimeService异步刷盘
ReputMessageService构建 ConsumeQueue/Index
HAConnection主从同步

3.2 消息发送核心流程

DefaultMQProducerImpl.send()
→ MQFaultStrategy.selectOneMessageQueue() // 选队列
→ SendMessageProcessor.processRequest() // Broker 端
→ MessageStore.putMessage() // 存储
→ CommitLog.asyncPutMessage() // 顺序写
→ Broker 返回 SendResult

3.3消息消费核心流程

PullMessageHoldService
→ PullRequestHoldService 拉取请求
→ PullMessageProcessor 处理
→ PullResult 推送给消费者
→ ConsumeMessageConcurrentlyService 消费
→ 更新 Offset


http://www.gsyq.cn/news/1457437.html

相关文章:

  • 从STEP到STL:搞3D打印和模型分享,你真的懂这些CAD格式的‘潜规则’吗?
  • 别再手动找App了!保姆级教程:利用SAP官方Fiori Apps Library精准定位并配置‘管理银行’磁贴
  • 别再只调参数了!Simulink模块的‘隐藏属性’:回调、注释与优先级实战指南
  • 2026年工业CRM选型:14大品牌横评
  • 基于STM32F10x与AD9910的400MHz DDS波形源码包,含扫频控制和RAM模式方波生成
  • 从Java字节码到破解实战:手把手教你用FrontEnd Plus和十六进制编辑器绕过软件试用限制
  • 别再只用ArcGIS了!免费神器GeoDa 1.16版空间自相关分析保姆级教程
  • 告别混乱!Unity与Android Studio协作时,高效管理build.gradle配置的完整指南
  • 虚拟主播人设崩塌率高达41.7%,如何用LLM+多模态AI重构可信度?——企业级合规部署 checklist 公开
  • 从零到实战:用GeoDa的Python包玩转空间数据分析(附最新安装与案例代码)
  • 计算机毕业设计之基于ECharts的国内热门景点数据可视化平台设计与实现
  • Facenet模型轻量化实战:用MobileNetV1替换Inception-ResNet,在CPU上也能跑得飞快
  • 矢量玻色子在库仑场中的量子行为与真空稳定性研究
  • 【AI决策引擎落地实战指南】:20年架构师亲授5大行业智能决策整合避坑清单
  • 太阳能户外路灯选购指南,方迪照明口碑好 - myqiye
  • 2026年当下湖南卡式龙骨配件制造厂全景扫描与选型指南 - 2026年企业资讯
  • 2026年更新:如何挑选靠谱的市政环卫服务平台 - 2026年企业资讯
  • 全球国家、省份、城市三级地理编码数据(标准JSON结构)
  • 告别网络依赖!手把手教你将30M的腾讯TBS X5内核直接打包进Android APK
  • 2026年IQUNIX EV63磁轴键盘推荐:千元磁轴的性能王者,银武士实测
  • 别再死记硬背CMOS与非门了!用这个四输入实例,带你搞懂VTC曲线漂移和体效应
  • 第 35 篇 k8s之PVC 与 StorageClass:动态存储供应
  • 售后完善的幼儿园公司排名 - mypinpai
  • 点云去噪优化:统计滤波+体素滤波+半径滤波优化去噪
  • DeepONet非线性算子学习实战指南:从理论到应用的完整解决方案
  • 2026年地图制作靠谱品牌推荐,哪家更权威? - mypinpai
  • 面试潜规则⑥:面试官桌下那张“评估表”,到底在打什么分?
  • YOLOv3推理时,置信度、类别概率和NMS到底怎么‘打架’?一个Debug案例讲清楚
  • 第 36 篇 k8s之资源管理:Requests、Limits 与 QoS
  • LangChain 实战指南:从调用模型到构建 AI 应用