`
yuwenlin2008
  • 浏览: 127342 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

RabbitMQ安装使用(直接交换direct exchange)

阅读更多

1.简介

         RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP高级消息队列,说白了就是一个开源的消息中间件。它能解决不同组件、模块、系统间消息通信。

 

2.系统架构

RabbitMQ Server: 也叫broker server,它不是运送食物的卡车,而是一种传输服务。原话是RabbitMQisn’t a food truck, it’s a delivery service. 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。

Producer:数据的发送方,create messages and publish (send) them to a broker server (RabbitMQ)。

Consumer:数据的接收方,Consumersattach to a broker server (RabbitMQ) and subscribe to a queue。

Connection: 就是一个TCP的连接。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。

 

Channels: 虚拟连接。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

那么,为什么使用Channel,而不是直接使用TCP连接?

 

 

    对于OS来说,建立和关闭TCP连接是有代价的,频繁的建立关闭TCP连接对于系统的性能有很大的影响,而且TCP的连接数也有限制,这也限制了系统处理高并发的能力。但是,在TCP连接中建立Channel是没有上述代价的。对于Producer或者Consumer来说,可以并发的使用多个Channel进行Publish或者Receive。

 

对于一个数据从Producer到Consumer的正确传递,还有三个概念需要明确:exchanges, queues and bindings。

 

        Exchanges are where producers publish their messages.

 

        Queues are where the messages end up and are received by consumers

 

 

        Bindings are how the messages get routed from the exchange to particular queues.

 

Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。

     有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。

Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。

Fanout exchange: 会向响应的queue广播。

Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。

 

Consumer和Procuder都可以通过 queue.declare 创建queue。如果queue已经存在,也不会报错。如果没有,要么发送不了消息,要么取不到消息,所以还是都创建吧。

 

Bindings就是将通过Exchange将queue和routing keys绑定。

 

3.应用开发测试

我们使用直接交换(direct exchange)模式,这种方式有效实现点对点发送。比如发送方:系统分别给每个组织机构或用户发送信息,接收方:每个组织机构或用户各自接收自己的消息。

RabbitMQ服务端搭建(windows环境)参考附件:

a.它由erlang开发,要安装erlang依赖otp_win32_R16B02.exe

b.rabbitmq服务端rabbitmq-server-3.2.0.exe,默认端口5672,要改的话,代码里得显示指定。

c.RabbitMQ客户端对种语言支持良好,这里我用Java,下载java开发包rabbitmq-client.jar,commons-cli-1.1.jar,commons-io-1.2.jar

producer代码:

 

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Date;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
/**
 * 测试RabbitMQ发送
 */
public class SendTest {
	
	private static String HOST = "172.16.6.180";
	private static String EXCHANGE_NAME = "temp";
	private static String[] BINDINGS_QUEUE_NAMES = { "user001","user002","user003","user004","user005","user006","user007","user008","user009","user010"};

	public static void main(String[] args) throws Exception {
		BufferedReader br = null;
		String message = null;
		String flag = "";
		// 建立连接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(HOST);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		System.out.println("连接成功");
		System.out.println("声明持久化的direct交换机....");
		System.out.println("声明持久化队列并绑定...");
		// 声明此交换器为全广播并且持久化
		channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
		for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {
			channel.queueDeclare(BINDINGS_QUEUE_NAMES[i], true, false, false, null);
			channel.basicQos(1);
			channel.queueBind(BINDINGS_QUEUE_NAMES[i], EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i]);
			System.out.println("队列" + BINDINGS_QUEUE_NAMES[i] + "绑定成功!");
		}
		
		while (true) {
			System.out.println("请选择发送方式:1:全部发送;2:指定发送;");
			br = new BufferedReader(new InputStreamReader(System.in));
			flag = br.readLine();
			if ("1".equals(flag)) {
				System.out.println("发送内容为:这是一条测试数据");
				for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {
					// 发送消息
					byte[] buffer = ("这是一条测试数据"+BINDINGS_QUEUE_NAMES[i]+new Date()).getBytes("utf-8");
					channel.basicPublish(EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i], MessageProperties.PERSISTENT_TEXT_PLAIN, buffer);
					String ss = new String(buffer, "utf-8");
					System.out.println(ss);
				}
				System.out.println("发送完毕!");
			} else if("2".equals(flag)) {
				System.out.println("请选择您需要发送的队列序号,以','隔开:");
				for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {
					System.out.println((i+1) + ":" + BINDINGS_QUEUE_NAMES[i]);
				}
				br = new BufferedReader(new InputStreamReader(System.in));
				String[] indexs = br.readLine().split(",");
				System.out.println("请输入您要发送的消息:");
				br = new BufferedReader(new InputStreamReader(System.in));
				message = br.readLine();
				for (int i = 0; i < indexs.length; i++) {
					// 发送消息
					channel.basicPublish(EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[Integer.valueOf(indexs[i])-1], MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
				}
				System.out.println("发送完毕!");
			}
		}
		
	}
	
}

发送方控制台:

 

连接成功
声明持久化的direct交换机....
声明持久化队列并绑定...
队列user001绑定成功!
队列user002绑定成功!
队列user003绑定成功!
队列user004绑定成功!
队列user005绑定成功!
队列user006绑定成功!
队列user007绑定成功!
队列user008绑定成功!
队列user009绑定成功!
队列user010绑定成功!
请选择发送方式:1:全部发送;2:指定发送。
1
发送内容为:这是一条测试数据
这是一条测试数据user001Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user002Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user003Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user004Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user005Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user006Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user007Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user008Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user009Wed Apr 22 10:37:00 CST 2015
这是一条测试数据user010Wed Apr 22 10:37:00 CST 2015
发送完毕!
请选择发送方式:1:全部发送;2:指定发送。

 RabbitMQ可视化查看:http://localhost:15672/#/queues


 

Receive代码:

 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
 * 测试RabbitMQ接收
 */
public class ReceTest {
	
	private static String HOST = "172.16.6.180";
	private static String EXCHANGE_NAME = "temp";
	private static String[] BINDINGS_QUEUE_NAMES = { "user001","user002","user003","user004","user005","user006","user007","user008","user009","user010"};


	public static void main(String[] args) throws Exception {
		// 建立连接
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(HOST);
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();
		System.out.println("连接成功");
		System.out.println("声明持久化的direct交换机....");
		System.out.println("声明持久化队列并绑定...");
		// 声明交换器,与服务保持一致
		channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
		for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {
			channel.queueDeclare(BINDINGS_QUEUE_NAMES[i], true, false, false, null);
			channel.basicQos(1);
			channel.queueBind(BINDINGS_QUEUE_NAMES[i], EXCHANGE_NAME, BINDINGS_QUEUE_NAMES[i]);
			System.out.println("队列" + BINDINGS_QUEUE_NAMES[i] + "绑定成功!");
		}
		System.out.println("开始接收数据...");
		for (int i = 0; i < BINDINGS_QUEUE_NAMES.length; i++) {
			final String queue = BINDINGS_QUEUE_NAMES[i];
			new Thread(){
				public void run() {
					try {
						receive(channel, queue);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}.start();
		}
	}
	
	private static void receive(Channel channel,String QUEUE_NAME) throws Exception {
		// 声明消费者
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(QUEUE_NAME, false, consumer);
		while (true) {
			// 等待队列推送消息
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody(), "UTF-8");
			System.out.println(QUEUE_NAME + " Received '" + message + "'");
			// 反馈给服务器表示收到信息
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
		}
	}
}

 

 接收方控制台:

 

连接成功
声明持久化的direct交换机....
声明持久化队列并绑定...
队列user001绑定成功!
队列user002绑定成功!
队列user003绑定成功!
队列user004绑定成功!
队列user005绑定成功!
队列user006绑定成功!
队列user007绑定成功!
队列user008绑定成功!
队列user009绑定成功!
队列user010绑定成功!
开始接收数据...
user010 Received '这是一条测试数据user010Wed Apr 22 10:37:00 CST 2015'
user001 Received '这是一条测试数据user001Wed Apr 22 10:37:00 CST 2015'
user007 Received '这是一条测试数据user007Wed Apr 22 10:37:00 CST 2015'
user006 Received '这是一条测试数据user006Wed Apr 22 10:37:00 CST 2015'
user004 Received '这是一条测试数据user004Wed Apr 22 10:37:00 CST 2015'
user005 Received '这是一条测试数据user005Wed Apr 22 10:37:00 CST 2015'
user002 Received '这是一条测试数据user002Wed Apr 22 10:37:00 CST 2015'
user008 Received '这是一条测试数据user008Wed Apr 22 10:37:00 CST 2015'
user009 Received '这是一条测试数据user009Wed Apr 22 10:37:00 CST 2015'
user003 Received '这是一条测试数据user003Wed Apr 22 10:37:00 CST 2015'

 再看 http://localhost:15672/#/queues



 关于RabbitMQ服务端搭建,及可视化页面配置,请参考附件RabbitMQ安装与配置。

参考网站:

http://blog.csdn.net/anzhsoft/article/details/19563091

http://my.oschina.net/OpenSourceBO/blog/379732

  • 大小: 28.7 KB
  • 大小: 31.3 KB
1
0
分享到:
评论
1 楼 freezingsky 2015-04-22  
难得看开一两篇讲得不错的文章!

相关推荐

    springboot+RabbitMq交换器Direct的demo

    标题中的“springboot+RabbitMq交换器Direct的demo”指的是使用Spring Boot框架集成RabbitMQ,并通过Direct交换器实现消息的发送和接收。这是一个实际操作的示例,用于展示如何在Spring Boot应用中配置和使用...

    rabbitmq-delayed-message-exchange-3.8.0.tar.gz

    通过下载并解压"rabbitmq-delayed-message-exchange-3.8.0.tar.gz",我们可以得到插件的源代码、文档和其他相关资源,以便在本地环境中安装和使用。 在RabbitMQ中,交换机(Exchange)负责将消息路由到适当的队列。...

    RabbitMQ安装使用教程

    2. 安装Erlang:按照下载的安装包提示进行安装,确保安装过程中选择添加环境变量,这样可以在命令行中直接使用erl命令。 3. 验证安装:安装完成后,在命令行输入`erl`,如果出现Erlang的版本信息,表示安装成功。 *...

    rabbitmq-delayed-message-exchange-3.9.0.tar.gz

    1. **安装插件**:首先,你需要在RabbitMQ服务器上安装`rabbitmq_delayed_message_exchange`插件。这通常通过RabbitMQ的管理控制台或者命令行工具完成。 2. **创建Exchange**:创建一个类型为`x-delayed-message`的...

    rabbitmq安装

    RabbitMQ提供了多种类型的交换器,如Direct、Fanout、Topic和Header,每种交换器有其特定的路由策略。例如,Direct交换器根据路由键进行精确匹配,Fanout则将所有消息广播到所有绑定的队列,而Topic交换器支持模式...

    基于rabbitmq的topic 交换

    Topic交换模式是RabbitMQ中一种灵活的消息路由策略,它结合了Direct Exchange的精确匹配和Fanout Exchange的广播特性。在Direct Exchange中,消息只能被精确匹配的绑定规则接收,而在Fanout Exchange中,消息会被...

    rabbitmq安装包.zip

    在使用RabbitMQ的过程中,还需要了解一些关键概念,如Exchange(交换器)、Queue(队列)、Binding(绑定)和Message(消息)。Exchange负责根据预设的路由规则将消息分发到不同的队列,Queue是消息的实际存储位置,...

    RabbitMQ安装配置1

    **RabbitMQ安装配置详解** RabbitMQ是一个广泛使用的开源消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议实现。在本文中,我们将详细讲解如何在Linux环境中安装和配置RabbitMQ。 首先...

    spring集成rabbitMq(基于direct、topic和fanout模式)

    在本文中,我们将深入探讨如何将Spring框架与RabbitMQ集成,主要关注三种交换器类型:direct、topic和fanout。这些模式是RabbitMQ消息路由的基础,它们为不同的消息分发需求提供了灵活性。 首先,让我们理解...

    rabbitmq本地安装套件

    在实际使用中,RabbitMQ支持多种消息模式,如直接交换(Direct Exchange)、主题交换(Topic Exchange)、风扇型交换(Fanout Exchange)和路由键交换(Header Exchange)。选择合适的交换机类型可以帮助设计出灵活...

    SpringBoot整合RabbitMQ基础学习Exchange源码

    源码解析部分,SpringBoot使用`RabbitTemplate`类与RabbitMQ进行交互,发送消息时会涉及Exchange的创建和绑定。`RabbitAdmin`类负责管理Exchange、Queue和Binding的声明。`SimpleMessageConverter`用于消息的序列化...

    rabbitmq-server3.10.5

    - **Exchange**: 交换器是RabbitMQ的核心组件,它根据预定义的路由规则将消息分发到不同的队列。 - **Queue**: 队列是存储消息的地方,消费者从队列中获取消息,队列是FIFO(先进先出)的。 - **Binding**: 绑定...

    Spring Boot RabbitMQ 延迟消息实现完整版

    下载完成后,将插件放置到RabbitMQ安装目录下的plugins目录下,并使用命令`rabbitmq-plugins enable rabbitmq_delayed_message_exchange`启用该插件。启用后,请重启RabbitMQ服务使其生效。 #### 集成RabbitMQ到...

    springboot+RabbitMQ三种模式demo

    在本文中,我们将深入探讨如何在SpringBoot应用中使用RabbitMQ实现Direct、Topic和Fanout这三种消息队列模式。RabbitMQ是一款强大的开源消息代理和队列服务器,广泛应用于分布式系统中的异步处理和解耦。SpringBoot...

    Java使用RabbitMq的一个简单demo

    这里我们使用Direct Exchange,是最基础的交换机类型,将消息直接路由到指定的队列: ```java String exchangeName = "simple_exchange"; String queueName = "simple_queue"; channel.exchangeDeclare...

    rabbitMQ实战java版-rabbitMQ-demo.zip

    1. **Direct Exchange**: 直接交换,按照路由键精确匹配,将消息投递给指定的队列。 2. **Fanout Exchange**: 广播交换,不关心路由键,将所有消息投递给绑定的所有队列。 3. **Topic Exchange**: 主题交换,根据...

    RabbitmqDemo

    return new DirectExchange("directExchange"); } @Bean public Binding binding(DirectExchange exchange, Queue queue) { return BindingBuilder.bind(queue).to(exchange).with("routingKey"); } ``` **发送...

    RabbitMQDemo.rar

    在C#中使用RabbitMQ,首先需要安装`RabbitMQ.Client`或`Wenli.Data.RabbitMQ`库。以下是一些基本操作示例: 1. **创建连接和通道** 创建一个到RabbitMQ服务器的连接,并打开一个信道,这是进行所有RabbitMQ操作的...

    rabbitmq点对点发送消息Demo

    在本“rabbitmq点对点发送消息Demo”中,我们将深入探讨如何使用RabbitMQ实现点对点通信模式,即“direct”模式。 首先,我们要理解RabbitMQ中的交换器(Exchange)、队列(Queue)和绑定(Binding)三个核心概念。...

Global site tag (gtag.js) - Google Analytics