1、mqnamesrv单机启动2台服务,可以通过-c xxx.properties指定端口号(如:listenPort=9877,默认端口号9876)
2、master、slave异步复制模式
master宕机,slave不能自动切换到master。单个master/slave模式,生产消息受影响;
slave宕机,完全没影响。
3、producer生产消息轮询队列策略
默认:随机轮询队列(Roundbin),全局维护一个自增id,发送一次消息自增一次,与queueSize取模得出发送队列;
自定义:发送消息时,自定义messageQueueSelector类根据一定规则选择特定队列,可做到顺序消息,
如,相同的订单号发送到同一个队列,代码如下:
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
说明:顺序消息是指消费的顺序跟生产的消息顺序一致,一般指一类消息(比如同一个订单号)的顺序性,producer单线程顺序发送且发送到同一队列,consumer就可以按照producer发送的顺序消费。
4、consumer消费消息轮询队列策略
(1)RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。
但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。
(2)同一个ConsumerGroup中的多个Consumer平均消费队列消息,分配策略如下:
如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列。
这里采用的就是平均分配策略,它类似于分页的过程,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页有多少条记录,就类似于某个Consumer会消费哪些队列。
(3)同时,消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载,获取同一个Consumer Group下的所有Consumer实例数或Topic的queue的个数是否改变,通知所有Consumer实例重新做一次负载均衡算法。
参考:https://www.zhihu.com/question/52023315/answer/131450604
5、broker、namesrv的默认配置查询,命令如下:
sh mqbroker -p
sh mqnamesrv -p
6、发送定时消息
rocketmq可设置不同Level的定时发送消息,配置在broker中,默认为:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推,默认延迟Level为0,即不延迟。
如,客户端发送消息时候,延迟10s发送,demo代码如下:
Message msg = new Message(topic, tags, keys, body);
msg.setDelayTimeLevel(3);
SendResult sendResult = producer().send(msg);
7、消费消息失败,定时重试消费机制
猜测broker会根据broker的延时Level定时发送consumer消息。经过测试,消费端业务处理失败返回ConsumeConcurrentlyStatus.RECONSUME_LATER,
broker会根据设置的messageDelayLevel,定时发送consumer重试消费,查看broker的consumeProgress消费进度,consumeOffset和brokerOffset一直在增加(不知道为啥这么设计?),
直到返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS为止结束这条消息的消费;如果发送的消息设置有DelayTimeLevel,broker会从设置的级别后的messageDelayLevel对应的延迟时间重试。
8、producer发送消息失败重试机制
抛异常时(RemotingException,MQClientException,MQBrokerException),Producer会选择另外一个队列发送消息,重试发送消息要满足下列条件:
重试次数 < retryTimesWhenSendFailed+1,注意:retryTimesWhenSendFailed一定小于队列个数,重试完队列会直接退出重试(设置方式:mqProducer.setRetryTimesWhenSendFailed(xxx),默认为2【版本:3.5.8】)
非抛异常时,正常返回SendStatus状态,但是sendResult.getSendStatus() != SendStatus.SEND_OK,根据retryAnotherBrokerWhenNotStoreOK=true[默认是false],重试其他队列再发送,重试逻辑与上面相同。
注意:看有网友写到同时需要满足这个条件,总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数),源代码中没看到。 只是看到sendMsgTimeout作为一次发送消息的超时时间而已,不参与重试策略计算。
这个有待商榷,源代码没有深入研究,可能我看的有遗漏。
9、broker和client/producer版本不一致问题
发现broker/namesrv版本为3.5.8,client/producer版本为3.6.2.final,此时生产消息正常,也可以消费消息,但是通过ConsumerProgress查看消费进度,生产的消息全部堆积着,但是consumer确实已经消费完了;
无论设置的ConsumeFromWhere是CONSUME_FROM_LAST_OFFSET或CONSUME_FROM_FIRST_OFFSET,重启consumer都会重新消费消息,并且消息一直堆积。更换client/producer的版本号后,正常。
10、启动、关闭broker、namesrv
#启动namesrv
nohup sh mqnamesrv [-c xxx.properties] > mqnamesrv.log 2>&1 &
#启动broker[ip:port为namesrv的ip和端口号]
nohup sh mqbroker -c xxx.conf -n ip:port[;ip1:port1] > mqbroker.log 2>&1 &
测试中发现broker的一些问题:
(1)可在xxx.conf设置namesrvAddr=ip:port[;ip1:port1],启动broker时不需要-n ip:port[;ip1:port1]
(2)因为broker部署在虚拟机,并且虚拟双网卡,client无法正常连接服务端(因为broker启动时获取了服务器不对外的ip),
所以需要在xxx.conf设置brokerIP1=192.168.56.101,显式指定本机IP;同一台服务器部署多个broker,需要显式指定端口;listenPort=20911(默认为10911)。
可用配置参考如下:
namesrvAddr=192.168.56.101:9876
brokerIP1=192.168.56.101
listenPort=10911
........
说明:broker默认端口10911,同时该broker会使用10910[mqadmin连接使用],10912端口[slave连接使用];所以设置其他broker端口号时请规避。
关闭命令:
sh mqshutdown namesrv
sh mqshutdown broker
以上命令会关闭集群中所有的namesrv或broker,测试中如果仅关闭broker_slave,可使用kill pid.
11、队列个数设置
producer发送消息时候设置,特别注意:同一个topic仅当第一次创建的时候设置有效,以后修改无效,除非修改broker服务器上的consume.json文件,
demo代码如下:mqProducer.setDefaultTopicQueueNums(5)
参考:http://www.mamicode.com/info-detail-327693.html
12、ConsumeFromWhere的设置说明
//一个【新的订阅组】第一次启动从队列的【最前】位置开始消费,后续再启动接着上次消费的进度开始消费
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
//一个【新的订阅组】第一次启动从队列的【最后】位置开始消费,后续再启动接着上次消费的进度开始消费
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
13、消息存储结构
以consumeQueue和common log两个文件为主:
(1)consume Queue
consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置,通过broker配置文件设置。
我们可以在配置中指定consumequeue与commitlog存储的目录,每个topic下的每个queue都有一个对应的consumequeue文件,比如:${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,存储格式如:
|------------8Byte------------|------4Byte---|-------------8Byte--------------|
|------CommonLog Offset-------|------Size----|------MessageTag HashCode-------|
(2)common Log
消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。文件的默认位置如下,可通过broker配置文件设置:${user.home} \store\${commitlog}\${fileName}
CommitLog的消息存储单元长度不固定,文件顺序写,随机读。
(3)消息索引文件
如果一个消息包含key值的话,会使用IndexFile存储消息索引,索引文件主要用于根据key来查询消息的。
参考:http://www.jianshu.com/p/453c6e7ff81c
14、消息队列和消息存储
rocketmq的消息队列全部都是持久化的,长度无限的数据结构(每个存储单元都是定长)。
因消息存储在message Queue中,且它的具有水平扩展能力,并且根据过期时间定期删除,所以消息存储量大小跟磁盘大小有关,假设1K消息算,96G内存,物理内存可以缓存1亿条消息。
15、可用的管理命令
查看组内的消费进度:sh mqadmin consumerProgress -g xx
查看组内的消费实例:sh mqadmin consumerConnection -g xx
查看集群状态:sh mqadmin clusterList
查看主题列表:sh mqadmin topicList
查看主题的路由情况:sh mqadmin topicRoute -t xx
查看主题的状态:sh mqadmin topicStatus -t xx
查看主题的集群信息:sh mqadmin topicClusterList -t xx
查看消费实例的内部数据结构:sh mqadmin consumerStatus -g xx -i clientId
消除某个broker上的写权限(用于安全关闭Broker):sh mqadmin wipeWritePerm -b xx
通过时间戳重置消费偏移量(用于回溯消费):sh mqadmin resetOffsetByTime -g xx -s yy -t tt
说明:也可以通过rocketmq-console页面控制台查询以上所有数据。
16、broker集群部署配置说明[推荐使用多master多slave,异步复制模式]
Master 与 Slave 配对是通过指定相同的 brokerName 参数来配对, Master 的 BrokerId 必须是 0, Slave 的BrokerId 必须是大于 0 的数。
第10条说过,如果服务器多网卡,可能需要设置本机brokerIP地址,同时可以设置namesrvAddr地址;每一个broker
broker_a_master.properties可配置如下:
brokerClusterName=DefaultCluster
brokerName=broker-a
namesrvAddr=192.168.56.101:9876;192.168.56.101:9877
brokerIP1=192.168.56.101
listenPort=10911
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/work/alibaba-rocketmq/store_a_master
storePathCommitLog=/work/alibaba-rocketmq/store_a_master/commitlog
storePathConsumeQueue=/work/alibaba-rocketmq/store_a_master/consumequeue
说明:不同的broker,如果部署在同一台服务器,都需要指定不同的storePathXXXX路径。
17、Producer/Consumer Group的作用
Producer Group:
(1)、可以通过运维工具查询这个组下有多少Producer实例,命令:sh mqadmin producerConnection;
(2)、事务消息,如果Producer意外宕机,Broker会主动回调Producer Group中的任意一台机器确认事务状态。
Consumer Group:
(1)、可通过运维工具查询这个组下的消费进度,多少个Consumer实例,命令:sh mqadmin consumerProgress/consumerConnection -g xxx
(2)、集群模式,一个Consumer Group下的多个Consumer均摊消费消息;广播模式,group无意义。
18、回溯消费
Consumer已经消费成功的消息,由于业务上需求需要重新消费,那么Broker要提供一种机制,可以按照时间维度来回退消费进度,rocketmq支持精确到毫秒。可以向前或者向后回溯消息
使用命令:sh mqadmin resetOffsetByTime -g xx -s yy -t tt
19、消息过滤
3种方式都是在broker端做过滤:
(1)通过Message Tag过滤,* 代表所有类型消息,xxx||yyy 用||分隔消费不同类型的消息;
(2)可以通过Message Header/body过滤消息(官方文档说明,未经过测试);
(3)通过客户端定义的过滤器类传递到broker端过滤消息,详细步骤如下:
<a>Broker所在服务器会启动多个FilterServer过滤进程;
<b>Consumer启动时把自定义的过滤器类传递到FilterServer;
<c>Consumer从FilterServer拉消息,FilterServer把请求转发给Broker,FilterServer收到消息后,按照Consumer上传的过滤程序过滤后,把消费返回给Consumer。
测试代码如下:
String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", filterCode);
20、线上broker配置关闭autoCreateTopicEnable(官方建议关闭)
RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,
这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,
然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。
后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。
参考:http://www.jianshu.com/p/453c6e7ff81c
21、消费过程要保证幂等性(消费端去重)、Consumer尽可能批量消费消息(设置Consumer的consumerMessageBatchMaxSize[默认为1])
22、NameServer/Broker/Producer/Consumer关系
参考:http://www.cnblogs.com/tommyli/p/5081846.html
23、发送消息的3种类型
参见类CommunicationMode,
SYNC,
ASYNC,
ONEWAY
producer.send(msg,SendCallback)方法中带有SendCallback类参数的是异步发送消息;
producer.sendOneway(msg);
否则为同步发送消息(默认为同步发送消息)。
.... 后续待总结
24、参考网站如下
http://www.jianshu.com/p/453c6e7ff81c
http://hiant.github.io/2016/08/26/rocketmq-0x5/
http://xingxiudong.com/2015/05/18/rocketmq-message-delay-config/
https://catslave.github.io/rocketmq/2016/08/15/RocketMQ.NameServer.html
http://www.tianshouzhi.com/api/tutorials/rocketmq
http://www.uml.org.cn/zjjs/201504011.asp
https://github.com/alibaba/RocketMQ/wiki/CLI-Admin-Tool
问题解决:
http://blog.csdn.net/a417930422/article/details/50663639
http://www.mamicode.com/info-detail-327693.html
25、rocketmq-console.war 见附件
最后说明:不要相信官方PDF文档中的默认值,不同版本可能不一样,以源码为主。