hi all:
大家都很关心kafka消息阻塞的情况(感谢RoctetMQ给我们的教训)。Kafka上线也有一段时间了,确实有出现过消息阻塞的情况,虽然不影响业务而且用临时办法解决了,但是我觉得可以跟大家总结一下。为了不引起大家的恐慌,我决定先把结论写出来:comsumer 非正常的rebalancing(重新分配分区)才会导致消费阻塞,如果不出现rebalancing,消息是不是重复消费或阻塞。
以下是这两个BUG的描述,这可能需要一些Kafka的知识,我会说得通俗一点,同也会留下一些参考给有兴趣的童鞋进一步了解。
1. 消费者处理过慢可能会导致重复消费
线上场景:BPM会订阅消费然后把流程信息一条一条索引到ElasticSearch,当索引处理较慢(30s)的时候会出现。
重现步骤:https://gist.github.com/richard2011/23b563e6ee5bad4e9d56 ,很普通的代码,消费代码段加上sleep(30s)。
产生原因:Kafka是使用poll()(长轮询)拉取消息,流程可以简单理解为: 拉取消息->向kafka broker发送心跳->提交 offset。当消费者处理过慢(session timeout为30s)没有向kafka broker发送心跳,而且没有提交 offset,broker就会发起rebalancing,这个分区就会分配给其他消费者重复消费。最坏的情况是一直在rebalancing,新的消息都不会被消费。
临时办法:排查发现线上业务线正常场景没有消费处理过慢的场景,processor组件维护了一个线程池,每条消息都用一个线程处理。tasker-center会不断地把消息放在java的ArrayBlockingQueue。其他周边后端服务(如存储)这种处理能力难以评估的,可以尝试一下先把消息缓存到本地队列再做批量插入的操作,其实很多日志类或大数据类使用Kafka都是这样做的,如Flume。
如果确实有这样的场景,请联系我,可以通过通过两个参数减小这问题发生,但都不是完美解决问题。1) 增加session time的时间,但同时也会增加客户端失败的时间。2)减小分区拉取值(max.partition.fetch.bytes默认为1M), 但会影响吞量,而且以bytes为单位也无法评估消息的数量。
修复计划:Kafka社区专门针对这个问题写了篇WIKI https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=61333789 。这是他们的改进计划,内容有目的(Motivation), 计划修改(Proposed Change), 新增或修改的公共接口(New or Changed Public Interfaces),升级计划和兼容性(Migration Plan and Compatibility)和已放弃方案(Rejected Alternatives)。@听风,可以参考一下他的文档模板,用于轩辕组件的改进计划模板挺好的。这个BUG暂定为随着Kafka版本的升级来修复。
2. 多分区多consumer时rebalancing可能会导致某个分区阻塞。
线上场景:发生在在cart-processor,每个topic有18个分区,每个cart-processor有两个consumer(不同groupId),8个cart-processor节点。当cart-processor发版(节点增加或删除)会引起rebalancing,这可能导致个topic的分区阻塞。
重现步骤:代码https://gist.github.com/richard2011/d92caaa4af50331b0953,创建18个分区的topic, 通过不断地增加或删除,同时通过kafka-consumer-groups.sh命令查看分区情况,直到出现分区阻塞。
产生原因:kafka client的BUG( https://issues.apache.org/jira/browse/KAFKA-2978 ),Kafka github主分支已修复,但是还没有release版,越多分区和consumer越容易出现这个问题。
临时办法:每次使用kafka的程序发版时,用kafka-consumer-groups.sh命令查看分区情况,发现分区阻塞则重启对应的机器。同时也写了小工具,使comsumer再次rebalancing。
修复计划:观察线上出现的频率,如果频繁出现,将会修复kafka client代码,出现不频繁则随Kafka升级修复。
分享到:
相关推荐
在本文中,我们将深入探讨如何使用C#语言从Apache Kafka中读取消息。Apache Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用程序。C#作为.NET框架的主要编程语言,提供了丰富的库来与Kafka进行交互。...
这是一个阻塞调用,直到有新消息可用或超时。 ```csharp var consumeResult = consumer.Consume(cts.Token); Console.WriteLine($"Consumed message '{consumeResult.Message.Value}' at: '{consumeResult....
同时,Consumer可以通过配置参数阻塞等待新消息到达。 7. **消息格式与存储**:Kafka消息包含一个固定长度的头部和可变长度的数据。头部有版本号、CRC32校验码和消息长度。消息由多个小文件段组成,便于管理和删除...
- 订阅者指定主题和分区号,订阅后可迭代读取消息,没有消息时会阻塞等待。 为了实现高效的数据传输: 1. 发布和订阅操作批量处理,利用系统Page Cache减少内存拷贝。 2. 利用sendfile系统调用优化网络传输,避免...
同时,Kafka提供了阻塞选项,使得Consumer在无新消息时等待,而不是持续轮询。 7. **Kafka的消息存储格式** Kafka的消息结构包括一个固定长度的头部和可变长度的消息体。头部包含版本号、CRC32校验码,消息体则是...
### Kafka Producer机制优化—提高发送消息可靠性 #### 一、Kafka Producer机制及问题背景 在Kafka消息系统中,消息是由Producer生产并通过Broker(消息中介节点)进行存储与转发的。Broker负责处理消息的存储,并...
这通常通过线程模型来实现,例如在单独的线程池中运行Kafka消费者,一旦有新的消息,就将这些消息放入一个共享的数据结构,如阻塞队列。然后,Netty的IO线程可以在合适的时候从队列中取出消息,推送给客户端。 在...
4. **KafkaConsumer.poll()**:消费者从Kafka获取新消息,这是一个阻塞方法,直到有新消息到来或达到超时时间。 5. **KafkaConsumer.commitSync()**:消费者提交消费的offset,保证消息不会被重复消费。 此外,...
这将阻塞直到有新消息可用。 ```java while (true) { ConsumerRecords, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord, String> record : records) System.out.printf("offset...
这个方法会阻塞直到有新消息可用或达到指定超时时间。 4. **处理消息**:接收到的消息是一个`ConsumerRecords`对象,包含了来自不同主题和分区的消息。对每个消息进行处理后,通常需要调用`commitSync()`或`...
### 分布式消息通信之Kafka的实现原理 #### 一、消息中间件与Kafka简介 消息中间件主要用于解决分布式系统之间的消息传递问题。它能够有效地屏蔽不同平台及协议间的特性差异,促进应用程序间的协作。例如,在电商...
在Java应用程序中集成Kafka API可以让开发者轻松地将消息生产到Kafka主题(topics)中,或者从这些主题中消费消息。 首先,让我们详细了解如何编写Java程序来向Kafka发送消息。在开始之前,确保已经正确搭建了Kafka...
2. **生产消息**: 使用Kafka的Producer API,创建一个生产者实例,连接到Kafka集群,然后发布消息到指定主题。 3. **消费消息**: 创建消费者实例,订阅感兴趣的主题,通过回调函数或轮询机制处理接收到的消息。 4. *...
3. Storm集群订阅Kafka的主题,创建一个实时数据流拓扑,对流中的每条消息进行实时处理。可以设置多个bolt(处理组件),实现数据清洗、转换等功能。 4. 处理结果通过JDBC连接器写入MySQL数据库。这里可能需要设计...
重要的是要指出,librdkafka没有提供同步的消息生产接口,这意味着所有的消息发送操作都是非阻塞的。 ### 配置与主题结构 在使用librdkafka时,我们首先需要配置客户端库以连接到Kafka集群。这包括设置必要的参数...
这个方法会阻塞直到至少有一条消息可用或达到超时时间。 ```java ConsumerRecords, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord, String> record : records) { System.out...
同时,Kafka还提供了一种机制,允许Consumer在没有新消息时进行阻塞等待,以减少不必要的轮询操作。 #### Kafka消息的存储格式 **消息结构:** Kafka中存储在硬盘上的消息具有固定的头部结构和可变长度的字节数组...