论坛首页 Java企业应用论坛

RocketMq学习总结

浏览 5745 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2017-02-23  

1、mqnamesrv单机启动2台服务,可以通过-c xxx.properties指定端口号(如:listenPort=9877,默认端口号9876)

 

2、master、slave异步复制模式

master宕机,slave不能自动切换到master。单个master/slave模式,生产消息受影响;

slave宕机,完全没影响。

 

3、producer生产消息轮询队列策略

默认:随机轮询队列(Roundbin),全局维护一个自增id,发送一次消息自增一次,与queueSize取模得出发送队列;

自定义:发送消息时,自定义messageQueueSelector类根据一定规则选择特定队列,可做到顺序消息,

如,相同的订单号发送到同一个队列,代码如下:

producer.send(msg, new MessageQueueSelector() {

  @Override

  public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

        Integer id = (Integer) arg;

        int index = id % mqs.size();

        return mqs.get(index);

  }

}, orderId);

 

说明:顺序消息是指消费的顺序跟生产的消息顺序一致,一般指一类消息(比如同一个订单号)的顺序性,producer单线程顺序发送且发送到同一队列,consumer就可以按照producer发送的顺序消费。

 

4、consumer消费消息轮询队列策略

(1)RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。

但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

(2)同一个ConsumerGroup中的多个Consumer平均消费队列消息,分配策略如下:

如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列。

这里采用的就是平均分配策略,它类似于分页的过程,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页有多少条记录,就类似于某个Consumer会消费哪些队列。

(3)同时,消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载,获取同一个Consumer Group下的所有Consumer实例数或Topic的queue的个数是否改变,通知所有Consumer实例重新做一次负载均衡算法。

 

参考:https://www.zhihu.com/question/52023315/answer/131450604

 

5、broker、namesrv的默认配置查询,命令如下:

sh mqbroker -p

sh mqnamesrv -p

 

6、发送定时消息

rocketmq可设置不同Level的定时发送消息,配置在broker中,默认为:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推,默认延迟Level为0,即不延迟。

如,客户端发送消息时候,延迟10s发送,demo代码如下:

Message msg = new Message(topic, tags, keys, body);

msg.setDelayTimeLevel(3);

SendResult sendResult = producer().send(msg);

 

7、消费消息失败,定时重试消费机制

猜测broker会根据broker的延时Level定时发送consumer消息。经过测试,消费端业务处理失败返回ConsumeConcurrentlyStatus.RECONSUME_LATER,

broker会根据设置的messageDelayLevel,定时发送consumer重试消费,查看broker的consumeProgress消费进度,consumeOffset和brokerOffset一直在增加(不知道为啥这么设计?),

直到返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS为止结束这条消息的消费;如果发送的消息设置有DelayTimeLevel,broker会从设置的级别后的messageDelayLevel对应的延迟时间重试。

 

8、producer发送消息失败重试机制

抛异常时(RemotingException,MQClientException,MQBrokerException),Producer会选择另外一个队列发送消息,重试发送消息要满足下列条件:

重试次数 < retryTimesWhenSendFailed+1,注意:retryTimesWhenSendFailed一定小于队列个数,重试完队列会直接退出重试(设置方式:mqProducer.setRetryTimesWhenSendFailed(xxx),默认为2【版本:3.5.8】)

非抛异常时,正常返回SendStatus状态,但是sendResult.getSendStatus() != SendStatus.SEND_OK,根据retryAnotherBrokerWhenNotStoreOK=true[默认是false],重试其他队列再发送,重试逻辑与上面相同。

 

注意:看有网友写到同时需要满足这个条件,总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数),源代码中没看到。 只是看到sendMsgTimeout作为一次发送消息的超时时间而已,不参与重试策略计算。

这个有待商榷,源代码没有深入研究,可能我看的有遗漏。

 

9、broker和client/producer版本不一致问题

发现broker/namesrv版本为3.5.8,client/producer版本为3.6.2.final,此时生产消息正常,也可以消费消息,但是通过ConsumerProgress查看消费进度,生产的消息全部堆积着,但是consumer确实已经消费完了;

无论设置的ConsumeFromWhere是CONSUME_FROM_LAST_OFFSET或CONSUME_FROM_FIRST_OFFSET,重启consumer都会重新消费消息,并且消息一直堆积。更换client/producer的版本号后,正常。

 

