`
hongs_yang
  • 浏览: 61335 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

spark transform系列__sortByKey

阅读更多

该 函数主要功能:通过指定的排序规则与进行排序操作的分区个数,对当前的RDD中的数据集按KEY进行排序,并生成一个SHUFFLEdrdd的实例,这个 过程会执行shuffle操作,在执行排序操作前,sortBy操作会执行一次到两次的数据取样的操作,取出RDD中每个PARTITION的部分数据, 并根据进行分区的partition的个数,按key的compare大小把某个范围内的key放到一个指定的partition中进行排序.

该函数的操作示例:

import org.apache.spark.SparkContext._
*
*   val rdd: RDD[(String, Int)] = ...
*   implicit val caseInsensitiveOrdering = new Ordering[String] {
*     override def compare(a: String, b: String) =  

         a.toLowerCase.compare(b.toLowerCase)
*   }
*
*   // Sort by key, using the above case insensitive ordering.
*   rdd.sortByKey()

 

上面的示例中定义implicit的隐试转换,

  在OrderedRDDFunctions通过private val ordering implicitly[Ordering[K]]引用

 

函数定义,由OrderedRDDFunctions类进行函数的实现:

这个函数中,传入两个参数,ascending表示是升序还是降序,默认true表示升序.

第二个参数是执行排序使用的partition的个数,默认是当前RDD的partition个数.

def sortByKey(ascending: Boolean truenumPartitions: Int = self.partitions.length)
    : RDD[(KV)] = self.withScope
{

生成执行分区操作的算子,这里生成的算子不再是默认的Hash算子,而是Range的算子,这个算子后面进行具体的分析.
  val part = new RangePartitioner(numPartitionsselfascending)

这里重新根据当前的RDD,生成一个新的RDD,这个shuffle的rdd中,不包含aggregator聚合函数.

也就是在shuffle的过程中只是把key对应的值hash到对应的partition中,并根据key执行排序操作.
  new ShuffledRDD[KVV](selfpart)
    .setKeyOrdering(if (ascending) ordering else ordering.reverse)
}

 

接下来看看RangePartitioner的排序算子的实现:

这里传入的参数包含三个,第一个是进行sort操作的分区数,第二个是当前的RDD,在生成新的RDD后,这个rdd就是ShuffledRDD的上层依赖,第三个表示升序或者降序.

class RangePartitioner[Ordering : ClassTagV](
    partitions: Int,
    rdd: RDD[_ <: Product2[KV]],
    private var ascending: Boolean true)
  extends Partitioner {

 

在RangePartitioner实例生成时,会初始化一个rangeBounds的集合.这个的长度是partitions的长度减一.

private var rangeBounds: Array[K] = {

如果partitions只有一个时,直接返回一个空的集合,因为这个rangeBounds的长度是partitions的值减一.
  if (partitions <= 1) {
    Array.empty
  else {

这里得到一个大约的分区的样本量,最多不超过1e6(1000000)个,默认是分区个数的20倍.如果这个分区太多时,只取1e6的个数.
    // This is the sample size we need to have roughly balanced output partitions, 

          capped at 1M.
    val sampleSize = math.min(20.0 * partitions1e6)

这里取出每个partition中样本的最大的个数,通过当前的样本数量*3/partition的个数,并向上取整.
    // Assume the input partitions are roughly balanced and over-sample a little bit.
    val sampleSizePerPartition = math.ceil(3.0 * sampleSize / 

            rdd.partitions.size).toInt

 

这里会根据SHUFFLE前的RDD执行map->collect操作,得到每个partition的样本信息,

每个partition中最多取出ampleSizePerPartition的样本个数.

这里返回的sketched是一个数组,数组的长度是rdd的partitions的个数,

数组中每一个元素是一个Iterator(partitionid,这个partition中总的数据条数,Array[key](长度是样本个数,或者一个小于样本个数的值(这种情况表示partition的数据不够样本个数))),

这里读取每个partition的样本时:

1,如果这个partition中的总的数据集小于ampleSizePerPartition个数时,取出这个partition的所有的数据,这个返回值中的样本个数也就是这个数据集的size.

2, 这种情况表示partition中的总数据集的个数大于(等于就不说了,直接返回)要取的样本数,一直对partition的数据进行迭代,并生成随机数 (通过一个种子值与当前迭代到的条数进行乘法操作),得到的值如果是一个在样本个数范围内的值时,更新样本中对应位置的值.
    val (numItemssketched) = RangePartitioner.sketch(rdd.map(_._1)

             sampleSizePerPartition)


    if (numItems == 0L) {

这种情况表示没有数据,直接返回一个空集合.
      Array.empty
    else {

这里使用到的numItems表示要进行排序操作的这个RDD中的总数据的条数.

通过取样的个数除上总的数据的条数,得到一个分数值.
      // If a partition contains much more than the average number of items, we re-sample

               from it
      // to ensure that enough items are collected from that partition.
      val fraction = math.min(sampleSize / math.max(numItems1L)1.0)

 

这个candidates中存储有用于计算排序的key的候选人信息,
      val candidates = ArrayBuffer.empty[(K, Float)]

 

这个集合中存储了部分partition中数据集的总数超过了平均每个partition的数据集记录太多的数据的partition.
      val imbalancedPartitions = mutable.Set.empty[Int]


      sketched.foreach { case (idxnsample) =>

这里迭代每一个partition中的样本,如果partition中的数据量总数与样本在总记录中的占比进行乘法操作后的值大于每个partition要取的样本个数,把这个partition先记录下来.

否则根据这个partition的总记录数除上取样的数量得到一个权重,把这个partition中的样本添加到candidates的集合中.这个集合中的数据是排序的候选数据集.
        if (fraction * n > sampleSizePerPartition) {
          imbalancedPartitions += idx
        } else {
          // The weight is 1 over the sampling probability.
          val weight = (n.toDouble / sample.size).toFloat
          for (key <- sample) {
            candidates += ((keyweight))
          }
        }
      }

 

这里计算partition中的记录数比较多的partition,也就是记录数大于了平均每个partition的数据集个数.需要对这些个partition进行重新的取样,
      if (imbalancedPartitions.nonEmpty) {

这里根据需要重新进行取样的partition生成一个PartitionPruningRDD实例.这个实例中只计算需要进行重新取样的partition.传入参数中的imbalancedPartitions.contains用于过滤partition
        // Re-sample imbalanced partitions with the desired sampling probability.
        val imbalanced = new PartitionPruningRDD(rdd.map(_._1)

                imbalancedPartitions.contains)

 


        val seed = byteswap32(-rdd.id 1)

这里执行一个取样操作,并通过collect得到取样的结果集.采用的是BernoulliSampler取样.

通过迭代每条数据根据传入的seed的种子值生成一个随机值,如果这个值小于传入的份数,把这个结果进行保留.具体的取样算法可见BernoulliSampler中的实现.
        val reSampled = imbalanced.sample(withReplacement = falsefraction

               seed).collect()

迭代得到的样本数据,添加到候选人的集合中.
        val weight = (1.0 / fraction).toFloat
        candidates ++= reSampled.map(x => (xweight))
      }

 

这里根据候选人列表进行排序,返回一个数组,这个数组的长度是partitions的个数.

数组中每一个下标位置存储一个key值,这个key就是区分这个分区数据的key的最大值.

这个过程主要是:

通 过候选人列表中的每个weight的的总和除上partitions的个数,得到每个partition的一个平均的步长,开始对这个 candidates(排序后的)进行迭代并对每条数据的weight进行相加操作,当这个值加到计算出来的这个步长时,同时当前迭代的key比上一个存 储的key要大时,把这个key值存储起来.
      RangePartitioner.determineBounds(candidatespartitions)
    }
  }
}

 

RangePartitioner中处理的重新分区函数:

在执行shuffle操作时,针对sortByKey操作的key的重新分区操作的函数,

def getPartition(key: Any): Int = {
  val k = key.asInstanceOf[K]
  var partition = 0
  if (rangeBounds.length <= 128) {

如果进行shuffle操作的重新分区的分区个数小于128个时,直接从第0个分区开始迭代比较这个key应该在那个分区中,
    // If we have less than 128 partitions naive search
    while (partition < rangeBounds.length && ordering.gt(krangeBounds(partition))) 

{
      partition += 1
    }
  } else {

这种情况表示分区个数比较多,通过二分查找的方式进行partition的查找.
    // Determine which binary search method to use only once.
    partition = binarySearch(rangeBoundsk)
    // binarySearch either returns the match location or -[insertion point]-1
    if (partition < 0) {
      partition = -partition-1
    }
    if (partition > rangeBounds.length) {
      partition = rangeBounds.length
    }
  }

这里得到分区后,根据正排还是倒排返回对应的分区.
  if (ascending) {
    partition
  } else {
    rangeBounds.length - partition
  }
}

1
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics