kafka.common.ConsumerRebalanceFailedException :log-push-record-consumer-group_mobile-pushremind02.lf.xxx.com-1399456594831-99f15e63 can't rebalance after 3 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown Source)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(Unknown Source)
at kafka.consumer.ZookeeperConsumerConnector.consume(Unknown Source)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(Unknown Source)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.getKafkaStreams(DefaultConsumerProcessor.java:149)
at com.xxx.mafka.client.consumer.DefaultConsumerProcessor.recvMessage(DefaultConsumerProcessor.java:63)
at com.xxx.service.mobile.push.kafka.MafkaPushRecordConsumer.main(MafkaPushRecordConsumer.java:22)
at com.xxx.service.mobile.push.Bootstrap.main(Bootstrap.Java:34)
出现以上问题原因分析:
同一个消费者组(consumer group)有多个consumer先后启动,就是一个消费者组内有多个consumer同时负载消费多个partition数据.
解决办法:
1.配置zk问题(kafka的consumer配置)
zookeeper.session.timeout.ms=5000
zookeeper.connection.timeout.ms=10000
rebalance.backoff.ms=2000
rebalance.max.retries=10
在使用高级API过程中,一般出现这个问题是zookeeper.sync.time.ms时间间隔配置过短,不排除有其他原因引起,但笔者遇到一般是这个原因。
给大家解释一下原因:一个消费者组中(consumer数量<partitions数量)每当有consumer发送变化,会触发负载均衡。第一件事就是释放当consumer资源,无则免之,调用ConsumerFetcherThread关闭并释放当前kafka broker所有连接,释放当前消费的partitons,实际就是删除临时节点(/xxx/consumer/owners/topic-xxx/partitions[0-n]),所有同一个consumer group内所有consumer通过计算获取本consumer要消费的partitions,然后本consumer注册相应临时节点卡位,代表我拥有该partition的消费所有权,其他consumer不能使用。
如果大家理解上面解释,下面就更容易了,当consumer调用Rebalance时,它是按照时间间隔和最大次数采取失败重试原则,每当获取partitions失败后会重试获取。举个例子,假如某个公司有个会议,B部门在某个时间段预订该会议室,但是时间到了去会议室看时,发现A部门还在使用。这时B部门只有等待了,每隔一段时间去询问一下。如果时间过于频繁,则会议室一直会处于占用状态,如果时间间隔设置长点,可能去个2次,A部门就让出来了。
同理,当新consumer加入重新触发rebalance时,已有(old)的consumer会重新计算并释放占用partitions,但是会消耗一定处理时间,此时新(new)consumer去抢占该partitions很有可能就会失败。我们假设设置足够old consumer释放资源的时间,就不会出现这个问题。
官方解释:
consumer rebalancing fails (you will see ConsumerRebalanceFailedException): This is due to conflicts when two consumers are trying to own the same topic partition. The log will show you what caused the conflict (search for "conflict in ").
- If your consumer subscribes to many topics and your ZK server is busy, this could be caused by consumers not having enough time to see a consistent view of all consumers in the same group. If this is the case, try Increasing rebalance.max.retries and rebalance.backoff.ms.
- Another reason could be that one of the consumers is hard killed. Other consumers during rebalancing won't realize that consumer is gone after zookeeper.session.timeout.ms time. In the case, make sure that rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms.
rebalance.backoff.ms时间设置过短就会导致old consumer还没有来得及释放资源,new consumer重试失败多次到达阀值就退出了。
确保rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms
kafka zk节点存储,请参考:kafka在zookeeper中存储结构
相关推荐
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server ...
4. **Serializer / Deserializer**: Kafka支持自定义消息序列化和反序列化,例如`org.apache.kafka.common.serialization.StringSerializer` 和 `StringDeserializer` 分别用于字符串类型的消息。 为了使用这些类,...
Apache Kafka is a popular distributed streaming platform that acts as a messaging queue or an enterprise messaging system. It lets you publish and subscribe to a stream of records and process them in ...
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 创建Kafka生产者实例 Producer, String> producer = new KafkaProducer(props); // 发送消息 for (int i = 0...
在本文中,我们将深入探讨如何使用NodeJS与Apache Kafka进行交互。Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用。而NodeJS是JavaScript的服务器端实现,可以方便地创建高性能网络应用。结合这两者...
《Kafka-Manager 1.3.3.22:解决Unknown offset schema version 3异常详解》 在大数据处理领域,Apache Kafka作为一个高效、可扩展的实时数据流平台,广泛应用于消息传递和数据集成。然而,在实际操作中,用户可能...
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer, String> producer = new KafkaProducer(props); ``` 2. **发送消息**:通过调用`send()`方法,生产...
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.consumer.group-id=my-consumer-group ``` 现在,我们来创建生产者类。Spring Boot允许我们使用...
在大数据处理领域,Flume、Kafka、Flink 和 MySQL 是四个非常重要的组件,它们各自承担着不同的职责,共同构建了一套高效的数据流处理系统。本文将深入探讨这些技术及其在"flume+kafka+flink+mysql数据统计"中的应用...
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="kafka" \ password="kafka-secret"; ``` 2. **配置Kafka客户端**: - 对于生产者和消费者,也需要在相应的...
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer, String> producer = new KafkaProducer(props); for (int i = 0; i ; i++) { ProducerRecord, String> ...
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer ...
在使用 Kafka 0.11.0.1 和 Flink 1.4 进行实时计算时,Flink 无法获取 Kafka Topic Metadata,报以下异常:org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata。...
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer, String> producer = new KafkaProducer(props); ``` 接着,我们可以使用`send()`方法将消息发送到...
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kp = new ...
import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.kafka.listener.KafkaMessageListenerContainer; import org.springframework.kafka.support.KafkaHeaders; import ...
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer, String> producer = new KafkaProducer(props); for (int i = 0; i ; i++) { producer.send(new ...
key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: my-consumer-group auto-offset-...
value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: my-consumer-group # 消费者组ID key-deserializer: org.apache.kafka.common.serialization....