整理这个博客的原因有两个,
1.在Spark的mailing list有人问道,Spark面试的话,一般会问些什么,有个人回复时提到他面试时一般会问问如何做join
2.今天看了个博客,刚好讲到spark实现大数据join操作的两个算法,map-side join和reduce-side join,正好接此机会整理下
Map-Side Join
- Map-side Join使用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。
- 在Hadoop MapReduce中, map-side join是借助DistributedCache实现的。DistributedCache可以帮我们将小文件分发到各个节点的Task工作目录下,这样,我们只需在程序中将文件加载到内存中(比如保存到Map数据结构中),然后借助Mapper的迭代机制,遍历另一个大表中的每一条记录,并查找是否在小表中,如果在则输出,否则跳过。
- 在Apache Spark中,同样存在类似于DistributedCache的功能,称为“广播变量”(Broadcast variable)。其实现原理与DistributedCache非常类似,但提供了更多的数据/文件广播算法,包括高效的P2P算法,该算法在节点数目非常多的场景下,效率远远好于DistributedCache这种基于HDFS共享存储的方式。使用MapReduce DistributedCache时,用户需要显示地使用File API编写程序从本地读取小表数据,而Spark则不用,它借助Scala语言强大的函数闭包特性,可以隐藏数据/文件广播过程,让用户编写程序更加简单。
适用于一个数据集小,另一个数据集大的情况
package spark.examples.join import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ object SparkMapsideJoin { def main(args: Array[String]) { val conf = new SparkConf() conf.setAppName("SparkMapsideJoin") conf.setMaster("local[3]") conf.set("spark.shuffle.manager", "sort"); val sc = new SparkContext(conf) //val table1 = sc.textFile(args(1)) //val table2 = sc.textFile(args(2)) val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13")) val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23")) // table1 is smaller, so broadcast it as a map<String, String> val pairs = table1.map { x => val pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) }.collectAsMap val broadCastMap = sc.broadcast(pairs) //save table1 as map, and broadcast it // table2 join table1 in map side val result = table2.map { x => val pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) }.mapPartitions({ iter => val m = broadCastMap.value for { (key, value) <- iter if (m.contains(key)) } yield (key, (value, m.get(key).getOrElse(""))) }) val output = "d:/wordcount-" + System.currentTimeMillis() ; result.saveAsTextFile(output) //save result to local file or HDFS } }
Reduce Side Join
- 当两个文件/目录中的数据非常大,难以将某一个存放到内存中时,Reduce-side Join是一种解决思路。该算法需要通过Map和Reduce两个阶段完成,在Map阶段,将key相同的记录划分给同一个Reduce Task(需标记每条记录的来源,便于在Reduce阶段合并),在Reduce阶段,对key相同的进行合并。
- Spark提供了Join算子,可以直接通过该算子实现reduce-side join,但要求RDD中的记录必须是pair,即RDD[KEY, VALUE],
适用于两个join表数据量都很大的情况
package spark.examples.join import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.SparkContext._ object SparkReducesideJoin { def main(args: Array[String]) { val conf = new SparkConf() conf.setAppName("SparkMapsideJoin") conf.setMaster("local[3]") conf.set("spark.shuffle.manager", "sort"); val sc = new SparkContext(conf) val table1 = sc.parallelize(List("k1,v11", "k2,v12", "k3,v13")) val table2 = sc.parallelize(List("k1,v21", "k4,v24", "k3,v23")) //table1 and table 2 are both very large val pairs1 = table1.map { x => val pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) } val pairs2 = table2.map { x => val pos = x.indexOf(',') (x.substring(0, pos), x.substring(pos + 1)) } val result = pairs1.join(pairs2) val output = "d:/wordcount-" + System.currentTimeMillis(); result.saveAsTextFile(output) //save result to local file or HDFS } }
参考:http://dongxicheng.org/framework-on-yarn/apache-spark-join-two-tables/
相关推荐
《Spark技术内幕深入解析Spark内核架构设计与实现原理》这本书深入探讨了Apache Spark这一分布式计算框架的核心架构和实现机制,旨在帮助读者全面理解Spark的工作原理,并能够有效地利用其进行大数据处理。...
Spark 提供了多种连接类型,包括内连接(INNER JOIN)、左连接(LEFT JOIN)、右连接(RIGHT JOIN)和全连接(FULL JOIN)。在描述中提到的“内连接操作”是指只保留两个表中键值匹配的行,结果集中不包含任何一方表...
### Spark替代Hive实现ETL作业的关键知识点 #### 使用Hive存在的问题 - **性能瓶颈**:Hive依赖于Hadoop MapReduce引擎进行计算逻辑的执行。尽管它具有较低的硬件需求和较大的吞吐量,但相较于现代DAG(有向无环图...
为了帮助大家深入理解和掌握Spark的核心功能,我们整理了一系列的Spark考试练习题,涵盖从基础概念到高级应用的全方位知识点。这份资料包含两部分:《spark练习题含答案01.docx》和《spark练习题含答案02.docx》,...
Spark_JAR包是Apache Spark项目的核心组件之一,它包含了运行Spark应用程序所必需的类库和依赖。Spark作为一个快速、通用且可扩展的数据处理框架,它为大数据处理提供了丰富的API,支持Scala、Java、Python和R等多种...
4. **Spark Streaming**:Spark Streaming提供了一个高级接口来处理实时数据流,它通过微批处理将实时数据转化为一系列离散时间间隔的数据块,从而实现高效的流处理。 5. **调优**:Spark调优涉及到对执行性能的...
在CDH6.3.2中集成Spark3.2.2,用户可以利用Spark的新功能来提升大数据处理的效率和灵活性。以下是关于Spark3.2.2的一些关键知识点: 1. **DataFrame/Dataset API**:Spark3.2.2继续强化了DataFrame和Dataset API,...
Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 ...Dstream join 操作和 RDD join 操作的区别 PIDController源码赏析及 back pressure 实现思路 Streaming Context重点摘要 checkpoint 必知必会
Spark 的设计理念是使用 Scala 语言进行实现的,这是一种面向对象、函数式编程语言,能够像操作本地集合对象一样轻松地操作分布式数据集。Spark 官网上介绍,它具有运行速度快、易用性好、通用性强和随处运行等特点...
6. **性能优化**:通过Tungsten项目,Spark 2.3.0实现了代码生成,减少运行时反射和对象创建,从而提高执行速度。 三、源码学习价值 下载Spark 2.3.0源码可以深入理解其内部工作原理,学习如何实现分布式计算、内存...
在Spark中,可以使用`reduceByKey`或`distinct`操作来实现。首先,将两个文件的内容合并为一个DataFrame或RDD,然后通过`reduceByKey(_ + _)`对键值对进行合并,最后用`distinct()`去除重复项。 2. 求平均值:这个...
Hive on Spark EXPLAIN statement : 讲述了 Common Join / Map join / Bucket Map Join / Sorted Merge Bucket Map Join / skew join 在explain 中的 树结构 。In Hive, command EXPLAIN can be used to show the ...
- 特征工程在Spark中的实现,包括缩放、编码和转换 - 模型选择与评估,交叉验证和网格搜索 - 机器学习管道的构建和优化 5. **第6章**:Spark GraphX图计算 - 图数据结构和图算法基础 - GraphX的设计理念和API...
该模块构建在基础Spark API之上,旨在实现可扩展、高吞吐量以及容错性强大的流处理功能。用户可以从多种数据源(比如Kafka、Flume、Kinesis或者TCP套接字)获取数据,并通过高级函数接口(如`map`、`reduce`、`join`...
Spark 2.0是Apache Spark的一个重要版本,它在数据处理效率、易用性和功能上都有显著提升。在这个“spark2.0编译版-适用于hive2.3的hive on spark”压缩包中,我们主要关注的是如何在Spark 2.0上运行Hive查询,同时...
Spark 2.1.0与Hadoop 2.7的集成,确保了Spark作业可以在Hadoop集群上无缝运行,同时利用Hadoop的数据存储功能。 1. YARN支持:Spark可以作为一个YARN应用提交到Hadoop集群,充分利用集群资源进行任务调度和执行。 ...
通过以上配置和调优步骤,可以显著提升Hive on Spark的性能,实现更快的数据处理速度。在实践中还需要结合具体应用场景,不断调整优化策略,以达到最佳效果。此外,定期监控集群资源使用情况、调整配置参数,也是...
在这个版本中,Spark集成了多种功能和优化,使得数据科学家和开发人员能够更方便地处理大规模数据。 首先,Spark的核心组件包括Spark Core、Spark SQL、Spark Streaming、MLlib(机器学习库)和GraphX(图计算框架...