`

.Net下RabbitMQ的使用(3) -- 竞争的消费者

 
阅读更多

上一篇文章中,演示了一个发送者和一个消费者的情况。这一篇介绍一下多个消费者在同一个消息队列中获取消息的情况。

 

在有些应用当中,消费端接收到消息任务需要长时间的处理,如果等上一个消息处理完成以后再取下一个数据进行处理的话,势必会有一些延迟。在消息队列中的数据也会不断增多,延迟将越来越大。当然对于一个消费进程来说,在某些情况下可以起多个线程来处理,而在这里将介绍另一种处理方式,多个消费进程的情况。而RabbitMQ在这方面进行了很好的处理和封装,使客户程序可以很方便的使用。

 

python-two

 

 

其实,在代码实现上和上一遍的例子中并没有什么不同,我们只需要运行两个Server端的程序就可以了。我们可以看到,发送的消息会平均的由两个Server中的一个来处理。不会有重复。倘若一个Server程序关闭了,那之后发送的所有消息都会在还运行着的那个Server程序上处理。这可以解决我们一些需要负载均衡的场景,而且扩展非常方便,只要在运行一个Server也就是Worker就可以了,Worker之间的状态同步都免了。

 

当我们处理一个较长时间任务的时候,程序在处理过程中如果出现异常,或程序挂了导致消息没有处理成功,我们通常并不希望丢失该消息任务,而希望由其他Worker来处理或者等挂了的Worker重新起来以后再处理。同样,应对该场景,RabbitMQ也提供了简便的API方便我们处理。在RabbitMQ中,为了不让消息丢失,它提供了消息应答的概念。当消费者获取到了一个消息以后,需要给RabbitMQ服务一个应答的消息,告知服务我已经收到或正确处理了该消息。那么RabbitMQ可以放心的在队列中删除该消息。在上一篇的服务端的代码中

channel.BasicConsume("TaskQueue", true, consumer);

的第二个参数是true,RabbitMQ服务一旦把消息送达目标队列及认为应答了。为了演示不发送应答消息的情况,我们需要安装一个RabbitMQ的plugin:Management UI。安装这个插件比较简单。

你可以再 http://www.rabbitmq.com/management.html 了解到详细信息。

打开浏览器, http://localhost:55672,输入默认的用户名密码 guest/guest,进入主界面。界面上信息很多,我们只看最上面的一块:Queued Messages。

还是启动一个发送程序和一个接受程序。把接收程序的定义消费者的代码修改为

channel.BasicConsume("TaskQueue", false, consumer);

然后运行两个程序,发送一个消息。由于我们在接收消息的代码中没有发送消息收到的应答包,所以在刚才监控的网页上回出现如下结果:

image

 

没有回应的消息是1个。接下来我们关闭接收程序,不修改任何代码,然后再次运行接收程序。我们发现接收程序再次收到了那个原来的消息。而浏览器上显示的状态还是没有变说明还是有一个消息没有应答,因为我们第二次运行的接收程序还是没有发应答包。关掉接收程序,修改一下代码,在处理完消息的时候我们添加如下代码:

channel.BasicAck(ea.DeliveryTag, false);

这时,我发现程序如我们所料,还是会收到一次那个消息包,但是监控网页的界面却变了:

image

 

然后,无论在运行多少次接收程序,都不会再收到该消息包了。这也就说明了,RabbitMQ真正认为该消息被正确处理了。

需要注意的是,RabbitMQ对于没有发送应答包的消息没有时间的限制,也就是说没有Timeout。RabbitMQ只会在处理消息的接收程序与RabbitMQ服务端断开连接后才会重新分配该消息。如果连接没有断,但处理程序几天没有给回应包,它也不会重新发送。所以,如果在处理程序出现异常的时候,我们可以写代码将与RabbitMQ的连接断开来实现消息的重新发送(也许会发到其他负载均衡的机器上处理)。

修改后的接收端代码如下:

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Xml.Serialization;

namespace SendService
{
    /// <summary>
    /// Round-robin dispatching 循环分发
    /// http://www.cnblogs.com/qiyebao/p/4205626.html
    /// 一个Send和多个Receive的例子,
    /// 还加上了ack的例子.
    /// 优雅分发
    /// </summary>
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.12.111";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";
            //定义要发送的数据

            List<RequestMessage> messages = new List<RequestMessage>();
            for (int i = 0; i < 200; i++)
            {
                RequestMessage message = new RequestMessage() { MessageId = Guid.NewGuid(), Message = "Send1this is a 请求。" + i };
                messages.Add(message);
            }

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    bool durable = true;//队列持久化
                    channel.QueueDeclare("all.sms.message", durable, false, false, null);//hello是queue的名字
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;//non-persistent (1) or persistent (2) 数据持久化属性
                    
                     //channel.TxSelect();
                    //序列化消息对象,RabbitMQ并不支持复杂对象的序列化,所以对于自定义的类型需要自己序列化
                    foreach (var item in messages)
                    {
                        XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
                        using (MemoryStream ms = new MemoryStream())
                        {

                            xs.Serialize(ms, item);

                            byte[] bytes = ms.ToArray();

                            //指定发送的路由,通过默认的exchange直接发送到指定的队列中。

                            channel.BasicPublish("", "all.sms.message", null, bytes);

                            Console.WriteLine(" [x] Sent {0}", item.Message);
                        }
                    }
                    //var body = Encoding.UTF8.GetBytes(message);
                    //channel.BasicPublish("", "all.sms.message", null, body);//hello是routing key的名字

                    Console.ReadLine();
                }
            }
        }
    }

    public class RequestMessage
    {
        public Guid MessageId { set; get; }
        public string Message { set; get; }
    }
}

 

接收方代码:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml.Serialization;

namespace ReceiveService
{
    /// <summary>
    /// 循环分发
    /// http://www.cnblogs.com/qiyebao/p/4205626.html
    /// </summary>
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "192.168.12.111";
            factory.Port = 5672;
            factory.UserName = "admin";
            factory.Password = "admin";
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    bool durable = true;//队列持久化
                    channel.QueueDeclare("all.sms.message", durable, false, false, null);
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume("all.sms.message", false, consumer);//需要接受方发送ack回执,删除消息
                    Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C");
                    while(true)
                    {
                        //阻塞函数,获取队列中的消息
                        var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//挂起的操作
                        //模拟长时间运行
                        Thread.Sleep(6000);

                        byte[] body = ea.Body;
                        XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
                        using (MemoryStream ms = new MemoryStream(body))
                        {

                            RequestMessage message = (RequestMessage)xs.Deserialize(ms);

                            Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message);

                        }
                        //发送应答包,消息持久化时候使用
                        channel.BasicAck(ea.DeliveryTag, false);
                    }
                }
            }
        }
    }

    public class RequestMessage
    {
        public Guid MessageId { set; get; }
        public string Message { set; get; }
    }
}

 

需要注意的是,该功能只有在point-point模式下才有如此效果,也就是一发一接的模式下。如果是发布和订阅这种broadcasting的模式下,这种配置项的结果会有一些不同,我们下一篇再说。

分享到:
评论

相关推荐

    rabbitmq-tutorial.zip

    4. **点对点模式**:在这种模式下,消息会被路由到一个特定的队列,只有一个消费者会消费该消息,确保消息的唯一处理。如果多个消费者存在,它们之间不会竞争消息,而是一种轮询机制。 5. **工作队列**:工作队列...

    RabbitMQ一个第三方队列组件

    RabbitMQ支持多种语言的客户端库,包括.NET,这意味着开发者可以轻松地在C#、VB.NET或其他.NET框架下使用RabbitMQ。在描述中提到的“有详细代码注释,代码能直接运行”,暗示了提供的是.NET环境下的RabbitMQ示例代码...

    java中间件之rabbitmq

    RabbitMQ是一款开源的消息中间件,基于AMQP (Advanced Message Queuing Protocol) 协议构建,主要使用Erlang语言编写。RabbitMQ能够支持多种客户端编程语言,包括但不限于Java、Python、Ruby、.NET、C#、C、PHP等,...

    RabbitMQ.docx

    - **Work 模型**: 也称为轮询模型,多个消费者竞争接收消息,常用于负载均衡。 - **Fanout模型**: 广播模式,所有绑定到交换机的队列都会收到消息。 - **Direct模型**: 基于路由键的一对一匹配,消息只发送到指定...

    RabbitMQTrial.zip

    3. **消费者(Consumer)**:消费者连接到RabbitMQ服务器,并声明它们感兴趣的队列。当有新消息到达队列时,RabbitMQ会将消息推送给消费者进行处理。 4. **竞争消费(Competitive Consumption)**:在工作者模式下...

    RabbitMQ测试代码

    测试代码应模拟多消费者竞争同一消息的场景,确保消息只被一个消费者处理,且处理后从队列中删除。 5. **持久化**:测试消息的持久化功能,确保在服务器重启或崩溃后,未被消费的消息能够恢复。这包括交换机、队列...

    asp.net通过消息队列处理高并发请求(以抢小米手机为例)

    在ASP.NET中,可以利用诸如RabbitMQ、Azure Service Bus或MSMQ等消息队列服务来实现这一策略,它们都提供了与.NET Framework和.NET Core良好的集成,简化了开发和部署过程。在实际应用中,需要根据业务需求和系统...

    c#MQ开发和所需DLL文件

    6. **接收消息**:通过设置回调函数和消费者实例,监听队列中的新消息。 7. **关闭资源**:完成操作后,记得关闭通道和连接,以释放资源。 在实际开发中,我们还需要关注以下几点: - **错误处理**:处理网络中断、...

    多线程与高并发实战详细内容

    3. 线程通信:利用wait()、notify()和notifyAll()方法实现线程间的通信,解决生产者消费者问题。 五、Python多线程实战 1. GIL(全局解释器锁):Python的线程并不能真正实现并行,但可以用于IO密集型任务,提高CPU...

Global site tag (gtag.js) - Google Analytics