模型
一、生产者(Producer)
下面是生产者的样例,使用.Net core 3.1(RabbitMQ 支持 .Net core):
using RabbitMQ.Client; using System; using System.Text; namespace Simplest_Queue { class Program { static void Main(string[] args) { //1,创建连接工厂 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //2,使用工厂创建新连接 var connection = factory.CreateConnection(); //3,使用连接创建通道 var channel = connection.CreateModel(); //4,声明通道参数 channel.QueueDeclare( queue:"Hello Queue", //队列的名称 durable:false, //队列是否在代理(broker)重启后继续存在 exclusive:false, //队列是否在声明者断开连接时被删除 autoDelete:false, //队列是否在最后一个订阅者取消订阅时被删除 arguments:null ); //5,向队列发布消息 string msg = "Hello RabbitMQ"; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish( exchange: "", //指定交换机类型,空字符串则使用默认的“direct”交换机 routingKey: "Hello Queue",//指定routingKey basicProperties: null, body: body //消息 ); Console.WriteLine("over"); } } }
总结几点:
1,声明通道参数的时候,如果同名的队列不存在则 RabbitMQ 会新建队列;如果同名的队列存在,则不创建新队列。
2,不能声明队列名相同但是其他参数不同的通道,比如已经有了一个通道, Quene 名为 “Hello Queue” ,exclusive 参数为 false,如果再声明一个同名的 “Hello Queue” 通道,exclusive 参数值也必须为 false,因为如果为 true,那么该 Queue 会在断开的时候被删除,这就影响到了其他生产者。
3,因为 body 里存放的是字节,所以理论上可以发布任何数据到 broker。
4,如果发布消息失败,要检查配置文件,是不是磁盘剩余的容量小于 RabbitMQ 配置的最低空间(默认是48MB)
当代码执行完 “使用工厂创建新连接” ,在 RabbitMQ 后台就可以看到相应的连接:
当代码执行完 “使用连接创建通道” ,在 RabbitMQ 后台就可以看到相应的通道:
当代码执行完 “声明通道参数” ,在 RabbitMQ 后台就可以看到相应的队列:
当代码执行完 “发布消息” ,在 RabbitMQ 后台,点开 “Exchange” 选项卡就可以看到哪个交换机建立了连接(默认的“direct”交换机):
点开 “Exchange” 选项卡,可以看到相应队列里的条目增加了一条数据:
并且可以看到具体的内容:
至此,生产者就完成了。
注意,当生产者断开连接后,声明的队列依然存在,队列里的消息也存在,因为 autodelete 设置的 false。但是 Connection 和 Channel 消失了。
疑问:
1,为什么只指定了 IP,没有指定 Port,一旦指定了 Port 为 15672 反而连不上?
2,queue 和 routingKey 有什么联系和区别,为什么发布消息的时候, routingKey 的值要和 queue 的值一样?
二、消费者(Consumer)
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; namespace Simplest_Queue { class Consumer { public Consumer() { //1,创建连接工厂 var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1"; //2,使用工厂创建新连接 var connection = factory.CreateConnection(); //3,使用连接创建通道 var channel = connection.CreateModel(); //4,声明通道参数 channel.QueueDeclare( queue: "Hello Queue", //队列的名称 durable: false, //队列是否在代理(broker)重启后继续存在 exclusive: false, //队列是否在声明者断开连接时被删除 autoDelete: false, //队列是否在最后一个订阅者取消订阅时被删除 arguments: null ); //5,创建通道的消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, e) => { var body = e.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received:{0}",message); }; //6,把消费者放在通道上开始消费 channel.BasicConsume( queue: "Hello Queue", autoAck: true, consumer:consumer ) ; Console.ReadKey(); } } }
消费者连接到消息队列的方式跟生产者是一摸一样的,不同的就是消费者先创建一个消费者,然后把消费者放在通道上开始消费。为了确保消费者在消费的时候,确保队列存在,所以消费者也 QueueDeclare 了队列。
消费者消费完队列里的消息后,可以在后台看到队列里的 Ready 计数就为零了。
疑问:
1,能不能创建多个消费者,多线程同时进行消费,看代码好像不可以,因为通道放消费者的时候只能放一个消费者,难道要创建多个通道连接吗?