`

.Net下RabbitMQ的使用(4) -- 订阅和发布

 
阅读更多

消息的订阅和发布是使用消息队列的常用场景。在上一篇文章中,虽然有多个消费者,但是一个消息只会有一个消费者来处理。而订阅和发布则是每个订阅该消息的消费者都会收到这个消息。RabbitMQ的路由机制让我们实现这个功能轻而易举。

 

要了解RabbitMQ的路由机制,exchange是一个关键。exchange可以叫做交换机,也似乎可以叫做路由器,反正它是用来选择路由的。前文说到,RabbitMQ的核心思想就是消息的发布者不是直接把消息发送到目标队列中的,事实上,通常它并不知道消息要发到哪个队列中,它只知道把消息队列发送到exchange中。exchange一边接收发送者发过来的消息,而另一边则把消息发送到目标队列中去。exchange一定知道哪些队列需要接收这个消息,是加到一个队列里还是加到好几个队列里,还是直接扔掉。下图中的X就是exchange。

 

 

RabbitMQ的exchange有一些类型,这些类型决定了exchange的行为。分别是 direct, topic, headers 和 fanout四种类型。在这一篇文章中介绍的最简单的发送和接收例子中使用的如下代码中

//指定发送的路由,通过默认的exchange直接发送到指定的队列中。
channel.BasicPublish("", "esbtest.rmq.consoleserver", null, bytes);

第一个参数我们输入了空字符串来代表一个exchange。空字符串的exchange在RabbitMQ中时默认的exchange,类型是direct。在这个例子中它会直接将消息发送到第二个参数route_key定义的同名的队列中。

而我们这篇介绍的订阅和发布是用了fanout这个类型。由于默认类型是direct的,所以需要使用fanout就需要额外定义。如下代码就是定义了一个名字叫publish的fanout类型的exchange。

 

channel.ExchangeDeclare("publish", "fanout");

 

这种exchange有分发的意思。那分发到哪些队列中呢?因为发布者不需要知道,所以这段代码页就在订阅者那边来实现。来看发送方得代码片段:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Serialization;

namespace SendService
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.12.111";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";
            //定义要发送的数据

            List<RequestMessage> messages = new List<RequestMessage>();
            for (int i = 0; i < 100; i++)
            {
                RequestMessage message = new RequestMessage() { MessageId = Guid.NewGuid(), Message = "Send1this is a 请求。" + i, Type="email" };
                messages.Add(message);
            }
            for (int i = 0; i < 100; i++)
            {
                RequestMessage message = new RequestMessage() { MessageId = Guid.NewGuid(), Message = "Send1this is a 请求。" + i, Type = "sms" };
                messages.Add(message);
            }

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare("publish-topic", "topic", true);//定义一个交换机,且采用广播类型,并持久化该交换机
                    string smsqueue = channel.QueueDeclare("all.sms.message", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列
                    //绑定到名字叫publish的exchange上
                    channel.QueueBind(smsqueue, "publish-topic", "sms");
                    string emailqueue = channel.QueueDeclare("all.email.message", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列
                    channel.QueueBind(emailqueue, "publish-topic", "email");
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;//1表示不持久,2.表示持久化
                    foreach (var item in messages)
                    {
                        if (item.Type == "sms")
                        {
                            XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
                            using (MemoryStream ms = new MemoryStream())
                            {
                                xs.Serialize(ms, item);
                                byte[] bytes = ms.ToArray();
                                channel.BasicPublish("publish-topic", "sms", properties, bytes); //发送消息,这里指定了交换机名称,且routeKey会被忽略
                                Console.WriteLine(" [x] Sent {0}", item.Message);
                            }
                        }
                        if (item.Type == "email")
                        {
                            XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
                            using (MemoryStream ms = new MemoryStream())
                            {
                                xs.Serialize(ms, item);
                                byte[] bytes = ms.ToArray();
                                channel.BasicPublish("publish-topic", "email", properties, bytes); //发送消息,这里指定了交换机名称,且routeKey会被忽略
                                Console.WriteLine(" [x] Sent {0}", item.Message);
                            }
                        }
                    }
                }
            }
        }
    }

    public class RequestMessage
    {
        public Guid MessageId { set; get; }
        public string Message { set; get; }
        public string Type { set; get; }
    }
}

 接受方SMS:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Serialization;

namespace ReceiveSMSService
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.12.111";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";
            
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //交换机持久化
                    channel.ExchangeDeclare("publish-topic", "topic", true);
                    bool durable = true;//队列持久化
                    string queue_name = channel.QueueDeclare("all.sms.message", durable, false, false, null);//hello是queue的名字
                    //绑定到名字叫publish的exchange上
                    channel.QueueBind(queue_name, "publish-topic", "sms");
                    //定义这个队列的消费者
                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queue_name, false, consumer);
                    Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
                    while (true)
                    {
                        //阻塞函数,获取队列中的消息
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作
                        //模拟长时间运行
                        Thread.Sleep(3000);

                        byte[] body = ea.Body;
                        XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
                        using (MemoryStream ms = new MemoryStream(body))
                        {

                            RequestMessage message = (RequestMessage)xs.Deserialize(ms);

                            Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message + " Type:" + message.Type);

                        }
                        //发送应答包,消息持久化时候使用
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                }
            }
        }
    }

    public class RequestMessage
    {
        public Guid MessageId { set; get; }
        public string Message { set; get; }
        public string Type { set; get; }
    }
}

 接收方邮件:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Serialization;

namespace ReceiveEmailService
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.12.111";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare("publish-topic", "topic", true);
                    bool durable = true;//队列持久化
                    string queue_name = channel.QueueDeclare("all.email.message", durable, false, false, null);//hello是queue的名字
                    //绑定到名字叫publish的exchange上
                    channel.QueueBind(queue_name, "publish-topic", "email");
                    //定义这个队列的消费者
                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queue_name, false, consumer);
                    Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
                    while (true)
                    {
                        //阻塞函数,获取队列中的消息
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作
                        //模拟长时间运行
                        Thread.Sleep(3000);

                        byte[] body = ea.Body;
                        XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
                        using (MemoryStream ms = new MemoryStream(body))
                        {

                            RequestMessage message = (RequestMessage)xs.Deserialize(ms);

                            Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message + " Type:" + message.Type);

                        }
                        //发送应答包,消息持久化时候使用
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                }
            }
        }
    }

    public class RequestMessage
    {
        public Guid MessageId { set; get; }
        public string Message { set; get; }
        public string Type { set; get; }
    }
}

 

同样先定义一个fanout的exchange。对于为什么要在发布者和订阅者都要定义同名的exchange,我的理解是如果没有定义的一方先启动的话则会报错说找不到那个exchange。我测试过如果有定义的先启动,没定义的后启动也是没有问题的。

因为每个订阅者都需要一个队列来存放发给自己的消息,所以需要创建一个队列。通过QueueBind来和exchange关联了。所有发送给名字为publish的exchange的消息,都会被它分发给所有与之绑定的队列中,这样,每个对应的消费者都会收到一个副本。

image

 

在浏览器的管理界面上我们可以看见,RabbitMQ为每一个消费者(订阅者)创建了一个队列,而没有为发送者创建队列。

分享到:
评论

相关推荐

    C# .net 6 rabbitMq发布订阅类封装

    RabbitListener是mq消息的监听,BasicPublish丢消息的方法

    rabbitmq-server3.7版本和otp_win64安装包

    - 生产者:使用客户端库(如 Java, Python, .NET 等)发送消息到 RabbitMQ 队列。 - 消费者:同样通过客户端库订阅队列,接收并处理消息。 - 队列与交换器:队列存储消息,交换器根据路由规则将消息分发到对应的队列...

    rabbitmq-server-windows-3.7.10.zip

    RabbitMQ支持多种工作模式,如简单模式、发布/订阅模式、路由模式、主题模式等,以满足不同场景的需求。同时,它还支持多种客户端库,如Java、Python、.NET等,方便开发者集成到他们的应用程序中。 此外,RabbitMQ...

    rabbitmq-dotnet-client-3.6.14-dotnet-4.5

    1. LICENSE:这通常包含该软件的许可协议,允许用户在特定条件下使用、修改和分发该软件。在 RabbitMQ 的上下文中,这可能指的是该客户端库的开源许可信息。 2. LICENSE-APACHE2:Apache License Version 2.0 是一个...

    .Net使用RabbitMQ即时发消息Demo

    以上就是.NET环境下使用RabbitMQ发送即时消息的详细解释和关键知识点。开发者可以通过这个基础,进一步学习RabbitMQ的高级特性,如工作队列、发布/订阅模式、死信队列等,以适应更复杂的应用场景。

    rabbitmq的.net驱动

    在RabbitMQ的.NET驱动中,bin目录下可能包含已编译的客户端库,供开发者在项目中引用和使用。 通过RabbitMQ的.NET驱动,开发者可以实现以下关键知识点: - **连接管理**:建立和维护到RabbitMQ服务器的安全连接。 -...

    RabbitMQ安装和C#使用案列(.net core)

    包括rabbitmq-server-3.6.12,otp_win64_20.1(64位系统)otp_win32_20.1(32位系统),其中opt(文件过大限制上传,自己在网上下载)要在server之前安装,安装完了后需配置server的用户名和密码,并附上C#的代码...

    rabbitmq-server-3.8.3.zip

    4. **生产消息**:使用客户端库发布消息到指定的交换机。 5. **消费消息**:消费者订阅队列,接收到消息后进行处理。 6. **监控与日志**:通过Web管理界面查看节点状态、队列长度、消息速率等信息,以及日志文件...

    rabbitmq-server-3.8.8+erlang-21.3-1.el7.x86-64-linux版.zip

    安装此包后,用户可以配置和管理RabbitMQ服务器,创建和管理交换机、队列以及绑定,从而实现消息的发布与订阅。 在Linux上安装这两个组件通常涉及以下步骤: 1. 更新系统包:`sudo yum update` 2. 安装依赖:由于...

    rabbitmq发布订阅

    在实际开发中,`GcSystem.Bus.sln`下的项目会包含具体实现RabbitMQ发布订阅模式的代码,如创建通道、定义消息、配置交换机和队列、创建消费者等操作。通过这些代码,你可以看到如何在.NET环境中使用RabbitMQ客户端库...

    rabbitmq-server-2.8.5.tar.gz

    2. **发布与消费消息**:生产者使用`publish`方法发布消息到交换器,消费者通过`consume`方法订阅队列并接收消息。 3. **交换器类型**:包括Direct、Fanout、Topic和Headers四种,根据业务需求选择合适的类型进行...

    rabbitmq.net各种实例

    这些代码通常会展示如何创建连接、通道,声明交换器和队列,绑定,发布和消费消息等基本操作。 通过学习这些实例,开发者可以深入理解RabbitMQ在.NET环境中的用法,掌握如何在自己的项目中使用RabbitMQ进行高效的...

    NET Core 使用RabbitMQ源码.rar

    .NET Core使用RabbitMQ是一个广泛应用于微服务架构中的消息队列技术,用于实现应用程序之间的异步通信和解耦。RabbitMQ是一个开源的消息代理,它遵循Advanced Message Queuing Protocol(AMQP)标准,允许不同语言的...

    C# RabbitMQ Helper 帮助类

    C# RabbitMQ Helper是专为简化C#中RabbitMQ的使用而设计的一个类库,它可以作为DLL被其他项目引用,极大地提高了开发效率和代码的可维护性。 **一、RabbitMQ基础** RabbitMQ是一个分布式的消息代理和队列服务器,...

    rabbitmq.net 各种实例

    总结,rabbitmq.net为.NET开发者提供了丰富的功能,涵盖了从基础的消息发布/订阅到复杂的RPC模式、工作队列和事务处理。通过实例学习和实践,我们可以灵活地利用RabbitMQ解决各种消息通信问题,提高系统的可靠性和可...

    RabbitMQ工具类封装实现

    本篇文章将重点讲解如何进行RabbitMQ的工具类封装,以及在Android环境下的使用。 首先,`MQSubscribeService.java`代表的是订阅者服务,它是接收和处理来自RabbitMQ的消息的组件。在封装订阅者线程时,通常会包括...

    RabbitMQ实战-RabbitMQInAction.zip

    RabbitMQ是一种广泛使用的开源消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,适用于多种编程语言,包括Java、Python、Ruby、.NET等。"RabbitMQ实战-RabbitMQInAction"的资料很可能是...

    rabbitmq-codegen.tar.gz

    通过这个工具,开发人员可以快速集成RabbitMQ到他们的项目中,实现发布/订阅、工作队列、路由、主题等多种消息模式。 在`rabbitmq-codegen-c7c5876a05bb`这个特定版本中,`c7c5876a05bb`很可能是Git仓库中的一个...

    RabbitMQ入门-实战-RabbitMQ.zip

    **RabbitMQ 入门与实战** RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol)协议的开源消息...在 RabbitMQ-master 文件中,可能包含了示例代码和教程,可以帮助你进一步学习和掌握 RabbitMQ 的使用。

    RabbitMQ-Pub-Sub-Sample:RabbitMQ入门

    RabbitMQ是一个开源的消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,广泛...通过实际操作,你将能够掌握如何在C#应用中使用RabbitMQ实现发布/订阅模式,从而提升你的分布式系统设计和实现能力。

Global site tag (gtag.js) - Google Analytics