`

RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析

 
阅读更多
rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message,
Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息,
最近翻阅了基于Java的客户端的相关源码,简单做个分析。
编程模型伪代码如下:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
Channel channel=conn.createChannel();
创建Connection需要指定MQ的物理地址和端口,是socket tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ channel
以下是基于channel上的两种消费方式。

1、Subscribe订阅方式
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             String routingKey = envelope.getRoutingKey();
             String contentType = properties.contentType;
             long deliveryTag = envelope.getDeliveryTag();
             // (process the message components here ...)
             channel.basicAck(deliveryTag, false);
         }
     });

订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息,
这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。
参见ChannelN中的方法
    public String basicConsume(String queue, boolean autoAck, String consumerTag,
                               boolean noLocal, boolean exclusive, Map<String, Object> arguments,
                               final Consumer callback)
        throws IOException
    {
    ......
        rpc((Method)
            new Basic.Consume.Builder()
             .queue(queue)
             .consumerTag(consumerTag)
             .noLocal(noLocal)
             .noAck(autoAck)
             .exclusive(exclusive)
             .arguments(arguments)
            .build(),
            k);

        try {
            return k.getReply();
        } catch(ShutdownSignalException ex) {
            throw wrap(ex);
        }
    }

Consumer接收消息的过程:
创建Connection后,会启动MainLoop后台线程,循环从socket(FrameHandler)中获取数据包(Frame),调用channel.handleFrame(Frame frame)处理消息,
    public void handleFrame(Frame frame) throws IOException {
        AMQCommand command = _command;
        if (command.handleFrame(frame)) { // 对消息进行协议assemble
            _command = new AMQCommand(); // prepare for the next one
            handleCompleteInboundCommand(command);//对消息消费处理
        }
    }
ChannelN.handleCompleteInboundCommand
       ---ChannelN.processAsync
           ----dispatcher.handleDelivery
                 ---QueueingConsumer.handleDelivery
                     ---this._queue.add(new Delivery(envelope, properties, body));//消息最终放到队列中
每个Consumer都有一个BlockQueue,用于缓存从socket中获取的消息。
接下来,Consumer对象就可以调用api来从客户端缓存的_queue中依次获取消息,进行消费,参见QueueingConsumer.nextDelivery()

对于这种长连接的方式,没看到心跳功能,以防止长连接的因网络等原因连接失效


2、poll API方式
ChannelN:
GetResponse basicGet(String queue, boolean autoAck)
这种方式比较简单,直接通过RPC从MQ Server端获取队列中的消息

参考:http://blog.csdn.net/yangbutao/article/details/10395599
分享到:
评论

相关推荐

    rabbitMq 5种消息模型测试代码

    rabbitMq 5种消息模型测试代码

    rabbitMQ consumer provider txSelect confirmSelect addConfirmListener

    事务提供了一种确保消息可靠投递的方式,但在性能上可能不如非事务模式。 4. **confirmSelect(确认选择)**: `confirmSelect`是RabbitMQ的另一种确保消息投递的机制,它使用发布确认。一旦启用,生产者会接收到一...

    rabbitmq 实现消息插队

    在RabbitMQ中,生产者(Producer)负责发布消息,消费者(Consumer)则负责接收并处理这些消息。消息队列作为中介,存储未被消费的消息,确保即使在高并发或系统故障情况下,消息也不会丢失。 在"多人投资"场景下,...

    rabbitmq发送&接收消息

    在IT行业中,消息队列(Message Queue)是一种常用的技术,用于处理异步任务、解耦系统组件以及缓解高并发时的系统压力。RabbitMQ是一款开源的消息代理软件,它基于AMQP(Advanced Message Queuing Protocol),广泛...

    网页端直接消费rabbitmq的消息

    功能说明:网页端直接消费rabbitmq的消息 环境准备:先安装好rabbitmq。 要配置插件:与前端对接,需要在RabbitMQ上启动rabbitmq_web_stomp插件 ,命令:./rabbitmq-plugins enable rabbitmq_web_stomp 1、...

    RabbitMQ延迟消息插件.zip

    这个插件允许我们在RabbitMQ中创建一种特殊的交换机类型,即`x-delayed-message`,它可以将消息按照指定的延迟时间放入队列,而不是立即发送。 在RabbitMQ中,消息通常通过生产者发布到交换机,然后交换机根据绑定...

    基于rabbitMQ实现的消息队列(MQ)

    消息队列(MQ)是一种中间件技术,用于在分布式系统中解耦生产者和消费者,通过缓存消息来提高系统的可扩展性和可靠性。基于RabbitMQ实现的消息队列组件是这个话题的核心,RabbitMQ是一个开源的消息代理和队列服务器...

    RabbitMQ相关的面试题和问题解析

    rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:...

    RabbitMQ实现Android消息推送.zip

    基于SpringBoot、RabbitMQ的Android消息推送平台...有的公司对所要推送的消息保密要求比较高,不希望被第三方看到,可以使用此种方式进行消息推送。如果所要推送的人群比较多,可以搭建RabbitMQ集群解决高并发问题。

    基于springboot+nacos+rabbitmq和本地消息表实现分布式事务.zip

    基于springboot+nacos+rabbitmq和本地消息表实现分布式事务.zip 基于springboot+nacos+rabbitmq和本地消息表实现分布式事务.zip 基于springboot+nacos+rabbitmq和本地消息表实现分布式事务.zip 基于springboot+nacos...

    JAVA获取rabbitmq消息总数过程详解

    在Java中,获取RabbitMQ消息总数的过程涉及与RabbitMQ服务器建立连接、创建通道、声明队列以及查询队列状态。以下是一个详细的步骤解析: 1. **建立连接**: 首先,需要通过`ConnectionFactory`类来创建一个连接工厂...

    RabbitMQ实战 高效部署分布式消息队列 PDF下载

    《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,专注于帮助读者理解和掌握如何在实际项目中高效地部署和使用这个强大的消息中间件。RabbitMQ作为开源的消息代理和队列服务器,广泛应用...

    SpringBoot整合Rabbitmq发送接收消息实战

    SpringBoot整合Rabbitmq发送接收消息实战 另外,博主发起了SpringBoot整合Rabbitmq这一系列的gitchat交流会。刚兴趣的童鞋可以进入交流:https://gitbook.cn/gitchat/activity/5b90f9214fb1bd5c9acd4338 交流QQ:...

    RabbitMQ实战 高效部署分布式消息队列 带目录 高清版 PDF

    《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,旨在帮助读者理解和掌握如何在实际项目中高效地运用这一强大的消息中间件。书中不仅涵盖了RabbitMQ的基础知识,还深入探讨了其在分布式...

    基于RabbitMQ的消息路由分发实例

    6. **消费者(Consumer)**:消费者是从RabbitMQ队列中获取并处理消息的组件。它们通过订阅队列来接收消息。 在这个实例中,你可能需要设置不同的交换机类型、路由键和绑定,以实现灵活的消息分发。例如,你可以...

    RabbitMQ实战高效部署分布式消息队列.pdf+rabbitmq学习手册.pdf

    - **Consumer**: 消费者是接收并处理从RabbitMQ中获取消息的应用。 2. **RabbitMQ功能特性** - **高可用性**:通过集群和镜像队列实现跨节点的数据复制,确保服务的连续性。 - **多种协议支持**:除了AMQP外,...

    rabbitMQ 消息队列 Demo

    这个例子帮助我们理解RabbitMQ的基本工作流程:生产者创建消息,将消息发送到交换机,交换机根据绑定规则将消息放入队列,最后消费者从队列中接收并处理消息。 2. **02WorkQueues(任务队列)** 在这个示例中,多...

    SpringBoot+WebSocket+RabbitMQ实时消息推送

    rabbitmq+websocket(SpringBoot版)实现分布式消息推送 本来想用websocket做一个消息推送 可是分布式环境下不支持session共享因为服务器不同 所以采用 rabbitMQ+webSocket实现分布式消息推送 生产者将消息 发送给 ...

    RabbitMQ消息中间件技术精讲

    ### RabbitMQ消息中间件技术精讲 #### 一、RabbitMQ简介 RabbitMQ是一款在IT领域广泛应用的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议开发而成,能够实现高效、可靠的数据传输服务。...

    RabbitMQ详细讲解

    * Consumer:消费者,负责从 RabbitMQ 服务器接收消息。 * Queue:队列,存储生产者发送的消息。 * Exchange:交换机,负责将生产者的消息路由到相应的队列。 * Binding:绑定,定义了队列与交换机之间的关系。 ...

Global site tag (gtag.js) - Google Analytics