模型
这种模式跟 Simplest queue 的区别就是可以多个消费者同时消费,加快消息处理的速度。这一章只需要在前一章的基础上稍微修改即可。
一、生产者(Producer)
生产者跟 Simplest queue 的一样,这里修改一下消息的发布,循环 100 次发布100个消息到队列里:
//5,向队列发布消息 for (int i = 0; i < 100; i++) { string msg = $"Task {i}"; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish( exchange: "", //指定交换机类型,空字符串则使用默认的“direct”交换机 routingKey: "Hello Queue",//指定routingKey basicProperties: null, body: body //消息 ); }
打开 127.0.0.1:15672 可以看到队列里消息的数量:
二、消费者(Consumer)
1,同时打开两个消费者
如果直接使用 Simplest queue 的消费者,同时打开两个可以吗?
但是消息处理的速度太快了,第二个消费者还没启动,第一个消费者就把消息处理完了。为了模拟实际消息处理是需要时间的,在消费者里增加一句睡眠,其余部分不变:
//5,创建通道的消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, e) => { Thread.Sleep(500); //增加延时模拟任务处理耗时 var body = e.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received:{0}",message); };
开始测试,同时启动 2 个消费者:
结果第二个消费者一条消息都没收到!而且当两个消费者都关闭的后,队列里的消息全部消失了!可是明明消费者 1 只打印了 20 条记录。
—————————————————–
首先队列里的消息 “丢失” 的问题:问题的关键在于 autoAck 参数的设定:
//5,创建通道的消费者 var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, e) => { Thread.Sleep(500); var body = e.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine("Received:{0}",message); //手动回复 RabbitMQ 消息处理完了 channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); }; //6,把消费者放在通道上开始消费 channel.BasicConsume( queue: "Hello Queue", autoAck: false, //true -> fasle,表示收到手动 ack 才从队列移除消息 consumer:consumer ) ;
首先把 autoAck 改为 false,表示RabbitMQ 收到消费者的回复之后才把消息从队列移除。然后在消费者里增加手动回复。再试一次就可以看到消费者消费多少,队列里就少多少。可见,当 autoAck 为 true 的时候,消费者一股脑把队列里的消息全部取了出来,然后慢慢消费,程序关闭后还没消费完的消息就在内存中释放了。
修改后,队列里就有消息供其他消费者消费了。再启动两个消费者,结果队列里的消息没有凭空丢失了,但是第二个启动的消费者竟然还是一条消息都拿不到。
问题的关键在于通道的设定:
//4,声明通道参数 channel.QueueDeclare( queue: "Hello Queue", //队列的名称 durable: false, //队列是否在代理(broker)重启后继续存在 exclusive: false, //队列是否在声明者断开连接时被删除 autoDelete: false, //队列是否在最后一个订阅者取消订阅时被删除 arguments: null ); //增加一条通道设定 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
如上所示,使用 BasicQos 方法,prefetchCount 为 1 表示该通道的消费者一次只能占用一个消息,收到回复后,RabbitMQ 才会分配新的消息。
—————————————————–
新的问题又出现了,如果两个消费者,一个有这个设定通道设定,一个没有,又会是什么样的现象呢?
假如消费者 1 没有该设定,消费者 2 有该设定:
1,先启动消费者 1 获取消息,再启动消费者 2 但是获取不到消息,关闭消费者 1,消费者 2 才获取到了消息。 2,先启动消费者 2 获取消息,再启动消费者 1,消费者 1 获取消息但是消费者 2 不再获取消息。 3,先启动消费者 2 获取消息,再启动两外一个消费者 2,两个消费者一起获取消息。
可见,不设置 BasicQos 的消费者就像江洋大盗,他来了,其他人都别想拿到消息,哪怕是其他江洋大盗也不行,就是这么硬气。
—————————————————–
最后一个问题,如果两个有 BasicQos 设置的消费者,一个处理消息速度快(延时1s),一个处理消息速度慢(延时2s),那么 RabbitMQ 会你一个他一个轮流分配消息,还是谁先处理完就再给谁分配呢?
实验结果如下,可见处理速度快的,就可以多拿到消息:
以上就是 Work queues 模式的内容。