您现在的位置是:网站首页> 编程资料编程资料
运用.net core中实例讲解RabbitMQ_实用技巧_
2023-05-24
444人已围观
简介 运用.net core中实例讲解RabbitMQ_实用技巧_
一、RabbitMQ简介
是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang(高并发语言)语言来编写的,并且RabbitMQ是基于AMQP协议的。
(1) AMQP协议
Advanced Message Queuing Protocol(高级消息队列协议)
(2)AMQP专业术语
(多路复用->在同一个线程中开启多个通道进行操作)
- Server:又称broker,接受客户端的链接,实现AMQP实体服务
- Connection:连接,应用程序与broker的网络连接
- Channel:网络信道,几乎所有的操作都在channel中进行,Channel是进行消息读写的通道。客户端可以建立多个channel,每个channel代表一个会话任务。
- Message:消息,服务器与应用程序之间传送的数据,由Properties和Body组成.Properties可以对消息进行修饰,必须消息的优先级、延迟等高级特性;Body则是消息体内容。
- virtualhost: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个virtual host里面可以有若干个Exchange和Queue,同一个Virtual Host 里面不能有相同名称的Exchange 或 Queue。
- Exchange:交换机,接收消息,根据路由键转单消息到绑定队列
- Binding: Exchange和Queue之间的虚拟链接,binding中可以包换routing key
- Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息。(如负载均衡)
(3)RabbitMQ整体架构

ClientA(生产者)发送消息到Exchange1(交换机),同时带上RouteKey(路由Key),Exchange1找到绑定交换机为它和绑定传入的RouteKey的队列,把消息转发到对应的队列,消费者Client1,Client2,Client3只需要指定对应的队列名即可以消费队列数据。
交换机和队列多对多关系,实际开发中一般是一个交换机对多个队列,防止设计复杂化。
二、安装RabbitMQ
安装方式不影响下面的使用,这里用Docker安装
#15672端口为web管理端的端口,5672为RabbitMQ服务的端口
docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management
输入:ip:5672访问验证。

建一个名为develop的Virtual host(虚拟主机)使用,项目中一般是一个项目建一个Virtual host用,能够隔离队列。

切换Virtual host

三、RabbitMQ六种队列模式在.NetCore中使用
(1)简单队列
最简单的工作队列,其中一个消息生产者,一个消息消费者,一个队列。也称为点对点模式

