`
m635674608
  • 浏览: 5043507 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

RocketMQ原理解析-consumer 5.push消费-顺序消费消息

    博客分类:
  • MQ
 
阅读更多

consumer 1.启动

有别于其他消息中间件由broker做负载均衡并主动向consumer投递消息,RocketMq是基于拉模式拉取消息,consumer做负载均衡并通过长轮询向broker拉消息。

Consumer消费拉取的消息的方式有两种

1.      Push方式:rocketmq已经提供了很全面的实现,consumer通过长轮询拉取消息后回调MessageListener接口实现完成消费,应用系统只要MessageListener完成业务逻辑即可

2.      Pull方式:完全由业务系统去控制,定时拉取消息,指定队列消费等等,当然这里需要业务系统去根据自己的业务需求去实现

下面介绍默认以push方式为主,因为绝大多数是由push消费方式来使用rocketmq的。 

consumer启动流程 

复制代码
指定group

订阅topic

注册消息监听处理器,当消息到来时消费消息

消费端Start

         复制订阅关系

         初始化rebalance变量

         构建offsetStore消费进度存储对象

         启动消费消息服务

         向mqClientFactory注册本消费者

         启动client端远程通信

         启动定时任务

                   定时获取nameserver地址

                   定时从nameserver获取topic路由信息

                   定时清理下线的borker

                   定时向所有broker发送心跳信息,(包括订阅关系)

                   定时持久化Consumer消费进度(广播存储到本地,集群存储到Broker)

                   统计信息打点

                   动态调整消费线程池

         启动拉消息服务PullMessageService

         启动消费端负载均衡服务RebalanceService

         从namesrv更新topic路由信息

         向所有broker发送心跳信息,(包括订阅关系)

         唤醒Rebalance服务线程
复制代码
 
 

consumer 2.消费端负载均衡

消费端负载均衡

 

消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载

 

复制代码
消费端遍历自己的所有topic,依次调rebalanceByTopic

 根据topic获取此topic下的所有queue

 选择一台broker获取基于group的所有消费端(有心跳向所有broker注册客户端信息)

 选择队列分配策略实例AllocateMessageQueueStrategy执行分配算法,获取队列集合Set<MessageQueue>mqSet
复制代码
复制代码
1)  平均分配算法,其实是类似于分页的算法

将所有queue排好序类似于记录

将所有消费端consumer排好序,相当于页数

然后获取当前consumer所在页面应该分配到的queue

2)  按照配置来分配队列, 也就是说在consumer启动的时候指定了queue

3)  按照机房来配置队列

Consumer启动的时候会指定在哪些机房的消息

获取指定机房的queue

然后在执行如1)平均算法

 根据分配队列的结果更新ProccessQueueTable<MessageQueue,ProcessQueue>

1)      比对mqSet 将多余的队列删除,当broker当机或者添加,会导致分配到mqSet变化,

a)        将不在被本consumer消费的messagequeue的ProcessQueue删除,其实是设置ProcessQueue的droped属性为true

b)        将超过两份中没有拉取动作ProcessQueue删除

//TODO 为什么要删除掉,两分钟后来了消息怎么办?

//

2)      添加新增队列,比对mqSet,给新增的messagequeue

构建长轮询对象PullRequest对象,会从broker获取消费的进度

构建这个队列的ProcessQueue

将PullRequest对象派发到长轮询拉消息服务(单线程异步拉取)

注:ProcessQueue正在被消费的队列,

(1)    长轮询拉取到消息都会先存储到ProcessQueue的TreeMap<Long, MessageExt>集合中,消费调后会删除掉,用来控制consumer消息堆积,

TreeMap<Long, MessageExt> key是消息在此ConsumeQueue队列中索引

(2)    对于顺序消息消费处理

locked属性:当consumer端向broker申请锁队列成功后设置true,只有被锁定的processqueue才能被执行消费

rollback: 将消费在msgTreeMapTemp中的消息,放回msgTreeMap重新消费

commit: 将临时表msgTreeMapTemp数据清空,代表消费完成,放回最大偏移值

(3)    这里是个TreeMap,对key即消息的offset进行排序,这个样可以使得消息进行顺序消费
复制代码

 

consumer 3.长轮询

Rocketmq的消息是由consumer端主动到broker拉取的, consumer向broker发送拉消息请求, PullMessageService服务通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>中的PullRequest到broker拉取消息     

复制代码
  DefaultMQPushConsumerImpl的pullMessage(pullRequest)方法执行向broker拉消息动作

1.      获取ProcessQueue判读是否drop的, drop为true返回

2.      给ProcessQueue设置拉消息时间戳

3.      流量控制,正在消费队列中消息(未被消费的)超过阀值,稍后在执行拉消息

4.      流量控制,正在消费队列中消息的跨度超过阀值(默认2000),稍后在消费

5.      根据topic获取订阅关系

6.      构建拉消息回调对象PullBack, 从broker拉取消息(异步拉取)返回结果是回调

7.      从内存中获取commitOffsetValue  //TODO 这个值跟pullRequest.getNextOffset区别

8.      构建sysFlag   pull接口用到的flag

9.      调底层通信层向broker发送拉消息请求

如果master压力过大,会建议去slave拉取消息

如果是到broker拉取消息清楚实时提交标记位,因为slave不允许实时提交消费进度,可以定时提交 

//TODO 关于master拉消息实时提交指的是什么?

10.  拉到消息后回调PullCallback

处理broker返回结果pullResult

           更新从哪个broker(master 还是slave)拉取消息

           反序列化消息

           消息过滤

           消息中放入队列最大最小offset,方便应用来感知消息堆积度

将消息加入正在处理队列ProcessQueue

将消息提交到消费消息服务ConsumeMessageService

流控处理, 如果pullInterval参数大于0 (拉消息间隔,如果为了降低拉取速度,可以设置大于0的值),延迟再执行拉消息,  如果pullInterval为0立刻在执行拉消息动作
复制代码

序列图

1.      向broker发送长轮询请求

2.   Broker接收长轮询请求

3.      Consumer接收broker响应

长轮询活动图:

一张图画不下,再来一张

consumer 4.长轮询push消息-并发消息

通过长轮询拉取到消息后会提交到消息服务ConsumeMessageConcurrentlyService,

ConsumeMessageConcurrentlyServic的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。

长轮询向broker拉取消息是批量拉取的, 默认设置批量的值为pullBatchSize= 32,可配置

消费端consumer构建一个消费消息任务ConsumeRequest消费一批消息的个数是可配置的consumeMessageBatchMaxSize = 1, 默认批量个数为一个

         ConsumeRequest 任务run方法执行   

复制代码
 判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息

                   构建并行消费上下文

                   给消息设置消费失败时候的retrytopic,当消息发送失败的时候发送到topic为%RETRY%groupname的队列中
                   调MessageListenerConcurrently监听器的consumeMessage方法消费消息,返回消费结果

                   如果ProcessQueue的droped为true,不处理结果,不更新offset, 但其实这里消费端是消费了消息的,这种情况感觉有被重复消费的风险

                   处理消费结果

消费成功, 对于批次消费消息,返回消费成功并不代表所有消息都消费成功,但是消费消息的时候一旦遇到消费消息失败直接放回,根据ackIndex来标记成功消费到哪里了

                            消费失败, ackIndex设置为-1

广播模式发送失败的消息丢弃, 广播模式对于失败重试代价过高,对整个集群性能会有较大影响,失败重试功能交由应用处理

集群模式, 将消费失败的消息一条条的发送到broker的重试队列中去,如果此时还有发送到重试队列发送失败的消息,那就在cosumer的本地线程定时5秒钟以后重试重新消费消息,在走一次上面的消费流程。

                   删除正在消费的队列processQueue中本次消费的消息,放回消费进度

                  更新消费进度,这里只是一个内存offsettable的更新,后面有定时任务更新到broker上去
复制代码

 

consumer 5.长轮询push消息-顺序消费消息

复制代码
顺序消费服务ConsumeMessageConcurrentlyService构建的时候

                   构建一个线程池来接收消费请求ConsumeRequest

                   构建一个单线程的本地线程,用来稍后定时重新消费ConsumeRequest, 用来执行定时周期性(一秒)钟锁队列任务

 

         周期性锁队列lockMQPeriodically

                   获取正在消费队列列表ProcessQueueTable所有MesssageQueue, 构建根据broker归类成MessageQueue集合Map<brokername,Set<MessageQueue>>

                   遍历Map<brokername,Set<MessageQueue>>的brokername, 获取broker的master机器地址,将brokerName的Set<MessageQueue>发送到broker请求锁定这些队列。
在broker端锁定队列,其实就是在broker的queue中标记一下消费端,表示这个queue被某个client锁定。 Broker会返回成功锁定队列的集合,
根据成功锁定的MessageQueue,设置对应的正在处理队列ProccessQueue的locked属性为true没有锁定设置为false 通过长轮询拉取到消息后会提交到消息服务ConsumeMessageOrderlyService, ConsumeMessageOrderlyService的submitConsumeRequest方法构建ConsumeRequest任务提交到线程池。ConsumeRequest是由ProcessQueue和Messagequeue组成。 ConsumeRequest任务的run方法 判断proccessQueue是否被droped的, 废弃直接返回,不在消费消息 每个messagequeue都会生成一个队列锁来保证在当前consumer内,同一个队列串行消费, 判断processQueue的lock属性是否为true,lock属性是否过期,如果为false或者过期,放到本地线程稍后锁定在消费。 如果lock为true且没有过期,开始消费消息 计算任务执行的时间如果大于一分钟且线程数小于队列数情况下,将processqueue, messagequeue重新构建ConsumeRequest加到线程池10ms后在消费,这样防止个别队列被饿死 获取客户端的消费批次个数,默认一批次为一条 从proccessqueue获取批次消息, processqueue.takeMessags(batchSize), 从msgTreeMap中移除消息放到临时map中msgTreeMapTemp,这个临时map用来回滚消息和commit消息来实现事物消费 调回调接口消费消息,返回状态对象ConsumeOrderlyStatus 根据消费状态,处理结果 1) 非事物方式,自动提交 消息消息状态为success:调用processQueue.commit方法 获取msgTreeMapTemp的最后一个key,表示提交的 offset 清空msgTreeMapTemp的消息,已经成功消费 2) 事物提交,由用户来控制提交回滚(精卫专用) 更新消费进度, 这里的更新只是一个内存offsetTable的更新,后面有定时任务定时更新到broker上去
复制代码

consumer 6.消息消费

复制代码
消费者主动拉取消息消费,客户端通过类DefaultMQPullConsumer

         客户端可以指定特定MessageQueue

         也可以通过DefaultMQPullConsumer.fetchMessageQueuesInBalance(topic) 获取消费的队列

         业务自己获取消费队列,自己到broker拉取消息,以及自己更新消费进度

因为内部实现跟push方式类似就不在啰嗦,用法也请求看示例代码去
复制代码

consumer 7.shutdown

复制代码
DefaultMQPushConsumerImpl  关闭消费端

         关闭消费线程

         将分配到的Set<MessageQueue>的消费进度保存到broker

利用DefaultMQPushConsumerImpl获取ProcessQueueTable<MessageQueue,ProcessQueue>的keyset的messagequeue去获取

RemoteBrokerOffsetStore.offsetTable<MessageQueue,AutomicLong>Map中的消费进度,

offsetTable中的messagequeue的值,在update的时候如果没有对应的Messagequeue会构建, 但是也会rebalance的时候将没有分配到的messagequeue删除

rebalance会将offsettable中没有分配到messagequeue删除, 但是在从offsettable删除之前会将offset保存到broker

         Unregiser客户端

         pullMessageService关闭

         scheduledExecutorService关闭,关闭一些客户端的起的定时任务

         mqClientApi关闭

         rebalanceService关闭
复制代码

 http://www.cnblogs.com/wxd0108/p/6054817.html

分享到:
评论

相关推荐

    rocketmq-externals-master.zip

    - **Consumer模块**:研究消费模式(Push/Pull),消息的拉取策略,以及消费确认机制。 - **Message Store模块**:深入研究RocketMQ的数据存储结构,如CommitLog、ConsumeQueue等,以及如何实现高可用和高并发读写...

    rocketmq相关jar包.zip

    - 分布式消息顺序:在分布式环境中,RocketMQ的部分主题可以保证消息的顺序性,但需合理设计MessageQueue和Consumer的分配。 总的来说,"rocketmq相关jar包.zip"提供的jar文件是使用RocketMQ进行消息传递和处理的...

    RocketMQ入门实战及源码解析.7z

    RocketMQ的源码解析可以帮助我们理解其内部机制,包括NameServer的注册与发现、Broker的消息存储与检索、Producer的发送逻辑、Consumer的消费策略等。通过阅读源码,可以学习到如何设计分布式系统、如何优化消息传递...

    消息中间件 rocketmq原理解析

    本文将从以下几个方面对RocketMQ的原理进行解析。 ### 一、Producer #### 1. Producer启动流程 Producer是消息的发送者,它的启动流程如下: - 在发送消息时,如果Producer集合中没有对应topic的信息,则会向...

    java面试题_消息中间件--RocketMq(14题).zip

    以上是对RocketMQ面试题的详细解析,涵盖其基本概念、工作原理、特性和应用场景。理解这些知识点对于Java开发者的面试及实际工作都至关重要。在实际面试中,面试官可能还会深入询问具体实现细节、异常处理、性能优化...

    RocketMQ原理分析.rar

    本压缩包“RocketMQ原理分析.rar”包含了对RocketMQ核心机制的深入解析,旨在帮助用户理解其工作原理。 RocketMQ的核心组件包括Producer、Consumer、NameServer和Broker四个部分: 1. **Producer**: 生产者是消息...

    全面解剖RocketMQ和项目实战-day4-part5.7z

    在"全面解剖RocketMQ和项目实战-day4-part5.7z"这个压缩包中,包含了一系列视频教程,详细讲解了RocketMQ的关键特性和实战技巧,下面将对这些知识点进行深入解析。 1. **拉取消息长轮询机制(54.拉取消息长轮询机制...

    RocketMQ实战与原理解析

    ### RocketMQ实战与原理解析 #### 一、RocketMQ简介 Apache RocketMQ是一个分布式消息中间件,由阿里巴巴捐赠并成为Apache顶级项目。RocketMQ具备高性能、低延迟、高可靠等特性,支持发布/订阅模式、消息过滤、...

    rocketmq实例代码

    - **Consumer.java**: 实现消息消费者,创建一个Consumer实例,设置NameServer地址,订阅主题,并注册回调函数处理接收到的消息。 ```java DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(...

    RocketMQ源码

    - **Consumer**:消费者从RocketMQ中拉取或接收消息,有Push和Pull两种消费模式。 - **Message**:消息是数据传输的基本单元,包含消息体、消息属性等信息。 - **Topic**:主题是消息的分类,消息被路由到特定的...

    RocketMQ的使用、原理

    ### RocketMQ 的使用与原理详解 #### 一、RocketMQ简介及发展历程 RocketMQ作为一款高性能、高可靠的分布式消息中间件,在阿里巴巴集团内部得到了广泛的应用。其发展历程大致可分为以下几个阶段: 1. **Metaq 1.x...

    rocketmq4.9.4源码包文件

    - **消息消费**:Consumer通过NameServer订阅Topic,然后从 Broker拉取消息或采用Push模式接收消息。 - **消息回溯**:基于Pull模型的Consumer可以指定消费位置,如最早的未读消息或者最新消息。 - **消息确认**...

    rocketmq的源码文件

    4. **Consumer**:Consumer订阅Topic并消费消息,RocketMQ提供了Push Consumer和Pull Consumer两种模式。Push Consumer由Broker主动推送消息,而Pull Consumer则由消费者定时从Broker拉取消息。源码分析可以帮助理解...

    RocketMQ技术讲解V2.0

    RocketMQ源码分析,分为存储篇、NameServer篇、Broker篇、Producer篇、Consumer篇五大部分进行源码级的讲解。...5、讲解Consumer端的启动、PUSH模式的消息消费、PULL模式的消息消费、顺序消费/并发消费等功能;

    消息队列核心知识点-yes.zip

    本资料主要针对MQ的核心知识点进行深入探讨,以常见的面试问题为切入点,涵盖如何保证消息不丢失、处理重复消息、确保消息有序性以及应对消息堆积等关键议题,并对RocketMQ和Kafka等主流MQ进行了源码级别的解析。...

    LG机构RocketMQ视频源码资料课件

    2. **RocketMQ概念解析**:RocketMQ的核心概念包括Producer(生产者)、Consumer(消费者)、Message(消息)、Topic(主题)、Queue(队列)等。Producer负责发送消息,Consumer负责接收消息,Message是传输的对象...

    Rocket MQ 用户指南 v3.0.4

    ### RocketMQ 用户指南 v3.0.4 关键知识点解析 #### 1. 前言 - **目标读者:** RocketMQ用户指南的目标读者包括应用开发者及系统运维人员。 - **主要内容:** 指南主要介绍了如何使用RocketMQ以及如何进行服务器集群...

    (2024)跳槽涨薪必备精选面试题.pdf

    - **Consumer**:服务消费者。 - **Registry**:注册中心。 - **Monitor**:监控中心。 4. **Spring Cloud常用组件及其作用** - **Eureka**:服务发现。 - **Ribbon**:负载均衡。 - **Hystrix**:断路器。 ...

Global site tag (gtag.js) - Google Analytics