四、Work queues 模式

模型

这种模式跟 Simplest queue 的区别就是可以多个消费者同时消费,加快消息处理的速度。这一章只需要在前一章的基础上稍微修改即可。

一、生产者(Producer)

生产者跟 Simplest queue 的一样,这里修改一下消息的发布,循环 100 次发布100个消息到队列里:

//5,向队列发布消息
for (int i = 0; i < 100; i++)
{
    string msg = $"Task {i}";
    var body = Encoding.UTF8.GetBytes(msg);

    channel.BasicPublish(
       exchange: "",             //指定交换机类型,空字符串则使用默认的“direct”交换机
       routingKey: "Hello Queue",//指定routingKey
       basicProperties: null,
       body: body                //消息
       );
}

打开 127.0.0.1:15672 可以看到队列里消息的数量:

二、消费者(Consumer)

1,同时打开两个消费者

如果直接使用 Simplest queue 的消费者,同时打开两个可以吗?

但是消息处理的速度太快了,第二个消费者还没启动,第一个消费者就把消息处理完了。为了模拟实际消息处理是需要时间的,在消费者里增加一句睡眠,其余部分不变:

//5,创建通道的消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, e) =>
{
     Thread.Sleep(500); //增加延时模拟任务处理耗时
     var body = e.Body.ToArray();
     var message = Encoding.UTF8.GetString(body);
     Console.WriteLine("Received:{0}",message);
};

开始测试,同时启动 2 个消费者:

结果第二个消费者一条消息都没收到!而且当两个消费者都关闭的后,队列里的消息全部消失了!可是明明消费者 1 只打印了 20 条记录。

—————————————————–

首先队列里的消息 “丢失” 的问题:问题的关键在于 autoAck 参数的设定:

//5,创建通道的消费者
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, e) =>
   {
       Thread.Sleep(500);
       var body = e.Body.ToArray();
       var message = Encoding.UTF8.GetString(body);
       Console.WriteLine("Received:{0}",message);

       //手动回复 RabbitMQ 消息处理完了
       channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
    };

//6,把消费者放在通道上开始消费
channel.BasicConsume(
       queue: "Hello Queue",
       autoAck: false, //true -> fasle,表示收到手动 ack 才从队列移除消息
       consumer:consumer
       ) ;

首先把 autoAck 改为 false,表示RabbitMQ 收到消费者的回复之后才把消息从队列移除。然后在消费者里增加手动回复。再试一次就可以看到消费者消费多少,队列里就少多少。可见,当 autoAck 为 true 的时候,消费者一股脑把队列里的消息全部取了出来,然后慢慢消费,程序关闭后还没消费完的消息就在内存中释放了。

修改后,队列里就有消息供其他消费者消费了。再启动两个消费者,结果队列里的消息没有凭空丢失了,但是第二个启动的消费者竟然还是一条消息都拿不到。

问题的关键在于通道的设定:

//4,声明通道参数
channel.QueueDeclare(
      queue: "Hello Queue",  //队列的名称
      durable: false,        //队列是否在代理(broker)重启后继续存在
      exclusive: false,      //队列是否在声明者断开连接时被删除
      autoDelete: false,     //队列是否在最后一个订阅者取消订阅时被删除
      arguments: null
      );

//增加一条通道设定
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

如上所示,使用 BasicQos 方法,prefetchCount 为 1 表示该通道的消费者一次只能占用一个消息,收到回复后,RabbitMQ 才会分配新的消息。

—————————————————–

新的问题又出现了,如果两个消费者,一个有这个设定通道设定,一个没有,又会是什么样的现象呢?

假如消费者 1 没有该设定,消费者 2 有该设定:

1,先启动消费者 1 获取消息,再启动消费者 2 但是获取不到消息,关闭消费者 1,消费者 2 才获取到了消息。
2,先启动消费者 2 获取消息,再启动消费者 1,消费者 1 获取消息但是消费者 2 不再获取消息。
3,先启动消费者 2 获取消息,再启动两外一个消费者 2,两个消费者一起获取消息。

可见,不设置 BasicQos 的消费者就像江洋大盗,他来了,其他人都别想拿到消息,哪怕是其他江洋大盗也不行,就是这么硬气。

—————————————————–

最后一个问题,如果两个有 BasicQos 设置的消费者,一个处理消息速度快(延时1s),一个处理消息速度慢(延时2s),那么 RabbitMQ 会你一个他一个轮流分配消息,还是谁先处理完就再给谁分配呢?

实验结果如下,可见处理速度快的,就可以多拿到消息:

 

以上就是 Work queues 模式的内容。

发表评论

Powered by WordPress | Theme Revised from Doo

苏ICP备18047621号

Copyright © 2017-2024 追光者博客