`

RabbitMQ学习之Headers交换类型

 
阅读更多
Headers类型的exchange使用的比较少,它也是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。匹配有两种方式all和any。这两种方式是在接收端必须要用键值"x-mactch"来定义。all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型。

1.生产者Producer.Java
package cn.slimsmart.rabbitmq.demo.headers;  
02.  
03.import java.util.Date;  
04.import java.util.Hashtable;  
05.import java.util.Map;  
06.  
07.import org.springframework.amqp.core.ExchangeTypes;  
08.  
09.import com.rabbitmq.client.AMQP;  
10.import com.rabbitmq.client.AMQP.BasicProperties;  
11.import com.rabbitmq.client.AMQP.BasicProperties.Builder;  
12.import com.rabbitmq.client.Channel;  
13.import com.rabbitmq.client.Connection;  
14.import com.rabbitmq.client.ConnectionFactory;  
15.  
16.public class Producer {  
17.    private final static String EXCHANGE_NAME = "header-exchange";  
18.      
19.    @SuppressWarnings("deprecation")  
20.    public static void main(String[] args) throws Exception {  
21.        // 创建连接和频道  
22.        ConnectionFactory factory = new ConnectionFactory();  
23.        factory.setHost("192.168.36.102");  
24.        // 指定用户 密码  
25.        factory.setUsername("admin");  
26.        factory.setPassword("admin");  
27.        // 指定端口  
28.        factory.setPort(AMQP.PROTOCOL.PORT);  
29.        Connection connection = factory.newConnection();  
30.        Channel channel = connection.createChannel();  
31.          
32.        //声明转发器和类型headers  
33.        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS,false,true,null);  
34.        String message = new Date().toLocaleString() + " : log something";  
35.          
36.        Map<String,Object> headers =  new Hashtable<String, Object>();  
37.        headers.put("aaa", "01234");  
38.        Builder properties = new BasicProperties.Builder();  
39.        properties.headers(headers);  
40.          
41.        // 指定消息发送到的转发器,绑定键值对headers键值对  
42.        channel.basicPublish(EXCHANGE_NAME, "",properties.build(),message.getBytes());  
43.          
44.        System.out.println("Sent message :'" + message + "'");  
45.        channel.close();  
46.        connection.close();  
47.    }  
48.}  


2.消费者Consumer.java
package cn.slimsmart.rabbitmq.demo.headers;  
02.  
03.import java.util.Hashtable;  
04.import java.util.Map;  
05.  
06.import org.springframework.amqp.core.ExchangeTypes;  
07.  
08.import com.rabbitmq.client.AMQP;  
09.import com.rabbitmq.client.Channel;  
10.import com.rabbitmq.client.Connection;  
11.import com.rabbitmq.client.ConnectionFactory;  
12.import com.rabbitmq.client.QueueingConsumer;  
13.  
14.public class Consumer {  
15.    private final static String EXCHANGE_NAME = "header-exchange";  
16.    private final static String QUEUE_NAME = "header-queue";  
17.      
18.    public static void main(String[] args) throws Exception {  
19.        // 创建连接和频道  
20.        ConnectionFactory factory = new ConnectionFactory();  
21.        factory.setHost("192.168.36.102");  
22.        // 指定用户 密码  
23.        factory.setUsername("admin");  
24.        factory.setPassword("admin");  
25.        // 指定端口  
26.        factory.setPort(AMQP.PROTOCOL.PORT);  
27.        Connection connection = factory.newConnection();  
28.        Channel channel = connection.createChannel();  
29.          
30.        //声明转发器和类型headers  
31.        channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.HEADERS,false,true,null);  
32.        channel.queueDeclare(QUEUE_NAME,false, false, true,null);  
33.          
34.        Map<String, Object> headers = new Hashtable<String, Object>();  
35.        headers.put("x-match", "any");//all any  
36.        headers.put("aaa", "01234");  
37.        headers.put("bbb", "56789");  
38.        // 为转发器指定队列,设置binding 绑定header键值对  
39.        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"", headers);  
40.        QueueingConsumer consumer = new QueueingConsumer(channel);  
41.        // 指定接收者,第二个参数为自动应答,无需手动应答  
42.        channel.basicConsume(QUEUE_NAME, true, consumer);  
43.        while (true) {  
44.            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
45.            String message = new String(delivery.getBody());  
46.            System.out.println(message);  
47.        }   
48.    }  
49.}  


转自:http://blog.csdn.net/zhu_tianwei/article/details/40923131
分享到:
评论

相关推荐

    RabbitMQ学习简单demo项目源码

    【RabbitMQ学习简单demo项目源码】是一个用于学习RabbitMQ基础概念和使用方法的实战项目。这个项目包含了RabbitMQ的各种模式示例,对于希望通过实践来掌握RabbitMQ的人来说是一个很好的起点。 RabbitMQ是基于AMQP...

    RabbitMQ资料RabbitMQ资料

    学习RabbitMQ,你需要了解AMQP协议的基础,熟悉RabbitMQ的安装和配置,掌握基本的生产者和消费者模型,理解各种交换机类型(如Direct、Fanout、Topic、Headers)和绑定机制,以及如何使用管理界面进行监控和管理。...

    RabbitMQ安装使用(直接交换direct exchange)

    **RabbitMQ安装使用——直接交换(direct exchange)** RabbitMQ是一款开源的...在实践中,还可以探索其他交换类型(如fanout, topic, headers)以及更高级的特性,如死信队列、持久化等,以适应不同应用场景的需求。

    RabbitMQDemo 自己学习的一些笔记

    2. **交换器(Exchange)**:根据特定的路由键和类型(如direct、fanout、topic、headers)来决定消息应放入哪个队列。在"Send"代码中,生产者会指定交换器类型和路由键,确保消息正确分发。 3. **队列(Queue)**...

    rabbitmq3.8.3.zip

    有多种类型的交换器,如direct、topic、fanout和headers。 4. **绑定**:绑定是将交换器和队列关联起来的规则,定义了消息如何从交换器流向队列。 5. **生产者**:生产者是发布消息到RabbitMQ的客户端应用。 6. *...

    rabbitmq-windows64位两件套

    RabbitMQ提供了多种类型的交换器(如Direct、Fanout、Topic、Headers),以及丰富的队列绑定策略,以满足不同场景的需求。 在Java环境中,可以通过使用RabbitMQ的Java客户端库(rabbitmq-java-client)来与RabbitMQ...

    RabbitMQ 学习整理

    - **Custom(自定义)**:用户可以根据需求定义自己的交换器类型。 3. **消息确认机制** - **publisher confirms**:生产者确认,确保消息被 RabbitMQ 正确接收。 - **consumer acknowledges**:消费者确认,...

    rabbitmq客户端

    2. 创建交换机与队列:客户端可以创建交换机和队列,定义其类型(如direct、topic、headers或fanout)以及相关属性。例如,direct类型的交换机遵循简单的路由规则,将消息直接发送到匹配路由键的队列。 3. 发布消息...

    20.消息中间件之RabbitMQ入门讲解

    消息中间件之RabbitMQ入门讲解”的主题中,我们将深入理解RabbitMQ的核心概念,如何通过控制台进行管理,以及如何在Spring Cloud框架下创建消息生产者和消费者。 首先,让我们了解RabbitMQ的基本概念。RabbitMQ的...

    rabbitmq-server-3.8.9.exe

    - **头部交换**:基于消息的headers属性进行路由。 - **扇出交换**:每个消息都会被发送到所有绑定的队列,无需路由键。 4. **RabbitMQ的管理**: - **命令行工具**:如`rabbitmqctl`用于管理节点、用户、权限、...

    rabbitmq_delayed_message_exchange-20171215-3.6.x.ez.zip

    延迟消息交换插件在RabbitMQ中的工作原理是创建一个特殊的交换机类型,即“x-delayed-message”。当你创建一个绑定并指定一个延迟时间时,消息会被放置在一个内部队列中,等待延迟时间过后才进入正常的消费流程。...

    rabbitmq-server-3.8.2.zip

    常见的交换器类型有direct、fanout、topic和headers。 - **路由键(Route Key)**:生产者发布消息时指定的字符串,用于匹配交换器的绑定规则。 - **绑定(Binding)**:绑定定义了交换器和队列之间的关系,即消息如何...

    RabbitMq测试

    3. **声明交换机**:生产者需要声明交换机类型,例如direct、fanout、topic或headers,以及相关的绑定键。这决定了消息的路由方式。 4. **发布消息**:生产者通过通道向交换机发送消息,每个消息都携带一个路由键,...

    rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

    延迟消息交换基于RabbitMQ的自定义交换机类型,它扩展了标准的AMQP协议。在RabbitMQ中,交换机负责将发布者发布的消息路由到合适的队列。默认情况下,消息是立即发送的,但有了延迟交换,我们可以指定消息在多少毫秒...

    RabbitMQ生产者C#源代码

    生产者通常需要先声明交换机的类型,如`Direct`、`Fanout`、`Topic`或`Headers`,并设置其相关属性。 4. **声明队列**:队列是消息的实际存储位置。生产者可能需要声明一个或多个队列,指定其名称、是否持久化等...

    用Jmeter测试RabbitMQ

    - **AutoDelete**: 当没有任何队列或其他交换机绑定到此交换机时,该交换机会被自动删除。 - **Prefetch Count**: 控制消费者同时获取的消息数量,有助于平滑消息处理过程中的负载,提高效率。 #### 四、通过...

    rabbitMQ实战

    ### RabbitMQ实战知识点详解 #### 一、RabbitMQ简介及背景 - **RabbitMQ开发语言**: Erlang,一种面向并发的编程...通过本文的学习,您可以掌握RabbitMQ的基本使用方法和核心概念,为后续深入学习打下坚实的基础。

    RabbitMq实例

    常见的交换器类型有direct、fanout、topic和headers。 4. **声明队列**:创建一个或多个队列,队列是存储消息的地方,每个队列都有唯一的名称。 5. **绑定**:将交换器和队列关联起来,指定消息应该如何从交换器路由...

    rabbitmq练习项目Java源代码.zip

    在实际应用中,我们可能需要处理更复杂的场景,如交换机类型(direct、fanout、topic、headers)、路由键、消息确认机制、批量消费等。此外,还可以利用RabbitMQ的高级特性,如工作队列、延迟消息、死信队列等,来...

    rabbitmq(java)入门教程

    RabbitMQ 提供多种消息模型,如 Direct、Fanout、Topic 和 Headers 等,可以根据实际需求选择合适的交换器类型来路由消息。在上述简单的示例中,使用了默认的 Direct 交换器,它将消息直接投递到指定的队列。 6. *...

Global site tag (gtag.js) - Google Analytics