1.启动
producer通过配置的namesrv列表,启动时随机选择一个进行相连。首先引出,设置namesrv的几种方式,优先级依次由高到低:
- 第一种:代码中指定namesrv地址
producer.setNamesrvAddr(namesrvAddr);
consumer.setNamesrvAddr(namesrvAddr);
- 第二种:Java启动参数中指定:
-Drocketmq.namesrv.addr=192.168.0.1:9876
- 第三种:环境变量
NAMESRV_ADDR
- 第四种为http方式获取。
如果启动前未配置namesrv地址,那么每2分钟从http://jmenv.tbsite.net:8080/rocketmq/nsaddr以http的方式获取namesrv地址。Namesrv寻址可通过hosts文件从定向或者通过设置系统属性进行更改。
原地址为http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP。
WS_DOMAIN_NAME对应启动参数rocketmq.namesrv.domain
WS_DOMAIN_SUBGROUP对应启动参数rocketmq.namesrv.domain.subgroup。
在3.2.4官方文档中,作者释迦也比较推荐通过http方式获取namesrv地址。好处就是客户端部署简单,并且namesrv可以热升级。
producer启动后,定时每30s从namesrv更新topic的路由信息。设么意思?假如某个topic的队列从4个增加到8个,或者新增了broker且包含此topic,那么可以重新拿到topic的路由信息。Topic的路由信息有brokerName,queueId组成。定时30s清理下线broker及发送心跳和订阅关系。Producer的启动首先会和namesrv建立连接,然后拿到topic的路由信息后,当在发送消息时会和broker建立连接并将broker信息缓存本地。这里的清理下线broker指检查本地broker列表信息,如果此broker没有topic的路由信息,即从本地列表移除。
2.发送接口
public List<MessageQueue> fetchPublishMessageQueues(String topic)获取某个topic下的队列信息。
public SendResult send(Message msg)同步发送消息。
public void send(Message msg, SendCallback sendCallback)异步发送消息
public void sendOneway(Message msg)发送消息,无返回结果。
public SendResult send(Message msg, MessageQueue mq)同步发送消息到指定的队列
public void send(Message msg, MessageQueue mq, SendCallback sendCallback)异步发送消息到指定队列。
public void sendOneway(Message msg, MessageQueue mq)发送消息到指定队列,无返回值
public SendResult send(Message msg, MessageQueueSelector selector, Object arg, long timeout)通过指定的队列选择器同步发送消息,arg参数会回传给队列选择器。
public void send(Message msg, MessageQueueSelector selector, Object arg, SendCallback sendCallback)通过指定的队列选择器异步发送消息,arg参数会回传给队列选择器。
public long searchOffset(MessageQueue mq, long timestamp)根据时间获取某队列的offset
public long maxOffset(MessageQueue mq)获取队列的最大offset
public long minOffset(MessageQueue mq)获取队列的最小offset
public MessageExt viewMessage(String msgId)根据id获取消息信息
public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end)根据消息key获取消息。
viewMessage和queryMessage有两点说明:
- MQ会根据storehostaddress和offset来生成msgId,所以在集群下,可以通过msgId查询到消息。
- MQ会根据topic和uniqKey以及topic和keys进行消息的索引构建,所以可以通过索引查询消息。
3.负载均衡
默认,producer采用轮询的策略发送消息。Producer从namesrv更新到topic的路由信息后,根据queueId和brokerName组成发送列表。假如,名为test的topic有8个队列,0-7,那么和broker-a组成的发送列表为broker-a-0,broker-a1...broker-a7,然后依次轮训列表进行发送。
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
4.发送的一些其他说明
- 默认发送超时为3s。
- 消息超过4k,即启用消息的压缩。
- 发送失败,默认重发2次。
- 消息最大限制为4M,即超过4M会提示发送失败。
5.注意
在MQ内部,发送者是没有group的概念的。Group只是业务上的划分。Producer在启动时,会选择一个namesrv相连,通过topic关系找到broker,并和存有topic的所有master broker相连,也就是说,消息只会发到master的broker上去。
- 大小: 27.2 KB
分享到:
相关推荐
"rocketmq-externals-master.zip"是一个包含RocketMQ源码的压缩包,对于深入理解RocketMQ的工作原理、性能优化以及进行二次开发非常有帮助。下面将详细阐述RocketMQ的核心概念、架构、工作流程以及源码解析的关键点...
标题中提到的"消息中间件rocketmq原理解析"揭示了本文档的核心内容,即对消息中间件RocketMQ的原理进行解析和探讨。RocketMQ是阿里巴巴开源的一款分布式消息中间件,主要用于企业级的分布式系统中,用以实现系统之间...
以下将基于给定内容详细解析RocketMQ的原理解析。 ### RocketMQ概述 RocketMQ是由阿里巴巴开源的一款分布式消息中间件。它提供了分布式系统之间的异步消息通信能力,广泛应用于各种业务场景中,如订单处理、任务...
### RocketMQ原理详解 #### 一、RocketMQ概述 RocketMQ是一款由阿里巴巴开源的消息中间件,主要用于异步处理、解耦、削峰等场景。它提供了高性能、高可靠性的消息服务,支持点对点消息、发布/订阅模式、事务消息等...
本文将从以下几个方面对RocketMQ的原理进行解析。 ### 一、Producer #### 1. Producer启动流程 Producer是消息的发送者,它的启动流程如下: - 在发送消息时,如果Producer集合中没有对应topic的信息,则会向...
在深入探讨RocketMQ核心源码之前,...通过对RocketMQ的CommitLog和ConsumeQueue的理解,以及NameServer和Broker架构的解析,我们可以更深入地掌握RocketMQ的工作原理,这对于优化系统性能和解决实际问题具有重要意义。
Producer: 生产者,负责生产消息并发送到消息引擎。测评开始时,测评程序会启动10~20个Producer,每个Producer在一条线程中,然后每个Producer随机生产某个Topic或者附属于Queue的消息并发送到消息引擎; Topic: ...
2. `rocketmq-common-4.7.0.jar`:这个jar包包含了RocketMQ的通用模块,提供了一些基础工具类和常量,如NameServer地址解析、配置管理、时间戳处理、线程池管理等。同时,它也包含了一些核心的数据结构,如Message...
《RocketMQ实战与原理解析》是一本专为IT专业人士准备的深度学习Apache RocketMQ的指南。这本书分为两大部分:基础入门和源码分析,旨在帮助读者从零开始熟悉RocketMQ,并逐步深入到其核心机制的理解。 在基础入门...
Producer 如何感知要发送消息的broker 即brokerAddrTable 中的值是怎么获得的, 1. 发送消息的时候指定会指定topic,如果producer 集合中没有会根据指定topic 到namesrv 获取 topic 发布信息TopicPublishInfo,并放...
在《RocketMQ-原理简介.pdf》中,可能会深入解析RocketMQ的工作原理: 1. **消息存储机制**:RocketMQ采用日志文件存储消息,通过CommitLog和IndexFile配合,实现高效的消息查询和检索。 2. **消息传输机制**:...
本压缩包“RocketMQ原理分析.rar”包含了对RocketMQ核心机制的深入解析,旨在帮助用户理解其工作原理。 RocketMQ的核心组件包括Producer、Consumer、NameServer和Broker四个部分: 1. **Producer**: 生产者是消息...
在事务消息机制中,如果某个Producer(Producer-A)发生故障,导致消息处于PREPARED状态并超时,Broker会回查同一组内的其他Producer,决定消息是提交还是回滚,从而确保事务的一致性。 2. **ConsumerGroup** ...
### RocketMQ实战与原理解析 #### 一、RocketMQ简介 Apache RocketMQ是一个分布式消息中间件,由阿里巴巴捐赠并成为Apache顶级项目。RocketMQ具备高性能、低延迟、高可靠等特性,支持发布/订阅模式、消息过滤、...
【全面解剖RocketMQ与项目实战-day4-part2】是一个深入学习Apache RocketMQ的课程资料压缩包,包含多个视频教程,旨在帮助用户深入了解RocketMQ的工作原理及实际应用。RocketMQ是一个开源的消息中间件,广泛应用于...
本知识点将深入探讨RocketMQ的基础概念、工作原理、实战应用以及源码解析。 一、RocketMQ基础 1. 概念:RocketMQ是一个基于发布/订阅模式的消息队列,支持事务消息、定时/延时消息、顺序消息等多种特性,广泛应用...
在Java编程环境中,RocketMQ的源码解析可以帮助我们理解其内部工作原理,包括消息的生产、消费、存储和调度机制。以下是一些可能涉及的知识点: 1. **消息模型**:RocketMQ支持两种消息模型,发布/订阅(Publish/...
RocketMQ 的架构主要包括 Producer、Consumer、Broker、NameServer、Topic、Message Queue 等角色。Producer 是消息的发送者,Consumer 是消息接收者,Broker 是暂存和传输消息的中间件,NameServer 是管理 Broker ...
在这个“全面解剖RocketMQ和项目实战-day4-part4.7z”资料包中,包含了一系列视频教程,详细讲解了RocketMQ的关键特性和工作流程,下面将对这些知识点进行深入解析。 1. **实时更新消息消费队列与索引文件流程说明*...