`
1028826685
  • 浏览: 938640 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类

apache kafka系列之kafka.common.ConsumerRebalanceFailedException异常解决办法

 
阅读更多

kafka.common.ConsumerRebalanceFailedException :log-push-record-consumer-group_mobile-pushremind02.lf.xxx.com-1399456594831-99f15e63 can't rebalance after 3 retries

at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown Source)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown Source)
at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown Source)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.getKafkaStreams(DefaultConsumerProcessor.java:149)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.recvMessage(DefaultConsumerProcessor.java:63)
at com.xxx.service.mobile.push.kafka.MafkaPushRecordConsumer.main(MafkaPushRecordConsumer.java:22)

at com.xxx.service.mobile.push.Bootstrap.main(Bootstrap.Java:34)

 

出现以上问题原因分析:

同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时负载消费多个partition数据.

解决办法:

1.配置zk问题(kafka的consumer配置)

zookeeper.session.timeout.ms=5000

zookeeper.connection.timeout.ms=10000

rebalance.backoff.ms=2000

rebalance.max.retries=10

 

 

在使用高级API过程中,一般出现这个问题是zookeeper.sync.time.ms时间间隔配置过短,不排除有其他原因引起,但笔者遇到一般是这个原因。

给大家解释一下原因:一个消费者组中(consumer数量<partitions数量)每当有consumer发送变化,会触发负载均衡。第一件事就是释放当consumer资源,无则免之,调用ConsumerFetcherThread关闭并释放当前kafka broker所有连接,释放当前消费的partitons,实际就是删除临时节点(/xxx/consumer/owners/topic-xxx/partitions[0-n]),所有同一个consumer group内所有consumer通过计算获取本consumer要消费的partitions,然后本consumer注册相应临时节点卡位,代表我拥有该partition的消费所有权,其他consumer不能使用。

 

如果大家理解上面解释,下面就更容易了,当consumer调用Rebalance时,它是按照时间间隔和最大次数采取失败重试原则,每当获取partitions失败后会重试获取。举个例子,假如某个公司有个会议,B部门在某个时间段预订该会议室,但是时间到了去会议室看时,发现A部门还在使用。这时B部门只有等待了,每隔一段时间去询问一下。如果时间过于频繁,则会议室一直会处于占用状态,如果时间间隔设置长点,可能去个2次,A部门就让出来了。

 

同理,当新consumer加入重新触发rebalance时,已有(old)的consumer会重新计算并释放占用partitions,但是会消耗一定处理时间,此时新(new)consumer去抢占该partitions很有可能就会失败。我们假设设置足够old consumer释放资源的时间,就不会出现这个问题。

 

官方解释:

consumer rebalancing fails (you will see ConsumerRebalanceFailedException): This is due to conflicts when two consumers are trying to own the same topic partition. The log will show you what caused the conflict (search for "conflict in ").

  • If your consumer subscribes to many topics and your ZK server is busy, this could be caused by consumers not having enough time to see a consistent view of all consumers in the same group. If this is the case, try Increasing rebalance.max.retries and rebalance.backoff.ms.
  • Another reason could be that one of the consumers is hard killed. Other consumers during rebalancing won't realize that consumer is gone after zookeeper.session.timeout.ms time. In the case, make sure that rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms.

 

 

rebalance.backoff.ms时间设置过短就会导致old consumer还没有来得及释放资源,new consumer重试失败多次到达阀值就退出了。

 

确保rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms

 

kafka zk节点存储,请参考:kafka在zookeeper中存储结构

分享到:
评论

相关推荐

    kafka处理超大消息的配置 org.apache.kafka.common.errors.RecordTooLargeException

    java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server ...

    kafka java 下载的jar

    4. **Serializer / Deserializer**: Kafka支持自定义消息序列化和反序列化,例如`org.apache.kafka.common.serialization.StringSerializer` 和 `StringDeserializer` 分别用于字符串类型的消息。 为了使用这些类,...

    Building Data Streaming Applications with Apache Kafka

    Apache Kafka is a popular distributed streaming platform that acts as a messaging queue or an enterprise messaging system. It lets you publish and subscribe to a stream of records and process them in ...

    Kafka生产者向kafka发送消息Java代码.zip

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 创建Kafka生产者实例 Producer, String&gt; producer = new KafkaProducer(props); // 发送消息 for (int i = 0...

    NodeJS操作Kafka(已验证)

    在本文中,我们将深入探讨如何使用NodeJS与Apache Kafka进行交互。Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用。而NodeJS是JavaScript的服务器端实现,可以方便地创建高性能网络应用。结合这两者...

    kafka-manager最新编译版1.3.3.22,解决了异常Unknown offset schema version 3

    《Kafka-Manager 1.3.3.22:解决Unknown offset schema version 3异常详解》 在大数据处理领域,Apache Kafka作为一个高效、可扩展的实时数据流平台,广泛应用于消息传递和数据集成。然而,在实际操作中,用户可能...

    Apache Kafka(带书签)

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer, String&gt; producer = new KafkaProducer(props); ``` 2. **发送消息**:通过调用`send()`方法,生产...

    springboot简单集成kafka(ue-kafka.zip)

    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.group-id=my-consumer-group ``` 现在,我们来创建生产者类。Spring Boot允许我们使用...

    flume+kafka+flink+mysql数据统计

    在大数据处理领域,Flume、Kafka、Flink 和 MySQL 是四个非常重要的组件,它们各自承担着不同的职责,共同构建了一套高效的数据流处理系统。本文将深入探讨这些技术及其在"flume+kafka+flink+mysql数据统计"中的应用...

    Kafka 配置用户名密码例子

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="kafka" \ password="kafka-secret"; ``` 2. **配置Kafka客户端**: - 对于生产者和消费者,也需要在相应的...

    apache-kafka-1.0.0 java Demo

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer, String&gt; producer = new KafkaProducer(props); for (int i = 0; i ; i++) { ProducerRecord, String&gt; ...

    【笔记】分布式消息通信之Kafka的实现原理.pdf

    properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer ...

    Flink无法获取Kafka Topic Metadata异常及解决.docx

    在使用 Kafka 0.11.0.1 和 Flink 1.4 进行实时计算时,Flink 无法获取 Kafka Topic Metadata,报以下异常:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata。...

    Kafka中生产者和消费者java实现

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer, String&gt; producer = new KafkaProducer(props); ``` 接着,我们可以使用`send()`方法将消息发送到...

    kafka client依赖包和示例代码

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kp = new ...

    springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区

    import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.support.KafkaHeaders; import ...

    maven-kafka

    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer, String&gt; producer = new KafkaProducer(props); for (int i = 0; i ; i++) { producer.send(new ...

    kafka-clients源码.zip

    2. **日志记录**:源码中的`org.apache.kafka.common.utils.LogContext`和`org.apache.kafka.common.log.DefaultLogger`提供了日志处理机制,便于调试和监控。 通过深入研究kafka-clients的源码,我们可以更好地...

    kafka监听样例.rar

    key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: my-consumer-group auto-offset-...

Global site tag (gtag.js) - Google Analytics