10、启动、关闭broker、namesrv

#启动namesrv 

nohup sh mqnamesrv [-c xxx.properties] > mqnamesrv.log 2>&1 &   

#启动broker[ip:port为namesrv的ip和端口号]

nohup sh mqbroker -c xxx.conf -n ip:port[;ip1:port1] > mqbroker.log 2>&1 & 

测试中发现broker的一些问题:

(1)可在xxx.conf设置namesrvAddr=ip:port[;ip1:port1],启动broker时不需要-n ip:port[;ip1:port1]

(2)因为broker部署在虚拟机,并且虚拟双网卡,client无法正常连接服务端(因为broker启动时获取了服务器不对外的ip),

所以需要在xxx.conf设置brokerIP1=192.168.56.101,显式指定本机IP;同一台服务器部署多个broker,需要显式指定端口;listenPort=20911(默认为10911)。

可用配置参考如下:

namesrvAddr=192.168.56.101:9876

brokerIP1=192.168.56.101

listenPort=10911

........

 

说明:broker默认端口10911,同时该broker会使用10910[mqadmin连接使用],10912端口[slave连接使用];所以设置其他broker端口号时请规避。

 

关闭命令:

sh mqshutdown namesrv

sh mqshutdown broker

以上命令会关闭集群中所有的namesrv或broker,测试中如果仅关闭broker_slave,可使用kill pid.

 

11、队列个数设置

producer发送消息时候设置,特别注意:同一个topic仅当第一次创建的时候设置有效,以后修改无效,除非修改broker服务器上的consume.json文件,

demo代码如下:mqProducer.setDefaultTopicQueueNums(5)

参考:http://www.mamicode.com/info-detail-327693.html

 

12、ConsumeFromWhere的设置说明

//一个【新的订阅组】第一次启动从队列的【最前】位置开始消费,后续再启动接着上次消费的进度开始消费

ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET

 

//一个【新的订阅组】第一次启动从队列的【最后】位置开始消费,后续再启动接着上次消费的进度开始消费

ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET

 

13、消息存储结构

以consumeQueue和common log两个文件为主:

(1)consume Queue

consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置,通过broker配置文件设置。