描述:一个生产者 P 发送消息到队列 Q,一个消费者 C 接收
建一个RabbitMQHelper.cs类
////// RabbitMQ帮助类 /// public class RabbitMQHelper { private static ConnectionFactory factory; private static object lockObj = new object(); ////// 获取单个RabbitMQ连接 /// ///public static IConnection GetConnection() { if (factory == null) { lock (lockObj) { if (factory == null) { factory = new ConnectionFactory { HostName = "172.16.2.84",//ip Port = 5672,//端口 UserName = "admin",//账号 Password = "123456",//密码 VirtualHost = "develop" //虚拟主机 }; } } } return factory.CreateConnection(); } }
生产者代码:
新建发送类Send.cs
public static void SimpleSendMsg() { string queueName = "simple_order";//队列名 //创建连接 using (var connection = RabbitMQHelper.GetConnection()) { //创建信道 using (var channel = connection.CreateModel()) {//创建队列 channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); for (var i = 0; i < 10; i++) { string message = $"Hello RabbitMQ MessageHello,{i + 1}"; var body = Encoding.UTF8.GetBytes(message);//发送消息 channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body); Console.WriteLine($"发送消息到队列:{queueName},内容:{message}"); } } } }创建队列参数解析:
durable:是否持久化。
exclusive:排他队列,只有创建它的连接(connection)能连,创建它的连接关闭,会自动删除队列。
autoDelete:被消费后,消费者数量都断开时自动删除队列。
arguments:创建队列的参数。
发送消息参数解析:
exchange:交换机,为什么能传空呢,因为RabbitMQ内置有一个默认的交换机,如果传空时,就会用默认交换机。
routingKey:路由名称,这里用队列名称做路由key。
mandatory:true告诉服务器至少将消息route到一个队列种,否则就将消息return给发送者;false:没有找到路由则消息丢弃。
执行效果:

队列产生10条消息。

消费者代码:
新建Recevie.cs类
public static void SimpleConsumer() { string queueName = "simple_order"; var connection = RabbitMQHelper.GetConnection(); { //创建信道 var channel = connection.CreateModel(); { //创建队列 channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); int i = 0; consumer.Received += (model, ea) => { //消费者业务处理 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"{i},队列{queueName}消费消息长度:{message.Length}"); i++; }; channel.BasicConsume(queueName, true, consumer); } } }消费者只需要知道队列名就可以消费了,不需要Exchange和routingKey。
注:消费者这里有一个创建队列,它本身不需要,是预防消费端程序先执行,没有队列会报错。
执行效果:


消息已经被消费完。
(2)工作队列模式
一个消息生产者,一个交换器,一个消息队列,多个消费者。同样也称为点对点模式

生产者P发送消息到队列,多个消费者C消费队列的数据。
工作队列也称为公平性队列模式,循环分发,RabbitMQ将按顺序将每条消息发送给下一个消费者,每个消费者将获得相同数量的消息。
生产者:
Send.cs代码:
////// 工作队列模式 /// public static void WorkerSendMsg() { string queueName = "worker_order";//队列名 //创建连接 using (var connection = RabbitMQHelper.GetConnection()) { //创建信道 using (var channel = connection.CreateModel()) { //创建队列 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); var properties = channel.CreateBasicProperties(); properties.Persistent = true; //消息持久化 for ( var i=0;i<10;i++) { string message = $"Hello RabbitMQ MessageHello,{i+1}"; var body = Encoding.UTF8.GetBytes(message); //发送消息到rabbitmq channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: properties, body); Console.WriteLine($"发送消息到队列:{queueName},内容:{message}"); } } } }
参数durable:true,需要持久化,实际项目中肯定需要持久化的,不然重启RabbitMQ数据就会丢失了。
执行效果:

写入10条数据,有持久化标识D

消费端:
Recevie代码:
public static void WorkerConsumer() { string queueName = "worker_order"; var connection = RabbitMQHelper.GetConnection(); { //创建信道 var channel = connection.CreateModel(); { //创建队列 channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); //prefetchCount:1来告知RabbitMQ,不要同时给一个消费者推送多于 N 个消息,也确保了消费速度和性能 channel.BasicQos(prefetchSize: 0, prefetchCount:1, global: false); int i = 1; int index = new Random().Next(10); consumer.Received += (model, ea) => { //处理业务 var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"{i},消费者:{index},队列{queueName}消费消息长度:{message.Length}"); channel.BasicAck(ea.DeliveryTag, false); //消息ack确认,告诉mq这条队列处理完,可以从mq删除了 Thread.Sleep(1000); i++; }; channel.BasicConsume(queueName,autoAck:false, consumer); } } }BasicQos参数解析:
prefetchSize:每条消息大小,一般设为0,表示不限制。
prefetchCount:1,作用限流,告诉RabbitMQ不要同时给一个消费者推送多于N个消息,消费者会把N条消息缓存到本地一条条消费,如果不设,RabbitMQ会进可能快的把消息推到客户端,导致客户端内存升高。设置合理可以不用频繁从RabbitMQ 获取能提升消费速度和性能,设的太多的话则会增大本地内存,需要根据机器性能合理设置,官方建议设为30。
global:是否为全局设置。
这些限流设置针对消费者autoAck:false时才有效,如果是自动Ack的,限流不生效。
执行两个消费者,效果:

可以看到消费者号的标识,8,2,8,2是平均的,一个消费者5个,RabbitMQ上也能看到有2个消费者,Unacked数是2,因为每个客户端的限流数是1。

工作队列模式也是很常用的队列模式。
相关内容
- .Net中异步任务的取消和监控的具体实现_实用技巧_
- 理解ASP.NET Core 中间件(Middleware)_实用技巧_
- .Net Core项目中NLog整合Exceptionless实例_实用技巧_
- .NET Core对象池的应用:扩展篇_实用技巧_
- .NET Core对象池的应用:设计篇_实用技巧_
- .NET Core对象池的应用:编程篇_实用技巧_
- NAT网络地址转换详情_ASP.NET_
- .Net Framework .Net .NET Standard的概念及区别_ASP.NET_
- C# 有关Assembly.Unload详解_实用技巧_
- ASP.NET session.timeout设置案例详解_实用技巧_
