`

rabbitMQ ConfirmListener

 
阅读更多
消息消费者
操作步骤:
1. 创建连接工厂ConnectionFactory
2. 获取连接Connection
3. 通过连接获取通信通道Channel
4. 声明交换机Exchange:交换机类型分为四类:

        Fanout Exchange: 将消息分发到所有的绑定队列,无routingkey的概念

        Headers Exchange :通过添加属性key-value匹配

        Direct Exchange:按照routingkey分发到指定队列

        Topic Exchange:多关键字匹配

5. 声明队列Queue

6. 将队列和交换机绑定

7. 创建消费者

8. 执行消息的消费

package org.lkl.mq.rabbitmq.test;  
  
import java.io.IOException;  
import java.util.concurrent.TimeUnit;  
import java.util.concurrent.TimeoutException;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.ConsumerCancelledException;  
import com.rabbitmq.client.QueueingConsumer;  
import com.rabbitmq.client.QueueingConsumer.Delivery;  
import com.rabbitmq.client.ShutdownSignalException;  
  
/**  
 * 客户端01  
 *   
 * @author liaokailin  
 * @version $Id: Receive01.java, v 0.1 2015年11月01日 下午3:47:58 liaokailin Exp $  
 */  
public class Receive01 {  
    public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException,  
                                          ConsumerCancelledException, InterruptedException {  
        ConnectionFactory facotry = new ConnectionFactory();  
        facotry.setUsername("test");  
        facotry.setPassword("test");  
        facotry.setVirtualHost("test");  
        facotry.setHost("localhost");  
  
        Connection conn = facotry.newConnection(); //获取一个链接  
        //通过Channel进行通信  
        Channel channel = conn.createChannel();  
        int prefetchCount = 1;  
        channel.basicQos(prefetchCount); //保证公平分发  
  
        boolean durable = true;  
        //声明交换机  
        channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", durable); //按照routingKey过滤  
        //声明队列  
        String queueName = channel.queueDeclare("queue-01", true, true, false, null).getQueue();  
        //将队列和交换机绑定  
        String routingKey = "lkl-0";  
        //队列可以多次绑定,绑定不同的交换机或者路由key  
        channel.queueBind(queueName, Send.EXCHANGE_NAME, routingKey);  
  
        //创建消费者  
        QueueingConsumer consumer = new QueueingConsumer(channel);  
          
        //将消费者和队列关联  
        channel.basicConsume(queueName, false, consumer); // 设置为false表面手动确认消息消费  
  
        //获取消息  
  
        System.out.println(" Wait message ....");  
        while (true) {  
            Delivery delivery = consumer.nextDelivery();  
            String msg = new String(delivery.getBody());  
            String key = delivery.getEnvelope().getRoutingKey();  
  
            System.out.println("  Received '" + key + "':'" + msg + "'");  
            System.out.println(" Handle message");  
            TimeUnit.SECONDS.sleep(3); //mock handle message  
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //确定该消息已成功消费  
        }  
  
    }  
}  



消息生产者
操作步骤:
1. 创建连接工厂ConnectionFactory
2. 获取连接Connection
3. 通过连接获取通信通道Channel
4. 发送消息

package org.lkl.mq.rabbitmq.test;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.ConfirmListener;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.MessageProperties;  
  
/**  
 * 消息publish  
 *   
 * @author liaokailin  
 * @version $Id: Send.java, v 0.1 2015年10月22日 下午3:48:09 liaokailin Exp $  
 */  
public class Send {  
    public final static String EXCHANGE_NAME = "test-exchange";  
  
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  
        /**  
         * 配置amqp broker 连接信息  
         */  
        ConnectionFactory facotry = new ConnectionFactory();  
        facotry.setUsername("test");  
        facotry.setPassword("test");  
        facotry.setVirtualHost("test");  
        facotry.setHost("localhost");  
  
        Connection conn = facotry.newConnection(); //获取一个链接  
        //通过Channel进行通信  
        Channel channel = conn.createChannel();  
  
        // channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", true); //如果消费者已创建,这里可不声明  
        channel.confirmSelect(); //Enables publisher acknowledgements on this channel  
        channel.addConfirmListener(new ConfirmListener() {  
  
            @Override  
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {  
                System.out.println("[handleNack] :" + deliveryTag + "," + multiple);  
  
            }  
  
            @Override  
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {  
                System.out.println("[handleAck] :" + deliveryTag + "," + multiple);  
            }  
        });  
  
        String message = "lkl-";  
        //消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN  
        //发送多条信息,每条消息对应routekey都不一致  
        for (int i = 0; i < 10; i++) {  
            channel.basicPublish(EXCHANGE_NAME, message + (i % 2), MessageProperties.PERSISTENT_TEXT_PLAIN,  
                (message + i).getBytes());  
            System.out.println("[send] msg " + (message + i) + " of routingKey is " + (message + (i % 2)));  
        }  
  
    }  
}  


在设置消息被消费的回调前需显示调用
引用
channel.confirmSelect() 

否则回调函数无法调用

先执行消费者,消费者会轮询是否有消息的到来,在web控制也可以观察哦~~,再启动生产者发送消息。

rabbitmq 为每一个channel维护了一个delivery tag的计数器,这里采用正向自增,新消息投递时自增,当消息响应时自减

参考:http://blog.csdn.net/liaokailin/article/details/49558605

     http://blog.csdn.net/stonexmx/article/details/51885745
分享到:
评论

相关推荐

    rabbitmq-c-master.rar_RabbitMQ c lib_cmake编译_rabbitmq_rabbitmq-c

    `rabbitmq-c`是RabbitMQ的一个C语言客户端库,它使得在C程序中与RabbitMQ服务器进行交互变得更加简单。本文将详细介绍如何使用CMake编译`rabbitmq-c-master`源码,并讨论相关知识点。 首先,我们需要了解CMake,这...

    rabbitmq配置文件 rabbitmq.config

    rabbitmq配置文件,用于rabbitmq管理

    麒麟v10系统Rabbitmq3.6.10安装包

    在这个"麒麟v10系统Rabbitmq3.6.10安装包"中,我们将探讨如何在麒麟v10环境下安装和配置RabbitMQ 3.6.10版本。 首先,安装RabbitMQ前需要确保系统满足必要的依赖条件。麒麟v10内核版本为4.19.90-17.ky10.x86_64,这...

    rabbitmq 3.10.2 window安装包

    RabbitMQ是一个开源的消息代理和队列服务器,广泛用于分布式系统中的消息传递。它基于AMQP(Advanced Message Queuing Protocol)标准,允许应用程序之间异步通信,并提供了高可用性、可扩展性和容错性。RabbitMQ的...

    RabbitMQ实战指南-rabbitmq-action.zip

    **RabbitMQ实战指南** RabbitMQ是一款广泛应用的开源消息队列系统,它基于Advanced Message Queuing Protocol(AMQP)标准,提供高可用性、可靠性和可扩展性。本实战指南将带你深入理解RabbitMQ的核心概念、安装与...

    rabbitmq-server3.10.5

    RabbitMQ服务器3.10.5是一款广泛使用的开源消息代理和队列服务器,它基于高级消息队列协议(AMQP)实现。这个版本的RabbitMQ提供了稳定且高效的中间件服务,允许分布式系统中的应用程序进行异步通信,确保数据可靠...

    RabbitMQ使用指南.pdf

    RabbitMQ是一款开源的消息队列服务软件,它实现了高级消息队列协议(AMQP),以高性能、健壮和可伸缩性闻名,主要由Erlang语言编写。Erlang是一种适合于构建并发处理能力强、高可用性系统的编程语言,这些特点使得...

    kettle rabbitmq 插件开发

    标题 "kettle rabbitmq 插件开发" 涉及的是如何在 Pentaho Kettle(也称为 Spoon)中创建和使用 RabbitMQ 插件。Kettle 是一个开源的数据集成工具,它允许用户进行数据抽取、转换和加载(ETL)操作。RabbitMQ 是一个...

    RabbitMQ性能测试报告

    【RabbitMQ性能测试报告】 本测试报告详细记录了对RabbitMQ的性能评估,包括在单机模式和集群模式下的压力和稳定性测试。RabbitMQ是业界广泛使用的开源消息代理,它基于AMQP(Advanced Message Queuing Protocol)...

    RabbitMQ-c源码

    **RabbitMQ-c源码分析** RabbitMQ-c是一个轻量级且高效的C语言实现的RabbitMQ客户端库。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中...

    prometheus rabbitmq_exporter

    为了能够有效地监控 RabbitMQ 的性能和状态,Prometheus 提供了一个名为 `rabbitmq_exporter` 的工具。然而,在某些情况下,官方网站可能不直接提供这个插件,这时我们需要从第三方源获取,例如在本例中提到的 `...

    tp6使用rabbitmq

    【标题】:“TP6使用RabbitMQ” 在PHP框架ThinkPHP6(简称TP6)中集成RabbitMQ是一项常见的任务,用于实现异步处理、消息队列和分布式系统的通信。RabbitMQ是一个开源的消息代理和队列服务器,它遵循AMQP(Advanced...

    rabbitMQ_Windows版.zip

    1. **下载RabbitMQ**:首先,访问RabbitMQ官方网站下载适用于Windows的RabbitMQ Server安装包。这个压缩包"rabbitMQ_Windows版.zip"很可能包含了所有必要的文件。 2. **解压与安装**:解压缩文件,运行安装程序,...

    rabbitMQ3.7.8windows安装软件包

    RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中高效地路由和传递消息。它由Erlang编程语言开发,因此在安装RabbitMQ之前,需要先安装Erlang环境。本...

    RabbitMQ与SpringMVC集成

    **RabbitMQ与SpringMVC集成** RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的消息传递。RabbitMQ是由Erlang OTP平台构建的,因此...

    Rabbitmq 默认配置文件模板

    **RabbitMQ 默认配置文件模板详解** RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,用于解耦应用程序,提高系统的可扩展性和容错性。在...

    RabbitMQ3.9.15安装包及安装文档

    3. **RabbitMQ服务器安装**:"rabbitmq-server-3.9.15-1.el7.noarch.rpm"是RabbitMQ 3.9.15版本的安装包,适用于RHEL/CentOS 7系统。安装过程中,通常会包括设置RabbitMQ服务、启动服务以及配置默认用户和虚拟主机等...

    flink-sql集成rabbitmq

    标题中的“flink-sql集成rabbitmq”指的是将Apache Flink的数据流处理能力与RabbitMQ消息队列系统相结合,实现数据的实时处理和传输。Flink是一个开源的流处理框架,提供低延迟、高吞吐量的数据处理,而RabbitMQ是一...

    rabbitMQ实战java版-rabbitMQ-demo.zip

    《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...

    RabbitMQ源码和客户端工具

    **RabbitMQ源码分析** RabbitMQ是一个开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的异步处理、任务队列以及服务间通信。源码分析有助于深入理解其内部...

Global site tag (gtag.js) - Google Analytics