`
骑猪逛街666
  • 浏览: 146287 次
  • 性别: Icon_minigender_2
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

RabbitMQ:四种ExChange用法

阅读更多

摘要: RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。ExChange和Queue之前是多对多的关系。RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。

RabbitMQ发送消息时,都是先把消息发送给ExChange(交换机),然后再分发给有相应RoutingKey(路由)关系的Queue(队列)。
ExChange和Queue之前是多对多的关系。
RabbitMQ 3.0之后创建ExChange时,有四种类型可选“fanout、direct、topic、headers”。
 
一、fanout
当向一个fanout发送一个消息时,RoutingKey的设置不起作用。
消息会被发送给同一个交换机下的所有队列,每个队列接收到的消息是一样的;
一个队列内有所有消费者(包含那些并没有相应RoutingKey的消费者),将平分队列接收到的消息
 
----------------消息生产者----------------
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
 
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
 
// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
String message = "hello world! ";
 
for(int i=0;i<100;i++)
{
channel.basicPublish(EXCHANGE_NAME, "", null, (message+i).getBytes());
}
 
System.out.println("Sent msg finish");
 
channel.close();
connection.close();

----------------消息消费者----------------
ConnectionFactory factory = new ConnectionFactory();
 
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
 
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
 
//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false, null);
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//绑定路由和队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routkey2", null);
 
System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,  AMQP.BasicProperties properties, byte[] body) {
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}
 
System.out.println("Received msg='" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
 
二、direct
当向一个direct发送一个消息时,消息会被发送给同一个交换机下的拥有相应RoutingKey的队列,每个队列接收到的消息是一样的;
一个队列内拥有相应RoutingKey的消费者,将平分队列接收到的消息。
 
----------------消息生产者----------------
ConnectionFactory factory = new ConnectionFactory();
 
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
 
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
 
// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
String message = "hello world! ";
 
for(int i=0;i<100;i++)
{
channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message+i).getBytes());
}
 
System.out.println("Sent msg is '" + message + "'");
 
channel.close();
connection.close();

----------------消息消费者----------------
ConnectionFactory factory = new ConnectionFactory();
 
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
 
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
 
//声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//绑定路由和队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);
 
System.out.println(" Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body)
{
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}
 
System.out.println("1 Received msg='" + message + "'");
}
};
 
channel.basicConsume(QUEUE_NAME, true, consumer);
 
三、topic
当向一个topic发送一个消息时消息会被发送给同一个交换机下的拥有相应RoutingKey的队列,每个队列接收到的消息是一样的;
一个队列内有所有消费者(包含那些并没有相应RoutingKey的消费者),将平分队列接收到的消息
 
----------------消息生产者----------------
ConnectionFactory factory = new ConnectionFactory();
 
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
 
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
 
// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
String message = "hello world! ";
 
// int i=101;
for (int i = 0; i < 100; i++)
{
channel.basicPublish(EXCHANGE_NAME, "routingkey1", null, (message + i).getBytes());
}
 
System.out.println("Sent msg is '" + message + "'");
 
channel.close();
connection.close();
 
----------------消息消费者----------------
ConnectionFactory factory = new ConnectionFactory();
 
factory.setHost(S_RabbitMQ.QUEUE_IP);// MQ主机
factory.setPort(S_RabbitMQ.QUEUE_PORT);// MQ端口
factory.setUsername(S_RabbitMQ.QUEUE_USER);// MQ用户名
factory.setPassword(S_RabbitMQ.QUEUE_PWD);// MQ密码
 
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
 
// 声明路由名字和类型
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true, false, null);
//声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//绑定路由和队列// 把队列绑定到路由上并指定headers
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routingkey1", null);
 
System.out.println("1 Waiting for msg....");
Consumer consumer = new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
{
String message = "";
try
{
message = new String(body, "UTF-8");
}
catch (UnsupportedEncodingException e)
{
e.printStackTrace();
}
catch (Throwable ex)
{
ex.printStackTrace();
}
 
System.out.println("1 Received msg='" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);

 

分享到:
评论

相关推荐

    rabbitmq三种exchange

    在Java中使用RabbitMQ时,通常需要理解并掌握三种主要的Exchange类型:Direct、Fanout和Topic。这三种类型的Exchange各自有不同的路由策略,适应不同的应用场景。 1. **Direct Exchange(直接交换机)** Direct ...

    RabbitMQ消息插件rabbitmq_delayed_message_exchange3.7

    **使用方法** 1. **声明交换机**:创建一个`x-delayed-message`类型的交换机,例如`delayed_exchange`。 ```bash rabbitmqadmin declare exchange name=delayed_exchange type=x-delayed-message ``` 2. **设置延迟...

    Laravel开发-rabbitmq-client

    使用 `RabbitMq::connection()-&gt;consume()` 方法开始消费队列: ```php RabbitMq::connection()-&gt;consume( 'rpc_queue', // 队列名 function ($message) { // 处理请求并生成响应 $response = handleRpcRequest...

    rabbitmq_delayed_message_exchange-20171215-3.6.x.ez.zip

    安装rabbitmq_delayed_message_exchange插件的方法是解压下载的ez文件,然后在RabbitMQ服务器上执行相应的命令启用插件。EZ(Erlang Zip)文件是一种打包格式,用于包含Erlang应用及其依赖。解压后,通过RabbitMQ的...

    C#的Demo项目:RabbitMQ封装和使用

    在标题和描述中提到的C# Demo项目,主要展示了RabbitMQ的基础用法,特别是如何封装RabbitMQ.Client库以简化操作,并演示了三种不同的Exchange(交换机)的使用。交换机是RabbitMQ中的核心组件,它决定了消息如何路由...

    rabbitmq:rabbitmq演示

    3. **交换机类型**:RabbitMQ支持多种Exchange类型,如Direct、Fanout、Topic和Header,每种类型有特定的消息路由逻辑。 4. **工作模式**:RabbitMQ支持两种主要的工作模式——同步(RPC)和异步。同步模式常用于...

    rabbitmq-delayed-message-exchange-3.8.0.tar.gz

    RabbitMQ是一个开源的消息代理和队列服务器,广泛用于分布式系统中的消息...通过深入研究"rabbitmq-delayed-message-exchange-3.8.0"的源代码和文档,我们可以更好地掌握其工作原理和高级用法,以满足特定项目的需求。

    nodejs-rabbitmq:简单的RabbitMQ发布者消费者应用程序,通过快速路由发送消息

    在Node.js中,我们创建一个通道,声明一个交换机(exchange),并使用`publish()`方法将消息发布到交换机。交换机根据预设的规则(例如直接、主题、扇出或头部)将消息路由到队列。 4. **消费者(Consumer)**:...

    java使用rabbitMq服务

    四种消息发送模式 - **点对点(Point-to-Point)**:使用Direct Exchange,消息仅被一个消费者消费。 - **发布/订阅(Publish/Subscribe)**:使用Fanout Exchange,消息被所有订阅的消费者消费。 ### 4. 高级特性...

    rabbitmq开发规范

    使用`RabbitTemplate`的`convertAndSend`方法,指定`MessageProperties.PERSISTENT_TEXT_PLAIN`作为消息属性。 遵循以上规范,开发者可以构建出稳定、可维护的RabbitMQ应用,确保数据在分布式环境中的可靠传输。...

    SpringCloudStream整合RabbitMq

    在消费者端,使用`@Input`注解来定义接收消息的方法: ```java @Autowired private MessageHandler inputOrder; @StreamListener("input-order") public void handleOrder(Order order) { // 处理订单逻辑 } ``` ...

    rabbitMQ课件.zip

    4. 启动RabbitMQ:使用rabbitmq-server命令启动服务。 5. 配置管理插件:启用rabbitmq_management插件,提供Web界面进行管理和监控。 **RabbitMQ集群搭建** 构建RabbitMQ集群可以增加系统的容错性和可扩展性: 1....

    sb-rabbitmq:Sprint Boot子Rabbitmq

    总之,"sb-rabbitmq"是一个针对Java开发者的Spring Boot子项目,它演示了如何使用Spring AMQP与RabbitMQ进行集成,为构建基于消息驱动的分布式系统提供了一个起点。通过深入研究项目源码,你可以了解到如何配置...

    rabbitmq-c-master.zip

    3. 队列与交换器:使用`rabbitmq_queue_declare()`声明队列,`rabbitmq_exchange_declare()`声明交换器,以及`rabbitmq_queue_bind()`绑定它们。 4. 消息确认:RabbitMQ-C支持手动确认模式,通过`rabbitmq.basic_ack...

    springboot中rabbitmq使用demo

    本篇将详细介绍如何在Spring Boot项目中集成并使用RabbitMQ。 首先,我们需要在Spring Boot项目中添加RabbitMQ的相关依赖。在`pom.xml`文件中,加入以下Maven依赖: ```xml &lt;groupId&gt;org.springframework.boot ...

    emailworker_rabbitmq:如果服务器收到来自Rabbitmq的消息,则发送电子邮件

    总结起来,`emailworker_rabbitmq`项目展示了一个使用RabbitMQ进行消息传递,并结合Mailgun服务发送电子邮件的完整流程。这个项目对于理解消息队列在实时通信和任务调度中的应用具有很好的学习价值。

    C++/QT 使用RabbitMQ

    RabbitMQ是一种基于AMQP(Advanced Message Queuing Protocol)的消息中间件,它允许不同系统之间通过消息传递进行异步通信,是分布式系统中的重要组件。 本文将详细介绍如何在C++和QT环境下集成并使用RabbitMQ,以...

    rabbitMQ代码案例 简单入门

    2. **基本API使用**:学习Java、Python、JavaScript等语言的RabbitMQ客户端库,掌握创建连接、创建通道、声明交换器、声明队列、发送和接收消息的基本方法。 3. **消息发布/订阅模式**:理解生产者如何发布消息到...

    Delphi-RabbitMQ.zip

    在本文中,我们将深入探讨如何使用Delphi与RabbitMQ进行集成,以便在你的应用程序中实现消息队列功能。RabbitMQ是一个广泛使用的开源消息代理和队列服务器,它允许不同系统之间通过标准的消息协议进行通信。Delphi,...

    learning_messaging_rabbitmq:储存在计算机上的消息源于原始的mensageria com RabbitMQ e .NET

    标题中的"learning_messaging_rabbitmq"显然与学习使用RabbitMQ进行消息传递有关,而RabbitMQ是一个开源的消息代理和队列服务器,常用于处理分布式系统中的异步任务和解耦通信。在这个项目中,重点是将消息系统集成...

Global site tag (gtag.js) - Google Analytics