1.背景
spark streaming有状态计算(如UV)通常采用DStream.updateStateByKey(实际是PairDStreamFunctions增强到DStream的),具体实现网上讲的很多。spark streaming是持续计算,有状态时不能通过简单的DAG/lineage容错,所以必须设置checkpoint(否则Job启动会报错)
checkpoint会持久化当批次RDD的快照、未完成的Task状态等。SparkContext通过checkpoint可以重建DStream,即使Driver宕机,重启后仍可用SparkContext.getOrElse从checkpoint恢复之前的状态。如果上游不丢数据(如kafka),那么宕机重启后原则上可以实现续传
事情似乎是很完美,但是拿到实际环境中还是会有问题
2.过压时的表现
首先来看下计算量过载以后发生的事情。这个不是Spark的问题,但分析一下有助于理解spark streaming有状态计算的原理
手动向spark灌超量数据(数据规模大至一个Duration内无法消化当批次数据),最终报错如下
java.lang.StackOverflowError
at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)
..........
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
..........(很长的重复的堆栈,栈溢出你)
出错是正常的,因为目的就是为观察压垮以后的情况,但为什么是StackOverflow(而不是通常预期的OOM)?为此研究了一下相关的源码:
首先就是PairDStreamFunctions.updateStateByKey,这里没什么特殊,就是说嘛使用的实现类是StateDStream
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean,initialRDD: RDD[(K, S)]): DStream[(K, S)] =
ssc.withScope {
new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,rememberPartitioner, Some(initialRDD))
}
然后来看StateDStream.getOrcompute,这是RDD实际生成的方法。这里带参数time就是有状态计算的专有逻辑
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
// 先从缓存拿,没有就计算
generatedRDDs.get(time).orElse {
if (isTimeValid(time)) {
// 这里createRDDWithLocalProperties和disableOutputSpecValidatio是做一些配置相关的预处理,这里不罗列代码了。主要是调用compute方法
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
}
}
......
再来看compute方法
override def compute(validTime: Time): Option[RDD[(K, S)]] = {
getOrCompute(validTime - slideDuration) match {
case Some(prevStateRDD) => {
........
这里关键就是递归去拿上一时间窗口的RDD,这就是有状态计算的实现方法。相当于把每个时间窗口的RDD串在一起。于是当计算跟不上数据的时候,会导致每次计算落后于上次的时间越来越大。而slideDuration是固定的,于是递归层数就越来越多,最终导致栈溢出
3.探讨
如果服务宕机很长时间(比如周末挂了),重启的时候会递归很多层来恢复数据,造成栈溢出。所以这个checkpoint机制在有状态机制下实际效果是有限的
有状态实时计算比无状态复杂很多,Spark Streaming虽然提供了理论上可行的方案,但是在数据恢复方面还是有限制的。这一点目前没有想到太完善的解决方案
a) 能无状态尽量无状态计算
b) 如果需要维护的状态不是特别复杂(比如少数几个当前的累加数),可以自己单独维护状态和checkpoint(比如记录在redis或者Accumulator,在启动和shutdown的时候自己实现状态记录和恢复),这样就不需要用Spark streaming的机制
c) Spark streaming的递归机制相当于把各时间点的DAG串联成一个大DAG,从而把问题归化为无状态。这种设计还是很精妙的,但是也带来一点副作用(DAG可能变得很庞大)。事实上长时间宕机期间都是没有数据的,完全没必要逐个interval去递归。如果能够动态调整interval,也许可以解决栈溢出的问题。期待spark streaming在这方面进行优化
相关推荐
Spark Streaming 是Apache Spark中的一个重要组件,专门设计用来处理实时数据流的计算框架。作为Spark核心API的一个扩展,它延续了Spark的易用性和高效性,能够将实时数据流处理与批量数据处理无缝集成在一起。利用...
为了提高性能,Spark Streaming提供了多种优化手段,包括减少网络传输、增加计算并行度、调整微批次大小等。 #### 四、实例演示 ##### 1. 网络数据处理 通过网络接口接收实时数据流,对其进行清洗、过滤和统计分析...
Flink、Storm、Spark Streaming三种流框架的对比分析 Flink架构及特性分析 Flink是一个原生的流处理系统,...Flink、Storm、Spark Streaming三种流框架都有其优缺点,选择哪种流框架取决于实际的业务需求和技术栈。
4. 容错性:通过检查点和状态管理,Spark Streaming能够保证在节点故障时恢复计算,确保数据处理的完整性。 5. 扩展性:Spark Streaming可以水平扩展,适应不断增长的数据量和处理需求,同时支持多种数据源,如...
6. **有状态的流式处理:** Spark Streaming支持有状态的操作,例如通过`updateStateByKey`方法来维护任意状态信息,使得即使在流式数据中也能进行复杂的状态管理和计算。 7. **Window-Based Transformation:** 在...
在大数据处理领域,Spark作为一款高效、通用的计算框架,被广泛应用在数据分析、机器学习等多个场景。本项目涉及的核心知识点包括Spark Core、Spark SQL和Spark Streaming,同时结合了Scala和Java编程语言,以及...
- **容错和数据保证**:Spark Streaming 提供了更强大的状态管理和数据一致性保证。在处理故障恢复时,Spark Streaming 可以确保每条记录被准确处理一次,而 Storm 则只能保证每条记录至少被处理一次。 #### 2. ...
6. Spark Streaming的运行与监控:实时数据流处理项目不仅需要强大的计算能力,还需要能够监控和管理系统的实时状态。这可能包括作业执行情况、资源使用情况、性能指标等。 由于文档内容不完整,并且信息量较大,...
而`updateStateByKey`这样的有状态转换操作则让开发者能够实现更高级的流处理逻辑,例如在时间序列数据上进行状态维护和累计计算。了解和熟练运用这些知识对于进行实时数据分析和流处理项目至关重要。
本资料包着重介绍了Spark Streaming的核心概念、架构以及如何在实际场景中应用。 **一、Spark Streaming简介** Spark Streaming构建在核心的Spark引擎之上,它将实时数据流分割成微小的批处理任务(DStreams),...
滑动窗口在Spark Streaming中的应用广泛,如计算滑动窗口内的平均值、最大值、最小值等统计指标,或者检测时间窗口内的特定事件模式。例如,我们可以设置一个5分钟的滑动窗口,每分钟滑动一次,来计算过去5分钟内每...
由于给定的文件信息中【部分内容】的文本出现了重复,并且具体的技术细节和项目实施细节并没有提及,因此我将基于标题和描述提供的信息,结合Spark Streaming和京东的实际应用情况,来详细阐述知识点。 首先,Spark...
理解这些依赖关系有助于更好地设计和调试 Spark Streaming 应用程序。 ##### 5.2 ReceiverTracker 与数据导入 ReceiverTracker 跟踪接收器的状态,确保数据能够正确地导入 Spark Streaming 应用程序。了解 ...
这个项目不仅适用于毕业设计,也是对大数据实时处理技术的实际应用训练,对于理解Spark Streaming、Kafka和HBase的工作原理,以及如何在实际场景中整合使用这些技术,具有很高的学习价值。通过该项目,开发者可以...
最后,我们需要启动Spark Streaming的计算任务并持续接收数据: ```java wordCounts.print(); jssc.start(); jssc.awaitTermination(); ``` 这个简单的示例展示了如何将Spark Streaming与MQTT结合,实现实时处理...
5. 故障容错:Spark Streaming支持检查点和故障恢复机制,确保在节点故障时能够恢复计算状态,保证系统的高可用性。 在实际操作中,项目可能还涉及到了配置管理工具如Ansible,版本控制工具如Git,以及持续集成/...
本文将深入探讨如何利用Apache Spark Streaming构建一个高效的实时推荐系统,并重点讲解其核心组件、工作流程以及在实际应用中的挑战与解决方案。 #### 一、为什么要使用Spark Streaming? 选择Spark Streaming...
Spark Streaming与Spark Core紧密集成,能够无缝利用Spark的强大计算能力,支持复杂的窗口操作和状态管理,使得实时分析变得简单且高效。 Apache Kafka是一个开源的分布式流处理平台,被广泛用于构建实时数据管道和...