在《rabbitmq学习3:Publish/Subscribe 》中已经学习了发送一个消息,所有消费者端都能收到。那现在这节准备介绍通过路由规则来接受生产者端所发送的消费。Routing的工作示意图如下:
对于Routing的示意图与Publish/Subscribe中的示意图区别:
第一:Publish/Subscribe的Exchange的类型为“fanout”,而Routing的类型为“direct”
第二:Publish/Subscribe的路由为默认的,而Routing的路由是自定义的。
可能从上图的示意图如可以发现可以把Routing的模式也可以转化Publish/Subscribe的模式,如示意图
我们也可能把所有的数据发送到一个Queue中去,示意图如下:
下面我们就开始程序吧。
P端的程序如下:
- package com.abin.rabbitmq;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class EmitLogDirect {
- private static final String EXCHANGE_NAME = "direct_logs";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");//rounting模式
- String routingKeyOne = "error";//定义一个路由名为“error”
- for (int i = 0; i <= 1; i++) {
- String messageOne = "this is a error logs:" + i;
- channel.basicPublish(EXCHANGE_NAME, routingKeyOne, null, messageOne
- .getBytes());
- System.out.println(" [x] Sent '" + routingKeyOne + "':'" + messageOne
- + "'");
- }
- System.out.println("################################");
- String routingKeyTwo = "info";
- for (int i = 0; i <= 2; i++) {
- String messageTwo = "this is a info logs:" + i;
- channel.basicPublish(EXCHANGE_NAME, routingKeyTwo, null, messageTwo
- .getBytes());
- System.out.println(" [x] Sent '" + routingKeyTwo + "':'" + messageTwo
- + "'");
- }
- System.out.println("################################");
- String routingKeyThree = "all";
- for (int i = 0; i <= 3; i++) {
- String messageThree = "this is a all logs:" + i;
- channel.basicPublish(EXCHANGE_NAME, routingKeyThree, null,
- messageThree.getBytes());
- System.out.println(" [x] Sent '" + routingKeyThree + "':'"
- + messageThree + "'");
- }
- channel.close();
- connection.close();
- }
- }
运行结果可能如下:
- [x] Sent 'error':'this is a error logs:0'
- [x] Sent 'error':'this is a error logs:1'
- ################################
- [x] Sent 'info':'this is a info logs:0'
- [x] Sent 'info':'this is a info logs:1'
- [x] Sent 'info':'this is a info logs:2'
- ################################
- [x] Sent 'all':'this is a all logs:0'
- [x] Sent 'all':'this is a all logs:1'
- [x] Sent 'all':'this is a all logs:2'
- [x] Sent 'all':'this is a all logs:3'
C端的代码如下:
- package com.abin.rabbitmq;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class ReceiveLogsDirect {
- private static final String EXCHANGE_NAME = "direct_logs";//定义Exchange名称
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");//声明Exchange
- String queueName = "queue_logs1";//定义队列名为“queue_logs1”的Queue
- channel.queueDeclare(queueName, false, false, false, null);
- String routingKeyOne = "error";//"error"路由规则
- channel.queueBind(queueName, EXCHANGE_NAME, routingKeyOne);//把Queue、Exchange及路由绑定
- String routingKeyTwo = "info";
- channel.queueBind(queueName, EXCHANGE_NAME, routingKeyTwo);
- System.out.println(" [*] Waiting for messages.");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- String routingKey = delivery.getEnvelope().getRoutingKey();
- System.out.println(" [x] Received '" + routingKey + "':'" + message
- + "'");
- }
- }
- }
这里我做了二个消费端程序来模仿通过路由规则来分配信息给各个消费端。第二个消费者端的程序只是修改了一小部分代码;只接受路由为”error“和”all“规则的消费。
运行程序1的结果如下:
- [*] Waiting for messages.
- [x] Received 'error':'this is a error logs:0'
- [x] Received 'error':'this is a error logs:1'
- [x] Received 'info':'this is a info logs:0'
- [x] Received 'info':'this is a info logs:1'
- [x] Received 'info':'this is a info logs:2'
运行程序2的运行结果如下:
- [*] Waiting for messages.
- [x] Received 'error':'this is a error logs:0'
- [x] Received 'error':'this is a error logs:1'
- [x] Received 'all':'this is a all logs:0'
- [x] Received 'all':'this is a all logs:1'
- [x] Received 'all':'this is a all logs:2'
- [x] Received 'all':'this is a all logs:3'
相关推荐
**RabbitMQ实战:高效部署分布式消息队列** ...通过本教程的学习,读者将能够熟练掌握RabbitMQ的核心概念、部署策略以及在实际项目中的应用,从而实现高效的消息队列管理,提升系统性能和稳定性。
**RabbitMQ学习实践项目** 在信息技术领域,消息队列(Message Queue)作为一种重要的中间件,广泛用于异步处理、解耦系统组件以及提高系统可扩展性。RabbitMQ是其中一个非常流行的消息队列实现,它基于开源的AMQP...
rabbitmq-java-routing rabbitmq路由示例 rabbitmq-spring-topic-exchange rabbitmq主题示例 rabbitmq-java-rpc rabbitmq PRC通信示例 rabbitmq-spring-helloworld spring boot 使用rabbitmq的第一个demo rabbitmq-...
3. **工作模式**:书中会涵盖RabbitMQ的各种工作模式,如Direct、Fanout、Topic、Header和Routing,这些模式对应不同的消息路由策略,满足不同场景的需求。 4. **AMQP协议**:RabbitMQ基于AMQP(Advanced Message ...
在本文中,我们将深入探讨如何在 Laravel 开发环境中使用 `rabbitmq-client`,这是一个基于 AMQP 协议的 RabbitMQ 客户端,为 Laravel 和 Lumen 提供了异步处理和 Direct RPC(远程过程调用)接口。RabbitMQ 是一个...
对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。 RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给...
4. **配置RabbitMQ**: 可以通过修改`rabbitmq.config`配置文件或者使用`rabbitmqctl`命令行工具进行基本配置,例如设置默认用户和权限。 ** socat工具** `socat`是一款轻量级的网络工具,能够建立两套网络之间数据...
关于rabbitmq的原生api和spring amqp的api的动态绑定exchange,routingKey,queue 说明文地址: http://blog.csdn.net/phantomes/article/details/47284829
routing-key (字符串) :交换的路由密钥 queue (字符串) :一个队列名称,您可以直接使用它而不是交换 virtual-host (字符串) :默认情况下/ basic-properties (布尔值) :用于映射基本AMQP属性 custom-...
amqp.routing_key例如my_routing_key amqp.exchange例如my_exchange amqp.queue例如my_queue 要发布的消息:输入字段消息 receive.java:hava类接收X条消息 receive.ktr:一个样本 发布要求: 要设置的配置变量...
SpringBoot整合RabbitMQ的学习是一个重要的主题,尤其对于那些希望在Java环境中构建高效、可扩展的消息传递系统的开发者来说。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing ...
4. **绑定Exchange和Queue**:通过Binding将Exchange和Queue关联起来,指定一个Routing Key。这样,所有带有该Routing Key的消息都会被发送到这个Queue。 ```python binding_keys = ['info', 'warning', 'error'] ...
在本篇学习笔记中,我们将首先了解RabbitMQ的安装过程。 1. **Erlang的安装**: Erlang是RabbitMQ的基础,因为RabbitMQ是用Erlang编写的。安装Erlang可以通过以下命令完成: ```bash sudo apt-get install ...
8. **RabbitMQ管理**: 了解基本的RabbitMQ管理概念,如交换机(Exchanges)、队列(Queues)、绑定(Bindings)以及消息路由键(Routing Key)。这些是实现不同消息路由策略的基础。 9. **安全配置**: 为了生产环境...
"RabbitMQ练习(Routing).pcapng" 的文件,它包含了 RabbitMQ 服务器在进行路由操作时的网络通信数据。通过使用网络分析工具打开这个文件,你可以查看 RabbitMQ 客户端和服务器之间的交互,包括消息的发送和接收、...
在实际应用中,RabbitMQ能够支持多种消息模式和交换类型,包括工作队列(Work queues)、发布/订阅(Publish/Subscribe)、路由(Routing)、Topics类型交换机和远程过程调用(RPC)等,这些模式在RabbitMQ中通过...
echo '[x] ', $msg->delivery_info['routing_key'], ': ', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while ($channel->is_consuming()) { $...
4. **Direct Exchange**:最简单的模式,消息会路由到与Routing Key完全匹配的队列。例如,`bindingKey = "key"`的消息只会被路由到`queueName = "key"`的队列。 5. **Fanout Exchange**:广播模式,不论Routing ...
rabbitmqadmin declare binding source=delayed_exchange destination=my_queue routing_key=my_routing_key ``` 4. **消费消息**:消费者可以从`my_queue`队列中正常接收延迟后到达的消息。 **应用场景** 1. **...
**RabbitMQ开发规范详解** 在使用RabbitMQ进行分布式消息传输时,遵循一定的开发规范至关重要,这不仅可以提高系统的可维护性,也有助于保证数据的一致性和稳定性。本篇文章将详细阐述RabbitMQ的命名规范、消息传输...