- 浏览: 632855 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (819)
- java开发 (110)
- 数据库 (56)
- javascript (30)
- 生活、哲理 (17)
- jquery (36)
- 杂谈 (15)
- linux (62)
- spring (52)
- kafka (11)
- http协议 (22)
- 架构 (18)
- ZooKeeper (18)
- eclipse (13)
- ngork (2)
- dubbo框架 (6)
- Mybatis (9)
- 缓存 (28)
- maven (20)
- MongoDB (3)
- 设计模式 (3)
- shiro (10)
- taokeeper (1)
- 锁和多线程 (3)
- Tomcat7集群 (12)
- Nginx (34)
- nodejs (1)
- MDC (1)
- Netty (7)
- solr (15)
- JSON (8)
- rabbitmq (32)
- disconf (7)
- PowerDesigne (0)
- Spring Boot (31)
- 日志系统 (6)
- erlang (2)
- Swagger (3)
- 测试工具 (3)
- docker (17)
- ELK (2)
- TCC分布式事务 (2)
- marathon (12)
- phpMyAdmin (12)
- git (3)
- Atomix (1)
- Calico (1)
- Lua (7)
- 泛解析 (2)
- OpenResty (2)
- spring mvc (19)
- 前端 (3)
- spring cloud (15)
- Netflix (1)
- zipkin (3)
- JVM 内存模型 (5)
- websocket (1)
- Eureka (4)
- apollo (2)
- idea (2)
- go (1)
- 业务 (0)
- idea开发工具 (1)
最新评论
-
sichunli_030:
对于频繁调用的话,建议采用连接池机制
配置TOMCAT及httpClient的keepalive以高效利用长连接 -
11想念99不见:
你好,我看不太懂。假如我的项目中会频繁调用rest接口,是要用 ...
配置TOMCAT及httpClient的keepalive以高效利用长连接
rabbitMQ中consumer通过建立到queue的连接,创建channel对象,通过channel通道获取message,
Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息,
最近翻阅了基于Java的客户端的相关源码,简单做个分析。
编程模型伪代码如下:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
Channel channel=conn.createChannel();
创建Connection需要指定MQ的物理地址和端口,是socket tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ channel
以下是基于channel上的两种消费方式。
1、Subscribe订阅方式
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.contentType;
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息,
这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。
参见ChannelN中的方法
public String basicConsume(String queue, boolean autoAck, String consumerTag,
boolean noLocal, boolean exclusive, Map<String, Object> arguments,
final Consumer callback)
throws IOException
{
......
rpc((Method)
new Basic.Consume.Builder()
.queue(queue)
.consumerTag(consumerTag)
.noLocal(noLocal)
.noAck(autoAck)
.exclusive(exclusive)
.arguments(arguments)
.build(),
k);
try {
return k.getReply();
} catch(ShutdownSignalException ex) {
throw wrap(ex);
}
}
Consumer接收消息的过程:
创建Connection后,会启动MainLoop后台线程,循环从socket(FrameHandler)中获取数据包(Frame),调用channel.handleFrame(Frame frame)处理消息,
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // 对消息进行协议assemble
_command = new AMQCommand(); // prepare for the next one
handleCompleteInboundCommand(command);//对消息消费处理
}
}
ChannelN.handleCompleteInboundCommand
---ChannelN.processAsync
----dispatcher.handleDelivery
---QueueingConsumer.handleDelivery
---this._queue.add(new Delivery(envelope, properties, body));//消息最终放到队列中
每个Consumer都有一个BlockQueue,用于缓存从socket中获取的消息。
接下来,Consumer对象就可以调用api来从客户端缓存的_queue中依次获取消息,进行消费,参见QueueingConsumer.nextDelivery()
对于这种长连接的方式,没看到心跳功能,以防止长连接的因网络等原因连接失效
2、poll API方式
ChannelN:
GetResponse basicGet(String queue, boolean autoAck)
这种方式比较简单,直接通过RPC从MQ Server端获取队列中的消息
参考:http://blog.csdn.net/yangbutao/article/details/10395599
Consumer可以声明式的以API轮询poll的方式主动从queue的获取消息,也可以通过订阅的方式被动的从Queue中消费消息,
最近翻阅了基于Java的客户端的相关源码,简单做个分析。
编程模型伪代码如下:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
Channel channel=conn.createChannel();
创建Connection需要指定MQ的物理地址和端口,是socket tcp物理连接,而channel是一个逻辑的概念,支持在tcp连接上创建多个MQ channel
以下是基于channel上的两种消费方式。
1、Subscribe订阅方式
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.contentType;
long deliveryTag = envelope.getDeliveryTag();
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
});
订阅方式其实是向queue注册consumer,通过rpc向queue server发送注册consumer的消息,rabbitMQ Server在收到消息后,根据消息的内容类型判断这是一个订阅消息,
这样当MQ 中queue有消息时,会自动把消息通过该socket(长连接)通道发送出去。
参见ChannelN中的方法
public String basicConsume(String queue, boolean autoAck, String consumerTag,
boolean noLocal, boolean exclusive, Map<String, Object> arguments,
final Consumer callback)
throws IOException
{
......
rpc((Method)
new Basic.Consume.Builder()
.queue(queue)
.consumerTag(consumerTag)
.noLocal(noLocal)
.noAck(autoAck)
.exclusive(exclusive)
.arguments(arguments)
.build(),
k);
try {
return k.getReply();
} catch(ShutdownSignalException ex) {
throw wrap(ex);
}
}
Consumer接收消息的过程:
创建Connection后,会启动MainLoop后台线程,循环从socket(FrameHandler)中获取数据包(Frame),调用channel.handleFrame(Frame frame)处理消息,
public void handleFrame(Frame frame) throws IOException {
AMQCommand command = _command;
if (command.handleFrame(frame)) { // 对消息进行协议assemble
_command = new AMQCommand(); // prepare for the next one
handleCompleteInboundCommand(command);//对消息消费处理
}
}
ChannelN.handleCompleteInboundCommand
---ChannelN.processAsync
----dispatcher.handleDelivery
---QueueingConsumer.handleDelivery
---this._queue.add(new Delivery(envelope, properties, body));//消息最终放到队列中
每个Consumer都有一个BlockQueue,用于缓存从socket中获取的消息。
接下来,Consumer对象就可以调用api来从客户端缓存的_queue中依次获取消息,进行消费,参见QueueingConsumer.nextDelivery()
对于这种长连接的方式,没看到心跳功能,以防止长连接的因网络等原因连接失效
2、poll API方式
ChannelN:
GetResponse basicGet(String queue, boolean autoAck)
这种方式比较简单,直接通过RPC从MQ Server端获取队列中的消息
参考:http://blog.csdn.net/yangbutao/article/details/10395599
发表评论
-
RocketMQ教程,包含所有MQ核心知识点!
2022-04-28 13:49 156RocketMQ教程,包含所有MQ核心知识点 原创 | Ja ... -
rabbitmq死信队列和延时队列的使用
2021-12-25 23:19 244rabbitmq死信队列和延时队列的使用 -
IM消息送达保证机制实现(一):保证在线实时消息的可靠投递
2021-12-14 11:49 167[url=http://www.52im.net/thread ... -
RabbitMQ高级特性TTL队列/消息
2021-09-04 22:47 213RabbitMQ高级特性-TTL队列/消息 RabbitMQ ... -
如何保证消息不丢失,消息顺序执行-面试
2021-05-26 20:24 233关于MQ的几件小事(四)如何保证消息不丢失 如何保证Rab ... -
RabbitMQ 相关问题汇总
2017-06-28 17:43 430RabbitMQ 相关问题汇总 rabbitmq基础概念与基 ... -
rabbitMq集成Spring后,消费者设置手动ack,并且在业务上控制是否ack
2017-06-12 20:30 2160http://blog.csdn.net/u010841296 ... -
rabbitmq消费消息的两种方式
2016-12-05 20:12 1002rabbitMQ中consumer通过建立到queue的连接, ... -
rabbitmq——镜像队列
2016-12-02 20:05 11251. 镜像队列的设置 镜像队列的配置通过添加policy完成 ... -
RabbitMQ 内部实现
2016-12-01 14:41 1017http://blog.csdn.net/joeyon1985 ... -
OpenStack RabbitMQ 集群-后续整理
2016-12-01 14:18 506参考:http://www.iyunv.com/thread- ... -
RabbitMQ (三) 发布/订阅
2016-11-30 19:53 5581、转发器(Exchanges) ... -
RabbitMQ学习(六)之远程过程调用(RPC)
2016-11-30 14:31 838在一般使用RabbitMQ做RPC很容易。客户端发送一个请求消 ... -
RabbitMQ学习之Headers交换类型
2016-11-28 10:51 796Headers类型的exchange使用的比较少,它也是忽略r ... -
RabbitMQ能打开的最大连接数
2016-11-28 10:29 2598转自:http://blog.csdn.net/huoyuns ... -
RabbitMQ基础知识
2016-11-28 10:25 520Routing key由生产者指定。Binding key由消 ... -
解决RabbitMQ远程不能访问的问题
2016-11-24 15:18 1173刚刚安装的RabbitMQ-Server-3.3.5,并且 ... -
RabbitMQ用户角色及权限控制
2016-11-24 11:08 1758RabbitMQ:基本命令 rabbitmq的安装、启动和停 ... -
publish消息确认
2016-11-23 18:01 671Using standard AMQP, the only w ... -
rabbitMQ ConfirmListener
2016-11-23 15:53 2159消息消费者 操作步骤: 1. 创建连接工厂Connection ...
相关推荐
rabbitMq 5种消息模型测试代码
事务提供了一种确保消息可靠投递的方式,但在性能上可能不如非事务模式。 4. **confirmSelect(确认选择)**: `confirmSelect`是RabbitMQ的另一种确保消息投递的机制,它使用发布确认。一旦启用,生产者会接收到一...
在RabbitMQ中,生产者(Producer)负责发布消息,消费者(Consumer)则负责接收并处理这些消息。消息队列作为中介,存储未被消费的消息,确保即使在高并发或系统故障情况下,消息也不会丢失。 在"多人投资"场景下,...
在IT行业中,消息队列(Message Queue)是一种常用的技术,用于处理异步任务、解耦系统组件以及缓解高并发时的系统压力。RabbitMQ是一款开源的消息代理软件,它基于AMQP(Advanced Message Queuing Protocol),广泛...
功能说明:网页端直接消费rabbitmq的消息 环境准备:先安装好rabbitmq。 要配置插件:与前端对接,需要在RabbitMQ上启动rabbitmq_web_stomp插件 ,命令:./rabbitmq-plugins enable rabbitmq_web_stomp 1、...
这个插件允许我们在RabbitMQ中创建一种特殊的交换机类型,即`x-delayed-message`,它可以将消息按照指定的延迟时间放入队列,而不是立即发送。 在RabbitMQ中,消息通常通过生产者发布到交换机,然后交换机根据绑定...
消息队列(MQ)是一种中间件技术,用于在分布式系统中解耦生产者和消费者,通过缓存消息来提高系统的可扩展性和可靠性。基于RabbitMQ实现的消息队列组件是这个话题的核心,RabbitMQ是一个开源的消息代理和队列服务器...
rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:...
本文介绍了一款使用C#开发的自动化发送RabbitMQ消息的工具,该工具旨在简化消息的发送过程,提高开发效率,并支持多种消息发送策略和配置选项。 该软件是一个基于 C# 的 RabbitMQ 消息生产程序。它集成了 RabbitMQ ...
基于SpringBoot、RabbitMQ的Android消息推送平台...有的公司对所要推送的消息保密要求比较高,不希望被第三方看到,可以使用此种方式进行消息推送。如果所要推送的人群比较多,可以搭建RabbitMQ集群解决高并发问题。
基于springboot+nacos+rabbitmq和本地消息表实现分布式事务.zip 基于springboot+nacos+rabbitmq和本地消息表实现分布式事务.zip 基于springboot+nacos+rabbitmq和本地消息表实现分布式事务.zip 基于springboot+nacos...
《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,专注于帮助读者理解和掌握如何在实际项目中高效地部署和使用这个强大的消息中间件。RabbitMQ作为开源的消息代理和队列服务器,广泛应用...
SpringBoot整合Rabbitmq发送接收消息实战 另外,博主发起了SpringBoot整合Rabbitmq这一系列的gitchat交流会。刚兴趣的童鞋可以进入交流:https://gitbook.cn/gitchat/activity/5b90f9214fb1bd5c9acd4338 交流QQ:...
《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,旨在帮助读者理解和掌握如何在实际项目中高效地运用这一强大的消息中间件。书中不仅涵盖了RabbitMQ的基础知识,还深入探讨了其在分布式...
6. **消费者(Consumer)**:消费者是从RabbitMQ队列中获取并处理消息的组件。它们通过订阅队列来接收消息。 在这个实例中,你可能需要设置不同的交换机类型、路由键和绑定,以实现灵活的消息分发。例如,你可以...
在Java中,获取RabbitMQ消息总数的过程涉及与RabbitMQ服务器建立连接、创建通道、声明队列以及查询队列状态。以下是一个详细的步骤解析: 1. **建立连接**: 首先,需要通过`ConnectionFactory`类来创建一个连接工厂...
- **Consumer**: 消费者是接收并处理从RabbitMQ中获取消息的应用。 2. **RabbitMQ功能特性** - **高可用性**:通过集群和镜像队列实现跨节点的数据复制,确保服务的连续性。 - **多种协议支持**:除了AMQP外,...
这个例子帮助我们理解RabbitMQ的基本工作流程:生产者创建消息,将消息发送到交换机,交换机根据绑定规则将消息放入队列,最后消费者从队列中接收并处理消息。 2. **02WorkQueues(任务队列)** 在这个示例中,多...
rabbitmq+websocket(SpringBoot版)实现分布式消息推送 本来想用websocket做一个消息推送 可是分布式环境下不支持session共享因为服务器不同 所以采用 rabbitMQ+webSocket实现分布式消息推送 生产者将消息 发送给 ...
### RabbitMQ消息中间件技术精讲 #### 一、RabbitMQ简介 RabbitMQ是一款在IT领域广泛应用的消息中间件,它基于AMQP(Advanced Message Queuing Protocol)协议开发而成,能够实现高效、可靠的数据传输服务。...