当前位置: 首页 > news >正文

NET 8 封装自己的 rabbtMQ

项目地址

https://github.com/sansantang/Jonckers.RabbitMQ.HttpApi.Order
1 支持自定义 QoS (默认 PrefetchSize = 0, PrefetchCount = 1, Global = false)
2 支持死信队列

怎么使用

1. 服务注册

appsettings.json

{"RabbitMQConnection": {"HostName": "192.168.49.151", // RabbitMQ 主机"Port": 5672, // 端口(默认5672)"UserName": "admin", // 用户名(默认guest)"Password": "admin123", // 密码(默认guest)"VirtualHost": "/", // 虚拟主机(默认/)"ExchangeName": "DefaultExchange", // 默认交换机"RetryCount": 3, // 重试次数"ConnectionTimeout": 10 // 连接超时时间(秒)}
}

在Program.cs中有以下代码:

builder.Services.AddMyRabbitMQ(builder.Configuration);
//注册消费者
//builder.Services.AddMyRabbitMQEventHandlers(typeof(PerryTest).Assembly.GetTypes());
//...
app.UseMyEventHandler();

2. 生产者

image
image

using Jonckers.RabbitMQ.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace Jonckers.RabbitMQ.Service.ConsumerMessageModel
{[RabbitMQEvent(queue: "jonckers.enterpriseordering.requestevent")]public class PerryTest{public Guid Id { get; set; }public string? Name { get; set; }public int Count { get; set; }public string? Remark { get; set; }}
}

WeatherForecastController中:

public IMyPublisher<PerryTest> TestPublisher { get; }public WeatherForecastController(ILogger<WeatherForecastController> logger, IMyPublisher<PerryTest> testPublisher)
{_logger = logger;TestPublisher = testPublisher;
}

发送消息

当TestAsync方法被调用时:

[HttpGet("test")]
public async Task<string> TestAsync()
{var data = new PerryTest(){Id = Guid.NewGuid(),Name = "AAA",Count = 123,Remark = "测试一下"};await TestPublisher.PublishAsync(data);return "发送了一条消息";
}

此时使用的TestPublisher就是通过上述过程创建的MyPublisher<PerryTest>实例,该实例是通过第2个构造函数初始化的。

3. 消费者

注册消费者

builder.Services.AddMyRabbitMQEventHandlers(typeof(PerryTest).Assembly.GetTypes());

创建消费者

using Jonckers.RabbitMQ.Core.Service;
using Jonckers.RabbitMQ.Service.ConsumerMessageModel;namespace Jonckers.RabbitMQ.HttpApi.Order.Consumer
{public class PerryTestEventHandler : MyEventHandler<PerryTest>{public override Task OnReceivedAsync(PerryTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}}
}

配置 Qos 参数

参数名 类型 含义 推荐值 说明
prefetchSize ushort 每次预取消息的总大小限制(字节) 0 (不限制) 一般用不到,设为 0 表示不限制消息大小
prefetchCount ushort 每次预取的消息数量上限(未确认的消息数) 1 ~ N(根据业务调整) 最关键参数!控制未 Ack 消息数
global bool 是否应用到该连接上的所有消费者 false (推荐) 如果为 true ,则对所有消费者生效;一般设为 false ,针对每个消费者单独设置
public class PerryTestEventHandler : MyEventHandler<PerryTest>
{public PerryTestEventHandler(){// 配置 QoS 参数Options.PrefetchSize = 0;Options.PrefetchCount = 2;    // 每次处理2条消息Options.Global = false;}public override Task OnReceivedAsync(PerryTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}
}

image

死信队列

过期或拒绝到死信队列

1 注册

启用 isWithDeadLetter = ture, 才能设置过期时间 expirationMilliseconds

using Jonckers.RabbitMQ.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace Jonckers.RabbitMQ.Service.ConsumerMessageModel
{[RabbitMQEvent(queue: "jonckers.enterpriseordering.deadletter", routingkey: "jonckers.enterpriseordering.deadletter", isWithDeadLetter: true, expirationMilliseconds: 60000)]public class DeadLetterTest{public Guid Id { get; set; }public string? Name { get; set; }public int Count { get; set; }public string? Remark { get; set; }}
}

2 生产者

image

发送消息 PublishWithDeadLetterAsync

[ApiController]
[Route("[controller]")]
public class WeatherForecastController : ControllerBase
{public IMyPublisher<DeadLetterTest> TestPublisher { get; }private readonly ILogger<WeatherForecastController> _logger;public WeatherForecastController(ILogger<WeatherForecastController> logger, IMyPublisher<DeadLetterTest> testPublisher){_logger = logger;TestPublisher = testPublisher;}[HttpGet("test")]public async Task<string> TestAsync(){var data = new DeadLetterTest(){Id = Guid.NewGuid(),Name = "AAA",Count = 123,Remark = "哈哈哈"};await TestPublisher.PublishAsync(data);return "发送了一个消息";}
}

3 消费者:

1 注册消费者

// 注册到services
builder.Services.AddMyRabbitMQEventHandlers(typeof(DeadLetterTestEventHandler).Assembly);

2 监听正常的消费者

using Jonckers.RabbitMQ.Core.Service;
using Jonckers.RabbitMQ.Service.ConsumerMessageModel;namespace Jonckers.RabbitMQ.HttpApi.Order.Consumer
{public class DeadLetterTestEventHandler : MyEventHandler<DeadLetterTest>{public override Task OnReceivedAsync(DeadLetterTest data, string message){Console.WriteLine(message);return Task.CompletedTask;}public override void OnConsumerException(Exception ex){Console.WriteLine(ex.Message);}}
}

3 监听死信
需要自己再监听一个死信队列,注意命名

var deadLetterExchangeName = _exchangeName + ".dlx-exchange";
var deadLetterQueueName = _queueName + ".dlx-queue";
var deadLetterRoutingKey = _routingKeyName + ".dlrk-routingKey";

参考

https://gitee.com/wosperry/wosperry-rabbit-mqtest

http://www.gsyq.cn/news/58170.html

相关文章:

  • Tefrorform-自动化创建IAM
  • elasticSearch之API:索引运行
  • 20232406 2025-2026-1 《网络与系统攻防技术》 实验六实验报告
  • 基于Prometheus-实现AWS EC2的实例异常自动重启
  • 2025 年 11 月伺服压力机厂家权威推荐榜:苏州小型电动精密四柱 C 型电缸节能智能高精度电子伺服油压机液压热压装机专业解析
  • PostgreSQL数据库技术革新与AI功能解析
  • 递推关系123不满足归路 也要赋值给3
  • 数据科学与大数据技术作业三_102302107_林诗樾
  • ios如何连接mysql数据库
  • HarmonyOS Canvas开发指南 - 指南
  • 【ESP32】VSCode PlatformIO第一次初始化项目卡死
  • 2025水暖毯水泵品牌TOP5推荐,宠物饮水机水泵、加湿器水泵、冷风扇水泵等微型水泵厂商品质性价比选择指南
  • ios17可自动清除短信验证码吗安全吗
  • 最轻量的图片处理工具:一个可以很方便地添加文字和裁剪图片的.html
  • iOS 虚拟现实开发如何改进设备适配
  • iOS 虚拟现实开发如何提升可维护性
  • 李宏毅机器学习笔记29 - 指南
  • 详细介绍:常见Web安全漏洞全解析
  • 密码系统设计实验3-1
  • 2025年11月全国求职机构选择指南:主流机构综合对比与避坑建议
  • 2025年靠谱的物流高性价比推荐榜
  • 完整教程:Spring Boot 2.6+ 整合 PageHelper 启动报错:循环依赖解决方案全解析
  • 设计模式-组合模式(Composite) - 教程
  • 2025年11月全国求职机构推荐榜单:五大知名机构综合对比与选择指南
  • 2025年周边西铁城机床代理商优选服务排行榜
  • 2025年11月留学生求职机构排行榜:五大知名机构深度评测报告
  • 2025年热门的麻辣烫食品添加剂厂家最新推荐榜
  • 2025年可靠的logo设计公司最新热门推荐榜推荐
  • STM32按键扫描
  • 2025年11月留学生回国求职机构市场报告:高性价比解决方案深度剖析