我们可以在配置中指定consumequeue与commitlog存储的目录,每个topic下的每个queue都有一个对应的consumequeue文件,比如:${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,存储格式如:

|------------8Byte------------|------4Byte---|-------------8Byte--------------|

|------CommonLog Offset-------|------Size----|------MessageTag HashCode-------|

(2)common Log

消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。文件的默认位置如下,可通过broker配置文件设置:${user.home} \store\${commitlog}\${fileName}

CommitLog的消息存储单元长度不固定,文件顺序写,随机读。

(3)消息索引文件

如果一个消息包含key值的话,会使用IndexFile存储消息索引,索引文件主要用于根据key来查询消息的。

 

参考:http://www.jianshu.com/p/453c6e7ff81c

 

14、消息队列和消息存储

rocketmq的消息队列全部都是持久化的,长度无限的数据结构(每个存储单元都是定长)。

因消息存储在message Queue中,且它的具有水平扩展能力,并且根据过期时间定期删除,所以消息存储量大小跟磁盘大小有关,假设1K消息算,96G内存,物理内存可以缓存1亿条消息。

 

15、可用的管理命令

查看组内的消费进度:sh mqadmin consumerProgress -g xx

查看组内的消费实例:sh mqadmin consumerConnection -g xx

查看集群状态:sh mqadmin clusterList

查看主题列表:sh mqadmin topicList

查看主题的路由情况:sh mqadmin topicRoute -t xx

查看主题的状态:sh mqadmin topicStatus -t xx

查看主题的集群信息:sh mqadmin topicClusterList -t xx

查看消费实例的内部数据结构:sh mqadmin consumerStatus -g xx -i clientId

消除某个broker上的写权限(用于安全关闭Broker):sh mqadmin wipeWritePerm -b xx

通过时间戳重置消费偏移量(用于回溯消费):sh mqadmin resetOffsetByTime -g xx -s yy -t tt

 

说明:也可以通过rocketmq-console页面控制台查询以上所有数据。

 

16、broker集群部署配置说明[推荐使用多master多slave,异步复制模式]

Master 与 Slave 配对是通过指定相同的 brokerName 参数来配对, Master 的 BrokerId 必须是 0, Slave 的BrokerId 必须是大于 0 的数。

第10条说过,如果服务器多网卡,可能需要设置本机brokerIP地址,同时可以设置namesrvAddr地址;每一个broker

broker_a_master.properties可配置如下:

brokerClusterName=DefaultCluster

brokerName=broker-a

namesrvAddr=192.168.56.101:9876;192.168.56.101:9877

brokerIP1=192.168.56.101

listenPort=10911

brokerId=0

deleteWhen=04

fileReservedTime=48

brokerRole=ASYNC_MASTER

flushDiskType=ASYNC_FLUSH

 

storePathRootDir=/work/alibaba-rocketmq/store_a_master

storePathCommitLog=/work/alibaba-rocketmq/store_a_master/commitlog

storePathConsumeQueue=/work/alibaba-rocketmq/store_a_master/consumequeue

 

说明:不同的broker,如果部署在同一台服务器,都需要指定不同的storePathXXXX路径。

 

17、Producer/Consumer Group的作用

Producer Group:

(1)、可以通过运维工具查询这个组下有多少Producer实例,命令:sh mqadmin producerConnection;

(2)、事务消息,如果Producer意外宕机,Broker会主动回调Producer Group中的任意一台机器确认事务状态。

Consumer Group:

(1)、可通过运维工具查询这个组下的消费进度,多少个Consumer实例,命令:sh mqadmin consumerProgress/consumerConnection -g xxx

(2)、集群模式,一个Consumer Group下的多个Consumer均摊消费消息;广播模式,group无意义。

 

18、回溯消费

Consumer已经消费成功的消息,由于业务上需求需要重新消费,那么Broker要提供一种机制,可以按照时间维度来回退消费进度,rocketmq支持精确到毫秒。可以向前或者向后回溯消息

使用命令:sh mqadmin resetOffsetByTime -g xx -s yy -t tt

 

19、消息过滤

3种方式都是在broker端做过滤:

(1)通过Message Tag过滤,* 代表所有类型消息,xxx||yyy 用||分隔消费不同类型的消息;

(2)可以通过Message Header/body过滤消息(官方文档说明,未经过测试);

(3)通过客户端定义的过滤器类传递到broker端过滤消息,详细步骤如下:

<a>Broker所在服务器会启动多个FilterServer过滤进程;

<b>Consumer启动时把自定义的过滤器类传递到FilterServer;

<c>Consumer从FilterServer拉消息,FilterServer把请求转发给Broker,FilterServer收到消息后,按照Consumer上传的过滤程序过滤后,把消费返回给Consumer。

测试代码如下:

String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");

consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", filterCode);

 

20、线上broker配置关闭autoCreateTopicEnable(官方建议关闭)

RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,

这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,

然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。

后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

参考:http://www.jianshu.com/p/453c6e7ff81c

 

21、消费过程要保证幂等性(消费端去重)、Consumer尽可能批量消费消息(设置Consumer的consumerMessageBatchMaxSize[默认为1])

 

22、NameServer/Broker/Producer/Consumer关系

参考:http://www.cnblogs.com/tommyli/p/5081846.html

 

23、发送消息的3种类型

参见类CommunicationMode, 

SYNC,

ASYNC,

ONEWAY

 

producer.send(msg,SendCallback)方法中带有SendCallback类参数的是异步发送消息;

producer.sendOneway(msg);

否则为同步发送消息(默认为同步发送消息)。

 

.... 后续待总结

 

24、参考网站如下

http://www.jianshu.com/p/453c6e7ff81c

http://hiant.github.io/2016/08/26/rocketmq-0x5/

http://xingxiudong.com/2015/05/18/rocketmq-message-delay-config/

https://catslave.github.io/rocketmq/2016/08/15/RocketMQ.NameServer.html

http://www.tianshouzhi.com/api/tutorials/rocketmq

http://www.uml.org.cn/zjjs/201504011.asp

https://github.com/alibaba/RocketMQ/wiki/CLI-Admin-Tool

问题解决:

http://blog.csdn.net/a417930422/article/details/50663639

http://www.mamicode.com/info-detail-327693.html

 

25、rocketmq-console.war 见附件

 

最后说明:不要相信官方PDF文档中的默认值,不同版本可能不一样,以源码为主。

论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics