`
qindongliang1922
  • 浏览: 2172376 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117120
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:125453
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59561
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71038
社区版块
存档分类
最新评论

如何管理Spark Streaming消费Kafka的偏移量(二)

阅读更多
上篇文章,讨论了在spark streaming中管理消费kafka的偏移量的方式,本篇就接着聊聊上次说升级失败的案例。

事情发生一个月前,由于当时我们想提高spark streaming程序的并行处理性能,于是需要增加kafka分区个数,,这里需要说下,在新版本spark streaming和kafka的集成中,按照官网的建议 spark streaming的executors的数量要和kafka的partition的个数保持相等,这样每一个executor处理一个kafka partition的数据,效率是最高的。如果executors的数量大于kafka的分区个数,其实多余的executors相当于是不会处理任何数据,这部分的进程其实是白白浪费性能。

如果executor的个数小于kafka partition的个数,那么其实有一些executors进程是需要处理多个partition分区的数据的,所以官网建议spark executors的进程数和kafka partition的个数要保持一致。

那么问题来了,如果想要提高spark streaming的并行处理性能,只能增加kafka的分区了,给kafka增加分区比较容易,直接执行一个命令即可,不过这里需要注意,kafka的分区只能增加不能减少,所以添加分区要考虑到底多少个才合适。

接下来我们便增加了kafka分区的数量,同时修改了spark streaming的executors的个数和kafka的分区个数一一对应,然后就启动了流程序,结果出现了比较诡异的问题,表现如下:

造几条测试数据打入kafka中,发现程序总是只能处理其中的一部分数据,而每次总有一些数据丢失。按理说代码没有任何改动,只是增加kafka的分区和spark streaming的executors的个数,应该不会出现问题才对,于是又重新测了原来的旧分区和程序,发现没有问题,经过对比发现问题只会出现在kafka新增分区后,然后出现这种丢数据的情况。然后和运维同学一起看了新增的kafka的分区的磁盘目录是否有数据落入,经查询发现新的分区确实已经有数据进入了,这就很奇怪了丢的数据到底是怎么丢的?

最后我又检查了我们自己保存的kafka的offset,发现里面的偏移量竟然没有新增kafka的分区的偏移量,至此,终于找到问题所在,也就是说,如果没有新增分区的偏移量,那么程序运行时是不会处理新增分区的数据,而我们新增的分区确确实实有数据落入了,这就是为啥前面说的诡异的丢失数据的原因,其实是因为新增kafka的分区的数据程序并没有处理过而这个原因正是我们的自己保存offset中没有记录新增分区的偏移量。

问题找到了,那么如何修复线上丢失的数据呢?

当时想了一个比较笨的方法,因为我们的kafka线上默认是保留7天的数据,旧分区的数据已经处理过,就是新增的分区数据没有处理,所以我们删除了已经处理过的旧的分区的数据,然后在业务流量底峰时期,重新启了流程序,让其从最早的数据开始消费处理,这样以来因为旧的分区被删除,只有新分区有数据,所以相当于是把丢失的那部分数据给修复了。修复完成后,又把程序停止,然后配置从最新的偏移量开始处理,这样偏移量里面就能识别到新增的分区,然后就继续正常处理即可。

注意这里面的删除kafka旧分区的数据,是一个比较危险的操作,它要求kafka的节点需要全部重启才能生效,所以除非特殊情况,不要使用这么危险的方式。

后来,仔细分析了我们使用的一个开源程序管理offset的源码,发现这个程序有一点bug,没有考虑到kafka新增分区的情况,也就是说如果你的kafka分区增加了,你的程序在重启后是识别不到新增的分区的,所以如果新增的分区还有数据进入,那么你的程序一定会丢数据,因为扩展kafka分区这个操作,并不常见,所以这个bug比较难易触发。

知道原因后,解决起来比较容易了,就是每次启动流程序前,对比一下当前我们自己保存的kafka的分区的个数和从zookeeper里面的存的topic的分区个数是否一致,如果不一致,就把新增的分区给添加到我们自己保存的信息中,并发偏移量初始化成0,这样以来在程序启动后,就会自动识别新增分区的数据。

所以,回过头来看上面的那个问题,最简单优雅的解决方法就是,直接手动修改我们自己的保存的kafka的分区偏移量信息,把新增的分区给加入进去,然后重启流程序即可。

这个案例也就是我上篇文章所说的第三个场景的case,如果是自己手动管理kafka的offset一定要注意兼容新增分区后的这种情况,否则程序可能会出现丢失数据的问题。

有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
0
0
分享到:
评论

相关推荐

    一个手动管理spark streaming集成kafka时的偏移量到zookeeper中的小项目

    这个小项目专注于手动管理Spark Streaming在处理Kafka数据时的偏移量,并将其存储在Zookeeper中,以便于跟踪和管理数据消费状态。以下是关于这个项目的详细知识点: **1. Spark Streaming** Spark Streaming是...

    SparkStreaming和kafka的整合.pdf

    如果ZooKeeper中存在保存的偏移量,则可以根据这些偏移量继续消费数据;如果没有,则从头开始消费。 - **处理数据**:通过`foreachRDD`方法处理接收到的数据,并打印出来。 #### 7. 总结 通过以上介绍,我们了解到...

    SparkStreaming Kafka 代码

    它是一个分布式协调服务,用于管理 Kafka 的元数据,包括主题(topics)、分区(partitions)和偏移量(offsets)。在Kafka消费者中,使用Zookeeper来存储和同步消费者的offset是非常常见的做法。这样可以确保消息的...

    基于Spark Streaming的大数据实时流计算平台和框架,并且是基于运行在yarn模式运行的spark streaming

    一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、kafka偏移量管理,web后台管理,web api启动、停止spark streaming,宕机告警、自动重启等等功能支持,用户只需要关心业务代码,无需关注繁琐的...

    sparkStreaming-offset-to-zk:手动管理spark streaming集成kafka的数据偏移量到zookeeper中

    下面是使用过程中记录的一些心得和博客,感兴趣的朋友可以了解下:项目简介该项目提供了一个在使用spark streaming2.3+kafka1.3的版本集成时,手动存储偏移量到zookeeper中,因为自带的checkpoint弊端太多,不利于...

    Spark Streaming 示例

    `SparkStreaming.zip` 文件可能包含了一个示例项目,演示了如何创建一个 Spark Streaming 应用来从 Kafka 消费数据。代码通常会包含以下步骤: 1. 创建 SparkConf 对象,配置 Spark 应用的属性,如应用程序名称、...

    spark与kafka集成

    除了消息本身,还可以通过API访问Kafka的消息元数据,如分区信息和偏移量。这有助于跟踪和管理消费进度,确保不会丢失或重复消息。 **性能优化** Spark与Kafka的集成还支持并行读取,可以利用多个工作节点同时从...

    SparkStreamingKafka:Spark Streaming日志到kafka

    2. **设置偏移量管理**:Spark Streaming负责管理和更新Kafka的消费偏移量。可以使用`offsetRanges`来跟踪处理过的数据,确保数据的正确处理和不丢失。 3. **数据转换与处理**:获取到DStream后,可以应用各种操作...

    sparkstreaming:封装sparkstreaming动态调节batch time(有数据就执行计算); 支持运行过程中增删topic; 封装sparkstreaming 1.6 - kafka 010 用以支持 SSL

    :party_popper:v1.6.0-0.10 解决了批次计算延迟后出现的任务append导致整体恢复后 计算消费还是跟不上的...支持rdd.updateOffset 来管理偏移量。 :party_popper: v1.6.0-0.10_ssl 只是结合了 sparkstreaming 1.6 和 ka

    streaming-offset-to-zk:一个手动管理spark streaming集成kafka时的偏移量到zookeeper中的小项目

    该项目提供了一个在使用spark streaming2.1+kafka0.9.0.0的版本集成时,手动存储偏移量到zookeeper中,因为自带的checkpoint弊端太多,不利于 项目升级发布,并修复了一些遇到的bug,例子中的代码已

    积分java源码-kafka-spark-consumer:用于SparkStreaming的高性能Kafka连接器。支持多主题获取、Kafk

    偏移量和处理故障。 这个消费者已经实现了一个自定义可靠接收器,它使用 Kafka Consumer API 从 Kafka 获取消息并将每个接收到的块存储在 Spark BlockManager 中。 该逻辑将自动检测主题的分区数量,并根据配置的...

    SparkStreaming原理介绍

    - **滑动时间间隔**:指相邻两个窗口之间的偏移量,同样也是批处理时间间隔的整数倍。通过调整滑动时间间隔,可以控制窗口的重叠程度。 - **Input DStream**:一种特殊的 DStream 类型,用于从外部数据源(如 Kafka...

    基于Scala和Java的Spark Streaming二次封装开源框架设计源码

    Spark Streaming二次封装开源框架 - 基于Scala和Java构建,包含1157个文件,提供实时流任务调度、Kafka偏移量管理、Web后台管理、Web API控制等功能。该框架简化业务开发流程,让开发者专注于业务逻辑,无需担心底层...

    IPMap文件.docx

    【IPMap文件.docx】的描述中涉及到的知识点主要集中在使用Apache Spark Streaming处理Kafka数据流,特别是关于偏移量的管理和Kafka消费者配置。以下是详细的解释: 1. **Apache Spark Streaming**: - `...

    kafka 插件kafka 插件kafka 插件

    - **实时分析**: 结合Spark Streaming或Flink等流处理框架,实时处理Kafka中的数据流,进行实时分析和决策。 - **微服务通信**: 在微服务架构中,Kafka可以作为一个消息中间件,允许服务之间异步通信,提高系统的可...

    Spark学习笔记Spark Streaming的使用

    5. Spark Streaming 整合 Kafka * 基于 Receiver 的方式:使用 Receiver 来接收数据,偏移量是由 ZooKeeper 来维护的,编程简单但是效率低。 * 基于 Direct 的方式:使用 Direct 来接收数据,偏移量是由我们来手动...

    Spark分布式内存计算框架视频教程

    10.消费Kafka偏移量管理 第六章、StructuredStreaming模块 1.StructuredStreaming 概述(核心设计和编程模型) 2.入门案例:WordCount 3.输入源InputSources 4.Streaming Query 设置 5.输出终端OutputSink 6.集成...

    kafka-spark

    这段内容主要涉及到了Kafka的配置、主题创建以及Spark Streaming的应用。接下来将详细解释这些知识点。 ### 1. Apache Kafka 基础配置 #### 创建主题 在Apache Kafka中,通过`kafka-topics.sh`命令可以实现主题...

    kafka及其性能测试

    首先,Kafka的核心概念包括生产者(Producer)、消费者(Consumer)、主题(Topic)、代理(Broker)、分区(Partition)、偏移量(Offset)、副本(Replica)、领导者(Leader)、追随者(Follower)、同步副本集合...

    kafka直接解压可用

    5. ** offsets管理**:每个消费者记录其读取的每个分区的偏移量(offset),以便下次从上次读取的位置继续。 6. **Zookeeper集成**:Kafka使用Zookeeper来协调集群,管理元数据,例如分区的主副本选举。 7. **可...

Global site tag (gtag.js) - Google Analytics