当前位置: 首页 > news >正文

消息队列设计:从同步到异步的性能突破

前言

2024年初,我们的订单系统经常出现"超时"问题。用户下单后,系统需要同时调用库存服务、支付服务、通知服务,任何一个服务慢都会导致整个请求超时。

我们决定引入消息队列,将同步调用改为异步处理。这个改造带来了显著的性能提升。


一、问题:同步调用的瓶颈

原始的订单流程是这样的:

python

@app.route('/api/orders', methods=['POST']) def create_order(): # 1. 创建订单 order = Order.create(request.json) # 2. 同步调用库存服务 inventory_response = requests.post( 'http://inventory-service/deduct', json={'product_id': order.product_id, 'quantity': order.quantity} ) if inventory_response.status_code != 200: return {"error": "库存不足"}, 400 # 3. 同步调用支付服务 payment_response = requests.post( 'http://payment-service/pay', json={'order_id': order.id, 'amount': order.amount} ) if payment_response.status_code != 200: return {"error": "支付失败"}, 400 # 4. 同步调用通知服务 notify_response = requests.post( 'http://notify-service/send', json={'order_id': order.id, 'type': 'order_created'} ) return {"order_id": order.id}, 201

问题

  • 任何一个服务慢都会导致整个请求慢;
  • 任何一个服务故障都会导致订单创建失败;
  • 耦合度太高,难以扩展。

性能数据

  • 库存服务:200ms
  • 支付服务:300ms
  • 通知服务:150ms
  • 总耗时:200 + 300 + 150 =650ms

二、解决方案:引入RabbitMQ

我们选择RabbitMQ作为消息队列。改造后的流程:

2.1 发布订单创建事件

python

import pika import json def create_order(): # 1. 创建订单 order = Order.create(request.json) # 2. 发布事件到消息队列 connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() # 声明交换机和队列 channel.exchange_declare(exchange='orders', exchange_type='topic') # 发布消息 message = { 'order_id': order.id, 'product_id': order.product_id, 'quantity': order.quantity, 'amount': order.amount } channel.basic_publish( exchange='orders', routing_key='order.created', body=json.dumps(message) ) connection.close() # 立即返回响应 return {"order_id": order.id}, 201

耗时:仅需10ms(发布到队列)

2.2 消费者:库存服务

python

def inventory_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='inventory_queue', durable=True) queue_name = result.method.queue # 绑定队列到交换机 channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 扣减库存 deduct_inventory( message['product_id'], message['quantity'] ) # 确认消息 ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: # 拒绝消息,重新入队 ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('库存服务已启动,等待消息...') channel.start_consuming() if __name__ == '__main__': inventory_consumer()

2.3 消费者:支付服务

python

def payment_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='payment_queue', durable=True) queue_name = result.method.queue channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 处理支付 process_payment( message['order_id'], message['amount'] ) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('支付服务已启动,等待消息...') channel.start_consuming()

2.4 消费者:通知服务

python

def notify_consumer(): connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) channel = connection.channel() channel.exchange_declare(exchange='orders', exchange_type='topic') result = channel.queue_declare(queue='notify_queue', durable=True) queue_name = result.method.queue channel.queue_bind( exchange='orders', queue=queue_name, routing_key='order.created' ) def callback(ch, method, properties, body): message = json.loads(body) try: # 发送通知 send_notification( message['order_id'], 'order_created' ) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) channel.basic_consume( queue=queue_name, on_message_callback=callback ) print('通知服务已启动,等待消息...') channel.start_consuming()


三、可靠性保证

3.1 消息持久化

python

# 声明持久化队列 channel.queue_declare( queue='payment_queue', durable=True # 队列持久化 ) # 发布持久化消息 channel.basic_publish( exchange='orders', routing_key='order.created', body=json.dumps(message), properties=pika.BasicProperties( delivery_mode=2 # 消息持久化 ) )

3.2 消息确认机制

python

Copy code

# 手动确认消息 def callback(ch, method, properties, body): try: process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) # 确认 except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) # 拒绝并重新入队 # 禁用自动确认 channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=False # 手动确认 )

