`
weitao1026
  • 浏览: 1057010 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论
阅读更多

写了关于Hadoop的Map侧join
和Reduce的join,今天我们就来在看另外一种比较中立的Join。

SemiJoin,一般称为半链接,其原理是在Map侧过滤掉了一些不需要join的数据,从而大大减少了reduce的shffule时间,因为我们知道,如果仅仅使用Reduce侧连接,那么如果一份数据中,存在大量的无效数据,而这些数据,在join中,并不需要,但是因为没有做过预处理,所以这些数据,直到真正的执行reduce函数时,才被定义为无效数据,而这时候,前面已经执行过shuffle和merge和sort,所以这部分无效的数据,就浪费了大量的网络IO和磁盘IO,所以在整体来讲,这是一种降低性能的表现,如果存在的无效数据越多,那么这种趋势,就越明显。

之所以会出现半连接,这其实也是reduce侧连接的一个变种,只不过我们在Map侧,过滤掉了一些无效的数据,所以减少了reduce过程的shuffle时间,所以能获取一个性能的提升。

具体的原理也是利用DistributedCache将小表的的分发到各个节点上,在Map过程的setup函数里,读取缓存里面的文件,只将小表的链接键存储在hashset里,在map函数执行时,对每一条数据,进行判断,如果这条数据的链接键为空或者在hashset里面不存在,那么则认为这条数据,是无效的数据,所以这条数据,并不会被partition分区后写入磁盘,参与reduce阶段的shuffle和sort,所以在一定程序上,提升了join性能。需要注意的是如果
小表的key依然非常巨大,可能会导致我们的程序出现OOM的情况,那么这时候我们就需要考虑其他的链接方式了。

测试数据如下:
模拟小表数据:
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862


模拟大表数据:
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07
5,E,100,2013-09-09
6,H,200,2014-01-10

代码如下:

Java代码 复制代码 收藏代码
  1. package com.semijoin;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.DataInput;  
  5. import java.io.DataOutput;  
  6. import java.io.FileReader;  
  7. import java.io.IOException;  
  8. import java.net.URI;  
  9. import java.util.ArrayList;  
  10. import java.util.HashSet;  
  11. import java.util.List;  
  12.   
  13. import org.apache.hadoop.conf.Configuration;  
  14. import org.apache.hadoop.filecache.DistributedCache;  
  15. import org.apache.hadoop.fs.FileSystem;  
  16. import org.apache.hadoop.fs.Path;  
  17. import org.apache.hadoop.io.LongWritable;  
  18. import org.apache.hadoop.io.Text;  
  19. import org.apache.hadoop.io.WritableComparable;  
  20.   
  21. import org.apache.hadoop.mapred.JobConf;  
  22. import org.apache.hadoop.mapreduce.Job;  
  23. import org.apache.hadoop.mapreduce.Mapper;  
  24. import org.apache.hadoop.mapreduce.Reducer;  
  25. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  26. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  27. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  28. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  29. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  30.   
  31. /*** 
  32.  *  
  33.  * Hadoop1.2的版本 
  34.  *  
  35.  * hadoop的半链接 
  36.  *  
  37.  * SemiJoin实现 
  38.  *  
  39.  * @author qindongliang 
  40.  *  
  41.  *    大数据交流群:376932160 
  42.  *  搜索技术交流群:324714439 
  43.  *  
  44.  *  
  45.  *  
  46.  * **/  
  47. public class Semjoin {  
  48.       
  49.       
  50.       
  51.     /** 
  52.      *  
  53.      *  
  54.      * 自定义一个输出实体 
  55.      *  
  56.      * **/  
  57.     private static class CombineEntity implements WritableComparable<CombineEntity>{  
  58.   
  59.           
  60.         private Text joinKey;//连接key  
  61.         private Text flag;//文件来源标志  
  62.         private Text secondPart;//除了键外的其他部分的数据  
  63.           
  64.           
  65.         public CombineEntity() {  
  66.             // TODO Auto-generated constructor stub  
  67.             this.joinKey=new Text();  
  68.             this.flag=new Text();  
  69.             this.secondPart=new Text();  
  70.         }  
  71.           
  72.         public Text getJoinKey() {  
  73.             return joinKey;  
  74.         }  
  75.   
  76.         public void setJoinKey(Text joinKey) {  
  77.             this.joinKey = joinKey;  
  78.         }  
  79.   
  80.         public Text getFlag() {  
  81.             return flag;  
  82.         }  
  83.   
  84.         public void setFlag(Text flag) {  
  85.             this.flag = flag;  
  86.         }  
  87.   
  88.         public Text getSecondPart() {  
  89.             return secondPart;  
  90.         }  
  91.   
  92.         public void setSecondPart(Text secondPart) {  
  93.             this.secondPart = secondPart;  
  94.         }  
  95.   
  96.         @Override  
  97.         public void readFields(DataInput in) throws IOException {  
  98.             this.joinKey.readFields(in);  
  99.             this.flag.readFields(in);  
  100.             this.secondPart.readFields(in);  
  101.               
  102.         }  
  103.   
  104.         @Override  
  105.         public void write(DataOutput out) throws IOException {  
  106.             this.joinKey.write(out);  
  107.             this.flag.write(out);  
  108.             this.secondPart.write(out);  
  109.               
  110.         }  
  111.   
  112.         @Override  
  113.         public int compareTo(CombineEntity o) {  
  114.             // TODO Auto-generated method stub  
  115.             return this.joinKey.compareTo(o.joinKey);  
  116.         }  
  117.           
  118.           
  119.           
  120.     }  
  121.       
  122.       
  123.       
  124.       
  125.     private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{  
  126.           
  127.         private CombineEntity combine=new CombineEntity();  
  128.         private Text flag=new Text();  
  129.         private  Text joinKey=new Text();  
  130.         private Text secondPart=new Text();  
  131.         /** 
  132.          * 存储小表的key 
  133.          *  
  134.          *  
  135.          * */  
  136.         private HashSet<String> joinKeySet=new HashSet<String>();  
  137.           
  138.           
  139.         @Override  
  140.         protected void setup(Context context)throws IOException, InterruptedException {  
  141.            
  142.             //读取文件流  
  143.             BufferedReader br=null;  
  144.             String temp;  
  145.             // 获取DistributedCached里面 的共享文件  
  146.             Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());  
  147.               
  148.             for(Path p:path){  
  149.                   
  150.                 if(p.getName().endsWith("a.txt")){  
  151.                     br=new BufferedReader(new FileReader(p.toString()));  
  152.                     //List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8"));  
  153.                       
  154.                     while((temp=br.readLine())!=null){  
  155.                         String ss[]=temp.split(",");  
  156.                         //map.put(ss[0], ss[1]+"\t"+ss[2]);//放入hash表中  
  157.                         joinKeySet.add(ss[0]);//加入小表的key  
  158.                     }  
  159.                 }  
  160.             }  
  161.               
  162.               
  163.         }  
  164.           
  165.           
  166.           
  167.         @Override  
  168.         protected void map(LongWritable key, Text value,Context context)  
  169.                 throws IOException, InterruptedException {  
  170.               
  171.           
  172.                //获得文件输入路径  
  173.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();  
  174.           
  175.             if(pathName.endsWith("a.txt")){  
  176.                   
  177.                 String  valueItems[]=value.toString().split(",");  
  178.                   
  179.                   
  180.                 /** 
  181.                  * 在这里过滤必须要的连接字符 
  182.                  *  
  183.                  * */  
  184.                 if(joinKeySet.contains(valueItems[0])){  
  185.                     //设置标志位  
  186.                     flag.set("0");     
  187.                     //设置链接键  
  188.                     joinKey.set(valueItems[0]);            
  189.                     //设置第二部分  
  190.                     secondPart.set(valueItems[1]+"\t"+valueItems[2]);  
  191.                       
  192.                     //封装实体  
  193.                     combine.setFlag(flag);//标志位  
  194.                     combine.setJoinKey(joinKey);//链接键  
  195.                     combine.setSecondPart(secondPart);//其他部分  
  196.                       
  197.                      //写出  
  198.                     context.write(combine.getJoinKey(), combine);     
  199.                 }else{  
  200.                     System.out.println("a.txt里");  
  201.                     System.out.println("在小表中无此记录,执行过滤掉!");  
  202.                     for(String v:valueItems){  
  203.                         System.out.print(v+"   ");  
  204.                     }  
  205.                       
  206.                     return ;  
  207.                       
  208.                 }  
  209.                   
  210.                   
  211.                   
  212.             }else if(pathName.endsWith("b.txt")){  
  213.                 String  valueItems[]=value.toString().split(",");  
  214.                   
  215.                 /** 
  216.                  *  
  217.                  * 判断是否在集合中 
  218.                  *  
  219.                  * */  
  220.                 if(joinKeySet.contains(valueItems[0])){  
  221.                       
  222.   
  223.                     //设置标志位  
  224.                     flag.set("1");     
  225.                       
  226.                     //设置链接键  
  227.                     joinKey.set(valueItems[0]);  
  228.             
  229.                     //设置第二部分注意不同的文件的列数不一样  
  230.                     secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);  
  231.                       
  232.                     //封装实体  
  233.                     combine.setFlag(flag);//标志位  
  234.                     combine.setJoinKey(joinKey);//链接键  
  235.                     combine.setSecondPart(secondPart);//其他部分  
  236.                       
  237.                      //写出  
  238.                     context.write(combine.getJoinKey(), combine);  
  239.                       
  240.                       
  241.                 }else{                    
  242.                     //执行过滤 ......  
  243.                     System.out.println("b.txt里");  
  244.                     System.out.println("在小表中无此记录,执行过滤掉!");  
  245.                     for(String v:valueItems){  
  246.                         System.out.print(v+"   ");  
  247.                     }  
  248.                       
  249.                     return ;  
  250.                       
  251.                       
  252.                 }  
  253.                   
  254.               
  255.                   
  256.                   
  257.             }  
  258.               
  259.               
  260.               
  261.               
  262.                
  263.            
  264.               
  265.         }  
  266.           
  267.     }  
  268.       
  269.       
  270.     private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{  
  271.           
  272.           
  273.         //存储一个分组中左表信息  
  274.         private List<Text> leftTable=new ArrayList<Text>();  
  275.         //存储一个分组中右表信息  
  276.         private List<Text> rightTable=new ArrayList<Text>();  
  277.           
  278.         private Text secondPart=null;  
  279.           
  280.         private Text output=new Text();  
  281.           
  282.           
  283.            
  284.         //一个分组调用一次  
  285.         @Override  
  286.         protected void reduce(Text key, Iterable<CombineEntity> values,Context context)  
  287.                 throws IOException, InterruptedException {  
  288.              leftTable.clear();//清空分组数据  
  289.              rightTable.clear();//清空分组数据  
  290.                
  291.                
  292.              /** 
  293.               * 将不同文件的数据,分别放在不同的集合 
  294.               * 中,注意数据量过大时,会出现 
  295.               * OOM的异常 
  296.               *  
  297.               * **/  
  298.                
  299.              for(CombineEntity ce:values){  
  300.                    
  301.                  this.secondPart=new Text(ce.getSecondPart().toString());  
  302.                    
  303.                    
  304.                  //左表  
  305.                    
  306.                  if(ce.getFlag().toString().trim().equals("0")){  
  307.                      leftTable.add(secondPart);  
  308.                        
  309.                  }else if(ce.getFlag().toString().trim().equals("1")){  
  310.                        
  311.                      rightTable.add(secondPart);  
  312.                        
  313.                  }  
  314.                    
  315.                    
  316.                    
  317.                    
  318.              }  
  319.                
  320.              //=====================  
  321.              for(Text left:leftTable){  
  322.                    
  323.                  for(Text right:rightTable){  
  324.                        
  325.                      output.set(left+"\t"+right);//连接左右数据  
  326.                      context.write(key, output);//输出  
  327.                  }  
  328.                    
  329.              }  
  330.                
  331.                
  332.                
  333.               
  334.         }  
  335.           
  336.     }  
  337.       
  338.       
  339.       
  340.       
  341.       
  342.       
  343.       
  344.       
  345.     public static void main(String[] args)throws Exception {  
  346.           
  347.            
  348.       
  349.       
  350.          //Job job=new Job(conf,"myjoin");  
  351.          JobConf conf=new JobConf(Semjoin.class);   
  352.            conf.set("mapred.job.tracker","192.168.75.130:9001");  
  353.             conf.setJar("tt.jar");  
  354.             
  355.             
  356.             //小表共享  
  357.             String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt";  
  358.             //添加到共享cache里  
  359.         DistributedCache.addCacheFile(new URI(bpath), conf);  
  360.           
  361.           Job job=new Job(conf, "aaaaa");  
  362.          job.setJarByClass(Semjoin.class);  
  363.          System.out.println("模式:  "+conf.get("mapred.job.tracker"));;  
  364.            
  365.            
  366.          //设置Map和Reduce自定义类  
  367.          job.setMapperClass(JMapper.class);  
  368.          job.setReducerClass(JReduce.class);  
  369.            
  370.          //设置Map端输出  
  371.          job.setMapOutputKeyClass(Text.class);  
  372.          job.setMapOutputValueClass(CombineEntity.class);  
  373.            
  374.          //设置Reduce端的输出  
  375.          job.setOutputKeyClass(Text.class);  
  376.          job.setOutputValueClass(Text.class);  
  377.            
  378.       
  379.          job.setInputFormatClass(TextInputFormat.class);  
  380.          job.setOutputFormatClass(TextOutputFormat.class);  
  381.            
  382.        
  383.          FileSystem fs=FileSystem.get(conf);  
  384.            
  385.          Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew4");  
  386.            
  387.          if(fs.exists(op)){  
  388.              fs.delete(op, true);  
  389.              System.out.println("存在此输出路径,已删除!!!");  
  390.          }  
  391.            
  392.            
  393.       FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));  
  394.       FileOutputFormat.setOutputPath(job, op);  
  395.          
  396.       System.exit(job.waitForCompletion(true)?0:1);  
  397.         
  398.         
  399.            
  400.            
  401.            
  402.            
  403.            
  404.            
  405.            
  406.                   
  407.                    
  408.           
  409.           
  410.           
  411.     }  
  412.       
  413.       
  414.       
  415.   
  416. }  
package com.semijoin;

import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/***
 * 
 * Hadoop1.2的版本
 * 
 * hadoop的半链接
 * 
 * SemiJoin实现
 * 
 * @author qindongliang
 * 
 *    大数据交流群:376932160
 *  搜索技术交流群:324714439
 * 
 * 
 * 
 * **/
public class Semjoin {
	
	
	
	/**
	 * 
	 * 
	 * 自定义一个输出实体
	 * 
	 * **/
	private static class CombineEntity implements WritableComparable<CombineEntity>{

		
		private Text joinKey;//连接key
		private Text flag;//文件来源标志
		private Text secondPart;//除了键外的其他部分的数据
		
		
		public CombineEntity() {
			// TODO Auto-generated constructor stub
			this.joinKey=new Text();
			this.flag=new Text();
			this.secondPart=new Text();
		}
		
		public Text getJoinKey() {
			return joinKey;
		}

		public void setJoinKey(Text joinKey) {
			this.joinKey = joinKey;
		}

		public Text getFlag() {
			return flag;
		}

		public void setFlag(Text flag) {
			this.flag = flag;
		}

		public Text getSecondPart() {
			return secondPart;
		}

		public void setSecondPart(Text secondPart) {
			this.secondPart = secondPart;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.joinKey.readFields(in);
			this.flag.readFields(in);
			this.secondPart.readFields(in);
			
		}

		@Override
		public void write(DataOutput out) throws IOException {
			this.joinKey.write(out);
			this.flag.write(out);
			this.secondPart.write(out);
			
		}

		@Override
		public int compareTo(CombineEntity o) {
			// TODO Auto-generated method stub
			return this.joinKey.compareTo(o.joinKey);
		}
		
		
		
	}
	
	
	
	
	private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{
		
		private CombineEntity combine=new CombineEntity();
		private Text flag=new Text();
		private  Text joinKey=new Text();
		private Text secondPart=new Text();
		/**
		 * 存储小表的key
		 * 
		 * 
		 * */
		private HashSet<String> joinKeySet=new HashSet<String>();
		
		
		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
		 
			//读取文件流
			BufferedReader br=null;
			String temp;
			// 获取DistributedCached里面 的共享文件
			Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());
			
			for(Path p:path){
				
				if(p.getName().endsWith("a.txt")){
					br=new BufferedReader(new FileReader(p.toString()));
					//List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8"));
					
					while((temp=br.readLine())!=null){
						String ss[]=temp.split(",");
						//map.put(ss[0], ss[1]+"\t"+ss[2]);//放入hash表中
						joinKeySet.add(ss[0]);//加入小表的key
					}
				}
			}
			
			
		}
		
		
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
		
			   //获得文件输入路径
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
		
            if(pathName.endsWith("a.txt")){
            	
            	String  valueItems[]=value.toString().split(",");
            	
            	
            	/**
            	 * 在这里过滤必须要的连接字符
            	 * 
            	 * */
            	if(joinKeySet.contains(valueItems[0])){
            		//设置标志位
                	flag.set("0");   
                	//设置链接键
                	joinKey.set(valueItems[0]);          
                	//设置第二部分
                	secondPart.set(valueItems[1]+"\t"+valueItems[2]);
                	
                	//封装实体
                	combine.setFlag(flag);//标志位
                	combine.setJoinKey(joinKey);//链接键
                	combine.setSecondPart(secondPart);//其他部分
                	
                	 //写出
                	context.write(combine.getJoinKey(), combine);	
            	}else{
            		System.out.println("a.txt里");
            		System.out.println("在小表中无此记录,执行过滤掉!");
            		for(String v:valueItems){
            			System.out.print(v+"   ");
            		}
            		
            		return ;
            		
            	}
            	
            	
            	
            }else if(pathName.endsWith("b.txt")){
            	String  valueItems[]=value.toString().split(",");
            	
            	/**
            	 * 
            	 * 判断是否在集合中
            	 * 
            	 * */
            	if(joinKeySet.contains(valueItems[0])){
            		

                	//设置标志位
                	flag.set("1");   
                	
                	//设置链接键
                	joinKey.set(valueItems[0]);
          
                	//设置第二部分注意不同的文件的列数不一样
                	secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);
                	
                	//封装实体
                	combine.setFlag(flag);//标志位
                	combine.setJoinKey(joinKey);//链接键
                	combine.setSecondPart(secondPart);//其他部分
                	
                	 //写出
                	context.write(combine.getJoinKey(), combine);
            		
            		
            	}else{            		
            		//执行过滤 ......
            		System.out.println("b.txt里");
            		System.out.println("在小表中无此记录,执行过滤掉!");
            		for(String v:valueItems){
            			System.out.print(v+"   ");
            		}
            		
            		return ;
            		
            		
            	}
            	
            
            	
            	
            }
            
            
			
			
			 
		 
			
		}
		
	}
	
	
	private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{
		
		
		//存储一个分组中左表信息
		private List<Text> leftTable=new ArrayList<Text>();
		//存储一个分组中右表信息
		private List<Text> rightTable=new ArrayList<Text>();
		
		private Text secondPart=null;
		
		private Text output=new Text();
		
		
		 
		//一个分组调用一次
		@Override
		protected void reduce(Text key, Iterable<CombineEntity> values,Context context)
				throws IOException, InterruptedException {
			 leftTable.clear();//清空分组数据
			 rightTable.clear();//清空分组数据
			 
			 
			 /**
			  * 将不同文件的数据,分别放在不同的集合
			  * 中,注意数据量过大时,会出现
			  * OOM的异常
			  * 
			  * **/
			 
			 for(CombineEntity ce:values){
				 
				 this.secondPart=new Text(ce.getSecondPart().toString());
				 
				 
				 //左表
				 
				 if(ce.getFlag().toString().trim().equals("0")){
					 leftTable.add(secondPart);
					 
				 }else if(ce.getFlag().toString().trim().equals("1")){
					 
					 rightTable.add(secondPart);
					 
				 }
				 
				 
				 
				 
			 }
			 
			 //=====================
			 for(Text left:leftTable){
				 
				 for(Text right:rightTable){
					 
					 output.set(left+"\t"+right);//连接左右数据
					 context.write(key, output);//输出
				 }
				 
			 }
			 
			 
			 
			
		}
		
	}
	
	
	
	
	
	
	
	
	public static void main(String[] args)throws Exception {
		
		 
	
	
		 //Job job=new Job(conf,"myjoin");
		 JobConf conf=new JobConf(Semjoin.class); 
		   conf.set("mapred.job.tracker","192.168.75.130:9001");
		    conf.setJar("tt.jar");
		  
		  
			//小表共享
			String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt";
			//添加到共享cache里
		DistributedCache.addCacheFile(new URI(bpath), conf);
		
		  Job job=new Job(conf, "aaaaa");
		 job.setJarByClass(Semjoin.class);
		 System.out.println("模式:  "+conf.get("mapred.job.tracker"));;
		 
		 
		 //设置Map和Reduce自定义类
		 job.setMapperClass(JMapper.class);
		 job.setReducerClass(JReduce.class);
		 
		 //设置Map端输出
		 job.setMapOutputKeyClass(Text.class);
		 job.setMapOutputValueClass(CombineEntity.class);
		 
		 //设置Reduce端的输出
		 job.setOutputKeyClass(Text.class);
		 job.setOutputValueClass(Text.class);
		 
	
		 job.setInputFormatClass(TextInputFormat.class);
		 job.setOutputFormatClass(TextOutputFormat.class);
		 
	 
		 FileSystem fs=FileSystem.get(conf);
		 
		 Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew4");
		 
		 if(fs.exists(op)){
			 fs.delete(op, true);
			 System.out.println("存在此输出路径,已删除!!!");
		 }
		 
		 
	  FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));
	  FileOutputFormat.setOutputPath(job, op);
	   
	  System.exit(job.waitForCompletion(true)?0:1);
	  
	  
		 
		 
		 
		 
		 
		 
		 
				
				 
		
		
		
	}
	
	
	

}


运行日志如下:

Java代码 复制代码 收藏代码
  1. 模式:  192.168.75.130:9001  
  2. 存在此输出路径,已删除!!!  
  3. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
  4. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2  
  5. WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable  
  6. WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
  7. INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404260312_0002  
  8. INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%  
  9. INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%  
  10. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  
  11. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%  
  12. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%  
  13. INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404260312_0002  
  14. INFO - Counters.log(585) | Counters: 29  
  15. INFO - Counters.log(587) |   Job Counters   
  16. INFO - Counters.log(589) |     Launched reduce tasks=1  
  17. INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=12445  
  18. INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0  
  19. INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0  
  20. INFO - Counters.log(589) |     Launched map tasks=2  
  21. INFO - Counters.log(589) |     Data-local map tasks=2  
  22. INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9801  
  23. INFO - Counters.log(587) |   File Output Format Counters   
  24. INFO - Counters.log(589) |     Bytes Written=172  
  25. INFO - Counters.log(587) |   FileSystemCounters  
  26. INFO - Counters.log(589) |     FILE_BYTES_READ=237  
  27. INFO - Counters.log(589) |     HDFS_BYTES_READ=455  
  28. INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=169503  
  29. INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172  
  30. INFO - Counters.log(587) |   File Input Format Counters   
  31. INFO - Counters.log(589) |     Bytes Read=227  
  32. INFO - Counters.log(587) |   Map-Reduce Framework  
  33. INFO - Counters.log(589) |     Map output materialized bytes=243  
  34. INFO - Counters.log(589) |     Map input records=10  
  35. INFO - Counters.log(589) |     Reduce shuffle bytes=243  
  36. INFO - Counters.log(589) |     Spilled Records=16  
  37. INFO - Counters.log(589) |     Map output bytes=215  
  38. INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944  
  39. INFO - Counters.log(589) |     CPU time spent (ms)=1770  
  40. INFO - Counters.log(589) |     Combine input records=0  
  41. INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228  
  42. INFO - Counters.log(589) |     Reduce input records=8  
  43. INFO - Counters.log(589) |     Reduce input groups=4  
  44. INFO - Counters.log(589) |     Combine output records=0  
  45. INFO - Counters.log(589) |     Physical memory (bytes) snapshot=442564608  
  46. INFO - Counters.log(589) |     Reduce output records=4  
  47. INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184306688  
  48. INFO - Counters.log(589) |     Map output records=8  
模式:  192.168.75.130:9001
存在此输出路径,已删除!!!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404260312_0002
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404260312_0002
INFO - Counters.log(585) | Counters: 29
INFO - Counters.log(587) |   Job Counters 
INFO - Counters.log(589) |     Launched reduce tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=12445
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=2
INFO - Counters.log(589) |     Data-local map tasks=2
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9801
INFO - Counters.log(587) |   File Output Format Counters 
INFO - Counters.log(589) |     Bytes Written=172
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=237
INFO - Counters.log(589) |     HDFS_BYTES_READ=455
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=169503
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172
INFO - Counters.log(587) |   File Input Format Counters 
INFO - Counters.log(589) |     Bytes Read=227
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=243
INFO - Counters.log(589) |     Map input records=10
INFO - Counters.log(589) |     Reduce shuffle bytes=243
INFO - Counters.log(589) |     Spilled Records=16
INFO - Counters.log(589) |     Map output bytes=215
INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944
INFO - Counters.log(589) |     CPU time spent (ms)=1770
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228
INFO - Counters.log(589) |     Reduce input records=8
INFO - Counters.log(589) |     Reduce input groups=4
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=442564608
INFO - Counters.log(589) |     Reduce output records=4
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184306688
INFO - Counters.log(589) |     Map output records=8


在map侧过滤的数据,在50030中查看的截图如下:



运行结果如下所示:

Java代码 复制代码 收藏代码
  1. 1   三劫散仙    13575468248 B   89  2013-02-05  
  2. 2   凤舞九天    18965235874 C   69  2013-03-09  
  3. 3   忙忙碌碌    15986854789 A   99  2013-03-05  
  4. 3   忙忙碌碌    15986854789 D   56  2013-06-07  
1	三劫散仙	13575468248	B	89	2013-02-05
2	凤舞九天	18965235874	C	69	2013-03-09
3	忙忙碌碌	15986854789	A	99	2013-03-05
3	忙忙碌碌	15986854789	D	56	2013-06-07



至此,这个半链接就完成了,结果正确,在hadoop的几种join方式里,只有在Map侧的链接比较高效,但也需要根据具体的实际情况,进行选择。

分享到:
评论

相关推荐

    hadoop Join代码(map join 和reduce join)

    本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...

    19、Join操作map side join 和 reduce side join

    本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在数据库中是非常常见的,它用于将来自两个或更多表的数据根据某些共享字段(即键)关联起来。在 ...

    hadoop_join.jar.zip_hadoop_hadoop query_reduce

    通常,Hadoop中的Join可以分为几种类型:Bucket Join、Sort-Merge Join、Replicated Join和Map-Side Join等。每种Join策略都有其适用场景和优缺点。 `hadoop_join.jar`是一个针对Hadoop环境设计的Join查询工具,它...

    hadoop join implement

    ### Hadoop Join Implementation:关键技术与优化策略 #### 摘要 在大数据处理领域,Hadoop作为主流的大规模数据处理框架之一,其MapReduce模型在并行数据处理方面展现出巨大优势。然而,对于数据间的连接操作(即...

    hadoop mapreduce多表关联join多个job相互依赖传递参数

    在Hadoop MapReduce环境中,处理大数据时经常遇到多表关联(Join)的需求,尤其是在复杂的业务逻辑中。MapReduce提供了一种分布式计算模型,能够高效地处理大规模数据集,但面对多表关联,尤其是多个Job之间的依赖和...

    Hadoop datajoin示例(客户和订单信息)

    文件汗有三个java类,两个测试文件txt ...MapClass.java TaggedRecordWritable.java customers.txt orders.txt 经过亲自测试,可以将两个文件中的信息以groupby的key关联起来,查出类似数据库的join.

    hadoop0.23.9离线api

    org.apache.hadoop.mapreduce.lib.map org.apache.hadoop.mapreduce.lib.output org.apache.hadoop.mapreduce.lib.partition org.apache.hadoop.mapreduce.lib.reduce org.apache.hadoop.mapreduce.security ...

    在Hadoop Map-Reduce中实施联接

    2. **Map-Side Join**:当数据可以预先排序且可以划分到Map任务的大小时,可以在Map阶段进行部分Join。每个Mapper会加载其负责的部分小表数据,然后与大表数据进行Join。这种方法减少了网络传输,但需要额外的排序和...

    5堂Hadoop必修课,不会这些勿称高手

    4. 掌握MapReduce的Join算法,包括Reduce Join、Map Join、Semi Join和Bloom Filter。 5. 学习Zookeeper的基本原理和架构,以及如何在分布式环境中部署Zookeeper,包括配置管理Hadoop集群。 6. 掌握Hadoop和Spark...

    HadoopMRExamples:可以在此处找到 Hadoop Map Reduce 示例

    通过 Java,开发者可以轻松地实现 Map 和 Reduce 函数,从而处理 HDFS(Hadoop Distributed File System)中的数据。 在 HadoopMRExamples-master 这个压缩包中,我们可以期待找到以下内容: 1. **WordCount 示例*...

    大数据课程设计-Hadoop-MapReduce实现sql的统计、groupby和join-全部源码

    3) 内连接(Inner Join)和左连接(Left Join)可以通过一次MapReduce作业实现,Map阶段将JOIN键和对应数据发送到同一Reducer,Reduce阶段根据JOIN条件进行匹配。 在提供的"mapreduce-sql"压缩包文件中,很可能包含...

    Pro hadoop

    作者还提供了使用高级特性如map端join和链式映射的示例。为了将理论与实践相结合,作者引导读者一步一步开发一个实际的MapReduce应用,这将帮助读者深入了解真实的Hadoop项目。 标签中提到的“大数据”是指需要通过...

    Hive Summit 2011-join

    在执行时,Hadoop MapReduce框架会为每一对join的行调用一次join操作。这种操作对于小数据集来说效率较高,但是当涉及到大数据量时,它的性能会迅速下降,因为它需要处理大量的中间数据,并且在Map和Reduce阶段都要...

    hadoop 实战 dev_02

    课程内容还涉及了如何从nginx日志中提取访问量最高的IP地址,使用Unix/Linux的工具链,如awk、grep、sort、join等进行简单的日志分析。 综上所述,本课程深入介绍了Hadoop在Web日志分析中的应用,从基本的日志概念...

    Hadoop和hive大数据面试题

    5. Hadoop优化:如何通过调整配置参数提高Hadoop性能,如Block Size、Map任务和Reduce任务的数量等。 接下来是Hive,它是基于Hadoop的数据仓库工具,可将结构化的数据文件映射为一张数据库表,并提供SQL-like查询...

    hadoop_join_aggregate:在hadoop中加入和聚合mapreduce算法

    Map side join 比 reducer side join 快。 但是只有当您执行映射端连接操作的表之一小到足以放入内存时,映射端连接才足够。 日期集信息 客户数据集:每行包含:CustID、Name、Age、CountryCode、Salary。 交易...

Global site tag (gtag.js) - Google Analytics