三、Simplest queue 模式

模型

一、生产者(Producer)

下面是生产者的样例,使用.Net core 3.1(RabbitMQ 支持 .Net core):

using RabbitMQ.Client;
using System;
using System.Text;

namespace Simplest_Queue
{
    class Program
    {
        static void Main(string[] args)
        {
            //1,创建连接工厂
            var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";

            //2,使用工厂创建新连接
            var connection = factory.CreateConnection();

            //3,使用连接创建通道
            var channel = connection.CreateModel();

            //4,声明通道参数
            channel.QueueDeclare(
                queue:"Hello Queue",  //队列的名称
                durable:false,        //队列是否在代理(broker)重启后继续存在
                exclusive:false,      //队列是否在声明者断开连接时被删除
                autoDelete:false,     //队列是否在最后一个订阅者取消订阅时被删除
                arguments:null
                );

            //5,向队列发布消息
            string msg = "Hello RabbitMQ";
            var body = Encoding.UTF8.GetBytes(msg);

            channel.BasicPublish(
                exchange: "",             //指定交换机类型,空字符串则使用默认的“direct”交换机
                routingKey: "Hello Queue",//指定routingKey
                basicProperties: null,
                body: body                //消息
                );

            Console.WriteLine("over");
        }
    }
}

总结几点:

1,声明通道参数的时候,如果同名的队列不存在则 RabbitMQ 会新建队列;如果同名的队列存在,则不创建新队列。

2,不能声明队列名相同但是其他参数不同的通道,比如已经有了一个通道, Quene 名为 “Hello Queue” ,exclusive 参数为 false,如果再声明一个同名的 “Hello Queue” 通道,exclusive 参数值也必须为 false,因为如果为 true,那么该 Queue 会在断开的时候被删除,这就影响到了其他生产者。

3,因为 body 里存放的是字节,所以理论上可以发布任何数据到 broker。

4,如果发布消息失败,要检查配置文件,是不是磁盘剩余的容量小于 RabbitMQ 配置的最低空间(默认是48MB)

 

当代码执行完 “使用工厂创建新连接” ,在 RabbitMQ 后台就可以看到相应的连接:

当代码执行完 “使用连接创建通道” ,在 RabbitMQ 后台就可以看到相应的通道:

当代码执行完 “声明通道参数” ,在 RabbitMQ 后台就可以看到相应的队列:

当代码执行完 “发布消息” ,在 RabbitMQ 后台,点开 “Exchange” 选项卡就可以看到哪个交换机建立了连接(默认的“direct”交换机):

点开 “Exchange” 选项卡,可以看到相应队列里的条目增加了一条数据:

并且可以看到具体的内容:

至此,生产者就完成了。

注意,当生产者断开连接后,声明的队列依然存在,队列里的消息也存在,因为 autodelete 设置的 false。但是 Connection 和 Channel 消失了。

疑问:

1,为什么只指定了 IP,没有指定 Port,一旦指定了 Port 为 15672 反而连不上?

2,queue 和 routingKey 有什么联系和区别,为什么发布消息的时候, routingKey 的值要和 queue 的值一样?

 

二、消费者(Consumer)

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace Simplest_Queue
{
    class Consumer
    {
        public Consumer()
        {
            //1,创建连接工厂
            var factory = new ConnectionFactory();
            factory.HostName = "127.0.0.1";

            //2,使用工厂创建新连接
            var connection = factory.CreateConnection();

            //3,使用连接创建通道
            var channel = connection.CreateModel();

            //4,声明通道参数
            channel.QueueDeclare(
                queue: "Hello Queue",  //队列的名称
                durable: false,        //队列是否在代理(broker)重启后继续存在
                exclusive: false,      //队列是否在声明者断开连接时被删除
                autoDelete: false,     //队列是否在最后一个订阅者取消订阅时被删除
                arguments: null
                );

            //5,创建通道的消费者
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, e) =>
            {
                var body = e.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine("Received:{0}",message);
            };

            //6,把消费者放在通道上开始消费
            channel.BasicConsume(
                queue: "Hello Queue",
                autoAck: true,
                consumer:consumer
                ) ;

            Console.ReadKey();
        }
    }
}

消费者连接到消息队列的方式跟生产者是一摸一样的,不同的就是消费者先创建一个消费者,然后把消费者放在通道上开始消费。为了确保消费者在消费的时候,确保队列存在,所以消费者也 QueueDeclare 了队列。

消费者消费完队列里的消息后,可以在后台看到队列里的 Ready 计数就为零了。

 

疑问:

1,能不能创建多个消费者,多线程同时进行消费,看代码好像不可以,因为通道放消费者的时候只能放一个消费者,难道要创建多个通道连接吗?

发表评论

Powered by WordPress | Theme Revised from Doo

苏ICP备18047621号

Copyright © 2017-2024 追光者博客