`

mapreduce 实现内连接,左连接,右连接,全连接,反连接

阅读更多
测试数据
more user.txt (用户id,用户名)
1	用户1
2	用户2
3	用户3

more post.txt (用户id,帖子id,标题)
1	1	贴子1
1	2	贴子2
2	3	帖子3
4	4	贴子4
5	5	贴子5
5	6	贴子6
5	7	贴子7

 

 

查询结果

内连接
1	用户1	1	1	贴子1
1	用户1	1	2	贴子2
2	用户2	2	3	帖子3

左外连接
1	用户1	1	1	贴子1
1	用户1	1	2	贴子2
2	用户2	2	3	帖子3
3	用户3	 	 	 

右外连接
1	用户1	1	1	贴子1
1	用户1	1	2	贴子2
2	用户2	2	3	帖子3
 	 	4	4	贴子4
 	 	5	5	贴子5
 	 	5	6	贴子6
 	 	5	7	贴子7


全外连接
1	用户1	1	1	贴子1
1	用户1	1	2	贴子2
2	用户2	2	3	帖子3
3	用户3	 	 
 	 	4	4	贴子4
 	 	5	5	贴子5
 	 	5	6	贴子6
 	 	5	7	贴子7

反连接
3	用户3	 	 	 
 	 	4	4	贴子4
 	 	5	5	贴子5
 	 	5	6	贴子6
 	 	5	7	贴子7

 

代码如下:

package mapreduce.pattern.join;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import multiinput.post.PostJob;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * mapreduce 实现内连接,左连接,右连接,全连接,反连接
 * user.txt 用户表
 * post.txt 帖子表
 * 关联字段 userId
 * @author wxj
 *
 */
public class UserAndPostJoinJob
{

	static class UserAndPostWritable implements Writable
	{
		
		/**
		 * 类型 U表示用户,P表示帖子
		 */
		private String type;
		private String data;
		
		public UserAndPostWritable()
		{
			
		}
		
		public UserAndPostWritable(String type, String data)
		{
			super();
			this.type = type;
			this.data = data;
		}

		public String getType()
		{
			return type;
		}

		public void setType(String type)
		{
			this.type = type;
		}

		public String getData()
		{
			return data;
		}

		public void setData(String data)
		{
			this.data = data;
		}

		@Override
		public void readFields(DataInput input) throws IOException
		{
			type = input.readUTF();
			data = input.readUTF();
		}

		@Override
		public void write(DataOutput output) throws IOException
		{
			output.writeUTF(type);
			output.writeUTF(data);
		}
		
	}
	
	static class UserMapper extends Mapper<LongWritable, Text, Text, UserAndPostWritable>
	{
		@Override
		protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException
		{
			String[] arr = value.toString().split("\t");
			Text userId = new Text(arr[0]);
			context.write(userId, new UserAndPostWritable("U",value.toString()));
		}

	}
	
	static class PostMapper extends Mapper<LongWritable, Text, Text, UserAndPostWritable>
	{
		@Override
		protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException
		{
			String[] arr = value.toString().split("\t");
			Text userId = new Text(arr[0]);
			context.write(userId, new UserAndPostWritable("P",value.toString()));
			System.out.println(userId);
		}

	}
	
	static class PostReducer extends Reducer<Text, UserAndPostWritable, Text, Text>
	 {

		private List<Text> users = new ArrayList<Text>();
		private List<Text> posts = new ArrayList<Text>();
		
		private String joinType;

		
	 	@Override
		protected void setup(Context context) throws IOException,InterruptedException
		{
			super.setup(context);
			joinType = context.getConfiguration().get("joinType");
			//System.out.println("joinType: " + joinType);
		}

		protected void reduce(Text key, Iterable<UserAndPostWritable> iterable,Context context)throws IOException, InterruptedException
	 	{
			users.clear();
			posts.clear();
	 		for(UserAndPostWritable data : iterable)
	 		{
	 			//System.out.println(data.getType() + "," + data.getData());
	 			if(data.getType().equals("U"))
	 			{
	 				users.add(new Text(data.getData()));
	 			}
	 			else {
					posts.add(new Text(data.getData()));
				}
	 		}
	 		
	 		if(joinType.equals("innerJoin"))//内连接
	 		{
	 			if(users.size() > 0 && posts.size() > 0)
	 			{
	 				for(Text user : users)
		 			{
		 				for(Text post : posts)
		 				{
		 					context.write(new Text(user),new Text(post));
		 				}
		 			}
	 			}
	 		}
	 		else if(joinType.equals("leftOuter"))//左外连接
	 		{
	 			for(Text user : users)
	 			{
	 				if(posts.size() > 0)
	 				{
	 					for(Text post : posts)
		 				{
		 					context.write(new Text(user),new Text(post));
		 				}
	 				}
	 				else {
	 					context.write(new Text(user),createEmptyPost());
					}
	 			}
	 		}
	 		else if(joinType.equals("rightOuter"))//右外连接
	 		{
	 			for(Text post : posts)
 				{
	 				if(users.size() > 0)
	 				{
	 					for(Text user : users)
			 			{
		 					context.write(new Text(user),new Text(post));
			 			}
	 				}
	 				else {
						context.write(createEmptyUser(), post);
					}
 				}
	 		}
	 		else if(joinType.equals("allOuter"))//全外连接
	 		{
	 			if(users.size() > 0)
	 			{
	 				for(Text user : users)
	 				{
		 				if(posts.size() > 0)
		 				{
		 					for(Text post : posts)
			 				{
		 						context.write(new Text(user),new Text(post));
			 				}
		 				}
		 				else{
		 					context.write(new Text(user),createEmptyUser());
		 				}
	 				}
	 			}else {
	 				for(Text post : posts)
	 				{
		 				if(users.size() > 0)
		 				{
		 					for(Text user : users)
			 				{
		 						context.write(new Text(user),new Text(post));
			 				}
		 				}
		 				else{
		 					context.write(createEmptyUser(), post);
		 				}
	 				}
				}
	 		}
	 		else if(joinType.equals("anti"))//反连接
	 		{
	 			if(users.size() == 0 ^ posts.size() == 0)
	 			{
	 				for(Text user : users)
	 				{
 						context.write(new Text(user),createEmptyPost());
	 				}
	 				for(Text post : posts)
	 				{
 						context.write(createEmptyUser(),new Text(post));
	 				}
	 			}
	 		}
	 	}
		
		private Text createEmptyUser()
		{
			return new Text(" \t ");
		}
		
		private Text createEmptyPost()
		{
			return new Text(" \t \t ");
		}
	 	
	 }
	 
	 public static void main(String[] args)
	 {
			Configuration configuration = new Configuration();
			try
			{
				FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),configuration);
				Job job = new Job(configuration);
				job.setJarByClass(PostJob.class);
				//设置连接类型
				//innerJoin,leftOuter,rightOuter,allOuter,anti
				job.getConfiguration().set("joinType", "anti");
				//设置输出到part-r-00000时的分隔符
				job.getConfiguration().set("mapred.textoutputformat.separator", "\t");
				
				MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/input/userandpost/user.txt"),TextInputFormat.class,UserMapper.class);
				MultipleInputs.addInputPath(job, new Path("hdfs://master:9000/input/userandpost/post.txt"), TextInputFormat.class, PostMapper.class);
				
				job.setMapOutputKeyClass(Text.class);
				job.setMapOutputValueClass(UserAndPostWritable.class);
				job.setReducerClass(PostReducer.class);
			    job.setOutputKeyClass(Text.class);
			    job.setOutputValueClass(Text.class);
				
				Path outPath = new Path("hdfs://master:9000/output/userandpost");
				if(fs.exists(outPath))
				{
					fs.delete(outPath,true);
				}
				TextOutputFormat.setOutputPath(job, outPath);
				job.setOutputFormatClass(TextOutputFormat.class);
				
				job.waitForCompletion(true);
				
			} catch (Exception e)
			{
				e.printStackTrace();
			}
	 }
	
}

  

 

 

分享到:
评论

相关推荐

    MapReduce实现join连接

    简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接

    大数据课程设计-Hadoop-MapReduce实现sql的统计、groupby和join-全部源码

    3) 内连接(Inner Join)和左连接(Left Join)可以通过一次MapReduce作业实现,Map阶段将JOIN键和对应数据发送到同一Reducer,Reduce阶段根据JOIN条件进行匹配。 在提供的"mapreduce-sql"压缩包文件中,很可能包含...

    MapReduce1.doc

    - **全排序**:MapReduce可以通过中间键值对的排序来实现全数据集的排序,先在Map阶段生成排序的中间结果,然后在Reduce阶段进行归并排序。 - **混排**:在某些情况下,需要在所有数据之间进行混合排序,这可能...

    HadoopMapReduce:数据集链接的Hadoop MapReduce实践问题

    在处理大量数据时,传统的全连接方法可能导致不必要的网络传输和计算资源浪费。因此,通过模板或其他策略减少边数据的连接可以提高性能和效率,尤其是在处理关系型数据或图数据时。 标签中的关键词提供了更多线索:...

    基于MapReduce架构的就地化分布式母线保护研究.pdf

    无主分布式母线保护方式中,每个子机通过环网通信网络连接并完成独立的保护功能,采用全主式模式,即每个子机均承担主机职责,完成所有保护计算任务。这种方式虽然可靠性较高,但每个子机的负担较重,拓展能力较弱。...

    hive操作实战

    全外连接返回的是两张表中所有的记录,包括左表和右表中的所有记录,即使没有匹配项,也会使用NULL值填充。在这个例子中,即使某些学生的课程ID没有对应的课程名称,或者某些课程没有对应的学生信息,也会显示这些...

    go-web-mapreduce:使用Web浏览器作为工作程序的MapReduce服务器,用Go编写

    抽象的这是主服务器的实现,能够作为MapReduce工作者利用Web浏览器的免费计算功能。 通信是通过完成的, 允许主服务器和工作服务器之间进行一致的全双工通信。 工人的计算是通过完成的,该允许进行后台处理,而主UI...

    hive 优化总结

    - 左外连接:左表的过滤条件会被下推 - 右外连接:右表的过滤条件会被下推 - 全外连接:没有过滤条件会被下推 非确定性函数(如 `rand()`)不会被下推,需要通过注解 `#UDFType(deterministic=false)` 标记。 3...

    Hive-Summit-2011-join.zip_hive

    1. **Hive Join操作**:Hive支持多种类型的Join,包括内连接(Inner Join)、左连接(Left Outer Join)、右连接(Right Outer Join)、全连接(Full Outer Join)以及自连接(Self Join)。在处理大数据时,选择...

    Hive-工具篇_hive_

    - **JOIN操作**:Hive支持内连接、外连接和交叉连接,可以对多个表进行联合查询。 - **分组与聚合**:GROUP BY和HAVING用于数据分组和条件过滤,配合COUNT、SUM、AVG等聚合函数进行统计分析。 - **窗口函数**:...

    电子科技大学2018年-林迪-软件体系结构-复习整理(补充).docx

    MapReduce在HDFS上运行,处理这些数据块,实现了大数据处理的高效性和可靠性。 总结来说,软件体系结构是软件开发的基础,它定义了软件的构造和组织方式。理解构件和连接件、软件生命周期模型以及重用技术对于构建...

    hadoop和hbase集成所需jar包

    为了将这两个系统集成,以便在MapReduce任务中使用HBase,我们需要特定的JAR包来建立连接和通信。以下是对Hadoop与HBase集成所需知识的详细说明: 1. **Hadoop与HBase的关系**:Hadoop是Apache软件基金会开发的...

    基于Hadoop-GPU的RBM云计算实现.pdf

    受限波尔兹曼机(Restricted Boltzmann Machine, RBM)是一种人工神经网络模型,具有层内单元无连接、层间单元全连接的特性,可以视为波尔兹曼机(Boltzmann Machine, BM)的一种特殊形式。RBM模型属于双向马尔科夫...

    dbvis中hive驱动

    Hive构建于Hadoop之上,利用MapReduce进行计算,并将数据存储在HDFS(Hadoop Distributed File System)中,适合处理和管理大规模的数据。 二、DBVisualizer简介 DBVisualizer是一款跨平台的数据库管理和分析工具,...

    hadoop 分布式云计算 课程设计报告

    `HBcon`类是HBase连接的实现,它使用`HBaseConfiguration.create()`创建配置对象,然后设置Zookeeper的相关参数,如`hbase.zookeeper.quorum`和`hbase.zookeeper.property.clientPort`。`HTablePool`用于管理HTable...

    Windows上向集群提交任务.docx

    在Windows环境下向Hadoop集群提交MapReduce任务是大数据处理中常见的操作,这涉及到对Hadoop生态系统的理解,尤其是MapReduce框架和YARN资源调度器。本文将详细解析如何在Windows系统上配置和提交MapReduce作业。 ...

    hbase的java client实例

    在HBase中,MapReduce常用于批量导入和导出数据,以及复杂的全表扫描操作。使用`TableInputFormat`和`TableOutputFormat`,我们可以将HBase表作为输入和输出源。在Mapper和Reducer中,可以直接访问HBase的行键、列族...

    Hadoop权威指南 第二版(中文版)

     本书从Hadoop的缘起开始,由浅入深,结合理论和实践,全方位地介绍Hadoop这一高性能处理海量数据集的理想工具。全书共16章,3个附录,涉及的主题包括:Haddoop简介;MapReduce简介;Hadoop分布式文件系统;Hadoop...

    (完整word版)电子科技大学2017年-林迪-软件体系结构-复习整理.doc

    15、**MapReduce的实现机制**涉及数据分片、任务调度、容错处理等,确保高效、可靠的分布式计算。 16、**WordCount实例**展示了MapReduce如何统计文本中的单词数量,是MapReduce入门的经典例子。 17、**文档倒排...

    hive优化(ppt)

    左半连接是一种特殊的JOIN类型,它返回左表中的所有行,以及右表中与之匹配的行。在Hive中,通过使用`LEFT SEMI JOIN`而非`LEFT OUTER JOIN`,可以在不包含NULL值的情况下快速找到匹配项,从而减少数据传输和处理的...

Global site tag (gtag.js) - Google Analytics