在互联网场景下,经常会有各种实时的数据处理,这种处理方式也就是流式计算,延迟通常也在毫秒级或者秒级,比较有代表性的几个开源框架,分别是Storm,Spark Streaming和Filnk。
曾经在一个项目里面用过阿里改造后的JStrom,整体感受就是编程略复杂,在不使用Trident Api的时候是不能保证准确一次的数据处理的,但是能保证不丢数据,但是不保证数据重复,我们在使用期间也出现过几次问题,bolt或者worker重启时候会导致大量数据重复计算,这个问没法解决,如果想解决就得使用Trident来保证,使用比较繁琐。
最近在做一个实时流计算的项目,采用的是Spark Steaming,主要是对接Spark方便,当然后续有机会也会尝试非常具有潜力的Filnk,大致流程,就是消费kafka的数据,然后中间做业务上的一些计算,中间需要读取redis,计算的结果会落地在Hbase中,Spark2.x的Streaming能保证准确一次的数据处理,通过spark本身维护kafka的偏移量,但是也需要启用checkpoint来支持,因为你没法预料到可能出现的故障,比如断电,系统故障,或者JVM崩溃等等。
鉴于上面的种种可能,Spark Streaming需要通过checkpoint来容错,以便于在任务失败的时候可以从checkpoint里面恢复。
在Spark Streaming里面有两种类型的数据需要做checkpoint:
A :元数据信息checkpoint 主要是驱动程序的恢复
(1)配置 构建streaming应用程序的配置
(2)Dstream操作 streaming程序中的一系列Dstream操作
(3)没有完成的批处理 在运行队列中的批处理但是没有完成
B:消费数据的checkpoint
保存生成的RDD到一个可靠的存储系统中,常用的HDFS,通常有状态的数据横跨多个batch流的时候,需要做checkpoint
总结下:
元数据的checkpoint是用来恢复当驱动程序失败的场景下
而数据本身或者RDD的checkpoint通常是用来容错有状态的数据处理失败的场景
大多数场景下没有状态的数据或者不重要的数据是不需要激活checkpoint的,当然这会面临丢失少数数据的风险(一些已经消费了,但是没有处理的数据)
如何在代码里面激活checkpoint?
// 通过函数来创建或者从已有的checkpoint里面构建StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val rdds = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint("/spark/kmd/checkpoint") // 设置在HDFS上的checkpoint目录
//设置通过间隔时间,定时持久checkpoint到hdfs上
rdds.checkpoint(Seconds(batchDuration*5))
rdds.foreachRDD(rdd=>{
//可以针对rdd每次调用checkpoint
//注意上面设置了,定时持久checkpoint下面这个地方可以不用写
rdd.checkpoint()
}
)
//返回ssc
ssc
}
def main(args:Array){
// 创建context
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// 启动流计算
context.start()
context.awaitTermination()
}
启动项目之后,我们能在HDFS上看到对应目录下面的checkpoint内容
这里有有两个坑:
(1)处理的逻辑必须写在functionToCreateContext函数中,你要是直接写在main方法中,在首次启动后,kill关闭,再启动就会报错
关闭命令
yarn application -kill application_1482996264071_34284
再次启动后报错信息
has not been initialized when recovery from checkpoint
解决方案:将逻辑写在函数中,不要写main方法中,
(2)首次编写Spark Streaming程序中,因为处理逻辑没放在函数中,全部放在main函数中,虽然能正常运行,也能记录checkpoint数据,但是再次启动先报(1)的错误,然后你解决了,打包编译重新上传服务器运行,会发现依旧报错,这次的错误和(1)不一样:
xxxx classs ClassNotFoundException
但令你疑惑的是明明打的jar包中包含了,这个类,上一次还能正常运行这次为啥就不能了,问题就出在checkpoint上,因为checkpoint的元数据会记录jar的序列化的二进制文件,因为你改动过代码,然后重新编译,新的序列化jar文件,在checkpoint的记录中并不存在,所以就导致了上述错误,如何解决:
也非常简单,删除checkpoint开头的的文件即可,不影响数据本身的checkpoint
hadoop fs -rm /spark/kmd/check_point/checkpoint*
然后再次启动,发现一切ok,能从checkpoint恢复数据,然后kill掉又一次启动
就能正常工作了。
最后注意的是,虽然数据可靠性得到保障了,但是要谨慎的设置刷新间隔,这可能会影响吞吐量,因为每隔固定时间都要向HDFS上写入checkpoint数据,spark streaming官方推荐checkpoint定时持久的刷新间隔一般为批处理间隔的5到10倍是比较好的一个方式。
参考链接:
https://issues.apache.org/jira/browse/SPARK-6770
http://www.jianshu.com/p/807b0767953a
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
Spark Streaming支持两种主要的持久化机制:检查点(Checkpoint)和内存中的状态更新。这两种机制都可以用来实现容错功能,确保在发生故障时可以从最近的状态恢复处理流程。 ##### 5. 优化 为了提高性能,Spark ...
- **RDD容错机制**: Spark Streaming基于RDD的Lineage机制,当部分分区丢失时,可以通过计算Lineage信息重新计算得到丢失的数据。 - **数据源容错**: 对于来自外部文件系统(如HDFS)的数据源,可以通过重新读取数据...
相比之下,Spark Streaming依赖于Checkpoint和Write-Ahead Log(WAL)机制,在这方面表现一般。 6. **动态调整并行度**:Storm支持动态调整并行度,而Spark Streaming则不支持这一特性。 #### 三、总结 综上所述,...
7. **Checkpointing**:为了实现容错,Spark Streaming支持checkpoint机制,定期将状态信息写入持久化存储,当作业失败时可以从checkpoint恢复。 8. **资源调度**:在运行Spark Streaming作业时,需要考虑Spark集群...
4. **容错与可伸缩性**:讨论Spark Streaming的容错机制,如检查点(checkpoint)和滑动窗口,以及如何调整系统参数以优化性能和容错能力。 5. **实时流处理算法**:介绍适用于实时分析的统计和机器学习算法,如...
Spark SQL的内置函数、开窗函数、UDF、UDAF,Spark Streaming的Kafka Direct API、updateStateByKey、transform、滑动窗口、foreachRDD性能优化、与Spark SQL整合使用、持久化、checkpoint、容错与事务。 7、多个从...
- **统一API**:Spark Streaming与Spark Core共享相同的API,这意味着开发者可以在同一应用程序中混合使用批处理和流处理逻辑。 - **容错性**:通过checkpoint机制确保在发生故障时能够自动恢复。 - **窗口处理**:...
- 容错性:Spark通过checkpoint机制进行容错,可以是数据检查点或更新日志,用户可选择合适的策略。 - 用户友好:Spark提供了Scala, Java, Python的API,还有交互式Shell,增强了易用性。 3. Spark与Hadoop的整合...
在Java中使用Spark Streaming时,首先需要导入相应的库,并创建一个SparkConf对象来设置Spark应用的基本信息,如应用名称和Master地址。接着,通过SparkConf创建一个StreamingContext,这是Spark Streaming的主要...
Flink 的 checkpoint 机制比 Spark Streaming 的 checkpoint 机制更高效,Flink 可以在微秒级别上 checkpoint 数据,而 Spark Streaming 需要几十秒才能 checkpoint 数据。 7. Flink 是如何保证 Exactly-once 语义...
此外,理解Spark的持久化机制,如cached和checkpoint,以及错误处理和容错策略也很重要。 最后,面试可能还会涉及Spark与其他大数据技术(如Hadoop、Hive、Kafka等)的集成,以及在实际项目中的具体应用案例。 总...
而Spark Streaming则为实时流数据处理提供了框架,它将连续的数据流分割成小批次,然后使用Spark的批处理能力进行处理,实现了低延迟的实时计算。 Spark支持多种运行模式,包括本地模式、独立模式(Standalone)、...
此外,使用储蓄策略如 Checkpoint 和 lineage shortening 有助于降低容错成本。 以上就是使用 Scala 语言操作 Spark 进行 MySQL 和 HDFS 连接的基础知识。在实际项目中,根据具体需求和环境,还需要深入理解并应用...
Spark Streaming有较高的容错能力,因为它会定期对数据进行Checkpoint操作。它还支持Exactly-once的处理语义,保证了消息的精确处理。在处理延迟和吞吐量方面,Spark Streaming提供了折衷的方案,虽然不如Storm那样...
容错通过Lineage和Checkpoint机制实现,即使在节点故障时也能恢复数据完整性。 开发Spark应用程序,可以使用IntelliJ IDEA、Eclipse或SBT等工具,配合Spark Shell进行快速开发和测试。远程调试功能有助于优化代码和...
* 容错机制:Spark Streaming 任务可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。Flink 则使用...
- **Apache Spark Streaming** 使用微批次(Micro-Batch)处理模型,将实时流数据划分为小批量数据进行处理,从而实现近实时计算。 - **Apache Flink** 和 **Apache Storm** 支持连续流处理(Continuous Streaming...
- **RDD容错机制Checkpoint**:通过周期性地将RDD状态写入磁盘,实现容错。 - **RDD依赖关系**:RDD之间的依赖关系定义了任务调度的拓扑结构。 - **DAG生成和Stage划分**:Spark将用户代码转换为DAG,再根据数据...
- **容错机制**:利用Spark提供的容错特性,如RDD的持久化和checkpoint机制,增强系统的稳定性。 - **集群资源管理**:合理规划资源分配,避免资源争抢导致性能下降。 - **安全性和隐私保护**:在处理敏感数据时遵循...