import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object JoinRDD {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("sparkjson").setMaster("local")
val sc = new SparkContext(conf)
val file1 = sc.textFile("C:\\Users\\Think\\Desktop\\json.txt")
val file2 = sc.textFile("C:\\Users\\Think\\Desktop\\json2.txt")
//(tom,1,2), (jack,1,2)
val words1: RDD[(String, Int, Int)] = file1.flatMap(_.split(" ")).map((_, 1, 2))
//(tom,3,4), (jack,3,4)
val words2 : RDD[(String,Int,Int)] = file2.flatMap(_.split(" ")).map((_, 3, 4))
val words1Map = words1.map(x=>(x._1,(x._2,x._3)))
val words2Map = words2.map(x=>(x._1,(x._2,x._3)))
val result: RDD[(String, ((Int, Int), Option[(Int, Int)]))] = words1Map.leftOuterJoin(words2Map)
val finalResult = result.map(x=>{
val key = x._1
val outerTuple = x._2
val outerTupleMeta1 = x._2._1._1
val outerTuplemeta2 = x._2._1._2
val outerTuple2_1 = x._2._2.get._1
val outerTuple2_2 = x._2._2.get._2
(key,outerTupleMeta1,outerTuplemeta2,outerTuple2_1,outerTuple2_2)
})
// println(words1.collect().toBuffer)
// println(words2.collect().toBuffer)
println(finalResult.collect().toBuffer)
sc.stop()
}
}
分享到:
相关推荐
10. **leftOuterJoin(other, nump)**: 在两个键值对 RDD 上进行左外连接操作,左侧键不存在于右侧时,右侧值为空。 11. **rightOuterJoin(other, nump)**: 在两个键值对 RDD 上进行右外连接操作,右侧键不存在于...
系统的核心在于使用`transform`操作符将实时数据流与黑名单进行智能关联,通过`leftOuterJoin`操作识别并排除黑名单中的项。过滤后的数据流通过`pprint`操作输出,以便于监控和验证过滤效果。 整个实现过程简洁高效...
例如,它提供了join,leftOuterJoin,rightOuterJoin,fullOuterJoin,groupByKey,reduceByKey等操作。 4. ContextualFunction:ContextualFunction是一些在特定环境下,如mapPartitions和mapPartitionsWithIndex...
关联函数用于连接两个RDD,例如`join()`, `cogroup()`, `leftOuterJoin()`, `rightOuterJoin()`, 和 `fullOuterJoin()`. `join()`将两个RDD中的匹配键的值连接在一起;`cogroup()`则为每个键提供两个RDD的所有值的...
- leftOuterJoin() - rightOuterJoin() - groupByKey() - reduceByKey() - combineByKey() - lookup() 在这些操作中,预先分区可以减少或避免数据shuffle。例如,如果两个RDD使用相同的分区器并且都存储在同一节点...
Transformation操作用于创建新的RDD,并且这些操作是懒加载执行的,也就是说它们并不会立即被执行,而是等到Action操作触发时才会真正运行。以下是一些常用的Transformation操作: 1. **`map(func)`** - 描述:对...
- Key-value转换算子:如mapValues、flatMapValues、reduceByKey、groupByKey、sortByKey、keys、values、join、leftOuterJoin、rightOuterJoin。 - 共享变量:累加器accumulator和广播变量Broadcast的使用。 - ...
这使得API不仅表达力强而且优雅,例如`map`、`filter`、`groupBy`、`sort`、`union`、`join`、`leftOuterJoin`、`rightOuterJoin`、`reduce`、`count`、`fold`、`reduceByKey`等操作符。 2. **高效的数据处理**:...
- `.leftOuterJoin()`:执行左外连接。 - `.repartition()`:重新分区 RDD。 - **Action 操作**: - `.take()`:获取前 N 个元素。 - `.collect()`:收集所有元素到驱动程序。 - `.reduce()`:对所有元素应用...
- **转换操作**: 如.map(), .filter(), .flatMap(), .distinct(), .sample(), .leftOuterJoin(), .repartition()等,这些操作不会立即执行,而是构建执行计划。 - **动作方法**: 如.take(), .collect(), .reduce(), ...
左外部联接(LeftOuterJoin) 左外部联接保留左侧表的所有记录: ```csharp var joinedRecords = from parent in context.Parents join child in context.Children on parent.Id equals child.ParentId into ...
11. leftOuterJoin(otherDataSet, numPartitions): 进行左外连接,以左边 RDD 的键为主,如果右边 RDD 没有匹配的键,则对应值为 null。 12. rightOuterJoin(otherDataSet, numPartitions): 进行右外连接,以右边 ...
- **左外部联接(LeftOuterJoin)** - **投影的Let赋值(Projectedletassignment)** - **组合键(CompositeKey)** - **可为null/不可为null的键关系(Nullable/NonnullableKeyRelationship)** ##### OrderBy操作 - ...
- **Key-Value 型 Transformation 算子** 包括 `mapValues`, `combineByKey`, `reduceByKey`, `partitionBy`, `cogroup`, `join`, `leftOuterJoin`, 和 `rightOuterJoin`。 - **Action 算子** 如 `foreach`, `...
- **左外部联接(LeftOuterJoin)** - 示例:`var results = from p in Products join o in Orders on p.Id equals o.ProductId into gj from order in gj.DefaultIfEmpty() select new { p, order };` - **投影的Let...
数据库查询语句是数据库开发人员必须掌握的基础技能,涵盖了从基本的查询到复杂的数据操作。通过本文档提供的内容,我们可以详细了解SQL(Structured Query Language)的编写风格、查询语句的高级用法、以及如何处理...
**LINQ to SQL** 是 Microsoft .NET Framework 提供的一种用于在数据库和应用程序之间进行数据操作的技术。它利用 LINQ(Language Integrated Query)语法来简化数据访问代码,并提供了一种面向对象的方式来处理...
- **左外部联接(LeftOuterJoin)**:`var leftOuterJoins = from s in students join c in courses on s.Id equals c.StudentId into cg select new { Student = s, Courses = cg };` - **投影的Let赋值...