AMQP协议
AMQP 有四个非常重要的概念:虚拟机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。
- 虚拟机: 通常是应用的外在边界,我们可以为不同的虚拟机分配访问权限。虚拟机可持有多个交换机、队列和绑定。
- 交换机: 从连接通道(Channel)接收消息,并按照特定的路由规则发送给队列。
- 队列: 消息最终的存储容器,直到消费客户端(Consumer)将其取走。
- 绑定: 也就是所谓的路由规则,告诉交换机将何种类型的消息发送到某个队列中。
通常的操作流程是:
- (1) 消费者: 创建信息通道。
- (2) 消费者: 定义消息队列。
- (3) 消费者: 定义特定类型的交换机。
- (4) 消费者: 设定绑定规则 (包括交换机名称、队列名称以及路由键)。
- (5) 消费者: 等待消息。
- (6) 生产者: 创建消息。
- (7) 生产者: 将消息投递给信息通道 (注明接收交换机名称和路由键)。
- (8) 交换机: 获取消息,依据交换机类型决定是否匹配路由规则 (如需匹配,则对比消息路由键和绑定路由键)。
- (9) 消费者: 获取并处理消息,发送反馈。
- (10) 结束: 关闭通道和连接。
RabbitMq介绍
由erlang(面向并发的变成语言)开发,遵循AMQP协议的消息代理。原理简单,通过接受、转发消息。
应用场景
处理无需及时返回、耗时的操作,采用异步处理,能够大大节省服务器的请求响应时间,提高系统吞吐量。
概念答疑
生产者:发送消息的程序;
消费者:等待接受消息、并且处理消息的程序,如果处理大量堆积消息,只需要增加更多的消费者。
队列:存储消息,一个队列没有范围限制,本质上无限大的缓存。
MQ服务器:消息缓存服务器,一旦消息正确传递给消费者,消息会立即从内存删除。
系统架构
连接(connection):客户端和rabbitmq server之间的tcp连接;
虚拟连接(channel):消费者和生产者通过tcp连接到rabbitmq server,channel是建立在tcp连接上,避免频繁建立关闭tcp连接,影响系统性能,而且系统的tcp连接数有限制,从而限制系统处理高并发能力。有实验表明,1s的数据可以Publish10K的数据包(普通硬件环境)。
消息确认:一个消息确认是由消费者发出,告诉RabbitMQ这个消息已经被接受,处理完成,RabbitMQ 可以删除它了,保证消息不会丢失;
消息持久化:持久化消息到硬盘,保证不会丢失,但依旧有个短暂的时间窗口。需要更健壮的持久化保证,你可以使用出版者确认。
Exhanges
direct: routing key 匹配, 那么Message就会被传递到相应的queue中。
fanout:向响应的queue广播
topic: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue
代码DEMO
生产者 import com.whtr.eam.common.util.SystemContext; import com.whtr.eam.platform.interfaceMgr.engine.ProtocolException; import com.whtr.eam.platform.interfaceMgr.vo.DataVo; import org.apache.commons.lang3.StringUtils; import java.io.IOException; package com.whtr.eam.platform.interfaceMgr.util; import com.whtr.eam.common.util.SystemContext; import com.whtr.eam.platform.interfaceMgr.engine.ProtocolException; import com.whtr.eam.platform.interfaceMgr.vo.DataVo; import org.apache.commons.lang3.StringUtils; import java.io.IOException; /** * 消息生产者 * <p/> * Created by zhongmin 20167.5. */ public class ProtocolProducer extends ProtocolEndPoint { public ProtocolProducer(DataVo data) throws IOException { super(data.getName(), data.getExchangeName(), data.getExchangType(), data.getRoutingKey()); setHost(data.getUrl()); setPort(data.getPort()); setUserName(data.getUserName()); setPassword(data.getPassword()); init(); } public void sendMessage(String message) throws Exception { if(StringUtils.isBlank(message)){ logger.info("待发送的消息为空"); throw new ProtocolException("请求参数为空,无法发送消息!"); } if(StringUtils.isNotBlank(getExchangeName())){ channel.basicPublish(getExchangeName(), getRoutingKey(), null, message.getBytes()); }else{ channel.basicPublish("", getQueueName(), null, message.getBytes()); } } public static void main(String[] args) throws Exception { String queueName = "q.lee"; String exchangeName = "e.lee"; String exchangType = "topic"; String routingKey = "r.lee"; DataVo data = new DataVo(); data.setName(queueName); data.setExchangeName(exchangeName); data.setExchangType(exchangType); data.setRoutingKey(routingKey); data.setReqParam("lee mq"); /** # mq.host=192.168.1.43 mq.host=rabmq.inside.ppmoney mq.port=5672 mq.user=guest mq.password=guest */ data.setUserName("guest"); data.setPassword("guest"); data.setUrl("192.168.1.43"); data.setPort(15672); ProtocolProducer protocolProducer = new ProtocolProducer(data); protocolProducer.sendMessage("leee"); } } public class ProtocolProducer extends ProtocolEndPoint { public ProtocolProducer(DataVo data) throws IOException { super(data.getName(), data.getExchangeName(), data.getExchangType(), data.getRoutingKey()); setHost(data.getUrl()); setPort(data.getPort()); setUserName(data.getUserName()); setPassword(data.getPassword()); init(); } public void sendMessage(String message) throws Exception { if(StringUtils.isBlank(message)){ logger.info("待发送的消息为空"); throw new ProtocolException("请求参数为空,无法发送消息!"); } if(StringUtils.isNotBlank(getExchangeName())){ channel.basicPublish(getExchangeName(), getRoutingKey(), null, message.getBytes()); }else{ channel.basicPublish("", getQueueName(), null, message.getBytes()); } } public static void main(String[] args) throws Exception { String queueName = "q.lee"; String exchangeName = "e.lee"; String exchangType = "topic"; String routingKey = "r.lee"; DataVo data = new DataVo(); data.setName(queueName); data.setExchangeName(exchangeName); data.setExchangType(exchangType); data.setRoutingKey(routingKey); data.setReqParam("lee mq"); /** # mq.host=192.168.1.43 mq.host=rabmq.inside.ppmoney mq.port=5672 mq.user=guest mq.password=guest */ data.setUserName("guest"); data.setPassword("guest"); data.setUrl("192.168.1.43"); data.setPort(15672); ProtocolProducer protocolProducer = new ProtocolProducer(data); protocolProducer.sendMessage("leee"); } } 队列抽象者 import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; /** * 队列的抽象类 * <p/> * Created by liangqq on 2015/12/7. */ public abstract class ProtocolEndPoint { protected final Logger logger = LoggerFactory.getLogger(getClass()); protected Channel channel; protected Connection connection; protected String queueName; protected String exchangeName; protected String exchangeType; protected String routingKey; private ConnectionFactory factory; private String host; private Integer port; private String userName; private String password; public ProtocolEndPoint(String queueName, String exchangeName, String exchangeType, String routingKey) throws IOException { this.queueName = queueName; this.exchangeName = exchangeName; this.exchangeType = exchangeType; this.routingKey = routingKey; } public void init() throws IOException { ConnectionFactory factory = getFactory(); factory.setHost(host); factory.setPort(port); factory.setUsername(userName); factory.setPassword(password); factory.setAutomaticRecoveryEnabled(true); connection = factory.newConnection(); connection.addShutdownListener(new MqShutdownListener()); channel = connection.createChannel(); if (!bindExchange()) { channel.queueDeclare(queueName, false, false, false, null); } } public ProtocolEndPoint(String queueName) throws IOException { this(queueName, null, null, null); } private boolean bindExchange() throws IOException { if (StringUtils.isNotBlank(exchangeName)) { String type = StringUtils.isNotBlank(exchangeType) ? exchangeType : "topic"; channel.exchangeDeclare(exchangeName, type, true); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); return true; } return false; } /** * 关闭channel和connection。并非必须,因为隐含是自动调用的 */ public void close() { try { this.channel.close(); this.connection.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } finally { if (null != channel) { try { this.channel.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } if (null != connection) { try { this.connection.close(); } catch (IOException e) { logger.error(e.getMessage(), e); } } } } public String getQueueName() { return queueName; } public String getExchangeName() { return exchangeName; } public String getExchangeType() { return exchangeType; } public String getRoutingKey() { return routingKey; } public ConnectionFactory getFactory() { if(null == factory){ factory = new ConnectionFactory(); } return factory; } public void setFactory(ConnectionFactory factory) { this.factory = factory; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public Integer getPort() { return port; } public void setPort(Integer port) { this.port = port; } public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } } import com.rabbitmq.client.ShutdownListener; import com.rabbitmq.client.ShutdownSignalException; import org.nutz.json.Json; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 监听队列关闭的监听者 * * Created by zhongminon 2016.7.5. */ public class MqShutdownListener implements ShutdownListener { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override public void shutdownCompleted(ShutdownSignalException cause) { logger.error("队列关闭:{}", Json.toJson(cause)); } }
消费者 public class ConsoumerDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } }
参考更详细教程:http://blog.csdn.net/anzhsoft/article/details/19563091
RabbitMq控制台:http://www.cnblogs.com/dubing/p/4017613.html
相关推荐
### RabbitMQ:安装、配置与使用初探 #### 一、下载及安装 RabbitMQ 是一款基于 AMQP(Advanced Message Queuing Protocol)协议的消息中间件。本文将介绍如何在 CentOS 上安装并配置 RabbitMQ。 ##### 1.1 安装 ...
`rabbitmq-c`是RabbitMQ的一个C语言客户端库,它使得在C程序中与RabbitMQ服务器进行交互变得更加简单。本文将详细介绍如何使用CMake编译`rabbitmq-c-master`源码,并讨论相关知识点。 首先,我们需要了解CMake,这...
rabbitmq配置文件,用于rabbitmq管理
RabbitMQ是一个开源的消息代理和队列服务器,广泛用于分布式系统中的消息传递。它基于AMQP(Advanced Message Queuing Protocol)标准,允许应用程序之间异步通信,并提供了高可用性、可扩展性和容错性。RabbitMQ的...
RabbitMQ服务器3.10.5是一款广泛使用的开源消息代理和队列服务器,它基于高级消息队列协议(AMQP)实现。这个版本的RabbitMQ提供了稳定且高效的中间件服务,允许分布式系统中的应用程序进行异步通信,确保数据可靠...
RabbitMQ是一款开源的消息队列服务软件,它实现了高级消息队列协议(AMQP),以高性能、健壮和可伸缩性闻名,主要由Erlang语言编写。Erlang是一种适合于构建并发处理能力强、高可用性系统的编程语言,这些特点使得...
**RabbitMQ实战指南** RabbitMQ是一款广泛应用的开源消息队列系统,它基于Advanced Message Queuing Protocol(AMQP)标准,提供高可用性、可靠性和可扩展性。本实战指南将带你深入理解RabbitMQ的核心概念、安装与...
在这个"麒麟v10系统Rabbitmq3.6.10安装包"中,我们将探讨如何在麒麟v10环境下安装和配置RabbitMQ 3.6.10版本。 首先,安装RabbitMQ前需要确保系统满足必要的依赖条件。麒麟v10内核版本为4.19.90-17.ky10.x86_64,这...
标题 "kettle rabbitmq 插件开发" 涉及的是如何在 Pentaho Kettle(也称为 Spoon)中创建和使用 RabbitMQ 插件。Kettle 是一个开源的数据集成工具,它允许用户进行数据抽取、转换和加载(ETL)操作。RabbitMQ 是一个...
【RabbitMQ性能测试报告】 本测试报告详细记录了对RabbitMQ的性能评估,包括在单机模式和集群模式下的压力和稳定性测试。RabbitMQ是业界广泛使用的开源消息代理,它基于AMQP(Advanced Message Queuing Protocol)...
**RabbitMQ-c源码分析** RabbitMQ-c是一个轻量级且高效的C语言实现的RabbitMQ客户端库。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中...
为了能够有效地监控 RabbitMQ 的性能和状态,Prometheus 提供了一个名为 `rabbitmq_exporter` 的工具。然而,在某些情况下,官方网站可能不直接提供这个插件,这时我们需要从第三方源获取,例如在本例中提到的 `...
RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中高效地路由和传递消息。它由Erlang编程语言开发,因此在安装RabbitMQ之前,需要先安装Erlang环境。本...
1. **下载RabbitMQ**:首先,访问RabbitMQ官方网站下载适用于Windows的RabbitMQ Server安装包。这个压缩包"rabbitMQ_Windows版.zip"很可能包含了所有必要的文件。 2. **解压与安装**:解压缩文件,运行安装程序,...
【标题】:“TP6使用RabbitMQ” 在PHP框架ThinkPHP6(简称TP6)中集成RabbitMQ是一项常见的任务,用于实现异步处理、消息队列和分布式系统的通信。RabbitMQ是一个开源的消息代理和队列服务器,它遵循AMQP(Advanced...
**RabbitMQ与SpringMVC集成** RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的消息传递。RabbitMQ是由Erlang OTP平台构建的,因此...
**RabbitMQ基础** RabbitMQ是一个开源的消息代理和队列服务器,用于在分布式系统中进行消息传递。它是基于AMQP(Advanced Message Queuing Protocol)协议实现的,提供了高可用性、可扩展性和稳定性。RabbitMQ的...
**RabbitMQ 默认配置文件模板详解** RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,用于解耦应用程序,提高系统的可扩展性和容错性。在...
标题中的“flink-sql集成rabbitmq”指的是将Apache Flink的数据流处理能力与RabbitMQ消息队列系统相结合,实现数据的实时处理和传输。Flink是一个开源的流处理框架,提供低延迟、高吞吐量的数据处理,而RabbitMQ是一...