- 浏览: 623281 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (819)
- java开发 (110)
- 数据库 (56)
- javascript (30)
- 生活、哲理 (17)
- jquery (36)
- 杂谈 (15)
- linux (62)
- spring (52)
- kafka (11)
- http协议 (22)
- 架构 (18)
- ZooKeeper (18)
- eclipse (13)
- ngork (2)
- dubbo框架 (6)
- Mybatis (9)
- 缓存 (28)
- maven (20)
- MongoDB (3)
- 设计模式 (3)
- shiro (10)
- taokeeper (1)
- 锁和多线程 (3)
- Tomcat7集群 (12)
- Nginx (34)
- nodejs (1)
- MDC (1)
- Netty (7)
- solr (15)
- JSON (8)
- rabbitmq (32)
- disconf (7)
- PowerDesigne (0)
- Spring Boot (31)
- 日志系统 (6)
- erlang (2)
- Swagger (3)
- 测试工具 (3)
- docker (17)
- ELK (2)
- TCC分布式事务 (2)
- marathon (12)
- phpMyAdmin (12)
- git (3)
- Atomix (1)
- Calico (1)
- Lua (7)
- 泛解析 (2)
- OpenResty (2)
- spring mvc (19)
- 前端 (3)
- spring cloud (15)
- Netflix (1)
- zipkin (3)
- JVM 内存模型 (5)
- websocket (1)
- Eureka (4)
- apollo (2)
- idea (2)
- go (1)
- 业务 (0)
- idea开发工具 (1)
最新评论
-
sichunli_030:
对于频繁调用的话,建议采用连接池机制
配置TOMCAT及httpClient的keepalive以高效利用长连接 -
11想念99不见:
你好,我看不太懂。假如我的项目中会频繁调用rest接口,是要用 ...
配置TOMCAT及httpClient的keepalive以高效利用长连接
消息消费者
操作步骤:
1. 创建连接工厂ConnectionFactory
2. 获取连接Connection
3. 通过连接获取通信通道Channel
4. 声明交换机Exchange:交换机类型分为四类:
Fanout Exchange: 将消息分发到所有的绑定队列,无routingkey的概念
Headers Exchange :通过添加属性key-value匹配
Direct Exchange:按照routingkey分发到指定队列
Topic Exchange:多关键字匹配
5. 声明队列Queue
6. 将队列和交换机绑定
7. 创建消费者
8. 执行消息的消费
消息生产者
操作步骤:
1. 创建连接工厂ConnectionFactory
2. 获取连接Connection
3. 通过连接获取通信通道Channel
4. 发送消息
在设置消息被消费的回调前需显示调用
否则回调函数无法调用
先执行消费者,消费者会轮询是否有消息的到来,在web控制也可以观察哦~~,再启动生产者发送消息。
rabbitmq 为每一个channel维护了一个delivery tag的计数器,这里采用正向自增,新消息投递时自增,当消息响应时自减
参考:http://blog.csdn.net/liaokailin/article/details/49558605
http://blog.csdn.net/stonexmx/article/details/51885745
操作步骤:
1. 创建连接工厂ConnectionFactory
2. 获取连接Connection
3. 通过连接获取通信通道Channel
4. 声明交换机Exchange:交换机类型分为四类:
Fanout Exchange: 将消息分发到所有的绑定队列,无routingkey的概念
Headers Exchange :通过添加属性key-value匹配
Direct Exchange:按照routingkey分发到指定队列
Topic Exchange:多关键字匹配
5. 声明队列Queue
6. 将队列和交换机绑定
7. 创建消费者
8. 执行消息的消费
package org.lkl.mq.rabbitmq.test; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.QueueingConsumer.Delivery; import com.rabbitmq.client.ShutdownSignalException; /** * 客户端01 * * @author liaokailin * @version $Id: Receive01.java, v 0.1 2015年11月01日 下午3:47:58 liaokailin Exp $ */ public class Receive01 { public static void main(String[] args) throws IOException, TimeoutException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory facotry = new ConnectionFactory(); facotry.setUsername("test"); facotry.setPassword("test"); facotry.setVirtualHost("test"); facotry.setHost("localhost"); Connection conn = facotry.newConnection(); //获取一个链接 //通过Channel进行通信 Channel channel = conn.createChannel(); int prefetchCount = 1; channel.basicQos(prefetchCount); //保证公平分发 boolean durable = true; //声明交换机 channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", durable); //按照routingKey过滤 //声明队列 String queueName = channel.queueDeclare("queue-01", true, true, false, null).getQueue(); //将队列和交换机绑定 String routingKey = "lkl-0"; //队列可以多次绑定,绑定不同的交换机或者路由key channel.queueBind(queueName, Send.EXCHANGE_NAME, routingKey); //创建消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //将消费者和队列关联 channel.basicConsume(queueName, false, consumer); // 设置为false表面手动确认消息消费 //获取消息 System.out.println(" Wait message ...."); while (true) { Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); String key = delivery.getEnvelope().getRoutingKey(); System.out.println(" Received '" + key + "':'" + msg + "'"); System.out.println(" Handle message"); TimeUnit.SECONDS.sleep(3); //mock handle message channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //确定该消息已成功消费 } } }
消息生产者
操作步骤:
1. 创建连接工厂ConnectionFactory
2. 获取连接Connection
3. 通过连接获取通信通道Channel
4. 发送消息
package org.lkl.mq.rabbitmq.test; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; /** * 消息publish * * @author liaokailin * @version $Id: Send.java, v 0.1 2015年10月22日 下午3:48:09 liaokailin Exp $ */ public class Send { public final static String EXCHANGE_NAME = "test-exchange"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { /** * 配置amqp broker 连接信息 */ ConnectionFactory facotry = new ConnectionFactory(); facotry.setUsername("test"); facotry.setPassword("test"); facotry.setVirtualHost("test"); facotry.setHost("localhost"); Connection conn = facotry.newConnection(); //获取一个链接 //通过Channel进行通信 Channel channel = conn.createChannel(); // channel.exchangeDeclare(Send.EXCHANGE_NAME, "direct", true); //如果消费者已创建,这里可不声明 channel.confirmSelect(); //Enables publisher acknowledgements on this channel channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("[handleNack] :" + deliveryTag + "," + multiple); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("[handleAck] :" + deliveryTag + "," + multiple); } }); String message = "lkl-"; //消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN //发送多条信息,每条消息对应routekey都不一致 for (int i = 0; i < 10; i++) { channel.basicPublish(EXCHANGE_NAME, message + (i % 2), MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes()); System.out.println("[send] msg " + (message + i) + " of routingKey is " + (message + (i % 2))); } } }
在设置消息被消费的回调前需显示调用
引用
channel.confirmSelect()
否则回调函数无法调用
先执行消费者,消费者会轮询是否有消息的到来,在web控制也可以观察哦~~,再启动生产者发送消息。
rabbitmq 为每一个channel维护了一个delivery tag的计数器,这里采用正向自增,新消息投递时自增,当消息响应时自减
参考:http://blog.csdn.net/liaokailin/article/details/49558605
http://blog.csdn.net/stonexmx/article/details/51885745
发表评论
-
RocketMQ教程,包含所有MQ核心知识点!
2022-04-28 13:49 155RocketMQ教程,包含所有MQ核心知识点 原创 | Ja ... -
rabbitmq死信队列和延时队列的使用
2021-12-25 23:19 239rabbitmq死信队列和延时队列的使用 -
IM消息送达保证机制实现(一):保证在线实时消息的可靠投递
2021-12-14 11:49 162[url=http://www.52im.net/thread ... -
RabbitMQ高级特性TTL队列/消息
2021-09-04 22:47 210RabbitMQ高级特性-TTL队列/消息 RabbitMQ ... -
如何保证消息不丢失,消息顺序执行-面试
2021-05-26 20:24 226关于MQ的几件小事(四)如何保证消息不丢失 如何保证Rab ... -
RabbitMQ 相关问题汇总
2017-06-28 17:43 425RabbitMQ 相关问题汇总 rabbitmq基础概念与基 ... -
rabbitMq集成Spring后,消费者设置手动ack,并且在业务上控制是否ack
2017-06-12 20:30 2155http://blog.csdn.net/u010841296 ... -
rabbitmq消费消息的两种方式
2016-12-05 20:12 992rabbitMQ中consumer通过建立到queue的连接, ... -
rabbitmq——镜像队列
2016-12-02 20:05 11171. 镜像队列的设置 镜像队列的配置通过添加policy完成 ... -
RabbitMQ 内部实现
2016-12-01 14:41 1012http://blog.csdn.net/joeyon1985 ... -
OpenStack RabbitMQ 集群-后续整理
2016-12-01 14:18 499参考:http://www.iyunv.com/thread- ... -
RabbitMQ (三) 发布/订阅
2016-11-30 19:53 5541、转发器(Exchanges) ... -
RabbitMQ学习(六)之远程过程调用(RPC)
2016-11-30 14:31 832在一般使用RabbitMQ做RPC很容易。客户端发送一个请求消 ... -
RabbitMQ学习之Headers交换类型
2016-11-28 10:51 790Headers类型的exchange使用的比较少,它也是忽略r ... -
RabbitMQ能打开的最大连接数
2016-11-28 10:29 2565转自:http://blog.csdn.net/huoyuns ... -
RabbitMQ基础知识
2016-11-28 10:25 514Routing key由生产者指定。Binding key由消 ... -
RabbitMQ Consumer获取消息的两种方式(poll,subscribe)解析
2016-11-25 21:36 881rabbitMQ中consumer通过建立到queue的连接, ... -
解决RabbitMQ远程不能访问的问题
2016-11-24 15:18 1155刚刚安装的RabbitMQ-Server-3.3.5,并且 ... -
RabbitMQ用户角色及权限控制
2016-11-24 11:08 1747RabbitMQ:基本命令 rabbitmq的安装、启动和停 ... -
publish消息确认
2016-11-23 18:01 662Using standard AMQP, the only w ...
相关推荐
`rabbitmq-c`是RabbitMQ的一个C语言客户端库,它使得在C程序中与RabbitMQ服务器进行交互变得更加简单。本文将详细介绍如何使用CMake编译`rabbitmq-c-master`源码,并讨论相关知识点。 首先,我们需要了解CMake,这...
rabbitmq配置文件,用于rabbitmq管理
在这个"麒麟v10系统Rabbitmq3.6.10安装包"中,我们将探讨如何在麒麟v10环境下安装和配置RabbitMQ 3.6.10版本。 首先,安装RabbitMQ前需要确保系统满足必要的依赖条件。麒麟v10内核版本为4.19.90-17.ky10.x86_64,这...
RabbitMQ是一个开源的消息代理和队列服务器,广泛用于分布式系统中的消息传递。它基于AMQP(Advanced Message Queuing Protocol)标准,允许应用程序之间异步通信,并提供了高可用性、可扩展性和容错性。RabbitMQ的...
**RabbitMQ实战指南** RabbitMQ是一款广泛应用的开源消息队列系统,它基于Advanced Message Queuing Protocol(AMQP)标准,提供高可用性、可靠性和可扩展性。本实战指南将带你深入理解RabbitMQ的核心概念、安装与...
RabbitMQ服务器3.10.5是一款广泛使用的开源消息代理和队列服务器,它基于高级消息队列协议(AMQP)实现。这个版本的RabbitMQ提供了稳定且高效的中间件服务,允许分布式系统中的应用程序进行异步通信,确保数据可靠...
RabbitMQ是一款开源的消息队列服务软件,它实现了高级消息队列协议(AMQP),以高性能、健壮和可伸缩性闻名,主要由Erlang语言编写。Erlang是一种适合于构建并发处理能力强、高可用性系统的编程语言,这些特点使得...
标题 "kettle rabbitmq 插件开发" 涉及的是如何在 Pentaho Kettle(也称为 Spoon)中创建和使用 RabbitMQ 插件。Kettle 是一个开源的数据集成工具,它允许用户进行数据抽取、转换和加载(ETL)操作。RabbitMQ 是一个...
【RabbitMQ性能测试报告】 本测试报告详细记录了对RabbitMQ的性能评估,包括在单机模式和集群模式下的压力和稳定性测试。RabbitMQ是业界广泛使用的开源消息代理,它基于AMQP(Advanced Message Queuing Protocol)...
**RabbitMQ-c源码分析** RabbitMQ-c是一个轻量级且高效的C语言实现的RabbitMQ客户端库。RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中...
为了能够有效地监控 RabbitMQ 的性能和状态,Prometheus 提供了一个名为 `rabbitmq_exporter` 的工具。然而,在某些情况下,官方网站可能不直接提供这个插件,这时我们需要从第三方源获取,例如在本例中提到的 `...
【标题】:“TP6使用RabbitMQ” 在PHP框架ThinkPHP6(简称TP6)中集成RabbitMQ是一项常见的任务,用于实现异步处理、消息队列和分布式系统的通信。RabbitMQ是一个开源的消息代理和队列服务器,它遵循AMQP(Advanced...
1. **下载RabbitMQ**:首先,访问RabbitMQ官方网站下载适用于Windows的RabbitMQ Server安装包。这个压缩包"rabbitMQ_Windows版.zip"很可能包含了所有必要的文件。 2. **解压与安装**:解压缩文件,运行安装程序,...
RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中高效地路由和传递消息。它由Erlang编程语言开发,因此在安装RabbitMQ之前,需要先安装Erlang环境。本...
**RabbitMQ与SpringMVC集成** RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的消息传递。RabbitMQ是由Erlang OTP平台构建的,因此...
**RabbitMQ 默认配置文件模板详解** RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,用于解耦应用程序,提高系统的可扩展性和容错性。在...
3. **RabbitMQ服务器安装**:"rabbitmq-server-3.9.15-1.el7.noarch.rpm"是RabbitMQ 3.9.15版本的安装包,适用于RHEL/CentOS 7系统。安装过程中,通常会包括设置RabbitMQ服务、启动服务以及配置默认用户和虚拟主机等...
标题中的“flink-sql集成rabbitmq”指的是将Apache Flink的数据流处理能力与RabbitMQ消息队列系统相结合,实现数据的实时处理和传输。Flink是一个开源的流处理框架,提供低延迟、高吞吐量的数据处理,而RabbitMQ是一...
《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...
**RabbitMQ源码分析** RabbitMQ是一个开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的异步处理、任务队列以及服务间通信。源码分析有助于深入理解其内部...