1、mqnamesrv单机启动2台服务,可以通过-c xxx.properties指定端口号(如:listenPort=9877,默认端口号9876)
2、master、slave异步复制模式
master宕机,slave不能自动切换到master。单个master/slave模式,生产消息受影响;
slave宕机,完全没影响。
3、producer生产消息轮询队列策略
默认:随机轮询队列(Roundbin),全局维护一个自增id,发送一次消息自增一次,与queueSize取模得出发送队列;
自定义:发送消息时,自定义messageQueueSelector类根据一定规则选择特定队列,可做到顺序消息,
如,相同的订单号发送到同一个队列,代码如下:
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
说明:
(1)顺序消息是指消费的顺序跟生产的消息顺序一致,一般指一类消息(比如同一个订单号)的顺序性,producer单线程顺序发送且发送到同一队列,consumer就可以按照producer发送的顺序消费。
(2)顺序消息的缺点是:(a)队列热点问题,在消息量比较大的应用,个别队列因为hash不均,导致消息大量堆积(b)Producer发送消息无法利用高可用,队列发送失败不会重试(c)遇到消费失败消息,无法跳过,直接队列消费暂停
以上缺点,阿里团队会在第四版Rocketmq改进。
4、consumer消费消息轮询队列策略
(1)RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。
但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。
(2)同一个ConsumerGroup中的多个Consumer平均消费队列消息,分配策略如下:
如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列。
这里采用的就是平均分配策略,它类似于分页的过程,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页有多少条记录,就类似于某个Consumer会消费哪些队列。
(3)同时,消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载,获取同一个Consumer Group下的所有Consumer实例数或Topic的queue的个数是否改变,通知所有Consumer实例重新做一次负载均衡算法。
参考:https://www.zhihu.com/question/52023315/answer/131450604
5、broker、namesrv的默认配置查询,命令如下:
sh mqbroker -p
sh mqnamesrv -p
6、发送定时消息
rocketmq可设置不同Level的定时发送消息,配置在broker中,默认为:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推,默认延迟Level为0,即不延迟。
如,客户端发送消息时候,延迟10s发送,demo代码如下:
Message msg = new Message(topic, tags, keys, body);
msg.setDelayTimeLevel(3);
SendResult sendResult = producer().send(msg);
7、消费消息失败,定时重试消费机制
(a)、根据broker的延时Level定时发送consumer消息。经过测试,消费端业务处理失败返回ConsumeConcurrentlyStatus.RECONSUME_LATER,
broker会根据设置的messageDelayLevel,定时发送consumer重试消费,查看broker的consumeProgress消费进度,consumeOffset和brokerOffset一直在增加(不知道为啥这么设计?),
直到返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS为止结束这条消息的消费;如果发送的消息设置有DelayTimeLevel,broker会从设置的级别后的messageDelayLevel对应的延迟时间重试;
(b)还有个重试次数(默认16)
两者满足其一消息进入死信队列。
8、producer发送消息失败重试机制
抛异常时(RemotingException,MQClientException,MQBrokerException),Producer会选择另外一个队列发送消息,重试发送消息要满足下列条件:
重试次数 < retryTimesWhenSendFailed+1,注意:retryTimesWhenSendFailed一定小于队列个数,重试完队列会直接退出重试(设置方式:mqProducer.setRetryTimesWhenSendFailed(xxx),默认为2【版本:3.5.8】)
非抛异常时,正常返回SendStatus状态,但是sendResult.getSendStatus() != SendStatus.SEND_OK,根据retryAnotherBrokerWhenNotStoreOK=true[默认是false],重试其他队列再发送,重试逻辑与上面相同。
注意:看有网友写到同时需要满足这个条件,总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数),源代码中没看到。 只是看到sendMsgTimeout作为一次发送消息的超时时间而已,不参与重试策略计算。
这个有待商榷,源代码没有深入研究,可能我看的有遗漏。
9、broker和client/producer版本不一致问题
发现broker/namesrv版本为3.5.8,client/producer版本为3.6.2.final,此时生产消息正常,也可以消费消息,但是通过ConsumerProgress查看消费进度,生产的消息全部堆积着,但是consumer确实已经消费完了;
无论设置的ConsumeFromWhere是CONSUME_FROM_LAST_OFFSET或CONSUME_FROM_FIRST_OFFSET,重启consumer都会重新消费消息,并且消息一直堆积。更换client/producer的版本号后,正常。
10、启动、关闭broker、namesrv
#启动namesrv
nohup sh mqnamesrv [-c xxx.properties] > mqnamesrv.log 2>&1 &
#启动broker[ip:port为namesrv的ip和端口号]
nohup sh mqbroker -c xxx.conf -n ip:port[;ip1:port1] > mqbroker.log 2>&1 &
测试中发现broker的一些问题:
(1)可在xxx.conf设置namesrvAddr=ip:port[;ip1:port1],启动broker时不需要-n ip:port[;ip1:port1]
(2)因为broker部署在虚拟机,并且虚拟双网卡,client无法正常连接服务端(因为broker启动时获取了服务器不对外的ip),
所以需要在xxx.conf设置brokerIP1=192.168.56.101,显式指定本机IP;同一台服务器部署多个broker,需要显式指定端口;listenPort=20911(默认为10911)。
可用配置参考如下:
namesrvAddr=192.168.56.101:9876
brokerIP1=192.168.56.101
listenPort=10911
........
说明:broker默认端口10911,同时该broker会使用10910[mqadmin连接使用],10912端口[slave连接使用];所以设置其他broker端口号时请规避。
关闭命令:
sh mqshutdown namesrv
sh mqshutdown broker
以上命令会关闭集群中所有的namesrv或broker,测试中如果仅关闭broker_slave,可使用kill pid.
11、队列个数设置
producer发送消息时候设置,特别注意:同一个topic仅当第一次创建的时候设置有效,以后修改无效,除非修改broker服务器上的consume.json文件,
demo代码如下:mqProducer.setDefaultTopicQueueNums(5)
参考:http://www.mamicode.com/info-detail-327693.html
12、ConsumeFromWhere的设置说明
//一个【新的订阅组】第一次启动从队列的【最前】位置开始消费,后续再启动接着上次消费的进度开始消费
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
//一个【新的订阅组】第一次启动从队列的【最后】位置开始消费,后续再启动接着上次消费的进度开始消费
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
13、消息存储结构
以consumeQueue和common log两个文件为主:
(1)consume Queue
consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置,通过broker配置文件设置。
我们可以在配置中指定consumequeue与commitlog存储的目录,每个topic下的每个queue都有一个对应的consumequeue文件,比如:${user.home}/store/consumequeue/${topicName}/${queueId}/${fileName},可在broker中配置storePathCommitLog=xxx
Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,存储格式如:
|------------8Byte------------|------4Byte----|---------------8Byte------------|
|------CommonLog Offset-------|------Size-----|------MessageTag HashCode-------|
(2)commit Log
消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。文件的默认位置如${user.home}/store/commitlog/${fileName},可在broker中配置storePathConsumeQueue=xxx
CommitLog的消息存储单元长度不固定,文件顺序写,随机读。
(3)消息索引文件
如果一个消息包含key值的话,会使用IndexFile存储消息索引,索引文件主要用于根据key来查询消息的,文件默认位置如下:${user.home}/store/index/${fileName},可在broker中配置,可在broker中配置storePathIndex=xxx
参考:http://www.jianshu.com/p/453c6e7ff81c
14、消息队列和消息存储
rocketmq的消息队列全部都是持久化的,长度无限的数据结构(每个存储单元都是定长)。
因消息存储在message Queue中,且它的具有水平扩展能力,并且根据过期时间定期删除,所以消息存储量大小跟磁盘大小有关,假设1K消息算,96G内存,物理内存可以缓存1亿条消息。
15、可用的管理命令
查看组内的消费进度:sh mqadmin consumerProgress -g xx
查看组内的消费实例:sh mqadmin consumerConnection -g xx
查看集群状态:sh mqadmin clusterList
查看主题列表:sh mqadmin topicList
查看主题的路由情况:sh mqadmin topicRoute -t xx
查看主题的状态:sh mqadmin topicStatus -t xx
查看主题的集群信息:sh mqadmin topicClusterList -t xx
查看消费实例的内部数据结构:sh mqadmin consumerStatus -g xx -i clientId
消除某个broker上的写权限(用于安全关闭Broker):sh mqadmin wipeWritePerm -b xx
通过时间戳重置消费偏移量(用于回溯消费):sh mqadmin resetOffsetByTime -g xx -s yy -t tt
说明:也可以通过rocketmq-console页面控制台查询以上所有数据。
16、broker集群部署配置说明[推荐使用多master多slave,异步复制模式]
Master 与 Slave 配对是通过指定相同的 brokerName 参数来配对, Master 的 BrokerId 必须是 0, Slave 的BrokerId 必须是大于 0 的数。
第10条说过,如果服务器多网卡,可能需要设置本机brokerIP地址,同时可以设置namesrvAddr地址;每一个broker
broker_a_master.properties可配置如下:
brokerClusterName=DefaultCluster
brokerName=broker-a
namesrvAddr=192.168.56.101:9876;192.168.56.101:9877
brokerIP1=192.168.56.101
listenPort=10911
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/work/alibaba-rocketmq/store_a_master
storePathCommitLog=/work/alibaba-rocketmq/store_a_master/commitlog
storePathConsumeQueue=/work/alibaba-rocketmq/store_a_master/consumequeue
说明:不同的broker,如果部署在同一台服务器,都需要指定不同的storePathXXXX路径。
17、Producer/Consumer Group的作用
Producer Group:
(1)、可以通过运维工具查询这个组下有多少Producer实例,命令:sh mqadmin producerConnection;
(2)、事务消息,如果Producer意外宕机,Broker会主动回调Producer Group中的任意一台机器确认事务状态。
Consumer Group:
(1)、可通过运维工具查询这个组下的消费进度,多少个Consumer实例,命令:sh mqadmin consumerProgress/consumerConnection -g xxx
(2)、集群模式,一个Consumer Group下的多个Consumer均摊消费消息;广播模式,group无意义。
18、回溯消费
Consumer已经消费成功的消息,由于业务上需求需要重新消费,那么Broker要提供一种机制,可以按照时间维度来回退消费进度,rocketmq支持精确到毫秒。可以向前或者向后回溯消息
使用命令:sh mqadmin resetOffsetByTime -g xx -s yy -t tt
19、消息过滤
3种方式都是在broker端做过滤:
(1)通过Message Tag过滤,* 代表所有类型消息,xxx||yyy 用||分隔消费不同类型的消息;
(2)可以通过Message Header/body过滤消息(官方文档说明,未经过测试);
(3)通过客户端定义的过滤器类传递到broker端过滤消息,详细步骤如下:
<a>Broker所在服务器会启动多个FilterServer过滤进程;
<b>Consumer启动时把自定义的过滤器类传递到FilterServer;
<c>Consumer从FilterServer拉消息,FilterServer把请求转发给Broker,FilterServer收到消息后,按照Consumer上传的过滤程序过滤后,把消费返回给Consumer。
测试代码如下:
String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", filterCode);
20、线上broker配置关闭autoCreateTopicEnable(官方建议关闭)
RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,
这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,
然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。
后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。
参考:http://www.jianshu.com/p/453c6e7ff81c
21、消费过程要保证幂等性(消费端去重)、Consumer尽可能批量消费消息(设置Consumer的consumerMessageBatchMaxSize[默认为1])
22、NameServer/Broker/Producer/Consumer关系
参考:http://www.cnblogs.com/tommyli/p/5081846.html
23、发送消息的3种类型
参见类CommunicationMode,
SYNC,
ASYNC,
ONEWAY
producer.send(msg,SendCallback)方法中带有SendCallback类参数的是异步发送消息;
producer.sendOneway(msg);
否则为同步发送消息(默认为同步发送消息)。
24、死信队列
按照消费端GroupName分组死信队列,消费端消费失败,并且重试指定次数(默认16),会进入死信队列。可通过命令updateSubGroup 修改次数
其他:也可通过updateSubGroup设置ConsumeGroup的重试队列个数(默认1)。
.... 后续待总结
附录:
1、参考网站如下
http://www.jianshu.com/p/453c6e7ff81c
http://hiant.github.io/2016/08/26/rocketmq-0x5/
http://xingxiudong.com/2015/05/18/rocketmq-message-delay-config/
https://catslave.github.io/rocketmq/2016/08/15/RocketMQ.NameServer.html
http://www.tianshouzhi.com/api/tutorials/rocketmq
http://www.uml.org.cn/zjjs/201504011.asp
https://github.com/alibaba/RocketMQ/wiki/CLI-Admin-Tool
http://blog.csdn.net/lang_man_xing/article/details/47447797
问题解决:
http://blog.csdn.net/a417930422/article/details/50663639
http://www.mamicode.com/info-detail-327693.html
2、rocketmq-console.war
http://pan.baidu.com/s/1hs9i0lA
最后说明:不要相信官方PDF文档中的默认值,不同版本可能不一样,以源码为主。
相关推荐
rocketMq知识点总结,最全思维导图,互联网面试必备
RocketMQ学习笔记1 RocketMQ是Apache旗下的一个开源的消息队列系统,具有分布式、可靠、可扩展、高性能等特点。下面是对RocketMQ的学习笔记的总结。 分布式架构 RocketMQ原生支持分布式,解决了单点故障问题,...
RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大规模...通过学习和实践RocketMQ,开发者可以构建出稳定、高效的分布式应用,同时,官方文档是学习过程中的重要工具,能帮助我们更好地理解和使用RocketMQ。
总结来说,“rocketmq使用.zip”这个压缩包包含了关于RocketMQ的基本使用和核心特性——事务消息的资料,对于理解并运用RocketMQ在分布式系统中的事务处理非常有帮助。通过学习和实践,你可以构建出更健壮、高效的...
本学习总结将深入探讨RocketMQ的核心概念、工作原理以及实战应用,帮助读者全面掌握这款强大的工具。 一、RocketMQ概述 RocketMQ起源于阿里巴巴内部项目,后来成为Apache顶级项目,其设计目标是处理海量消息,支持...
通过以上知识点的总结,我们可以了解到RocketMQ源码解析涉及到了raft协议、多副本机制、源码阅读技巧、分布式系统架构、中间件实践经验等多个方面。这些知识点共同构成了理解和分析RocketMQ内部工作机制的完整框架。
在学习 RocketMQ 源码的过程中,有几个关键的知识点值得深入理解: 1. **ConsumerMode 模式**: - **Concurrentluy** 模式:消费者并行消费,消息的消费顺序不保证,适合对消息顺序性要求不高的场景。 - **...
总结起来,RocketMQ Console的这次更新增强了其安全管理能力,通过ACL鉴权和控制台登录验证,提供了更为安全、可控的使用环境。这对于运行关键业务的组织来说尤其重要,可以有效保护数据安全,防止未授权的访问和...
总结来说,RocketMQ3.0.6 用户指南为消息中间件开发者提供了一个全面的学习资源,从安装配置、使用方法到异常处理,都进行了详尽的解释,帮助开发者快速掌握RocketMQ的使用,降低开发和运维的门槛。通过文档的学习,...
总结,"rocketmq.zip"包含了在Linux系统上部署RocketMQ所需的所有资源,从安装、配置到使用,涉及了Linux基础、消息队列原理、RocketMQ组件、安装步骤以及运维监控等多个方面,是学习和实践RocketMQ的重要资料。
RocketMQ是中国阿里巴巴开源的一...通过学习,你将能够熟练掌握RocketMQ的使用,理解其内部工作原理,并有能力解决实际开发中遇到的问题。对于想深入了解和应用分布式消息中间件的人来说,这是一个不可多得的学习资源。
总结来说,阿里RocketMQ用户指南V3.2.4为用户在使用RocketMQ过程中可能遇到的各种问题提供了详尽的指导,覆盖了从基础概念到高级特性,从安装配置到运行维护,再到性能调优和问题排查的方方面面。文档通过具体的指令...
总结来说,RocketMQ是一个强大的消息中间件,4.7.1版本提供二进制和源码两种形式供用户使用。无论是快速部署还是深度定制,都能满足不同层次的需求。了解并掌握RocketMQ,将有助于提升你在分布式系统设计和开发中的...
- **消息模型**:理解RocketMQ中的消息模型是学习的基础,包括消息生产者(Producer)、消息消费者(Consumer)以及消息队列(Message Queue)的概念。 - **消息类型**:RocketMQ支持不同类型的消息,包括普通消息、顺序...
总结,RocketMQ作为一款强大的消息中间件,其丰富的特性和稳定性能使其在业界广泛应用。通过学习RocketMQ,开发者不仅能掌握消息队列的核心概念,还能提升对分布式系统的理解和实践能力。提供的"RocketMQ源码解析....
阿里RocketMQ是一款由阿里...通过深入学习这些资料,你将能够熟练掌握RocketMQ的使用,理解其内在机制,并能有效地解决在实际项目中遇到的问题。无论是初学者还是有经验的开发者,都能从中受益,提升自己的技术能力。
【RocketMQ 3.2.6 Web监控程序详解】 RocketMQ是阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统中扮演着重要角色,提供高...通过源码学习和与其他工具的结合,可以进一步提升对RocketMQ的理解和使用能力。
RocketMQ是一种开源的消息中间件,主要由阿里巴巴贡献并维护,广泛应用于分布式系统中,用于解耦应用程序、异步处理和构建可靠的数据传输...开发者可以通过学习和实践RocketMQ,提升其在微服务架构下的消息处理能力。
总结来说,SpringBoot Dubbo RocketMQ订单支付系统是一个典型的企业级微服务架构,通过合理地划分服务边界,利用Dubbo实现服务间的通信,借助RocketMQ实现消息驱动的异步处理,从而达到高性能、高可用的目标。...
还要学习如何配置消息队列、保证消息顺序和幂等性,以及如何监控和排查 RocketMQ 相关的问题。 Java并发编程库(JUC)是Java标准库的一部分,提供了丰富的线程和同步工具,如Semaphore、CyclicBarrier、...