`
m635674608
  • 浏览: 5027588 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

rocketmq问题汇总-一个consumerGroup只对应一个topic

 
阅读更多

1 同一个订阅组内不同Consumer实例订阅不同topic消费混乱问题调查

图1:

背景说明:

如图1左半部分,假设目前的关系如下:

broker: 两个,broker_a和broker_b

topic:两个,topic1和topic2,每个topic在每个broker上分为4个queue

consumer:两个,consumer1和consumer2,都属于group1,分属于不同的jvm运行。

默认情况下,topic和queue的对应关系是:

topic1 <-> broker_a q0~q3,

topic1 <-> broker_b q0~q3,

topic2 <-> broker_a q0~q3,

topic2 <-> broker_b q0~q3

rebalance流程开始:

假设consumer1先启动,consumer1最终通过rebalance对应关系如下:

topic1 <-> broker_a q0~q3,

topic1 <-> broker_b q0~q3

 

接着consumer2启动,consumer2具体rebalance流程如下:

关键点在5.2,会把consumer1也抓下来,接着根据分配策略会导致consumer2只消费broker_b上topic2对应的q0~q3。

同样,consumer1也会进行rebalance,进而使其只消费broker_a的topic1对应的q0~q3,最终导致其关系变为图1中右图所示。

consumer端警告日志:

rebalance完成之后,consumer端间断打印如下异常:

14:22:04.005 [NettyClientPublicExecutor_3] WARN RocketmqClient - execute the pull request exception
com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 24 DESC: the consumer's subscription not exist
See https://github.com/alibaba/RocketMQ/issues/46 for further details.
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:500) ~[rocketmq-client-3.2.6.jar:na]
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:78) ~[rocketmq-client-3.2.6.jar:na]
at com.alibaba.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:455) ~[rocketmq-client-3.2.6.jar:na]
at com.alibaba.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:62) [rocketmq-remoting-3.2.6.jar:na]
at com.alibaba.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:262) [rocketmq-remoting-3.2.6.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_51]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_51]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_51]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_51]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]

broker端也发现相应日志:

2015-07-31 16:38:08 WARN ClientManageThread_4 - subscription changed, group: consumerTestGroup remove topic vrs-topic-test SubscriptionData [classFilterMode=false, topic=vrs-topic-test, subString=*, tagsSet=[], codeSet=[], subVersion=1438331853269]

2015-07-31 16:38:22 WARN PullMessageThread_29 - the consumer's subscription not exist, group: consumerTestGroup

consumer&broker异常日志原因:

consumer会定时与所有broker进行心跳通信,代码详见:MQClientInstance.startScheduledTask,默认每30秒心跳一次。

心跳主要作用:

会将HeartbeatData对象发送到broker端,携带consumer group和topic信息

对应到图1中,consumer1会发送类似group1,topic1

consumer2会发送group1,topic2

 

经过走查broker端代码发现如下代码:

重点在步骤4和5.1和5.2,现在只针对一个broker做一下分析:

假设consumer1先启动,对于broker_a一开始关系是group1->topic1

当consumer2启动并rebalance完成后,consumer2发送group1->topic2,

在步骤4,会根据group1将原先的group1->topic1取出。

在步骤5.1,添加topic2

在步骤5.2,移除topic1。

 

而consumer1在rebalance之后同样会进行如上步骤,导致topic1&topic2反复被remove掉,

最终导致了consumer端和broker端的异常日志不停打印。

 

最终结论:是rebalance导致consumer只消费一部分topic,但是显然rocketmq在broker端做了处理,从而不停打印警告信息。

该调查由如何扩容consumer引出

 
http://blog.csdn.net/a417930422/article/details/50663639
分享到:
评论

相关推荐

    rocketmq-all-5.1.1-bin-release.zip

    - **Topic**:消息的主题,可以理解为消息的分类,多个Consumer可以订阅同一个Topic。 - **Queue**:队列,每个Topic下可以有多个Queue,消息在Queue中顺序存储。 - **Group**:消费者组,同一组内的Consumer会...

    rocketmq-console-ng-1.0.0.zip

    总结来说,"rocketmq-console-ng-1.0.0.zip"是一个包含RocketMQ Console管理控制台的压缩包,主要文件"rocketmq-console-ng-1.0.0.jar"是一个Java可执行文件,用于运行RocketMQ的图形化管理界面。使用这个控制台,...

    rocketmq-console-ng-1.0.1.jar

    RocketMQ-Console是RocketMQ项目的扩展插件,是一个图形化管理控制台,提供Broker集群状态查看,Topic管理,Producer、Consumer状态展示,消息查询等常用功能,这个功能在安装好RocketMQ后需要额外单独安装、运行。...

    rocketmq-externals-master.zip

    "rocketmq-externals-master.zip"是一个包含RocketMQ源码的压缩包,对于深入理解RocketMQ的工作原理、性能优化以及进行二次开发非常有帮助。下面将详细阐述RocketMQ的核心概念、架构、工作流程以及源码解析的关键点...

    rocketmq可视化控制台最新版 rocketmq-console-ng-2.x

    RocketMQ是一款开源的消息中间件,由阿里巴巴开发并贡献给Apache软件基金会,被广泛应用于大数据、实时计算、微服务等场景。RocketMQ的核心功能包括消息发送与接收、消息队列、消息回溯、高可用保障、分布式事务等。...

    rocketmq-all-4.9.4-bin-release

    9. **监控与管理**:RocketMQ提供了一个Web管理工具RocketMQ Console,可以用来监控Broker状态、查看Topic和Queue信息、管理Consumer和Producer等。 10. **消息过滤**:RocketMQ支持基于SQL92标准的表达式过滤消息...

    rocketmq-externals-release-rocketmq-console-1.0.0

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大数据、实时计算、微服务等领域。它提供了高可用、高吞吐量的消息传输能力,确保数据的可靠性和一致性。RocketMQ Console是RocketMQ的Web管理控制台,...

    rocketmq-all-5.1.3-bin-release.zip

    RocketMQ是一个分布式消息传递系统,它允许应用程序之间通过发送和接收消息进行通信,而无需直接耦合。这使得系统解耦,提高了扩展性和容错性。其主要特性包括: 1. 高性能:RocketMQ采用异步传输机制,优化了网络...

    rocketmq-all-5.1.4-bin-release.zip

    1. **消息队列**:RocketMQ提供基于主题(Topic)和队列(Queue)的消息模型,允许应用程序将消息发送到主题,然后由多个消费者从队列中拉取或订阅。这种模型支持一对多、一对一和广播等多种通信模式。 2. **高可用...

    rocketmq-all-4.6.0-bin-release.zip

    1. NameServer:这是一个轻量级的注册中心,负责维护 Topic 与 Broker 的映射关系,不存储任何业务数据,高可用性通过集群部署来实现。 2. Broker:Broker 是 RocketMQ 的核心组件,负责接收 Producer 发送的消息,...

    rocketmq-all-4.0.0-incubating-bin-release

    10. **Distributed Group**: 在RocketMQ中,Producer和Consumer可以以Group的形式工作,同一个Group内的实例可以协同处理消息,实现高可用和负载均衡。 11. **延时消息**: RocketMQ支持延时消息发送,用户可以设置...

    rocketmq-externals-master

    RocketMQ是一个高性能、分布式的消息中间件,广泛应用于大数据处理和实时业务场景。在这个压缩包中,重点是`rocketmq-console`工程,它是RocketMQ的Web管理控制台。 RocketMQ控制台对于运维人员来说非常重要,因为...

    rocketmq-spring-rocketmq-spring-all-2.0.3_rocketmq_

    RocketMQ Spring是阿里巴巴开源的一款基于Apache RocketMQ的消息中间件与Spring框架深度整合的产品,它使得在Spring Boot项目中集成RocketMQ变得极其简便。RocketMQ是一款高性能、高可用、分布式的消息队列服务,常...

    最新版 rocketmq-all-4.8.0-bin-release.zip

    RocketMQ是阿里巴巴开源的一款分布式消息中间件,广泛应用于大规模分布式系统中的消息传递。4.8.0版本是其最新的稳定版本,提供了许多增强特性和优化。以下是对RocketMQ 4.8.0版本的一些关键知识点的详细说明: 1. ...

    rocketmq-all-4.7.1-bin-release.zip

    Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message ...

    rocketmq-all-5.1.0-bin-release.zip

    综上所述,RocketMQ 5.1.0版本作为一个高效的消息中间件,不仅提供了丰富的消息模型和高级特性,还具有强大的稳定性和可扩展性,是构建大规模分布式系统的重要工具。通过深入理解和熟练运用RocketMQ,开发者可以构建...

    最新版windows rocketmq-all-4.7.0-bin-release.zip

    1. **解压文件**:将`rocketmq-all-4.7.0-bin-release.zip`解压到一个适当的目录。 2. **配置环境变量**:添加RocketMQ的bin目录到系统的PATH环境变量,以便于在任何地方启动RocketMQ服务。 3. **启动NameServer**...

Global site tag (gtag.js) - Google Analytics