`

[spark-src-core] 5.big data techniques in spark

 
阅读更多

  there are several nice techniques in spark,eg. in user api side.here will dive into it check how does spark  implement them.

 

1.abstract(functions in RDD)

group function feature  principle    
1 first()

 retrieve the first element in this rdd,if it's more than one partitons,the first partition will be taken by priority.

  esp,it will call take(1) internally.

 runs a job pairition by partition untill the total amount

reaches the expected number

   
  take(n)  extract the first n elements in this rdd.it's the equivalent of first() if n is 1      
2 top(n)(order)

 extract the top (max by default) N elements.

  calls takeOrdered(num)(ord.reverse) internally.several search engine says solr will use similar technique to 

figure out it.

 concurrently spawns all tasks to do the same operation 

on respective partiton,ie each ,ie each tasks will try 

to retrieve the 'n' elements.

   
  max()(order)

 retrieve the max element.though they are different algorithms with top(n) internally,both are the same effect(performance?) in finally.

  uses rdd.reduce(ord.max) internally.

     
3 min()(order) it's in the opposite of max(),  uses rdd.reduce(ord.min) internally.  simiar to top(n)    
  takeOrdered(n)(order)  in the opposite of top(n).similar to min() but take N minimum items.      
4 collect()  retrieve all the results for this rdd computation.so OOM exception will occur occasionally  similar to top(n),but here each task will not be limited to be n instead of 'max'    

 

 

2.techniques

a.lazy computation & compuates range by range

  eg. in terms of take(n),spark can act as a lazy-worker:action when need! that is spark will try to use least resources as far as possible.see below for details:

/**-estimates partitions step by step to decrease resource consumption.ie lazy copmutation.
   * Take the first num elements of the RDD. It works by first scanning one partition, and use the
   * results from that partition to estimate the number of additional partitions needed to satisfy
   * the limit.-loop runs(continued jobs) to estimate whether several partitons's results are satisfied the target num.
   *  the result returned is sorted by partiton sequnce.
   * @note due to complications in the internal implementation, this method will raise
   * an exception if called on an RDD of `Nothing` or `Null`.
   */
  def take(num: Int): Array[T] = withScope {
    if (num == 0) {
      new Array[T](0)
    } else {
      val buf = new ArrayBuffer[T]
      val totalParts = this.partitions.length
      var partsScanned = 0
      while (buf.size < num && partsScanned < totalParts) { //-loop to check whether results is satisfied the target
        //1 -compute what partitions range to run
        // The number of partitions to try in this iteration. It is ok for this number to be
        // greater than totalParts because we actually cap it at totalParts in runJob.
        var numPartsToTry = 1
        if (partsScanned > 0) {
          log.info(s"-step to next loop,numPartsToTry=${numPartsToTry}")
          // If we didn't find any rows after the previous iteration, quadruple/四位相乘 and retry.
          // Otherwise, interpolate/篡改 the number of partitions we need to try, but overestimate/高估
          // it by 50%. We also cap/覆盖 the estimation in the end.
          if (buf.size == 0) { //-no any data in prevous scanned partitons,ranges into more partitions
            numPartsToTry = partsScanned * 4
          } else {
            // the left side of max is >=1 whenever partsScanned >= 2
            //-estimate the remain parts to compute.but estimated total parts=num/buf.size * partsScanned * 1.5
            numPartsToTry = Math.max((1.5 * num * partsScanned / buf.size).toInt - partsScanned, 1)
            numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) //-narrow down the partiton range
          }
        }

        val left = num - buf.size
        //-step(range) to next run
        val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
        //-2 proceed with scanning the remain size of each specified partitions.similar to solr's group query
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
        //-3 add up to total buf;note:here doesn't take num-buf.size per partiton,but the remain size as the buf is mutable
        res.foreach(buf ++= _.take(num - buf.size)) //-change buf size per partition
        partsScanned += numPartsToTry
      }

      buf.toArray
    }

   again ,spark will try to estimate the partitons to be computated by current scanned items amount.ie numPartsToTry.

  of course ,this feature is dependented on the parted-computation utility in spark.

 

b.lazy load by iterator

  by diving into takeOrdered(n),some nice stuffs are shown here,

/**-similar to a search engine,asign the request 'num' to each partition,then merge all partitions' result.so here
    * the mapPartitons() is called.
   * Returns the first k (smallest) elements from this RDD as defined by the specified
   * implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].
   * For example:
   * {{{
   *   sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1)
   *   // returns Array(2)
   *
   *   sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2)
   *   // returns Array(2, 3)
   * }}}
   *
   * @param num k, the number of elements to return
   * @param ord the implicit ordering for T
   * @return an array of top elements
   */
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0) {
      Array.empty
    } else {
      //1 retrieve top n items per partition
      val mapRDDs = mapPartitions { items =>
        // Priority keeps the largest elements, so let's reverse the ordering.
        //-restore to small to large order:as the ord is used to trim the smallest element,ie element count is limited
        // in here
        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
        queue ++= util.collection.Utils.takeOrdered(items, num)(ord) //-keep large to samll order to limit items
        Iterator.single(queue)
      }
      //2 merge all the results into final n items
      if (mapRDDs.partitions.length == 0) {
        Array.empty
      } else {
        //-merge the individual partition's sub-result
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1 //-always keep left one to comulate; element count is implemented by queue,see above
        }.toArray.sorted(ord) //-resort by raw ord(reverse order)
      }
    }
  

   note:items is a Iterator,which means that only a reference to the underlying storage is cost other than a concreate a Array or Seq! 

  for more clearly ,we can demostrate some snippets:

  a.driver api

val maprdd = fmrdd.map((_,1)) //-MapPartitionsRDD[3]

  b.rdd internal

/**
   * Return a new RDD by applying a function to all elements of this RDD.
   */
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) //-so 'this' will be parent rdd
  

   c. then dive into iter.map()

def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())
 }

   so we know that,every key-value pari will be read in per loop(callbac func 'f()').

   ie. embeded processure: Fn->...->F2(F1(read root rdd's kv pair1))),then Fn(F..(kv pair2))

 

   also,since every RDD#iterator()(bedides HadoopRDD's one ) will produce a new Iterator(see above),so no any 'no more elements exception ' will rise for follow RDD's calls.

 

 

0
0
分享到:
评论

相关推荐

    Spark: The Definitive Guide: Big Data Processing Made Simple 英文高清.pdf版

    Get a gentle overview of big data and Spark Learn about DataFrames, SQL, and Datasets-Spark's core APIs-through worked examples Dive into Spark's low-level APIs, RDDs, and execution of SQL and ...

    Mastering Spark for Data Science

    Master the techniques and sophisticated analytics used to construct Spark-based solutions that scale to deliver production-grade data science products About This Book Develop and apply advanced ...

    Big Data Analytics with Java

    Big Data Analytics with Java: Data analysis, visualization & machine learning techniques By 作者: Rajat Mehta ISBN-10 书号: 1787288986 ISBN-13 书号:: 9781787288980 Release Finelybook 出版日期: 2017-07...

    Apache Spark 2.x for Java Developers

    Apache Spark is the buzzword in the big data industry right now, especially with the increasing need for real-time streaming and data processing. While Spark is built on Scala, the Spark Java API ...

    Data Analytics with Spark Using Python

    • Understand Spark’s evolving role in the Big Data and Hadoop ecosystems • Create Spark clusters using various deployment modes • Control and optimize the operation of Spark clusters and ...

    scala and spark for big data analytics

    - Springer出版社的书籍系列,例如"Realtime Data Mining Self-Learning Techniques for Recommendation Engines",尽管这不是直接关于Scala和Spark的,但是关于数据分析和推荐系统方面的内容可能会对相关领域的学习...

    Hands-On Data Science and Python Machine Learning

    Hands-On Data Science and Python Machine Learning gives you the tools that you need to understand and explore the core topics in the field, and the confidence and practice to build and analyze your ...

    Practical Data Science with Python3

    Review scalable software engineering practices to ramp up data analysis abilities in the realm of Big Data Apply theory of probability, statistical inference, and algebra to understand the data ...

Global site tag (gtag.js) - Google Analytics