消息的订阅和发布是使用消息队列的常用场景。在上一篇文章中,虽然有多个消费者,但是一个消息只会有一个消费者来处理。而订阅和发布则是每个订阅该消息的消费者都会收到这个消息。RabbitMQ的路由机制让我们实现这个功能轻而易举。
要了解RabbitMQ的路由机制,exchange是一个关键。exchange可以叫做交换机,也似乎可以叫做路由器,反正它是用来选择路由的。前文说到,RabbitMQ的核心思想就是消息的发布者不是直接把消息发送到目标队列中的,事实上,通常它并不知道消息要发到哪个队列中,它只知道把消息队列发送到exchange中。exchange一边接收发送者发过来的消息,而另一边则把消息发送到目标队列中去。exchange一定知道哪些队列需要接收这个消息,是加到一个队列里还是加到好几个队列里,还是直接扔掉。下图中的X就是exchange。
RabbitMQ的exchange有一些类型,这些类型决定了exchange的行为。分别是 direct, topic, headers 和 fanout四种类型。在这一篇文章中介绍的最简单的发送和接收例子中使用的如下代码中
//指定发送的路由,通过默认的exchange直接发送到指定的队列中。
channel.BasicPublish("", "esbtest.rmq.consoleserver", null, bytes);
第一个参数我们输入了空字符串来代表一个exchange。空字符串的exchange在RabbitMQ中时默认的exchange,类型是direct。在这个例子中它会直接将消息发送到第二个参数route_key定义的同名的队列中。
而我们这篇介绍的订阅和发布是用了fanout这个类型。由于默认类型是direct的,所以需要使用fanout就需要额外定义。如下代码就是定义了一个名字叫publish的fanout类型的exchange。
channel.ExchangeDeclare("publish", "fanout");
这种exchange有分发的意思。那分发到哪些队列中呢?因为发布者不需要知道,所以这段代码页就在订阅者那边来实现。来看发送方得代码片段:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Xml.Serialization; namespace SendService { class Program { static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "192.168.12.111"; factory.Port = 5672; factory.UserName = "admin"; factory.Password = "admin"; //定义要发送的数据 List<RequestMessage> messages = new List<RequestMessage>(); for (int i = 0; i < 100; i++) { RequestMessage message = new RequestMessage() { MessageId = Guid.NewGuid(), Message = "Send1this is a 请求。" + i, Type="email" }; messages.Add(message); } for (int i = 0; i < 100; i++) { RequestMessage message = new RequestMessage() { MessageId = Guid.NewGuid(), Message = "Send1this is a 请求。" + i, Type = "sms" }; messages.Add(message); } using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare("publish-topic", "topic", true);//定义一个交换机,且采用广播类型,并持久化该交换机 string smsqueue = channel.QueueDeclare("all.sms.message", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列 //绑定到名字叫publish的exchange上 channel.QueueBind(smsqueue, "publish-topic", "sms"); string emailqueue = channel.QueueDeclare("all.email.message", true, false, false, null);//创建一个队列,第2个参数为true表示为持久队列 channel.QueueBind(emailqueue, "publish-topic", "email"); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2;//1表示不持久,2.表示持久化 foreach (var item in messages) { if (item.Type == "sms") { XmlSerializer xs = new XmlSerializer(typeof(RequestMessage)); using (MemoryStream ms = new MemoryStream()) { xs.Serialize(ms, item); byte[] bytes = ms.ToArray(); channel.BasicPublish("publish-topic", "sms", properties, bytes); //发送消息,这里指定了交换机名称,且routeKey会被忽略 Console.WriteLine(" [x] Sent {0}", item.Message); } } if (item.Type == "email") { XmlSerializer xs = new XmlSerializer(typeof(RequestMessage)); using (MemoryStream ms = new MemoryStream()) { xs.Serialize(ms, item); byte[] bytes = ms.ToArray(); channel.BasicPublish("publish-topic", "email", properties, bytes); //发送消息,这里指定了交换机名称,且routeKey会被忽略 Console.WriteLine(" [x] Sent {0}", item.Message); } } } } } } } public class RequestMessage { public Guid MessageId { set; get; } public string Message { set; get; } public string Type { set; get; } } }
接受方SMS:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Xml.Serialization; namespace ReceiveSMSService { class Program { static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "192.168.12.111"; factory.Port = 5672; factory.UserName = "admin"; factory.Password = "admin"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //交换机持久化 channel.ExchangeDeclare("publish-topic", "topic", true); bool durable = true;//队列持久化 string queue_name = channel.QueueDeclare("all.sms.message", durable, false, false, null);//hello是queue的名字 //绑定到名字叫publish的exchange上 channel.QueueBind(queue_name, "publish-topic", "sms"); //定义这个队列的消费者 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue_name, false, consumer); Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C"); while (true) { //阻塞函数,获取队列中的消息 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作 //模拟长时间运行 Thread.Sleep(3000); byte[] body = ea.Body; XmlSerializer xs = new XmlSerializer(typeof(RequestMessage)); using (MemoryStream ms = new MemoryStream(body)) { RequestMessage message = (RequestMessage)xs.Deserialize(ms); Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message + " Type:" + message.Type); } //发送应答包,消息持久化时候使用 channel.BasicAck(ea.DeliveryTag, false); } } } } } public class RequestMessage { public Guid MessageId { set; get; } public string Message { set; get; } public string Type { set; get; } } }
接收方邮件:
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Xml.Serialization; namespace ReceiveEmailService { class Program { static void Main(string[] args) { var factory = new ConnectionFactory(); factory.HostName = "192.168.12.111"; factory.Port = 5672; factory.UserName = "admin"; factory.Password = "admin"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare("publish-topic", "topic", true); bool durable = true;//队列持久化 string queue_name = channel.QueueDeclare("all.email.message", durable, false, false, null);//hello是queue的名字 //绑定到名字叫publish的exchange上 channel.QueueBind(queue_name, "publish-topic", "email"); //定义这个队列的消费者 QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue_name, false, consumer); Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C"); while (true) { //阻塞函数,获取队列中的消息 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作 //模拟长时间运行 Thread.Sleep(3000); byte[] body = ea.Body; XmlSerializer xs = new XmlSerializer(typeof(RequestMessage)); using (MemoryStream ms = new MemoryStream(body)) { RequestMessage message = (RequestMessage)xs.Deserialize(ms); Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message + " Type:" + message.Type); } //发送应答包,消息持久化时候使用 channel.BasicAck(ea.DeliveryTag, false); } } } } } public class RequestMessage { public Guid MessageId { set; get; } public string Message { set; get; } public string Type { set; get; } } }
同样先定义一个fanout的exchange。对于为什么要在发布者和订阅者都要定义同名的exchange,我的理解是如果没有定义的一方先启动的话则会报错说找不到那个exchange。我测试过如果有定义的先启动,没定义的后启动也是没有问题的。
因为每个订阅者都需要一个队列来存放发给自己的消息,所以需要创建一个队列。通过QueueBind来和exchange关联了。所有发送给名字为publish的exchange的消息,都会被它分发给所有与之绑定的队列中,这样,每个对应的消费者都会收到一个副本。
在浏览器的管理界面上我们可以看见,RabbitMQ为每一个消费者(订阅者)创建了一个队列,而没有为发送者创建队列。
相关推荐
tornado-6.4.1-cp38-abi3-musllinux_1_2_i686.whl
tornado-6.1-cp36-cp36m-manylinux2014_aarch64.whl
基于java的ssm停车位短租系统程序答辩PPT.pptx
tornado-6.4b1-cp38-abi3-musllinux_1_1_x86_64.whl
基于java的招生管理系统答辩PPT.pptx
本压缩包资源说明,你现在往下拉可以看到压缩包内容目录 我是批量上传的基于SpringBoot+Vue的项目,所以描述都一样;有源码有数据库脚本,系统都是测试过可运行的,看文件名即可区分项目~ |Java|SpringBoot|Vue|前后端分离| 开发语言:Java 框架:SpringBoot,Vue JDK版本:JDK1.8 数据库:MySQL 5.7+(推荐5.7,8.0也可以) 数据库工具:Navicat 开发软件: idea/eclipse(推荐idea) Maven包:Maven3.3.9+ 系统环境:Windows/Mac
基于java的农机电招平台答辩PPT.pptx
jdk23 甲骨文官方安装包
基于java的机场网上订票系统答辩PPT.pptx
项目经过测试均可完美运行! 环境说明: 开发语言:java jdk:jdk1.8 数据库:mysql 5.7+ 数据库工具:Navicat11+ 管理工具:maven 开发工具:idea/eclipse
基于java的网上书店销售管理系统答辩PPT.pptx
tornado-6.3.3-cp38-abi3-win32.whl
【作品名称】:基于 Jsp+Sqlserver 实现的超市信息管理系统 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 【项目介绍】: 系统功能: (1)系统分两种身份:管理员和员工,选择不同的身份进入不同的功能操作界面! (2)商品信息管理:管理员可以添加和维护商品信息,员工只能对商品信息进行查询 (3)员工信息管理:管理员登陆系统后可以可以添加和维护超市员工(收银员)的信息 (4)商品进货管理:管理员登陆系统后可以添加商品进货信息,可以对商品进货信息进行查询和统计,添加商品进进货退货信息,对商品进货退货信息进行查询和统计 (5)商品销售管理:员工(收银员)登陆系统后可以对商品进行销售,可以按时间查询自己的销售业绩;管理员登陆系统后可以按照时间等条件对销售信息进行查询,可以根据小票号登记顾客退货信息,查询顾客退货信息,可以查看员 【资源声明】:本资源作为“参考资料”而不是“定制需求”,代码只能作为参考,不能完全复制照搬。需要有一定的基础看懂代码,自行调试代码并解决报错,能自行添加功能修改代码。
tornado-6.3.2-cp38-abi3-musllinux_1_1_i686.whl
基于java的热带水果商城答辩PPT.pptx
java awt、Swing实现中国象棋可联机版本采用面向对象思想 采用面向对象的思路,实现中国象棋可联机版本,适合初学者,以及对面向对象有更深层次理解的开发者或者同学。 使用原生的java awt、Swing进行窗口式开发 将素材文件夹放在D:\Game路径下 两个工程直接导入Eclipse,即可运行, ps:一个工程运行两次也可以,需要注意端口号,代码默认如果连接的端口号是3003,则监听3004端口,相反同理。联机前需要确保两台计算机同时处于局域网或外网
web前端设计与开发(详细整理)(包含html讲解,css讲解,移动web讲解),合适学习前端的人员进行基础学习,一秒变高手
分析所需的数据和代码都在这里
Listening Exercise 3 Part 2.mp3
链表 删除链表中的重复元素,链表基础