kafka.common.ConsumerRebalanceFailedException :log-push-record-consumer-group_mobile-pushremind02.lf.xxx.com-1399456594831-99f15e63 can't rebalance after 3 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown Source)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown Source)
at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown Source)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.getKafkaStreams(DefaultConsumerProcessor.java:149)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.recvMessage(DefaultConsumerProcessor.java:63)
at com.xxx.service.mobile.push.kafka.MafkaPushRecordConsumer.main(MafkaPushRecordConsumer.java:22)
at com.xxx.service.mobile.push.Bootstrap.main(Bootstrap.Java:34)
出现以上问题原因分析:
同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时负载消费多个partition数据.
解决办法:
1.配置zk问题(kafka的consumer配置)
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
rebalance.backoff.ms=2000
rebalance.max.retries=10
在使用高级API过程中,一般出现这个问题是zookeeper.sync.time.ms时间间隔配置过短,不排除有其他原因引起,但笔者遇到一般是这个原因。
给大家解释一下原因:一个消费者组中(consumer数量<partitions数量)每当有consumer发送变化,会触发负载均衡。第一件事就是释放当consumer资源,无则免之,调用ConsumerFetcherThread关闭并释放当前kafka broker所有连接,释放当前消费的partitons,实际就是删除临时节点(/xxx/consumer/owners/topic-xxx/partitions[0-n]),所有同一个consumer group内所有consumer通过计算获取本consumer要消费的partitions,然后本consumer注册相应临时节点卡位,代表我拥有该partition的消费所有权,其他consumer不能使用。
如果大家理解上面解释,下面就更容易了,当consumer调用Rebalance时,它是按照时间间隔和最大次数采取失败重试原则,每当获取partitions失败后会重试获取。举个例子,假如某个公司有个会议,B部门在某个时间段预订该会议室,但是时间到了去会议室看时,发现A部门还在使用。这时B部门只有等待了,每隔一段时间去询问一下。如果时间过于频繁,则会议室一直会处于占用状态,如果时间间隔设置长点,可能去个2次,A部门就让出来了。
同理,当新consumer加入重新触发rebalance时,已有(old)的consumer会重新计算并释放占用partitions,但是会消耗一定处理时间,此时新(new)consumer去抢占该partitions很有可能就会失败。我们假设设置足够old consumer释放资源的时间,就不会出现这个问题。
官方解释:
consumer rebalancing fails (you will see ConsumerRebalanceFailedException): This is due to conflicts when two consumers are trying to own the same topic partition. The log will show you what caused the conflict (search for "conflict in ").
- If your consumer subscribes to many topics and your ZK server is busy, this could be caused by consumers not having enough time to see a consistent view of all consumers in the same group. If this is the case, try Increasing rebalance.max.retries and rebalance.backoff.ms.
- Another reason could be that one of the consumers is hard killed. Other consumers during rebalancing won't realize that consumer is gone after zookeeper.session.timeout.ms time. In the case, make sure that rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms.
rebalance.backoff.ms时间设置过短就会导致old consumer还没有来得及释放资源,new consumer重试失败多次到达阀值就退出了。
确保rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
kafka zk节点存储,请参考:kafka在zookeeper中存储结构
相关推荐
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server ...
kafka的Dockerfile镜像的构建 1. jdk-8u102-linux-x64.tar.gz 2. kafka_2.11-2.4.0.tgz 配合脚本进行自动化的构建
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.group-id=my-consumer-group ``` 现在,我们来创建生产者类。Spring Boot允许我们使用...
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer, String...
kafka的docker镜像wurstmeister/kafka,有一段时间docker上老是拉不下来,故存为资源
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring....
**Kafka 概述** Kafka 是一个分布式流处理平台,由 LinkedIn 开发并后来成为 Apache 软件基金会的顶级项目。它被设计为高吞吐量、低延迟的消息传递系统,支持实时数据流处理。Kafka 主要用于构建实时数据管道和流...
4. **Serializer / Deserializer**: Kafka支持自定义消息序列化和反序列化,例如`org.apache.kafka.common.serialization.StringSerializer` 和 `StringDeserializer` 分别用于字符串类型的消息。 为了使用这些类,...
<groupId>org.apache.kafka <artifactId>kafka-clients <version>0.10.1.1 </dependency>
Kafka是大数据技术领域中的重要组件之一,它是一个分布式流处理平台,通常被用作消息队列系统,以处理实时数据流。在大数据技术应用中,Kafka以其高吞吐量、可扩展性以及实时性的特点,受到广泛的重视。 ### 消息...
foundation for Kafka. It also talks about how Kafka handles message compression and replication in detail. Chapter 4, Writing Producers, provides detailed information about how to write basic ...
Kafka&Kafka;-manager,Docker-compose脚本,使用之前需要手动配置文件中的zookeeper链接,使用之前需要先创建Docker网络: docker network create zoo_kafka。 Docker镜像使用的是:wurstmeister/kafka、...
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的...
const consumer = kafka.consumer({ groupId: 'test-consumer-group' }) const run = async () => { // Producing await producer.connect() await producer.send({ topic: 'test-topic', messages: [ { ...
【尚硅谷大数据技术之Kafka】教程主要涵盖了Kafka的基础知识和其在大数据处理中的重要性。Kafka是一个分布式消息队列系统,最初由LinkedIn开发并开源,现为Apache软件基金会的项目。它旨在提供一个统一、高吞吐量、...
在本示例中,`spring-kafka.zip`可能包含了一个XML配置文件,该文件定义了如何连接到Kafka服务器,创建消息生产者和消费者,并定义了消息的处理逻辑。 首先,我们需要理解Spring Kafka的基本概念。Spring Kafka为...
05.Kafka.xmind
spring.kafka.bootstrap-servers=localhost:9092 ``` Spring Boot提供了`@EnableKafka`注解,用于开启Kafka的自动配置。将其添加到主配置类上,Spring Boot会自动创建消费者和生产者的工厂bean。 然后,我们创建...
标题“jdk+kafka.zip”和描述“jdk1.8+kafka2.10”提示我们这个压缩包包含Java Development Kit(JDK)1.8版本和Apache Kafka 2.10版本的相关文件。这两个组件在IT领域中都扮演着至关重要的角色。Kafka是一个分布式...