这个操作的作用根据相同的key的所有的value存储到一个集合中的一个玩意.
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}
在 做groupByKey的操作时,由于需要根据key对数据进行重新的分区操作,因此这个操作需要有一个partitioner的实例.默认是hash算 子.这个操作根据当前操作的RDD中是否有partitioner,同时这个partitioner与当前的传入的partitioner的实例是否相同 来判断是否需要执行shuffle操作.
如果是默认的hashPartitioner时,检查spark.default.parallelism配置是否有配置,如果有分区个数按这个配置来设置,否则使用当前进行此groupByKey操作的rdd的partitions来设置.
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
= self.withScope {
这里同样与reduceByKey的操作一样,通过调用combineByKeyWithClassTag的函数来进行处理,
不同的是,withClassTag的合并操作是一个CompactBuffer[V]类型.
这里生成aggregator实例需要的三个函数时,
createCombiner:如果key目前还有值时,根据当前传入的key-value中的value生成一个CompactBuffer的实例,并存储到key对应的位置,
mergeValue:传入一个key-value时,如果key对应的CompactBuffer已经存在,把这个value添加到这个buffer中.
mergeCombiners:这个主要在shuffle结束时,把key相同的多个buffer进行合并.
需要注意的是,在执行groupByKey的操作时,会把mapSideCombine设置为false,表示不执行map端的聚合.
为 什么groupByKey不做mapSideCombine的操作呢,因为在groupByKey的操作中,会先根据相同的key,把value存储到一 个buffer中,这个地方并不会设计到map端combine的操作会减少多少的网络传输的开效,如果做map combine操作时,反而增加了map端writer的内存使用.
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
在combineByKeyWithClassTag的操作函数中的处理:
mapSideCombine的传入参数为false.
这个地方,根据上面的三个函数,生成Aggregator,这里的K,V,C分别代表key的类型,value的类型,C在groupByKey的操作中是一个CompactBuffer[V]的类型
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
这里主要是看看当前的partitioner是否与当前执行这个操作的rdd的partitioner实例相同.相同就不在需要执行shuffle操作,否则就需要执行shuffle操作,生成新的ShuffledRDD实例.
if (self.partitioner == Some(partitioner)) {
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter,
context))
}, preservesPartitioning = true)
} else {
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
在Aggregator的操作中,如果mapSideCombine的参数为false时,通过Aggregator中的combineValuesByKey函数执行数据的合并操作.如果mapSideCombine的参数为true时,通过Aggregator中的combineCombinersByKey函数执行数据的合并操作(只执行第三个函数,因为map端已经把结果合并成了C的类型).
在Aggregator的合并操作中,通过ExternalAppendOnlyMap实例来进行数据的合并(insertAll).这个实例会最大可能的使用内存,如果内存实在不够用时,考虑对内存中的数据进行spill到磁盘的操作.
相关推荐
spark-sql_2.11-2.4.0-cdh6.1.1.jar
### Spark升级后遇到`java.lang.NoClassDefFoundError: org/apache/spark/Logging`问题解决方案 #### 一、问题背景及现象 在升级Spark至2.1版本之后,在使用streaming-kafka进行测试时,遇到了`java.lang....
spark-core_2.11-1.6.0.jar spark-core_2.11-1.6.0.jar
spark-core_2.11-2.0.0.jar比spark-core_2.11-1.5.2.jar少了org.apache.spark.Logging.class,故此把缺少的class放到spark-core_2.11-1.5.2.logging.jar里面
mongodb-spark官方连接器,运行spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:1.1.0可以自动下载,国内网络不容易下载成功,解压后保存到~/.ivy2目录下即可。
比maven仓库下载快很多
spark_2_7_7
spark-streaming_2.11-2.1.3-SNAPSHOT.jar
spark-hive_2.11-2.3.0 spark-hive-thriftserver_2.11-2.3.0.jar log4j-2.15.0.jar slf4j-api-1.7.7.jar slf4j-log4j12-1.7.25.jar curator-client-2.4.0.jar curator-framework-2.4.0.jar curator-recipes-2.4.0....
mongo-spark-connector_2.11-2.1.2.jar 使用于 java7!
标题中的"SalahEddine_HebaBaze_Spark_CLOUDHPC_spark_spark_python_"暗示了这个压缩包内容可能围绕一个由SalahEddine和HebaBaze共同探讨的Spark项目,重点关注在云计算环境(可能是CLOUDHPC)下的Python编程。...
星火应用商店
spark 读取excel 的jar 包
spark_2_5_8_online 内网聊天工具 可以是内网供作的人们方便的进行沟通
mongodb spark连接器,适用版本spark2.1.X ,Scala2.11.X, java 6 or later,mongodb 2.6 or later,请根据上面的版本选择,不然会报各种错误
spark-tags_2.11-2.1.3-SNAPSHOT.jar
spark-sql_2.11-2.1.3-SNAPSHOT_bak.jar
spark-streaming_2.12-2.4.0.jar包,可以使用
SPARK2_ON_YARN-2.4.0 jar包下载
博客https://blog.csdn.net/lsshlsw/article/details/82670508 spark_prometheus_metrics.json