这个故障在我们前段时间线上服务碰到过,具体的问题描述跟KAFKA-1415里提及的完全一样。
简单概括就是,正常情况下,使用kafka异步producer.send()发送消息时,会在后台创建一个守护线程,通过jstack查看jvm线程堆栈信息会找到这个线程,默认名称:ProducerSendThread-。ProducerSendThread驻留在后台,负责将待发送的消息批量往kafka topic转移。
"ProducerSendThread-" prio=10 tid=0x00007f6b3c01c800 nid=0x40fa waiting on condition [0x00007f6b90510000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00007f6bc6d53cd8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025) at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:424) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$1.apply(ProducerSendThread.scala:66) at scala.collection.immutable.Stream$.continually(Stream.scala:1104) at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104) at scala.collection.immutable.Stream$$anonfun$continually$1.apply(Stream.scala:1104)
在我们的故障发生时,ProducerSendThread线程莫名其妙死掉了(具体原因未知-_-||),致使调用producer.send() 的所有线程全部阻塞。这是因为,当后台发送线程不工作,消息会在内存不断积压,当达到queue.buffering.max.messages(默认值10000)设定的queue消息数上限,调用producer.send() API发送消息的线程就会一直阻塞。可见此次故障还是很严重的,而我们之所以选择异步发送方式,初衷是不妨碍主业务线程的运行,但是却发现如此隐患,确实有点措手不及。
KAFKA-1415 评论里有人给出了一种临时解决方案,将queue.enqueue.timeout.ms设置成某个正数值,作为send超时时长,达到此超时时长后,会抛QueueFullException,消息会被丢弃掉。由于这种方式需要丢弃消息,对于一些应用可能并不合适。另外,在kafka 0.8之后的版本里,queue.enqueue.timeout.ms的默认值是-1,表示后台消息queue积压到上限后将一直等待,直到queue空间释放。在更早的版本里,这个值的默认值是0,表示达到queue上限后,立即抛弃消息,虽然send线程不会block,但是很容易丢消息,尤其是当queue短暂积满的情况,关于默认值的讨论可以参考 KAFKA-709。
故障引发的思考:
1. 结合故障看,后台守护线程只启动一个,这本身就是一个很大的隐患。需要思考的是,为何只设置一个线程?能否提升线程数?
2. 后台线程怎么就莫名其妙死掉了?
其他参考链接:
1. [Kafka-users] Issues with Kafka async producer's enqueue timeout
相关推荐
赠送jar包:kafka-clients-2.4.1.jar; 赠送原API文档:kafka-clients-2.4.1-javadoc.jar; 赠送源代码:kafka-clients-2.4.1-sources.jar; 赠送Maven依赖信息文件:kafka-clients-2.4.1.pom; 包含翻译后的API文档...
在本文中,我们将深入探讨如何使用C++库RdKafka中的`KafkaConsumer`类来消费Apache Kafka消息。RdKafka是一个高效的C/C++ Kafka客户端,它提供了生产者和消费者API,使得与Kafka集群进行交互变得更加简单。在这个...
4. **持久性**:Kafka将数据存储在磁盘上,支持数据的持久化,即使在系统故障的情况下也不会丢失数据。 5. **可扩展性**:Kafka可以通过增加更多的Broker来水平扩展,以处理更大的数据量。 6. **容错性**:Kafka支持...
4. **持久性**:Kafka将数据存储在磁盘上,支持数据的持久化,即使在系统故障的情况下也不会丢失数据。 5. **可扩展性**:Kafka可以通过增加更多的Broker来水平扩展,以处理更大的数据量。 6. **容错性**:Kafka支持...
1. 日志收集:Kafka 可以用来收集公司系统的所有日志,然后通过 Kafka 以统一接口服务的方式开发给其它数据处理中间件,例如 Hadoop 等。 2. 消息队列系统:Kafka 可以用作消息队列系统,解耦生产者和消费者,使用...
* 生产者(Producer):Kafka 中的生产者角色,用于发送消息。 * 消费者(Consumer):Kafka 中的消费者角色,用于消费消息。 * Broker:Kafka 中的代理节点,用于处理和存储消息。 * 集群(Cluster):Kafka 中的...
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=6.2.2 -Dfile=/root/kafka-schema-registry-client-6.2.2.jar -Dpackaging=jar 官网下载地址 packages....
- **生产者(Producer)**:向Kafka集群发送消息的组件。 - **消费者(Consumer)**:从Kafka集群读取消息的组件。 - **主题(Topic)**:Kafka中的一个消息类别,消息以Topic为单位进行分类。 - **分区(Partition...
- **生产者(Producer)**:负责向Kafka集群发布消息,可以选择将消息发送到特定分区或让Kafka自动分配。 - **消费者(Consumer)**:消费主题中的消息,可以属于一个消费组(Consumer Group)。同一组内的消费者会...
Apache Kafka:Kafka安装与配置.docx
Apache Kafka:KafkaConnect深入解析.docx
【kaufmann_ex:Kafka支持的服务库】 在大数据处理和实时流计算领域,Apache Kafka是一种广泛应用的消息中间件,它提供了高吞吐量、低延迟的发布订阅模型。kaufmann_ex项目则是一个针对Kafka的扩展服务库,旨在增强...
前端线程负责将用户要执行的操作转换成对应的请求,然后再将请求发送到后端 I/O 线程的队列中;而后端 I/O 线程从队列中读取相应的请求,然后发送到对应的 Broker 节点上,之后把执行结果保存起来,以便等待前端线程...
《Kafka技术内幕:图文详解Kafka源码设计与实现》是一本深入解析Apache Kafka的专著,旨在帮助读者理解Kafka的核心设计理念、内部机制以及源码实现。这本书结合图文并茂的方式,使得复杂的概念变得更为易懂。同时,...
Apache Kafka:Kafka与数据流处理.docx
Apache Kafka:Kafka消费者API详解.docx
Apache Kafka:Kafka分区与副本机制.docx
Apache Kafka:Kafka集群运维与监控.docx
Apache Kafka:Kafka主题管理与操作.docx
Apache Kafka:Kafka生产者API详解.docx