前面的文章已经介绍了在spark streaming集成kafka时,如何处理其偏移量的问题,由于spark streaming自带的checkpoint弊端非常明显,所以一些对数据一致性要求比较高的项目里面,不建议采用其自带的checkpoint来做故障恢复。
在spark streaming1.3之后的版本支持direct kafka stream,这种策略更加完善,放弃了原来使用Kafka的高级API自动保存数据的偏移量,之后的版本采用Simple API也就是更加偏底层的api,我们既可以用checkpoint来容灾,也可以通过低级api来获取偏移量自己管理偏移量,这样以来无论是程序升级,还是故障重启,在框架端都可以做到Exact One准确一次的语义。
本篇文章,会再介绍下,如何手动管理kafka的offset,并给出具体的代码加以分析:
版本:
apache spark streaming2.1
apache kafka 0.9.0.0
手动管理offset的注意点:
(1)第一次项目启动的时候,因为zk里面没有偏移量,所以使用KafkaUtils直接创建InputStream,默认是从最新的偏移量开始消费,这一点可以控制。
(2)如果非第一次启动,zk里面已经存在偏移量,所以我们读取zk的偏移量,并把它传入到KafkaUtils中,从上次结束时的偏移量开始消费处理。
(3)在foreachRDD里面,对每一个批次的数据处理之后,再次更新存在zk里面的偏移量
注意上面的3个步骤,1和2只会加载一次,第3个步骤是每个批次里面都会执行一次。
下面看第一和第二个步骤的核心代码:
/****
*
* @param ssc StreamingContext
* @param kafkaParams 配置kafka的参数
* @param zkClient zk连接的client
* @param zkOffsetPath zk里面偏移量的路径
* @param topics 需要处理的topic
* @return InputDStream[(String, String)] 返回输入流
*/
def createKafkaStream(ssc: StreamingContext,
kafkaParams: Map[String, String],
zkClient: ZkClient,
zkOffsetPath: String,
topics: Set[String]): InputDStream[(String, String)]={
//目前仅支持一个topic的偏移量处理,读取zk里面偏移量字符串
val zkOffsetData=KafkaOffsetManager.readOffsets(zkClient,zkOffsetPath,topics.last)
val kafkaStream = zkOffsetData match {
case None => //如果从zk里面没有读到偏移量,就说明是系统第一次启动
log.info("系统第一次启动,没有读取到偏移量,默认就最新的offset开始消费")
//使用最新的偏移量创建DirectStream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
case Some(lastStopOffset) =>
log.info("从zk中读取到偏移量,从上次的偏移量开始消费数据......")
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
//使用上次停止时候的偏移量创建DirectStream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, lastStopOffset, messageHandler)
}
kafkaStream//返回创建的kafkaStream
}
主要是针对第一次启动,和非首次启动做了不同的处理。
然后看下第三个步骤的代码:
/****
* 保存每个批次的rdd的offset到zk中
* @param zkClient zk连接的client
* @param zkOffsetPath 偏移量路径
* @param rdd 每个批次的rdd
*/
def saveOffsets(zkClient: ZkClient, zkOffsetPath: String, rdd: RDD[_]): Unit = {
//转换rdd为Array[OffsetRange]
val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//转换每个OffsetRange为存储到zk时的字符串格式 : 分区序号1:偏移量1,分区序号2:偏移量2,......
val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",")
log.debug(" 保存的偏移量: "+offsetsRangesStr)
//将最终的字符串结果保存到zk里面
ZkUtils.updatePersistentPath(zkClient, zkOffsetPath, offsetsRangesStr)
}
主要是更新每个批次的偏移量到zk中。
例子已经上传到github中,有兴趣的同学可以参考这个链接:
https://github.com/qindongliang/streaming-offset-to-zk
后续文章会聊一下为了升级应用如何优雅的关闭的流程序,以及在kafka扩展分区时,上面的程序如何自动兼容。
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
这个小项目专注于手动管理Spark Streaming在处理Kafka数据时的偏移量,并将其存储在Zookeeper中,以便于跟踪和管理数据消费状态。以下是关于这个项目的详细知识点: **1. Spark Streaming** Spark Streaming是...
如果ZooKeeper中存在保存的偏移量,则可以根据这些偏移量继续消费数据;如果没有,则从头开始消费。 - **处理数据**:通过`foreachRDD`方法处理接收到的数据,并打印出来。 #### 7. 总结 通过以上介绍,我们了解到...
它是一个分布式协调服务,用于管理 Kafka 的元数据,包括主题(topics)、分区(partitions)和偏移量(offsets)。在Kafka消费者中,使用Zookeeper来存储和同步消费者的offset是非常常见的做法。这样可以确保消息的...
一个完善的Spark Streaming二次封装开源框架,包含:实时流任务调度、kafka偏移量管理,web后台管理,web api启动、停止spark streaming,宕机告警、自动重启等等功能支持,用户只需要关心业务代码,无需关注繁琐的...
下面是使用过程中记录的一些心得和博客,感兴趣的朋友可以了解下:项目简介该项目提供了一个在使用spark streaming2.3+kafka1.3的版本集成时,手动存储偏移量到zookeeper中,因为自带的checkpoint弊端太多,不利于...
`SparkStreaming.zip` 文件可能包含了一个示例项目,演示了如何创建一个 Spark Streaming 应用来从 Kafka 消费数据。代码通常会包含以下步骤: 1. 创建 SparkConf 对象,配置 Spark 应用的属性,如应用程序名称、...
除了消息本身,还可以通过API访问Kafka的消息元数据,如分区信息和偏移量。这有助于跟踪和管理消费进度,确保不会丢失或重复消息。 **性能优化** Spark与Kafka的集成还支持并行读取,可以利用多个工作节点同时从...
2. **设置偏移量管理**:Spark Streaming负责管理和更新Kafka的消费偏移量。可以使用`offsetRanges`来跟踪处理过的数据,确保数据的正确处理和不丢失。 3. **数据转换与处理**:获取到DStream后,可以应用各种操作...
:party_popper:v1.6.0-0.10 解决了批次计算延迟后出现的任务append导致整体恢复后 计算消费还是跟不上的...支持rdd.updateOffset 来管理偏移量。 :party_popper: v1.6.0-0.10_ssl 只是结合了 sparkstreaming 1.6 和 ka
该项目提供了一个在使用spark streaming2.1+kafka0.9.0.0的版本集成时,手动存储偏移量到zookeeper中,因为自带的checkpoint弊端太多,不利于 项目升级发布,并修复了一些遇到的bug,例子中的代码已
偏移量和处理故障。 这个消费者已经实现了一个自定义可靠接收器,它使用 Kafka Consumer API 从 Kafka 获取消息并将每个接收到的块存储在 Spark BlockManager 中。 该逻辑将自动检测主题的分区数量,并根据配置的...
- **滑动时间间隔**:指相邻两个窗口之间的偏移量,同样也是批处理时间间隔的整数倍。通过调整滑动时间间隔,可以控制窗口的重叠程度。 - **Input DStream**:一种特殊的 DStream 类型,用于从外部数据源(如 Kafka...
Spark Streaming二次封装开源框架 - 基于Scala和Java构建,包含1157个文件,提供实时流任务调度、Kafka偏移量管理、Web后台管理、Web API控制等功能。该框架简化业务开发流程,让开发者专注于业务逻辑,无需担心底层...
【IPMap文件.docx】的描述中涉及到的知识点主要集中在使用Apache Spark Streaming处理Kafka数据流,特别是关于偏移量的管理和Kafka消费者配置。以下是详细的解释: 1. **Apache Spark Streaming**: - `...
5. Spark Streaming 整合 Kafka * 基于 Receiver 的方式:使用 Receiver 来接收数据,偏移量是由 ZooKeeper 来维护的,编程简单但是效率低。 * 基于 Direct 的方式:使用 Direct 来接收数据,偏移量是由我们来手动...
10.消费Kafka偏移量管理 第六章、StructuredStreaming模块 1.StructuredStreaming 概述(核心设计和编程模型) 2.入门案例:WordCount 3.输入源InputSources 4.Streaming Query 设置 5.输出终端OutputSink 6.集成...
这段内容主要涉及到了Kafka的配置、主题创建以及Spark Streaming的应用。接下来将详细解释这些知识点。 ### 1. Apache Kafka 基础配置 #### 创建主题 在Apache Kafka中,通过`kafka-topics.sh`命令可以实现主题...
- **实时分析**: 结合Spark Streaming或Flink等流处理框架,实时处理Kafka中的数据流,进行实时分析和决策。 - **微服务通信**: 在微服务架构中,Kafka可以作为一个消息中间件,允许服务之间异步通信,提高系统的可...
首先,Kafka的核心概念包括生产者(Producer)、消费者(Consumer)、主题(Topic)、代理(Broker)、分区(Partition)、偏移量(Offset)、副本(Replica)、领导者(Leader)、追随者(Follower)、同步副本集合...
5. ** offsets管理**:每个消费者记录其读取的每个分区的偏移量(offset),以便下次从上次读取的位置继续。 6. **Zookeeper集成**:Kafka使用Zookeeper来协调集群,管理元数据,例如分区的主副本选举。 7. **可...