`

Sparkstreaming reduceByKeyAndWindow(_+_, _-_, Duration, Duration) 的源码/原理解析

阅读更多
最近在玩spark streaming, 感觉到了他的强大。 然后看 StreamingContext的源码去理解spark是怎么完成计算的。 大部分的源码比较容易看懂, 但是这个
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)
还是花了不少时间。 主要还是由于对spark不熟悉造成的吧, 还好基本弄明白了。

总的来说SparkStreaming提供这个方法主要是出于效率考虑。 比如说我要每10秒计算一下前15秒的内容,(每个batch 5秒), 可以想象每十秒计算出来的结果和前一次计算的结果其实中间有5秒的时间值是重复的。
那么就是通过如下步骤
1. 存储上一个window的reduce值
2.计算出上一个window的begin 时间到 重复段的开始时间的reduce 值 =》 oldRDD
3.重复时间段的值结束时间到当前window的结束时间的值 =》 newRDD
4.重复时间段的值等于上一个window的值减去oldRDD

这样就不需要去计算每个batch的值, 只需加加减减就能得到新的reduce出来的值。

从代码上面来看, 入口为:
reduceByKeyAndWindow(_+_, _-_, Duration, Duration)

一步一步跟踪进去, 可以看到实际的业务类是在ReducedWindowedDStream 这个类里面:
代码理解就直接拿这个类来看了: 主要功能是在compute里面实现, 通过下面代码回调mergeValues 来计算最后的返回值
val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
      .mapValues(mergeValues)


先计算oldRDD 和newRDD

//currentWindow  就是以当前时间回退一个window的时间再向前一个batch 到当前时间的窗口 代码里面有一个图很有用:
我们要计算的new rdd就是15秒-25秒期间的值, oldRDD就是0秒到10秒的值, previous window的值是1秒 - 15秒的值

然后最终结果是 重复区间(previous window的值 - oldRDD的值) =》 也就是中间重复部分, 再加上newRDD的值, 这样的话得到的结果就是10秒到25秒这个时间区间的值


    // 0秒                  10秒     15秒                25秒
    //  _____________________________
    // |  previous window   _________|___________________
    // |___________________|       current window        |  --------------> Time
    //                     |_____________________________|
    //
    // |________ _________|          |________ _________|
    //          |                             |
    //          V                             V
    //       old RDDs                     new RDDs
    //






val currentTime = validTime


    val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,
      currentTime)
    val previousWindow = currentWindow - slideDuration

   val oldRDDs =
      reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration)
    logDebug("# old RDDs = " + oldRDDs.size)

    // Get the RDDs of the reduced values in "new time steps"
    val newRDDs =
      reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)
    logDebug("# new RDDs = " + newRDDs.size)



得到newRDD和oldRDD后就要拿到previous windows的值: 如果第一次没有previous window那么建一个空RDD, 为最后计算结果时 arrayOfValues(0).isEmpty 铺垫
val previousWindowRDD =
      getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K, V)]()))


然后把所有的值放到一个数组里面 0是previouswindow, 1到oldRDD.size是oldrdd, oldRDD.size到newRDD.size是newrdd

val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs


将每个RDD的(K,V) 转变成(K, Iterator(V))的形式:

比如说有两个值(K,a) 和(K,b) 那么coGroup后就会成为(K, Iterator(a,b))这种形式

 val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]],
      partitioner)


进行最后的计算:
 val mergeValues = (arrayOfValues: Array[Iterable[V]]) => {
...

}


首先判断RDD的value数量是不是正确 previous window因为已经计算过所以只有一组值
正确值为 1 (previous window value) + numOldValues (oldRDD 每个RDD的value) + numNewValues (newRDD 每个RDD的value)

      if (arrayOfValues.size != 1 + numOldValues + numNewValues) {
        throw new Exception("Unexpected number of sequences of reduced values")
      }



接下来取出oldRDD的值和newRDD的值:
 val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head)
 val newValues =
        (1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)


如果previous window是空的, 那么就直接计算newRDD的值(这也是为什么每次计算时候第一次打出来的值都比较少, 因为他只有newRDD部分没有重合部分, 也就是只有10秒的内容而不是15秒)

 if (arrayOfValues(0).isEmpty) {
        // If previous window's reduce value does not exist, then at least new values should exist
        if (newValues.isEmpty) {
          throw new Exception("Neither previous window has value for key, nor new values found. " +
            "Are you sure your key class hashes consistently?")
        }
        // Reduce the new values
        newValues.reduce(reduceF) // return
      }


如果有previous window的值, 那么先存到tempValue, 如果有oldRDD那么减去oldRDD, 如果有newRDD (一般都有) 那么加上newRDD的值 这样就组成上图里面10到25秒区间的值了

else {
        // Get the previous window's reduced value
        var tempValue = arrayOfValues(0).head
        // If old values exists, then inverse reduce then from previous value
        if (!oldValues.isEmpty) {
          tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))
        }
        // If new values exists, then reduce them with previous value
        if (!newValues.isEmpty) {
          tempValue = reduceF(tempValue, newValues.reduce(reduceF))
        }
        tempValue // return
      }



最后如果有filter的function的话就filter一下:
if (filterFunc.isDefined) {
      Some(mergedValuesRDD.filter(filterFunc.get))
    } else {
      Some(mergedValuesRDD)
    }


这样就返回了新window内的值


2
0
分享到:
评论

相关推荐

    SparkStreaming之滑动窗口的实现.zip_Spark!_spark stream 窗口_spark streamin

    Spark Streaming是Apache Spark的一个模块,专门用于处理实时数据流。...在学习和实践中,理解滑动窗口的工作原理并掌握其配置方法,将对提升Spark Streaming应用程序的效率和准确性起到关键作用。

    kafka+spark streaming开发文档

    在配置Spark Streaming时,需要将Spark版本设置为1.3.0,并且需要配置Spark Streaming的参数,包括batch interval、window duration等。 三、Kafka和Spark Streaming集成 在将Kafka和Spark Streaming集成时,需要...

    SparkStreaming和kafka的整合.pdf

    根据提供的文件信息,本文将详细解析“Spark Streaming与Kafka的整合”这一主题,并结合代码片段探讨其在实际场景中的应用。尽管标签中提到“数学建模”,但从标题和描述来看,这部分内容与数学建模无关,因此我们将...

    前端项目-moment-duration-format.zip

    首先,`moment.js`是一个强大的JavaScript库,它提供了丰富的API来处理日期和时间,包括解析、验证、操作和格式化等。然而,`moment.js`原生并不支持对持续时间的格式化,而`moment-duration-format`正是为了解决这...

    Spark Streaming 流式日志过滤的实验资源

    - **日志解析**:在 Spark Streaming 中,我们需要先读取这些日志文件,然后通过正则表达式或其他解析方法提取有意义的信息。 - **日志过滤**:实验可能涉及根据特定关键词或模式过滤日志,例如只保留包含特定错误...

    node-video-duration:NodeJs模块,用于检索视频音频资产的持续时间

    安装$ npm install --save node-video-duration用法 const getVideoDuration = require ( 'node-video-duration' ) ;getVideoDuration ( '...

    duration-fns:具有持续时间的功能

    import * as duration from 'duration-fns' // Parsing / stringifying // --------------------------------------------- duration . parse ( 'PT1M30S' ) // { minutes: 1, seconds: 30 } duration . toString ...

    Python库 | aws_cdk_billing_alarm-1.0.0b1-py3-none-any.whl

    period=core.Duration.months(1), evaluation_periods=1, threshold=1000.0, # 设置阈值 comparison_operator=cloudwatch.ComparisonOperator.GREATER_THAN_THRESHOLD, ), alarm_description="触发当月账单...

    c#用ffmpeg根据场景不同来提取视频

    这里给大家介绍如果一键将视频拆分为多个场景视频。... + " -t " + durationTime + " -max_muxing_queue_size 1024" + " -strict -2 -keyint_min 8 -g 8 -sc_threshold 0" + " " + DstFile + " -y ";

    google/protobuf/duration.proto

    google proto-buf中常被使用的一个计时文件,可能你的proto文件需要使用

    Python库 | music_metadata_extractor-1.1.1-py3-none-any.whl

    print(metadata.duration) ``` `music_metadata_extractor`库还支持自定义处理器,允许用户根据需求处理提取到的元数据。例如,你可以创建一个处理器来过滤特定的标签,或者将元数据存储到数据库或JSON文件中: ``...

    TPFanControl 0.63准绿色版+教程

    // Beep frequency and duration (in ms) for successful // fan state changes. (Set either or both to zero to // disable) // 当成功转换风扇状态时,蜂鸣器的频率和时长(毫秒),任何一个值设定为0时,禁止蜂鸣. ...

    前端项目-humanize-duration.zip

    源代码通常包含了函数和方法,用于解析毫秒值并生成相应的文本描述。例如,`humanizeDuration(ms, options)`函数是库的主要接口,接受毫秒数和可选配置对象作为参数,返回一个表示时间间隔的字符串。 配置对象可以...

    开源项目-senseyeio-duration.zip

    d := duration.Parse("1h30m") // 将字符串解析为 Duration result := d.Add(30 * duration.Second) // 添加30秒 ``` 总的来说,`Senseyeio-Duration` 是一个强大的 Go 时间处理库,它扩展了标准库的功能,使开发者...

    开源项目-icholy-Duration.py.zip

    【开源项目-icholy-Duration.py.zip】是一个包含Python代码的开源项目,主要功能是解析`time.Duration`字符串。在编程领域,`time.Duration`通常用于表示时间间隔,特别是在Go语言中,它是一个内置类型,可以方便地...

    前端开源库-gulp-duration

    `gulp-duration`是一个非常实用的前端开源库,它专为`Gulp.js`工作流设计,用于跟踪和显示`Gulp`任务的执行时间。这个库的目的是帮助开发者更好地理解他们的构建过程,定位可能的性能瓶颈,并优化整体的构建速度。 ...

    matlab开发-duration

    在MATLAB环境中,"duration"通常指的是用于处理和计算时间间隔的函数或数据类型。MATLAB的`duration`对象提供了一种方便的方式来存储和操作时间跨度,包括小时、分钟、秒以及毫秒等精度。在“matlab开发-duration”...

    loadRunner常用脚本

    lr_get_trans_instance_duration/获取事务实例的持续时间(由它的句柄指定) lr_get_trans_instance_wasted_time/获取事务实例浪费的时间(由它的句柄指定) lr_get_transaction_duration/获取事务的持续时间(按...

    youtube-duration-sort:按时长排序YouTube订阅视频-源码

    【标题解析】:“youtube-duration-sort:按时长排序YouTube订阅视频-源码”表明这是一个与YouTube订阅视频相关的项目,它的核心功能是实现视频时长的排序。"源码"提示我们这里包含的是编程代码,可能是用JavaScript...

Global site tag (gtag.js) - Google Analytics