测试数据 more user.txt (用户id,用户名) 1 用户1 2 用户2 3 用户3 more post.txt (用户id,帖子id,标题) 1 1 贴子1 1 2 贴子2 2 3 帖子3 4 4 贴子4 5 5 贴子5 5 6 贴子6 5 7 贴子7
查询结果 内连接 1 用户1 1 1 贴子1 1 用户1 1 2 贴子2 2 用户2 2 3 帖子3 左外连接 1 用户1 1 1 贴子1 1 用户1 1 2 贴子2 2 用户2 2 3 帖子3 3 用户3 右外连接 1 用户1 1 1 贴子1 1 用户1 1 2 贴子2 2 用户2 2 3 帖子3 4 4 贴子4 5 5 贴子5 5 6 贴子6 5 7 贴子7 全外连接 1 用户1 1 1 贴子1 1 用户1 1 2 贴子2 2 用户2 2 3 帖子3 3 用户3 4 4 贴子4 5 5 贴子5 5 6 贴子6 5 7 贴子7 反连接 3 用户3 4 4 贴子4 5 5 贴子5 5 6 贴子6 5 7 贴子7
代码如下:
package mapreduce.pattern.join; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.List; import multiinput.post.PostJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * mapreduce 实现内连接,左连接,右连接,全连接,反连接 * user.txt 用户表 * post.txt 帖子表 * 关联字段 userId * @author wxj * */ public class UserAndPostJoinJob { static class UserAndPostWritable implements Writable { /** * 类型 U表示用户,P表示帖子 */ private String type; private String data; public UserAndPostWritable() { } public UserAndPostWritable(String type, String data) { super(); this.type = type; this.data = data; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getData() { return data; } public void setData(String data) { this.data = data; } @Override public void readFields(DataInput input) throws IOException { type = input.readUTF(); data = input.readUTF(); } @Override public void write(DataOutput output) throws IOException { output.writeUTF(type); output.writeUTF(data); } } static class UserMapper extends Mapper<LongWritable, Text, Text, UserAndPostWritable> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] arr = value.toString().split("\t"); Text userId = new Text(arr[0]); context.write(userId, new UserAndPostWritable("U",value.toString())); } } static class PostMapper extends Mapper<LongWritable, Text, Text, UserAndPostWritable> { @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] arr = value.toString().split("\t"); Text userId = new Text(arr[0]); context.write(userId, new UserAndPostWritable("P",value.toString())); System.out.println(userId); } } static class PostReducer extends Reducer<Text, UserAndPostWritable, Text, Text> { private List<Text> users = new ArrayList<Text>(); private List<Text> posts = new ArrayList<Text>(); private String joinType; @Override protected void setup(Context context) throws IOException,InterruptedException { super.setup(context); joinType = context.getConfiguration().get("joinType"); //System.out.println("joinType: " + joinType); } protected void reduce(Text key, Iterable<UserAndPostWritable> iterable,Context context)throws IOException, InterruptedException { users.clear(); posts.clear(); for(UserAndPostWritable data : iterable) { //System.out.println(data.getType() + "," + data.getData()); if(data.getType().equals("U")) { users.add(new Text(data.getData())); } else { posts.add(new Text(data.getData())); } } if(joinType.equals("innerJoin"))//内连接 { if(users.size() > 0 && posts.size() > 0) { for(Text user : users) { for(Text post : posts) { context.write(new Text(user),new Text(post)); } } } } else if(joinType.equals("leftOuter"))//左外连接 { for(Text user : users) { if(posts.size() > 0) { for(Text post : posts) { context.write(new Text(user),new Text(post)); } } else { context.write(new Text(user),createEmptyPost()); } } } else if(joinType.equals("rightOuter"))//右外连接 { for(Text post : posts) { if(users.size() > 0) { for(Text user : users) { context.write(new Text(user),new Text(post)); } } else { context.write(createEmptyUser(), post); } } } else if(joinType.equals("allOuter"))//全外连接 { if(users.size() > 0) { for(Text user : users) { if(posts.size() > 0) { for(Text post : posts) { context.write(new Text(user),new Text(post)); } } else{ context.write(new Text(user),createEmptyUser()); } } }else { for(Text post : posts) { if(users.size() > 0) { for(Text user : users) { context.write(new Text(user),new Text(post)); } } else{ context.write(createEmptyUser(), post); } } } } else if(joinType.equals("anti"))//反连接 { if(users.size() == 0 ^ posts.size() == 0) { for(Text user : users) { context.write(new Text(user),createEmptyPost()); } for(Text post : posts) { context.write(createEmptyUser(),new Text(post)); } } } } private Text createEmptyUser() { return new Text(" \t "); } private Text createEmptyPost() { return new Text(" \t \t "); } } public static void main(String[] args) { Configuration configuration = new Configuration(); try { FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),configuration); Job job = new Job(configuration); job.setJarByClass(PostJob.class); //设置连接类型 //innerJoin,leftOuter,rightOuter,allOuter,anti job.getConfiguration().set("joinType", "anti"); //设置输出到part-r-00000时的分隔符 job.getConfiguration().set("mapred.textoutputformat.separator", "\t"); MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/input/userandpost/user.txt"),TextInputFormat.class,UserMapper.class); MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/input/userandpost/post.txt"), TextInputFormat.class, PostMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(UserAndPostWritable.class); job.setReducerClass(PostReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); Path outPath = new Path("hdfs://master:9000/output/userandpost"); if(fs.exists(outPath)) { fs.delete(outPath,true); } TextOutputFormat.setOutputPath(job, outPath); job.setOutputFormatClass(TextOutputFormat.class); job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } }
相关推荐
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
3) 内连接(Inner Join)和左连接(Left Join)可以通过一次MapReduce作业实现,Map阶段将JOIN键和对应数据发送到同一Reducer,Reduce阶段根据JOIN条件进行匹配。 在提供的"mapreduce-sql"压缩包文件中,很可能包含...
- **全排序**:MapReduce可以通过中间键值对的排序来实现全数据集的排序,先在Map阶段生成排序的中间结果,然后在Reduce阶段进行归并排序。 - **混排**:在某些情况下,需要在所有数据之间进行混合排序,这可能...
在处理大量数据时,传统的全连接方法可能导致不必要的网络传输和计算资源浪费。因此,通过模板或其他策略减少边数据的连接可以提高性能和效率,尤其是在处理关系型数据或图数据时。 标签中的关键词提供了更多线索:...
无主分布式母线保护方式中,每个子机通过环网通信网络连接并完成独立的保护功能,采用全主式模式,即每个子机均承担主机职责,完成所有保护计算任务。这种方式虽然可靠性较高,但每个子机的负担较重,拓展能力较弱。...
全外连接返回的是两张表中所有的记录,包括左表和右表中的所有记录,即使没有匹配项,也会使用NULL值填充。在这个例子中,即使某些学生的课程ID没有对应的课程名称,或者某些课程没有对应的学生信息,也会显示这些...
抽象的这是主服务器的实现,能够作为MapReduce工作者利用Web浏览器的免费计算功能。 通信是通过完成的, 允许主服务器和工作服务器之间进行一致的全双工通信。 工人的计算是通过完成的,该允许进行后台处理,而主UI...
- 左外连接:左表的过滤条件会被下推 - 右外连接:右表的过滤条件会被下推 - 全外连接:没有过滤条件会被下推 非确定性函数(如 `rand()`)不会被下推,需要通过注解 `#UDFType(deterministic=false)` 标记。 3...
1. **Hive Join操作**:Hive支持多种类型的Join,包括内连接(Inner Join)、左连接(Left Outer Join)、右连接(Right Outer Join)、全连接(Full Outer Join)以及自连接(Self Join)。在处理大数据时,选择...
- **JOIN操作**:Hive支持内连接、外连接和交叉连接,可以对多个表进行联合查询。 - **分组与聚合**:GROUP BY和HAVING用于数据分组和条件过滤,配合COUNT、SUM、AVG等聚合函数进行统计分析。 - **窗口函数**:...
MapReduce在HDFS上运行,处理这些数据块,实现了大数据处理的高效性和可靠性。 总结来说,软件体系结构是软件开发的基础,它定义了软件的构造和组织方式。理解构件和连接件、软件生命周期模型以及重用技术对于构建...
为了将这两个系统集成,以便在MapReduce任务中使用HBase,我们需要特定的JAR包来建立连接和通信。以下是对Hadoop与HBase集成所需知识的详细说明: 1. **Hadoop与HBase的关系**:Hadoop是Apache软件基金会开发的...
受限波尔兹曼机(Restricted Boltzmann Machine, RBM)是一种人工神经网络模型,具有层内单元无连接、层间单元全连接的特性,可以视为波尔兹曼机(Boltzmann Machine, BM)的一种特殊形式。RBM模型属于双向马尔科夫...
Hive构建于Hadoop之上,利用MapReduce进行计算,并将数据存储在HDFS(Hadoop Distributed File System)中,适合处理和管理大规模的数据。 二、DBVisualizer简介 DBVisualizer是一款跨平台的数据库管理和分析工具,...
`HBcon`类是HBase连接的实现,它使用`HBaseConfiguration.create()`创建配置对象,然后设置Zookeeper的相关参数,如`hbase.zookeeper.quorum`和`hbase.zookeeper.property.clientPort`。`HTablePool`用于管理HTable...
在Windows环境下向Hadoop集群提交MapReduce任务是大数据处理中常见的操作,这涉及到对Hadoop生态系统的理解,尤其是MapReduce框架和YARN资源调度器。本文将详细解析如何在Windows系统上配置和提交MapReduce作业。 ...
在HBase中,MapReduce常用于批量导入和导出数据,以及复杂的全表扫描操作。使用`TableInputFormat`和`TableOutputFormat`,我们可以将HBase表作为输入和输出源。在Mapper和Reducer中,可以直接访问HBase的行键、列族...
本书从Hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍Hadoop这一高性能处理海量数据集的理想工具。全书共16章,3个附录,涉及的主题包括:Haddoop简介;MapReduce简介;Hadoop分布式文件系统;Hadoop...
15、**MapReduce的实现机制**涉及数据分片、任务调度、容错处理等,确保高效、可靠的分布式计算。 16、**WordCount实例**展示了MapReduce如何统计文本中的单词数量,是MapReduce入门的经典例子。 17、**文档倒排...
左半连接是一种特殊的JOIN类型,它返回左表中的所有行,以及右表中与之匹配的行。在Hive中,通过使用`LEFT SEMI JOIN`而非`LEFT OUTER JOIN`,可以在不包含NULL值的情况下快速找到匹配项,从而减少数据传输和处理的...