`
aokunsang
  • 浏览: 816427 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

RocketMq学习使用总结

阅读更多

1、mqnamesrv单机启动2台服务,可以通过-c xxx.properties指定端口号(如:listenPort=9877,默认端口号9876)

 

2、master、slave异步复制模式

master宕机,slave不能自动切换到master。单个master/slave模式,生产消息受影响;

slave宕机,完全没影响。

 

3、producer生产消息轮询队列策略

默认:随机轮询队列(Roundbin),全局维护一个自增id,发送一次消息自增一次,与queueSize取模得出发送队列;

自定义:发送消息时,自定义messageQueueSelector类根据一定规则选择特定队列,可做到顺序消息,

如,相同的订单号发送到同一个队列,代码如下:

producer.send(msg, new MessageQueueSelector() {

  @Override

  public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

        Integer id = (Integer) arg;

        int index = id % mqs.size();

        return mqs.get(index);

  }

}, orderId);

 

说明:

(1)顺序消息是指消费的顺序跟生产的消息顺序一致,一般指一类消息(比如同一个订单号)的顺序性,producer单线程顺序发送且发送到同一队列,consumer就可以按照producer发送的顺序消费。

(2)顺序消息的缺点是:(a)队列热点问题,在消息量比较大的应用,个别队列因为hash不均,导致消息大量堆积(b)Producer发送消息无法利用高可用,队列发送失败不会重试(c)遇到消费失败消息,无法跳过,直接队列消费暂停

以上缺点,阿里团队会在第四版Rocketmq改进。

 

4、consumer消费消息轮询队列策略

(1)RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。

但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

(2)同一个ConsumerGroup中的多个Consumer平均消费队列消息,分配策略如下:

如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列。

这里采用的就是平均分配策略,它类似于分页的过程,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页有多少条记录,就类似于某个Consumer会消费哪些队列。

(3)同时,消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载,获取同一个Consumer Group下的所有Consumer实例数或Topic的queue的个数是否改变,通知所有Consumer实例重新做一次负载均衡算法。

 

参考:https://www.zhihu.com/question/52023315/answer/131450604

 

5、broker、namesrv的默认配置查询,命令如下:

sh mqbroker -p

sh mqnamesrv -p

 

6、发送定时消息

rocketmq可设置不同Level的定时发送消息,配置在broker中,默认为:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推,默认延迟Level为0,即不延迟。

如,客户端发送消息时候,延迟10s发送,demo代码如下:

Message msg = new Message(topic, tags, keys, body);

msg.setDelayTimeLevel(3);

SendResult sendResult = producer().send(msg);

 

7、消费消息失败,定时重试消费机制

(a)、根据broker的延时Level定时发送consumer消息。经过测试,消费端业务处理失败返回ConsumeConcurrentlyStatus.RECONSUME_LATER,

broker会根据设置的messageDelayLevel,定时发送consumer重试消费,查看broker的consumeProgress消费进度,consumeOffset和brokerOffset一直在增加(不知道为啥这么设计?),

直到返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS为止结束这条消息的消费;如果发送的消息设置有DelayTimeLevel,broker会从设置的级别后的messageDelayLevel对应的延迟时间重试;

(b)还有个重试次数(默认16)

两者满足其一消息进入死信队列。

 

8、producer发送消息失败重试机制

抛异常时(RemotingException,MQClientException,MQBrokerException),Producer会选择另外一个队列发送消息,重试发送消息要满足下列条件:

重试次数 < retryTimesWhenSendFailed+1,注意:retryTimesWhenSendFailed一定小于队列个数,重试完队列会直接退出重试(设置方式:mqProducer.setRetryTimesWhenSendFailed(xxx),默认为2【版本:3.5.8】)

非抛异常时,正常返回SendStatus状态,但是sendResult.getSendStatus() != SendStatus.SEND_OK,根据retryAnotherBrokerWhenNotStoreOK=true[默认是false],重试其他队列再发送,重试逻辑与上面相同。

 

注意:看有网友写到同时需要满足这个条件,总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数),源代码中没看到。 只是看到sendMsgTimeout作为一次发送消息的超时时间而已,不参与重试策略计算。

这个有待商榷,源代码没有深入研究,可能我看的有遗漏。

 

9、broker和client/producer版本不一致问题

发现broker/namesrv版本为3.5.8,client/producer版本为3.6.2.final,此时生产消息正常,也可以消费消息,但是通过ConsumerProgress查看消费进度,生产的消息全部堆积着,但是consumer确实已经消费完了;

无论设置的ConsumeFromWhere是CONSUME_FROM_LAST_OFFSET或CONSUME_FROM_FIRST_OFFSET,重启consumer都会重新消费消息,并且消息一直堆积。更换client/producer的版本号后,正常。

 

10、启动、关闭broker、namesrv

#启动namesrv 

nohup sh mqnamesrv [-c xxx.properties] > mqnamesrv.log 2>&1 &   

#启动broker[ip:port为namesrv的ip和端口号]

nohup sh mqbroker -c xxx.conf -n ip:port[;ip1:port1] > mqbroker.log 2>&1 & 

测试中发现broker的一些问题:

(1)可在xxx.conf设置namesrvAddr=ip:port[;ip1:port1],启动broker时不需要-n ip:port[;ip1:port1]

(2)因为broker部署在虚拟机,并且虚拟双网卡,client无法正常连接服务端(因为broker启动时获取了服务器不对外的ip),

所以需要在xxx.conf设置brokerIP1=192.168.56.101,显式指定本机IP;同一台服务器部署多个broker,需要显式指定端口;listenPort=20911(默认为10911)。

可用配置参考如下:

namesrvAddr=192.168.56.101:9876

brokerIP1=192.168.56.101

listenPort=10911

........

 

说明:broker默认端口10911,同时该broker会使用10910[mqadmin连接使用],10912端口[slave连接使用];所以设置其他broker端口号时请规避。

 

关闭命令:

sh mqshutdown namesrv

sh mqshutdown broker

以上命令会关闭集群中所有的namesrv或broker,测试中如果仅关闭broker_slave,可使用kill pid.

 

11、队列个数设置

producer发送消息时候设置,特别注意:同一个topic仅当第一次创建的时候设置有效,以后修改无效,除非修改broker服务器上的consume.json文件,

demo代码如下:mqProducer.setDefaultTopicQueueNums(5)

参考:http://www.mamicode.com/info-detail-327693.html

 

12、ConsumeFromWhere的设置说明

//一个【新的订阅组】第一次启动从队列的【最前】位置开始消费,后续再启动接着上次消费的进度开始消费

ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET

 

//一个【新的订阅组】第一次启动从队列的【最后】位置开始消费,后续再启动接着上次消费的进度开始消费

ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET

 

13、消息存储结构

以consumeQueue和common log两个文件为主:

(1)consume Queue

consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置,通过broker配置文件设置。

我们可以在配置中指定consumequeue与commitlog存储的目录,每个topic下的每个queue都有一个对应的consumequeue文件,比如:${user.home}/store/consumequeue/${topicName}/${queueId}/${fileName},可在broker中配置storePathCommitLog=xxx

Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,存储格式如:

|------------8Byte------------|------4Byte----|---------------8Byte------------|

|------CommonLog Offset-------|------Size-----|------MessageTag HashCode-------|

(2)commit Log

消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。文件的默认位置如${user.home}/store/commitlog/${fileName},可在broker中配置storePathConsumeQueue=xxx

CommitLog的消息存储单元长度不固定,文件顺序写,随机读。

(3)消息索引文件

如果一个消息包含key值的话,会使用IndexFile存储消息索引,索引文件主要用于根据key来查询消息的,文件默认位置如下:${user.home}/store/index/${fileName},可在broker中配置,可在broker中配置storePathIndex=xxx

 

 

参考:http://www.jianshu.com/p/453c6e7ff81c

 

14、消息队列和消息存储

rocketmq的消息队列全部都是持久化的,长度无限的数据结构(每个存储单元都是定长)。

因消息存储在message Queue中,且它的具有水平扩展能力,并且根据过期时间定期删除,所以消息存储量大小跟磁盘大小有关,假设1K消息算,96G内存,物理内存可以缓存1亿条消息。

 

15、可用的管理命令

查看组内的消费进度:sh mqadmin consumerProgress -g xx

查看组内的消费实例:sh mqadmin consumerConnection -g xx

查看集群状态:sh mqadmin clusterList

查看主题列表:sh mqadmin topicList

查看主题的路由情况:sh mqadmin topicRoute -t xx

查看主题的状态:sh mqadmin topicStatus -t xx

查看主题的集群信息:sh mqadmin topicClusterList -t xx

查看消费实例的内部数据结构:sh mqadmin consumerStatus -g xx -i clientId

消除某个broker上的写权限(用于安全关闭Broker):sh mqadmin wipeWritePerm -b xx

通过时间戳重置消费偏移量(用于回溯消费):sh mqadmin resetOffsetByTime -g xx -s yy -t tt

 

说明:也可以通过rocketmq-console页面控制台查询以上所有数据。

 

16、broker集群部署配置说明[推荐使用多master多slave,异步复制模式]

Master 与 Slave 配对是通过指定相同的 brokerName 参数来配对, Master 的 BrokerId 必须是 0, Slave 的BrokerId 必须是大于 0 的数。

第10条说过,如果服务器多网卡,可能需要设置本机brokerIP地址,同时可以设置namesrvAddr地址;每一个broker

broker_a_master.properties可配置如下:

brokerClusterName=DefaultCluster

brokerName=broker-a

namesrvAddr=192.168.56.101:9876;192.168.56.101:9877

brokerIP1=192.168.56.101

listenPort=10911

brokerId=0

deleteWhen=04

fileReservedTime=48

brokerRole=ASYNC_MASTER

flushDiskType=ASYNC_FLUSH

 

storePathRootDir=/work/alibaba-rocketmq/store_a_master

storePathCommitLog=/work/alibaba-rocketmq/store_a_master/commitlog

storePathConsumeQueue=/work/alibaba-rocketmq/store_a_master/consumequeue

 

说明:不同的broker,如果部署在同一台服务器,都需要指定不同的storePathXXXX路径。

 

17、Producer/Consumer Group的作用

Producer Group:

(1)、可以通过运维工具查询这个组下有多少Producer实例,命令:sh mqadmin producerConnection;

(2)、事务消息,如果Producer意外宕机,Broker会主动回调Producer Group中的任意一台机器确认事务状态。

Consumer Group:

(1)、可通过运维工具查询这个组下的消费进度,多少个Consumer实例,命令:sh mqadmin consumerProgress/consumerConnection -g xxx

(2)、集群模式,一个Consumer Group下的多个Consumer均摊消费消息;广播模式,group无意义。

 

18、回溯消费

Consumer已经消费成功的消息,由于业务上需求需要重新消费,那么Broker要提供一种机制,可以按照时间维度来回退消费进度,rocketmq支持精确到毫秒。可以向前或者向后回溯消息

使用命令:sh mqadmin resetOffsetByTime -g xx -s yy -t tt

 

19、消息过滤

3种方式都是在broker端做过滤:

(1)通过Message Tag过滤,* 代表所有类型消息,xxx||yyy 用||分隔消费不同类型的消息;

(2)可以通过Message Header/body过滤消息(官方文档说明,未经过测试);

(3)通过客户端定义的过滤器类传递到broker端过滤消息,详细步骤如下:

<a>Broker所在服务器会启动多个FilterServer过滤进程;

<b>Consumer启动时把自定义的过滤器类传递到FilterServer;

<c>Consumer从FilterServer拉消息,FilterServer把请求转发给Broker,FilterServer收到消息后,按照Consumer上传的过滤程序过滤后,把消费返回给Consumer。

测试代码如下:

String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");

consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl", filterCode);

 

20、线上broker配置关闭autoCreateTopicEnable(官方建议关闭)

RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,

这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,

然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。

后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

参考:http://www.jianshu.com/p/453c6e7ff81c

 

21、消费过程要保证幂等性(消费端去重)、Consumer尽可能批量消费消息(设置Consumer的consumerMessageBatchMaxSize[默认为1])

 

22、NameServer/Broker/Producer/Consumer关系

参考:http://www.cnblogs.com/tommyli/p/5081846.html

 

23、发送消息的3种类型

参见类CommunicationMode, 

SYNC,

ASYNC,

ONEWAY

 

producer.send(msg,SendCallback)方法中带有SendCallback类参数的是异步发送消息;

producer.sendOneway(msg);

否则为同步发送消息(默认为同步发送消息)。

 

24、死信队列

按照消费端GroupName分组死信队列,消费端消费失败,并且重试指定次数(默认16),会进入死信队列。可通过命令updateSubGroup 修改次数

其他:也可通过updateSubGroup设置ConsumeGroup的重试队列个数(默认1)。

 

.... 后续待总结

 

附录:

1、参考网站如下

http://www.jianshu.com/p/453c6e7ff81c

http://hiant.github.io/2016/08/26/rocketmq-0x5/

http://xingxiudong.com/2015/05/18/rocketmq-message-delay-config/

https://catslave.github.io/rocketmq/2016/08/15/RocketMQ.NameServer.html

http://www.tianshouzhi.com/api/tutorials/rocketmq

http://www.uml.org.cn/zjjs/201504011.asp

https://github.com/alibaba/RocketMQ/wiki/CLI-Admin-Tool

http://blog.csdn.net/lang_man_xing/article/details/47447797

问题解决:

http://blog.csdn.net/a417930422/article/details/50663639

http://www.mamicode.com/info-detail-327693.html

 

2、rocketmq-console.war 

http://pan.baidu.com/s/1hs9i0lA

 

最后说明:不要相信官方PDF文档中的默认值,不同版本可能不一样,以源码为主。

分享到:
评论

相关推荐

    rocketMq知识点总结,最全思维导图,互联网面试必备

    rocketMq知识点总结,最全思维导图,互联网面试必备

    RocketMQ学习笔记1

    RocketMQ学习笔记1 RocketMQ是Apache旗下的一个开源的消息队列系统,具有分布式、可靠、可扩展、高性能等特点。下面是对RocketMQ的学习笔记的总结。 分布式架构 RocketMQ原生支持分布式,解决了单点故障问题,...

    RocketMQ学习及V3.2.4最新开发指南

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大规模...通过学习和实践RocketMQ,开发者可以构建出稳定、高效的分布式应用,同时,官方文档是学习过程中的重要工具,能帮助我们更好地理解和使用RocketMQ。

    rocketmq使用.zip

    总结来说,“rocketmq使用.zip”这个压缩包包含了关于RocketMQ的基本使用和核心特性——事务消息的资料,对于理解并运用RocketMQ在分布式系统中的事务处理非常有帮助。通过学习和实践,你可以构建出更健壮、高效的...

    RocketMQ消息中间件学习总结-rocketmq_learning.zip

    本学习总结将深入探讨RocketMQ的核心概念、工作原理以及实战应用,帮助读者全面掌握这款强大的工具。 一、RocketMQ概述 RocketMQ起源于阿里巴巴内部项目,后来成为Apache顶级项目,其设计目标是处理海量消息,支持...

    RocketMQ源代码解析.pdf

    通过以上知识点的总结,我们可以了解到RocketMQ源码解析涉及到了raft协议、多副本机制、源码阅读技巧、分布式系统架构、中间件实践经验等多个方面。这些知识点共同构成了理解和分析RocketMQ内部工作机制的完整框架。

    RocketMq源码学习过程中的总结资料

    在学习 RocketMQ 源码的过程中,有几个关键的知识点值得深入理解: 1. **ConsumerMode 模式**: - **Concurrentluy** 模式:消费者并行消费,消息的消费顺序不保证,适合对消息顺序性要求不高的场景。 - **...

    rocketmq-console控制台已增加ACL鉴权和配置控制台登录验证

    总结起来,RocketMQ Console的这次更新增强了其安全管理能力,通过ACL鉴权和控制台登录验证,提供了更为安全、可控的使用环境。这对于运行关键业务的组织来说尤其重要,可以有效保护数据安全,防止未授权的访问和...

    RocketMQ3.0.6 用户指南

    总结来说,RocketMQ3.0.6 用户指南为消息中间件开发者提供了一个全面的学习资源,从安装配置、使用方法到异常处理,都进行了详尽的解释,帮助开发者快速掌握RocketMQ的使用,降低开发和运维的门槛。通过文档的学习,...

    rocketmq.zip

    总结,"rocketmq.zip"包含了在Linux系统上部署RocketMQ所需的所有资源,从安装、配置到使用,涉及了Linux基础、消息队列原理、RocketMQ组件、安装步骤以及运维监控等多个方面,是学习和实践RocketMQ的重要资料。

    RocketMQ相关资料

    RocketMQ是中国阿里巴巴开源的一...通过学习,你将能够熟练掌握RocketMQ的使用,理解其内部工作原理,并有能力解决实际开发中遇到的问题。对于想深入了解和应用分布式消息中间件的人来说,这是一个不可多得的学习资源。

    阿里RocketMQ用户指南V3.2.4.pdf

    总结来说,阿里RocketMQ用户指南V3.2.4为用户在使用RocketMQ过程中可能遇到的各种问题提供了详尽的指导,覆盖了从基础概念到高级特性,从安装配置到运行维护,再到性能调优和问题排查的方方面面。文档通过具体的指令...

    rocketmq-all-4.7.1.rar

    总结来说,RocketMQ是一个强大的消息中间件,4.7.1版本提供二进制和源码两种形式供用户使用。无论是快速部署还是深度定制,都能满足不同层次的需求。了解并掌握RocketMQ,将有助于提升你在分布式系统设计和开发中的...

    java -RocketMQ实战视频教程(上下全集)

    - **消息模型**:理解RocketMQ中的消息模型是学习的基础,包括消息生产者(Producer)、消息消费者(Consumer)以及消息队列(Message Queue)的概念。 - **消息类型**:RocketMQ支持不同类型的消息,包括普通消息、顺序...

    RocketMQ入门实战及源码解析.7z

    总结,RocketMQ作为一款强大的消息中间件,其丰富的特性和稳定性能使其在业界广泛应用。通过学习RocketMQ,开发者不仅能掌握消息队列的核心概念,还能提升对分布式系统的理解和实践能力。提供的"RocketMQ源码解析....

    阿里RocketMQ资料

    阿里RocketMQ是一款由阿里...通过深入学习这些资料,你将能够熟练掌握RocketMQ的使用,理解其内在机制,并能有效地解决在实际项目中遇到的问题。无论是初学者还是有经验的开发者,都能从中受益,提升自己的技术能力。

    RocketMQ 3.2.6 web监控程序

    【RocketMQ 3.2.6 Web监控程序详解】 RocketMQ是阿里巴巴开源的一款分布式消息中间件,它在大规模分布式系统中扮演着重要角色,提供高...通过源码学习和与其他工具的结合,可以进一步提升对RocketMQ的理解和使用能力。

    RocketMQ生产消费者模型实现

    RocketMQ是一种开源的消息中间件,主要由阿里巴巴贡献并维护,广泛应用于分布式系统中,用于解耦应用程序、异步处理和构建可靠的数据传输...开发者可以通过学习和实践RocketMQ,提升其在微服务架构下的消息处理能力。

    SpringBoot Dubbo RocketMQ订单支付系统.zip

    总结来说,SpringBoot Dubbo RocketMQ订单支付系统是一个典型的企业级微服务架构,通过合理地划分服务边界,利用Dubbo实现服务间的通信,借助RocketMQ实现消息驱动的异步处理,从而达到高性能、高可用的目标。...

    MySQL、JVM、RocketMQ、JUC、设计模式、数据结构与算法学习总结.zip

    还要学习如何配置消息队列、保证消息顺序和幂等性,以及如何监控和排查 RocketMQ 相关的问题。 Java并发编程库(JUC)是Java标准库的一部分,提供了丰富的线程和同步工具,如Semaphore、CyclicBarrier、...

Global site tag (gtag.js) - Google Analytics