背景介绍:
我们公司的实时流项目现在用的spark streaming比较多,这里在介绍下版本:
spark streaming2.1.0
kafka 0.9.0.0
spark streaming如果想要集成kafka使用时,必须得使用spark官网提供的依赖包,目前有两种大的kafka分支依赖集成包,他们的一些信息如下:
描述信息 | spark-streaming-kafka-0-8|spark-streaming-kafka-0-10
---|---|---
kafka版本 | 0.8.2.1 or higher|0.10.0 or higher
稳定程度 | Stable|Experimental
语言支持 |Scala, Java, Python|Scala, Java|
Receiver DStream|Yes|No|
Direct DStream|Yes|Yes|
SSL TLS Support|No|Yes|
Offset Commit Api |No|Yes|
Dynamic Topic Subscription|No|Yes|
从上面的表格可以看出
spark-streaming-kafka-0-8目前是支持版本大于或等于0.8.2.1时需要用到的,因为我们生产环境的kafka的版本是0.9.0.0所以只能选择spark-streaming-kafka-0-8_2.11这个依赖,然后spark streaming流程序跑起来,通过一定间隔不断从kafka消费数据,实时处理,整个流程是没有问题的,后来因为需要统一收集流程序的log中转到kafka中,最后通过logstash再发送到ElasticSearch中方便查看和检索日志,所以给项目集成了
kafka-log4j-appender-0.9.0.0,其功能是把log4j打印的日志给发送到kafka,配置完成之后再次启动项目,发现log也能收集到kafka中了,但通过后台发现时不时的会出现几条下面的log:
[2017-11-28 23:49:37,176] [WARN] [kafka-producer-network-thread | producer-2] Error in I/O with 192.168.10.160
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
但并不影响正常功能使用,从log里面能够看出来是生产者的问题,也就是说发送消息到kafka的server时出现连接中断了,导致抛出EOF异常。
那么为什么会中断连接呢?经查资料发现,这是由于kafka的版本不一致导致的,也就是说用0.8.2.1的kafka client向kafka0.9.0.0的server端发送数据,如果在经过了一定时间内,连接还没断开,那么服务端会主动断开这个连接,如果都是0.9.0.0的版本,服务端主动断开连接,客户端是不会抛出异常的,但由于版本不一样,在服务端主动中断的时候,就出现了上面的异常。
如何模拟重现?
(1)搭建一套0.9.0.0的kafka集群,为了方便重现,将server.properties里面的加上这个空闲连接关闭参数connections.max.idle.ms为30秒,默认不设置是10分钟,然后启动集群。
(2)在java项目里面使用0.8.2.1的client作为生产者,并使用生产者发送一条数据后,程序主动sleep40秒。
(3)然后观察等到30秒的时候就会抛出这个异常,但是主程序还是会等到40秒后结束,因为kafka发送消息是起的单独的线程所以抛出这个log时候主线程是不会受到影响的。
如何解决:
(1)最简单的办法就是升级client和server版本一致
(2)网上有朋友建议调大connections.max.idle.ms这个参数,减少抛出异常的次数,算是治标不治本吧,不建议这么干。
那么可能有朋友疑问,如果客户端一直不关闭空闲连接,必须得10分钟后由服务端强制关闭,那么会不会出现这个时间内kafka的连接资源被耗尽的情况呢?答案是几乎不可能,因为kafka允许每台主机默认的连接数为Int.MaxValue差不多21亿多吧。只要10分钟内每台主机的连接数达不到这个量级,程序就不会有问题。而实际情况生产者也不能出现这么多连接,所以我们的一些生产者程序一旦启动起来基本上不会调用close方法,除非在手动停止程序时,可以通过钩子函数来触发资源关闭,其他情况的空闲连接,可以由服务端进行管理通过超时关闭。注意如果是一直被占用的连接,服务端是不会主动关闭的,另外经过测试发现消费者就算版本不一致也不存在这个问题,目前来看只会版本不一致 而且是在生产者的程序中才会出现这个问题。
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
- 在调整分区和消费组时,务必谨慎操作,避免导致数据丢失或不一致。 - 定期备份重要数据,以防意外情况发生。 - 遵循安全策略,确保只有授权用户才能访问和操作 Kafka 集群。 - 定期检查和更新 Kafkatool 至最新...
Kafka 是一个分布式流处理平台,能够高效、可靠地处理大量的数据流。下面是 Kafka 面试题参考中的知识点: 1. Kafka topic 的分片保存算法 Kafka topic 的分片保存算法是将每个分片中的 partition 保存到 broker ...
总结,Kafka Manager 2.0.0.2作为一款强大的Kafka管理工具,不仅提升了与最新Kafka版本的兼容性,还优化了用户体验。它为运维人员带来了极大的便利,但也需要谨慎操作,确保集群的安全稳定。了解并熟练掌握Kafka ...
“Unknown offset schema version 3”异常通常是由于消费者使用的offset存储格式与Kafka集群不一致导致的。在早期版本的Kafka中,offsets存储在ZooKeeper中,而Kafka 0.9之后,offsets被迁移至内置的Offset存储,这...
综上所述,Kafka不仅是一个强大的消息队列系统,还是一个高度可扩展、可靠的流处理平台。通过对Kafka核心概念和技术细节的理解,我们可以更好地利用Kafka来解决实际问题,提高数据处理的效率和可靠性。
综上所述,Apache Flink 结合 Apache Kafka 实现端到端的一致性语义是一个复杂的过程,需要对两个系统的特性和交互有深入的理解。通过正确配置和实现上述提到的关键步骤,可以有效地实现数据处理的一致性,从而构建...
其次,描述中提到的“版本问题”可能指的是项目依赖库的版本与本地环境不匹配。在编译过程中,需要确保所有依赖项的版本与项目要求的版本一致。这可能涉及到对build.sbt文件的调整,以指定特定版本的库,或者更新...
### Kafka源码解析新手版本 #### 一、Kafka概览与诞生背景 Apache Kafka 是一个高度可扩展的分布式消息系统,由 LinkedIn 开发并开源,后成为 Apache 的顶级项目。Kafka 采用 Scala 编写,其核心设计旨在提供一个...
本压缩包"spark-streaming-kafka.rar"包含了在Spark 3.0.0版本中与Kafka集成所需的jar包,特别适用于解决因Kafka新版本导致的阿里云仓库下载问题。 1. **Spark Streaming与Kafka集成** Spark Streaming通过`spark-...
它能帮助诊断各种问题,比如找出导致offset丢失的原因,或者检测出可能导致数据不一致的情况。这些功能对于快速定位和解决问题至关重要。 最后,由于Kafka Manager是基于Scala和Play框架构建的,因此它具有良好的可...
1. **事务支持**:Kafka提供了一种简单的方式来保证消息处理的一致性,即支持事务操作。 2. **压缩和加密**:Kafka支持消息压缩和传输加密,有助于减少网络带宽消耗并保护数据安全。 3. **Kafka Connect**:用于方便...
- JVM层面的问题,比如垃圾回收(GC)导致的延迟、Scala版本兼容问题以及Kafka自身版本的更新兼容性问题。 - 在业务应用层面,开发者需要考虑解耦、消息时序、吞吐量和缓存策略等。 在面对消息队列(MQ)的选择时,...
- **多线程消费**:每个线程维护一个 KafkaConsumer,速度较快,但可能导致 TCP 连接开销增大,扩展性和消息顺序维护困难。 - **单个消费者与多个 worker 线程**:更易扩展,但实现复杂,可能难以维护消息顺序和...
在分布式消息系统Kafka中,分区消费策略是一个关键的概念,它决定了消息如何在消费者与生产者之间有效地流转。本文将深入探讨"Kafka分区消费策略",以及如何实现"发送到指定分区"。 首先,我们需要了解Kafka的基本...
3. **消费者组配置**:合理配置消费者组,避免过多的消费者竞争同一个分区,导致性能下降。 4. **性能调优**:根据实际情况调整Kafka的各项参数,如网络带宽限制、磁盘I/O限制等。 综上所述,Kafka作为一种高性能的...
- **问题背景**:在Kafka中,低效的情况主要由过多的网络请求和字节拷贝导致。 - **解决方案**:Kafka通过将消息分组,一次请求传输一组消息,并利用`sendfile`系统调用来减少字节拷贝次数。 - **具体实现**:...
- **Partition**:一个物理上的分区,每个Topic包含一个或多个Partitions。每个Partition在存储层面是一个Append Log文件,其中每条消息都有一个唯一的偏移量(offset)标识其位置。 为了提高系统的可用性和扩展性,...
总的来说,Kafka 2.8.0 是一个重要的版本更新,它旨在提供更强大、更可靠和更易用的流处理平台,满足现代大数据环境的需求。无论是对于实时数据处理、日志聚合、还是作为微服务之间的消息中间件,Kafka 都是企业级...
在Kafka中,消息通常是按照生产顺序存储在每个分区内的,但消费者可能会并行地从多个分区消费消息,导致乱序。例如,一个较晚生产的事件可能先于较早生产的事件被消费。 Watermark机制正是为了解决这种乱序事件的...
这个版本已经解决了在较早的version3中出现的异常问题,提供了更稳定和可靠的监控体验。 一、Kafka Manager的功能特性: 1. **集群视图**:Kafka Manager提供了一个清晰的集群概览,展示所有brokers的状态,帮助...