3.3 死信队列

python

# 声明死信交换机 channel.exchange_declare(exchange='dlx', exchange_type='direct') channel.queue_declare(queue='dead_letter_queue', durable=True) channel.queue_bind(exchange='dlx', queue='dead_letter_queue') # 声明普通队列,指定死信交换机 channel.queue_declare( queue='payment_queue', durable=True, arguments={ 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'dead_letter' } )


四、监控和告警

python

import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def callback(ch, method, properties, body): start_time = time.time() try: process_message(body) duration = time.time() - start_time logger.info(f"消息处理成功, 耗时: {duration}ms") ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"消息处理失败: {str(e)}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)


五、国际化团队的挑战

在跨国团队中,消息队列的错误日志和告警需要支持多语言。我们使用同言翻译(Transync AI)来自动翻译消息队列的错误信息和监控告警,确保不同语言背景的团队成员能够快速理解问题并做出响应。


六、性能对比

指标同步调用异步消息队列提升
平均响应时间650ms10ms-98.5%
P99响应时间2000ms50ms-97.5%
系统吞吐量1000 req/s10000 req/s+900%
故障隔离-

七、最佳实践

  1. 幂等性设计:消费者应该能够安全地处理重复消息;
  2. 超时设置:为消息处理设置合理的超时时间;
  3. 监控队列深度:及时发现消费者处理不过来的情况;
  4. 分离关注点:生产者和消费者应该解耦;
  5. 定期审查:定期检查死信队列,找出问题消息。

八、结语

消息队列的引入,从根本上改变了我们的系统架构。从同步的紧耦合,到异步的松耦合,系统的可扩展性和可靠性都得到了显著提升。

如果你的系统也在经历性能瓶颈,消息队列可能是一个很好的解决方案。

http://www.gsyq.cn/news/89465.html

相关文章:

  • DB-GPT:AI如何革新数据库管理与查询
  • 一个完全本地运行的视频转文字工具:Vid2X
  • 浅析Spring中的PropertySource 的基本使用
  • 3小时打造6v电影网MVP原型实战
  • 微服务面试题:概览
  • java Happens - before 原则到底是什么
  • 2025 年 12 月雅安市汽车租赁服务权威推荐榜:轿车、豪车、越野车、婚车、大巴车、商务车、房车、旅游车、跑车、皮卡车一站式尊享服务 - 品牌企业推荐师(官方)
  • C++--
  • 2025 年 12 月滚塑模具厂家权威推荐榜:滚塑钢模/铝模/铸铝模具/铝板模具/加工制品/产品/穿梭机/烘箱,匠心工艺与高效产能深度解析 - 品牌企业推荐师(官方)
  • 意图识别面试通关指南:从基础问答到场景落地
  • 从 Oracle 到金仓:一次真实迁移经历的复盘与思考
  • Memento播放器终极指南:用视频学习日语的完整解决方案
  • Nuklear即时模式GUI:颠覆传统UI设计的5大核心优势
  • 下一代盲盒系统核心架构解析:JAVA-S1如何打造极致公平与全球化体验
  • 公司上ERP,有什么好的建议吗?
  • 震惊!这家Linux开发板让工程师集体沉默,真相竟然是……
  • Git 开发常用命令速查手册
  • Python 3 解释器
  • Ⅰ、Ⅱ、Ⅲ型裂纹应力
  • 【深度收藏】模型蒸馏vs微调:技术详解+代码实战,两种技术的区别与组合使用指南
  • Vue 开发者必看:3 步搞定 dart-sass 替换 node-sass(告别编译慢 +
  • Buck Boost Buck-Boost
  • 震惊!Linux开发板稳定性排行,这家竟碾压群雄!
  • 从零入门CANN:揭秘华为昇腾AI计算的核心引擎
  • Go 指针详解:定义、初始化、nil 语义与用例(含 swap 示例与原理分析)
  • Java EE 应用与 Spring MVC简介
  • 不是护眼灯不好,而是眼调节训练灯更懂孩子近视的防控需求
  • Pandas DataFrame:数据处理的强大工具
  • jQuery 捕获详解
  • SOAP 语法