RabbitMQ
-------------------------
1
下载
erlang
http://www.erlang.org/download/otp_win32_17.3.exe
http://www.erlang.org/download/otp_src_17.3.tar.gz
http://www.erlang.org/download/otp_doc_html_17.3.tar.gz
http://www.erlang.org/download/otp_doc_man_17.3.tar.gz
rabbitmq server
http://www.rabbitmq.com/releases/rabbitmq-server/v3.4.2/rabbitmq-server-3.4.2.exe
rabbitmq java client
http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.4.2/rabbitmq-java-client-bin-3.4.2.zip
http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.4.2/rabbitmq-java-client-3.4.2.zip
2
配置
3
基本概念
http://www.rabbitmq.com/tutorials/tutorial-one-java.html
RabbitMQ 是一个消息代理,它接收生产者的消息,然后转发给消费者.同时,它路由,缓存,持久这个消息根据设定的规则.
producer 生产者,只负责发送消息.消息发送到指定队列中.
queue 队列或邮箱,它存储消息,它的大小没有限制,只要磁盘空间足够.消息只能缓存在队列中,队列属于消息代理的一部分.消息代理还包括 exchange 等.
consumer 消费者,它从队列接收消息.
AMQP:高级消息队列协议.
P(producer) -- msg --> |Queue|Queue|Queue| -- msg --> C(consumer)
PS:
生产者,消息代理,消费者通常不在同一机器上.
4 Hello World
----------------
我们这里使用 Java 客户端.
STEP 1:
P(producer) -- msg --> |HelloQueue|
public class P {
private final static String QUEUE_NAME = "HelloQueue";
public static void main(String[] args)
throws Exception {
// 建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 创建消息
String message = "Hello World!";
// 发布消息到 HelloQueue
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
// 关闭连接
channel.close();
connection.close();
}
}
STEP 2
|HelloQueue| -- msg --> C(consumer)
public class C {
private final static String QUEUE_NAME = "HelloQueue";
public static void main(String[] args)
throws Exception {
// 建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 创建消费者,处理消息
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
// block method ? yes.
// 阻塞直到接收到下一条消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received : " + message);
}
}
5
Work Queue
P -- MSG -- |Q| -- MSG -- Worker1|Worker2
Work1 , Work2 轮流获取消息处理,
为避免Worker1消息单节点处理失败, 修改ACK返回时机为消息处理成功后.ACK返回,消息代理才会删除已处理的消息,否则是Worker1一接收到就删除.
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//...
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
为保证消息代理节点失败,导致消息丢失, 设置消息持久化参数:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
...
channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
6
发布/订阅模式
一个消息多个接收者接收.
EXCHANGE: 消息交换,用于路由/过滤消息.
P -- MSG -- |X| -- |Q1| -- MSG -- C1
-- |Q2| -- MSG -- C2
// 交换策略扇出(fanout):消息交换到所有队列
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
...
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 临时队列.随机命名的队列
String queueName = channel.queueDeclare().getQueue();
// 绑定交换与队列
channel.queueBind(queueName, EXCHANGE_NAME, "");
7
路由
绑定: 一个队列对交换的消息感兴趣.
绑定KEY: 绑定时设置
路由KEY: 发布时设置 routingKey, fanout 会简单忽略这个值.
交换策略:
direct exchange: 直接交换,根据 routingKey 来选择发布到哪些队列.比fanout更加灵活.
fanout exchange: 扇出交换(全部队列都会接收这个消息)
topic exchange: routingKey用 "." 号分开.根据 routingKey 来选择发布到哪些匹配的队列, 支持模糊匹配.
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
...
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();
for(String bindingKey : bindingKeys){
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
...
}
8
RPC 远程过程调用
通常发送方要等待接收方的处理结果.
发送方阻塞直到方法返回结, 每个客户端一个响应队列.
为了识别哪个响应属于哪个请求,要加个请求ID.
result = Client.call(request); // block until result has returned
Client -- id:RequestId , replyTo:ReplyQueue -- HelloQueue -- Server
| |
-- id:RequestId , replyTo:ReplyQueue -- ReplyQueue ------
RPCServer
private static final String RPC_QUEUE_NAME = "rpc_queue";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();
String message = new String(delivery.getBody());
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
RPCClient
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}
public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();
BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes());
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}
return response;
}
public void close() throws Exception {
connection.close();
}
分享到:
相关推荐
RabbitMQ 安装与配置(分布式配置) RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,由 Erlang 语言开发。它支持多种语言的客户端,包括 Java、Python、Ruby 等,可以满足高并发、可扩展性的业务需求。 安装...
消息队列:RabbitMQ:RabbitMQ安装与配置.docx
### RabbitMQ 安装与配置知识点详解 #### 一、消息队列(MQ)概念解析 消息队列(Message Queue, MQ)是一种应用间通信的方法,它允许应用通过发送消息来进行异步通信,而不必直接相互调用。消息队列中的消息可以被一...
### Ubuntu系统下RabbitMQ安装与配置详细步骤 #### 一、引言 RabbitMQ是一种基于AMQP标准的消息中间件,广泛应用于分布式系统中,用于处理应用间的异步消息传递。本文档将详细介绍如何在Ubuntu系统下安装及配置...
NULL 博文链接:https://zhb1208.iteye.com/blog/1320219
二、RabbitMQ配置和部署 1. 安装RabbitMQ 执行以下命令来安装RabbitMQ: ``` rpm -ivh rabbitmq-server-3.5.7-1.noarch.rpm ``` 如果安装成功,将显示安装完成的界面。 2. 启动服务 执行以下命令来启动RabbitMQ...
### Windows下RabbitMQ安装与配置详解 #### 一、前言 RabbitMQ是一款开源的消息中间件,基于AMQP(Advanced Message Queuing Protocol)协议实现。它可以在分布式系统之间提供可靠的、高效的异步通信机制。本文将...
在RabbitMQ的运行过程中,配置文件起着至关重要的作用,它们定义了服务器的行为、策略以及与其他服务的交互方式。本文将详细介绍RabbitMQ的默认配置文件模板`rabbitmq.config.example`和`advanced.config.example`。...
### RabbitMQ 安装与配置详解 #### 一、前言 RabbitMQ 是一个开源的消息代理和队列服务器,实现高级消息队列协议 (AMQP) 0-9-1 规范。它能够存储转发消息,在分布式系统中发送消息给其他应用程序。本文将详细介绍 ...
在本配置中,我们首先需要安装 Erlang,因为 RabbitMQ 依赖于它来运行。 1. **Erlang 安装**: - 下载地址:http://www.erlang.org/downloads - 使用 `yum install ncurses-dev` 安装必要的依赖。 - 解压下载的 ...
rabbitmq 3.9.3 配置文件
### RabbitMQ 安装与配置知识点 #### 一、RabbitMQ简介 RabbitMQ是一款在AMQP(Advanced Message Queuing Protocol)标准基础上完成的、可靠的消息中间件软件。它支持多种主流操作系统,并且提供了丰富的客户端接口...
- 安装过程中,系统会自动配置RabbitMQ服务,并启动服务。 3. **验证安装**: - 打开命令行窗口,输入`rabbitmqctl status`,如果显示RabbitMQ服务器的状态信息,说明安装成功。 - 通过浏览器访问`...
4. **RabbitMQ安装与配置**:学习如何在CentOS系统上安装RPM包,配置RabbitMQ服务器,包括设置用户、虚拟主机、权限等。 5. **安全设置**:由于涉及到“安全”标签,所以理解如何配置SSL/TLS加密,设置访问控制,...
rabbitmq配置文件,用于rabbitmq管理
用于安装Erlang和RabbitMQ
在安装和配置RabbitMQ之前,我们需要确保准备好一些必要的软件包。以下是对这些软件包的详细介绍: 1. **Erlang**: Erlang是RabbitMQ的基础,因为RabbitMQ是用Erlang编程语言编写的。Erlang是一种并发和容错能力强...
按照文档,在Windows安装RabbitMQ,并配置用户和virtual Hosts 内含软件安装包和PDF
系统环境及所用版本与安装 在安装rabbitMQ之前,需要准备好系统环境。系统版本为CentOS 6.8,需要设置本地yum源。然后,需要安装erlang环境,因为rabbitMQ需要在erlang环境下安装与运行。最后一步是安装rabbitMQ ...
3. RabbitMQ配置与管理: 安装完成后,需要创建一个配置文件rabbitmq.config,并设置loopback_users参数为空,这意味着允许远程访问。接下来,配置RabbitMQ服务为开机自启动,并启动RabbitMQ服务。 为了方便管理...