`
weitao1026
  • 浏览: 1056989 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式

阅读更多

介绍了基于Hadoop的旧版API结合DataJoin工具类和MapReduce实现的侧连接,那么本次,散仙就来看下,如何在新版API(散仙的Hadoop是1.2版本,在2.x的hadoop版本里实现代码一样)中实现一个Reduce Side Join,在这之前,我们还是先来温故下Reduce侧连接的实现原理:

在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。


测试数据,依旧是上次使用的数据:



Java代码 复制代码 收藏代码
  1. a文件的数据    
  2.     
  3. 1,三劫散仙,13575468248    
  4. 2,凤舞九天,18965235874    
  5. 3,忙忙碌碌,15986854789    
  6. 4,少林寺方丈,15698745862    
a文件的数据  
  
1,三劫散仙,13575468248  
2,凤舞九天,18965235874  
3,忙忙碌碌,15986854789  
4,少林寺方丈,15698745862  




Java代码 复制代码 收藏代码
  1. b文件的数据    
  2.     
  3. 3,A,99,2013-03-05    
  4. 1,B,89,2013-02-05    
  5. 2,C,69,2013-03-09    
  6. 3,D,56,2013-06-07    
b文件的数据  
  
3,A,99,2013-03-05  
1,B,89,2013-02-05  
2,C,69,2013-03-09  
3,D,56,2013-06-07  



代码如下:

Java代码 复制代码 收藏代码
  1. package com.qin.reducejoin;  
  2.   
  3. import java.io.DataInput;  
  4. import java.io.DataOutput;  
  5. import java.io.IOException;  
  6. import java.util.ArrayList;  
  7. import java.util.List;  
  8.   
  9. import org.apache.hadoop.conf.Configuration;  
  10. import org.apache.hadoop.fs.FileSystem;  
  11. import org.apache.hadoop.fs.Path;  
  12. import org.apache.hadoop.io.LongWritable;  
  13. import org.apache.hadoop.io.Text;  
  14. import org.apache.hadoop.io.WritableComparable;  
  15.   
  16. import org.apache.hadoop.mapred.JobConf;  
  17. import org.apache.hadoop.mapreduce.Job;  
  18. import org.apache.hadoop.mapreduce.Mapper;  
  19. import org.apache.hadoop.mapreduce.Reducer;  
  20. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  21. import org.apache.hadoop.mapreduce.lib.input.FileSplit;  
  22. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
  23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  24. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
  25.   
  26. /*** 
  27.  *  
  28.  * Hadoop1.2的版本,新版本API实现的Reduce侧连接 
  29.  *  
  30.  * @author qindongliang 
  31.  *  
  32.  *    大数据交流群:376932160 
  33.  *  搜索技术交流群:324714439 
  34.  *  
  35.  *  
  36.  *  
  37.  * **/  
  38. public class NewReduceJoin2 {  
  39.       
  40.       
  41.       
  42.     /** 
  43.      *  
  44.      *  
  45.      * 自定义一个输出实体 
  46.      *  
  47.      * **/  
  48.     private static class CombineEntity implements WritableComparable<CombineEntity>{  
  49.   
  50.           
  51.         private Text joinKey;//连接key  
  52.         private Text flag;//文件来源标志  
  53.         private Text secondPart;//除了键外的其他部分的数据  
  54.           
  55.           
  56.         public CombineEntity() {  
  57.             // TODO Auto-generated constructor stub  
  58.             this.joinKey=new Text();  
  59.             this.flag=new Text();  
  60.             this.secondPart=new Text();  
  61.         }  
  62.           
  63.         public Text getJoinKey() {  
  64.             return joinKey;  
  65.         }  
  66.   
  67.         public void setJoinKey(Text joinKey) {  
  68.             this.joinKey = joinKey;  
  69.         }  
  70.   
  71.         public Text getFlag() {  
  72.             return flag;  
  73.         }  
  74.   
  75.         public void setFlag(Text flag) {  
  76.             this.flag = flag;  
  77.         }  
  78.   
  79.         public Text getSecondPart() {  
  80.             return secondPart;  
  81.         }  
  82.   
  83.         public void setSecondPart(Text secondPart) {  
  84.             this.secondPart = secondPart;  
  85.         }  
  86.   
  87.         @Override  
  88.         public void readFields(DataInput in) throws IOException {  
  89.             this.joinKey.readFields(in);  
  90.             this.flag.readFields(in);  
  91.             this.secondPart.readFields(in);  
  92.               
  93.         }  
  94.   
  95.         @Override  
  96.         public void write(DataOutput out) throws IOException {  
  97.             this.joinKey.write(out);  
  98.             this.flag.write(out);  
  99.             this.secondPart.write(out);  
  100.               
  101.         }  
  102.   
  103.         @Override  
  104.         public int compareTo(CombineEntity o) {  
  105.             // TODO Auto-generated method stub  
  106.             return this.joinKey.compareTo(o.joinKey);  
  107.         }  
  108.           
  109.           
  110.           
  111.     }  
  112.       
  113.       
  114.       
  115.       
  116.     private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{  
  117.           
  118.         private CombineEntity combine=new CombineEntity();  
  119.         private Text flag=new Text();  
  120.         private  Text joinKey=new Text();  
  121.         private Text secondPart=new Text();  
  122.           
  123.           
  124.           
  125.         @Override  
  126.         protected void map(LongWritable key, Text value,Context context)  
  127.                 throws IOException, InterruptedException {  
  128.               
  129.           
  130.                //获得文件输入路径  
  131.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();  
  132.           
  133.             if(pathName.endsWith("a.txt")){  
  134.                   
  135.                 String  valueItems[]=value.toString().split(",");  
  136.                 //设置标志位  
  137.                 flag.set("0");     
  138.                   
  139.                 //设置链接键  
  140.                 joinKey.set(valueItems[0]);  
  141.         
  142.                 //设置第二部分  
  143.                 secondPart.set(valueItems[1]+"\t"+valueItems[2]);  
  144.                   
  145.                 //封装实体  
  146.                 combine.setFlag(flag);//标志位  
  147.                 combine.setJoinKey(joinKey);//链接键  
  148.                 combine.setSecondPart(secondPart);//其他部分  
  149.                   
  150.                  //写出  
  151.                 context.write(combine.getJoinKey(), combine);  
  152.                   
  153.                   
  154.             }else if(pathName.endsWith("b.txt")){  
  155.                   
  156.                 String  valueItems[]=value.toString().split(",");  
  157.                 //设置标志位  
  158.                 flag.set("1");     
  159.                   
  160.                 //设置链接键  
  161.                 joinKey.set(valueItems[0]);  
  162.         
  163.                 //设置第二部分注意不同的文件的列数不一样  
  164.                 secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);  
  165.                   
  166.                 //封装实体  
  167.                 combine.setFlag(flag);//标志位  
  168.                 combine.setJoinKey(joinKey);//链接键  
  169.                 combine.setSecondPart(secondPart);//其他部分  
  170.                   
  171.                  //写出  
  172.                 context.write(combine.getJoinKey(), combine);  
  173.                   
  174.                   
  175.             }  
  176.               
  177.               
  178.               
  179.               
  180.                
  181.            
  182.               
  183.         }  
  184.           
  185.     }  
  186.       
  187.       
  188.     private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{  
  189.           
  190.           
  191.         //存储一个分组中左表信息  
  192.         private List<Text> leftTable=new ArrayList<Text>();  
  193.         //存储一个分组中右表信息  
  194.         private List<Text> rightTable=new ArrayList<Text>();  
  195.           
  196.         private Text secondPart=null;  
  197.           
  198.         private Text output=new Text();  
  199.           
  200.           
  201.            
  202.         //一个分组调用一次  
  203.         @Override  
  204.         protected void reduce(Text key, Iterable<CombineEntity> values,Context context)  
  205.                 throws IOException, InterruptedException {  
  206.              leftTable.clear();//清空分组数据  
  207.              rightTable.clear();//清空分组数据  
  208.                
  209.                
  210.              /** 
  211.               * 将不同文件的数据,分别放在不同的集合 
  212.               * 中,注意数据量过大时,会出现 
  213.               * OOM的异常 
  214.               *  
  215.               * **/  
  216.                
  217.              for(CombineEntity ce:values){  
  218.                    
  219.                  this.secondPart=new Text(ce.getSecondPart().toString());  
  220.                    
  221.                    
  222.                  //左表  
  223.                    
  224.                  if(ce.getFlag().toString().trim().equals("0")){  
  225.                      leftTable.add(secondPart);  
  226.                        
  227.                  }else if(ce.getFlag().toString().trim().equals("1")){  
  228.                        
  229.                      rightTable.add(secondPart);  
  230.                        
  231.                  }  
  232.                    
  233.                    
  234.                    
  235.                    
  236.              }  
  237.                
  238.              //=====================  
  239.              for(Text left:leftTable){  
  240.                    
  241.                  for(Text right:rightTable){  
  242.                        
  243.                      output.set(left+"\t"+right);//连接左右数据  
  244.                      context.write(key, output);//输出  
  245.                  }  
  246.                    
  247.              }  
  248.                
  249.                
  250.                
  251.               
  252.         }  
  253.           
  254.     }  
  255.       
  256.       
  257.       
  258.       
  259.       
  260.       
  261.       
  262.       
  263.     public static void main(String[] args)throws Exception {  
  264.           
  265.            
  266.           
  267.       
  268.          //Job job=new Job(conf,"myjoin");  
  269.          JobConf conf=new JobConf(NewReduceJoin2.class);   
  270.            conf.set("mapred.job.tracker","192.168.75.130:9001");  
  271.             conf.setJar("tt.jar");  
  272.             
  273.             
  274.           Job job=new Job(conf, "2222222");  
  275.          job.setJarByClass(NewReduceJoin2.class);  
  276.          System.out.println("模式:  "+conf.get("mapred.job.tracker"));;  
  277.            
  278.            
  279.          //设置Map和Reduce自定义类  
  280.          job.setMapperClass(JMapper.class);  
  281.          job.setReducerClass(JReduce.class);  
  282.            
  283.          //设置Map端输出  
  284.          job.setMapOutputKeyClass(Text.class);  
  285.          job.setMapOutputValueClass(CombineEntity.class);  
  286.            
  287.          //设置Reduce端的输出  
  288.          job.setOutputKeyClass(Text.class);  
  289.          job.setOutputValueClass(Text.class);  
  290.            
  291.       
  292.          job.setInputFormatClass(TextInputFormat.class);  
  293.          job.setOutputFormatClass(TextOutputFormat.class);  
  294.            
  295.        
  296.          FileSystem fs=FileSystem.get(conf);  
  297.            
  298.          Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew2");  
  299.            
  300.          if(fs.exists(op)){  
  301.              fs.delete(op, true);  
  302.              System.out.println("存在此输出路径,已删除!!!");  
  303.          }  
  304.            
  305.            
  306.       FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));  
  307.       FileOutputFormat.setOutputPath(job, op);  
  308.          
  309.       System.exit(job.waitForCompletion(true)?0:1);  
  310.         
  311.         
  312.            
  313.            
  314.            
  315.            
  316.            
  317.            
  318.            
  319.                   
  320.                    
  321.           
  322.           
  323.           
  324.     }  
  325.       
  326.       
  327.       
  328.   
  329. }  
package com.qin.reducejoin;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/***
 * 
 * Hadoop1.2的版本,新版本API实现的Reduce侧连接
 * 
 * @author qindongliang
 * 
 *    大数据交流群:376932160
 *  搜索技术交流群:324714439
 * 
 * 
 * 
 * **/
public class NewReduceJoin2 {
	
	
	
	/**
	 * 
	 * 
	 * 自定义一个输出实体
	 * 
	 * **/
	private static class CombineEntity implements WritableComparable<CombineEntity>{

		
		private Text joinKey;//连接key
		private Text flag;//文件来源标志
		private Text secondPart;//除了键外的其他部分的数据
		
		
		public CombineEntity() {
			// TODO Auto-generated constructor stub
			this.joinKey=new Text();
			this.flag=new Text();
			this.secondPart=new Text();
		}
		
		public Text getJoinKey() {
			return joinKey;
		}

		public void setJoinKey(Text joinKey) {
			this.joinKey = joinKey;
		}

		public Text getFlag() {
			return flag;
		}

		public void setFlag(Text flag) {
			this.flag = flag;
		}

		public Text getSecondPart() {
			return secondPart;
		}

		public void setSecondPart(Text secondPart) {
			this.secondPart = secondPart;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.joinKey.readFields(in);
			this.flag.readFields(in);
			this.secondPart.readFields(in);
			
		}

		@Override
		public void write(DataOutput out) throws IOException {
			this.joinKey.write(out);
			this.flag.write(out);
			this.secondPart.write(out);
			
		}

		@Override
		public int compareTo(CombineEntity o) {
			// TODO Auto-generated method stub
			return this.joinKey.compareTo(o.joinKey);
		}
		
		
		
	}
	
	
	
	
	private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{
		
		private CombineEntity combine=new CombineEntity();
		private Text flag=new Text();
		private  Text joinKey=new Text();
		private Text secondPart=new Text();
		
		
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
		
			   //获得文件输入路径
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
		
            if(pathName.endsWith("a.txt")){
            	
            	String  valueItems[]=value.toString().split(",");
            	//设置标志位
            	flag.set("0");   
            	
            	//设置链接键
            	joinKey.set(valueItems[0]);
      
            	//设置第二部分
            	secondPart.set(valueItems[1]+"\t"+valueItems[2]);
            	
            	//封装实体
            	combine.setFlag(flag);//标志位
            	combine.setJoinKey(joinKey);//链接键
            	combine.setSecondPart(secondPart);//其他部分
            	
            	 //写出
            	context.write(combine.getJoinKey(), combine);
            	
            	
            }else if(pathName.endsWith("b.txt")){
            	
            	String  valueItems[]=value.toString().split(",");
            	//设置标志位
            	flag.set("1");   
            	
            	//设置链接键
            	joinKey.set(valueItems[0]);
      
            	//设置第二部分注意不同的文件的列数不一样
            	secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);
            	
            	//封装实体
            	combine.setFlag(flag);//标志位
            	combine.setJoinKey(joinKey);//链接键
            	combine.setSecondPart(secondPart);//其他部分
            	
            	 //写出
            	context.write(combine.getJoinKey(), combine);
            	
            	
            }
            
            
			
			
			 
		 
			
		}
		
	}
	
	
	private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{
		
		
		//存储一个分组中左表信息
		private List<Text> leftTable=new ArrayList<Text>();
		//存储一个分组中右表信息
		private List<Text> rightTable=new ArrayList<Text>();
		
		private Text secondPart=null;
		
		private Text output=new Text();
		
		
		 
		//一个分组调用一次
		@Override
		protected void reduce(Text key, Iterable<CombineEntity> values,Context context)
				throws IOException, InterruptedException {
			 leftTable.clear();//清空分组数据
			 rightTable.clear();//清空分组数据
			 
			 
			 /**
			  * 将不同文件的数据,分别放在不同的集合
			  * 中,注意数据量过大时,会出现
			  * OOM的异常
			  * 
			  * **/
			 
			 for(CombineEntity ce:values){
				 
				 this.secondPart=new Text(ce.getSecondPart().toString());
				 
				 
				 //左表
				 
				 if(ce.getFlag().toString().trim().equals("0")){
					 leftTable.add(secondPart);
					 
				 }else if(ce.getFlag().toString().trim().equals("1")){
					 
					 rightTable.add(secondPart);
					 
				 }
				 
				 
				 
				 
			 }
			 
			 //=====================
			 for(Text left:leftTable){
				 
				 for(Text right:rightTable){
					 
					 output.set(left+"\t"+right);//连接左右数据
					 context.write(key, output);//输出
				 }
				 
			 }
			 
			 
			 
			
		}
		
	}
	
	
	
	
	
	
	
	
	public static void main(String[] args)throws Exception {
		
		 
		
	
		 //Job job=new Job(conf,"myjoin");
		 JobConf conf=new JobConf(NewReduceJoin2.class); 
		   conf.set("mapred.job.tracker","192.168.75.130:9001");
		    conf.setJar("tt.jar");
		  
		  
		  Job job=new Job(conf, "2222222");
		 job.setJarByClass(NewReduceJoin2.class);
		 System.out.println("模式:  "+conf.get("mapred.job.tracker"));;
		 
		 
		 //设置Map和Reduce自定义类
		 job.setMapperClass(JMapper.class);
		 job.setReducerClass(JReduce.class);
		 
		 //设置Map端输出
		 job.setMapOutputKeyClass(Text.class);
		 job.setMapOutputValueClass(CombineEntity.class);
		 
		 //设置Reduce端的输出
		 job.setOutputKeyClass(Text.class);
		 job.setOutputValueClass(Text.class);
		 
	
		 job.setInputFormatClass(TextInputFormat.class);
		 job.setOutputFormatClass(TextOutputFormat.class);
		 
	 
		 FileSystem fs=FileSystem.get(conf);
		 
		 Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew2");
		 
		 if(fs.exists(op)){
			 fs.delete(op, true);
			 System.out.println("存在此输出路径,已删除!!!");
		 }
		 
		 
	  FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));
	  FileOutputFormat.setOutputPath(job, op);
	   
	  System.exit(job.waitForCompletion(true)?0:1);
	  
	  
		 
		 
		 
		 
		 
		 
		 
				
				 
		
		
		
	}
	
	
	

}


运行日志如下:

Java代码 复制代码 收藏代码
  1. 模式:  192.168.75.130:9001  
  2. 存在此输出路径,已删除!!!  
  3. WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.  
  4. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2  
  5. INFO - NativeCodeLoader.<clinit>(43) | Loaded the native-hadoop library  
  6. WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded  
  7. INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404222310_0026  
  8. INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%  
  9. INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%  
  10. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%  
  11. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%  
  12. INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%  
  13. INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404222310_0026  
  14. INFO - Counters.log(585) | Counters: 29  
  15. INFO - Counters.log(587) |   Job Counters   
  16. INFO - Counters.log(589) |     Launched reduce tasks=1  
  17. INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=10742  
  18. INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0  
  19. INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0  
  20. INFO - Counters.log(589) |     Launched map tasks=2  
  21. INFO - Counters.log(589) |     Data-local map tasks=2  
  22. INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9738  
  23. INFO - Counters.log(587) |   File Output Format Counters   
  24. INFO - Counters.log(589) |     Bytes Written=172  
  25. INFO - Counters.log(587) |   FileSystemCounters  
  26. INFO - Counters.log(589) |     FILE_BYTES_READ=237  
  27. INFO - Counters.log(589) |     HDFS_BYTES_READ=415  
  28. INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=166329  
  29. INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172  
  30. INFO - Counters.log(587) |   File Input Format Counters   
  31. INFO - Counters.log(589) |     Bytes Read=187  
  32. INFO - Counters.log(587) |   Map-Reduce Framework  
  33. INFO - Counters.log(589) |     Map output materialized bytes=243  
  34. INFO - Counters.log(589) |     Map input records=8  
  35. INFO - Counters.log(589) |     Reduce shuffle bytes=243  
  36. INFO - Counters.log(589) |     Spilled Records=16  
  37. INFO - Counters.log(589) |     Map output bytes=215  
  38. INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944  
  39. INFO - Counters.log(589) |     CPU time spent (ms)=1520  
  40. INFO - Counters.log(589) |     Combine input records=0  
  41. INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228  
  42. INFO - Counters.log(589) |     Reduce input records=8  
  43. INFO - Counters.log(589) |     Reduce input groups=4  
  44. INFO - Counters.log(589) |     Combine output records=0  
  45. INFO - Counters.log(589) |     Physical memory (bytes) snapshot=441524224  
  46. INFO - Counters.log(589) |     Reduce output records=4  
  47. INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184306688  
  48. INFO - Counters.log(589) |     Map output records=8  
模式:  192.168.75.130:9001
存在此输出路径,已删除!!!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2
INFO - NativeCodeLoader.<clinit>(43) | Loaded the native-hadoop library
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404222310_0026
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404222310_0026
INFO - Counters.log(585) | Counters: 29
INFO - Counters.log(587) |   Job Counters 
INFO - Counters.log(589) |     Launched reduce tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=10742
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=2
INFO - Counters.log(589) |     Data-local map tasks=2
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9738
INFO - Counters.log(587) |   File Output Format Counters 
INFO - Counters.log(589) |     Bytes Written=172
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=237
INFO - Counters.log(589) |     HDFS_BYTES_READ=415
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=166329
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172
INFO - Counters.log(587) |   File Input Format Counters 
INFO - Counters.log(589) |     Bytes Read=187
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=243
INFO - Counters.log(589) |     Map input records=8
INFO - Counters.log(589) |     Reduce shuffle bytes=243
INFO - Counters.log(589) |     Spilled Records=16
INFO - Counters.log(589) |     Map output bytes=215
INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944
INFO - Counters.log(589) |     CPU time spent (ms)=1520
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228
INFO - Counters.log(589) |     Reduce input records=8
INFO - Counters.log(589) |     Reduce input groups=4
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=441524224
INFO - Counters.log(589) |     Reduce output records=4
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184306688
INFO - Counters.log(589) |     Map output records=8


运行完的数据截图如下:



至此,我们在新版API中也准确,实现了Reduce的侧连接,需要注意的是Reduce侧连接的不足之处:
之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。

另外一点需要注意的是,散仙在eclipse里进行调试,Local模式下会报异常,建议提交到hadoop的测试集群上进行测试。 

分享到:
评论

相关推荐

    MapReduce实现join连接

    简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接

    【MapReduce篇06】MapReduce之MapJoin和ReduceJoin1

    其中,Join操作是 MapReduce 中的一种基本操作,用于连接来自不同数据源的数据。今天,我们将讲解 MapReduce 之 MapJoin 和 ReduceJoin 两种 Join 操作的实现原理和应用场景。 MapJoin 概述 MapJoin 是一种特殊的 ...

    基于Java实现的简易MapReduce框架.zip

    这个“基于Java实现的简易MapReduce框架”是一个简化版的实现,旨在帮助开发者理解MapReduce的工作原理,并能在实际项目中进行快速开发。 MapReduce的运行流程主要分为三个阶段:Map、Shuffle和Reduce。Map阶段是...

    MapReduce 设计模式

    MapReduce设计模式是对MapReduce编程范式的进一步深化,通过多种不同的算法和策略来解决数据处理中的常见问题。 本文档中提到了《MapReduce设计模式》这本书,由Donald Miner和Adam Shook所著。书籍的标题说明了其...

    MapReduce之Join操作

    在关系型数据库中join是非常常见的操作,各种优化手段已经到了极致。在海量数据的环境下,不可避免的也会碰到这种类型的需求,例如在数据分析时需要连接从不同的数据源中获取到的数据。不同于传统的单机模式,在...

    极简MapReduce框架Mincemeat-node.zip

    Mincemeat-node 是使用Node.js实现的极简MapReduce框架,可以快速的部署投入工作,免去Hadoop繁琐的配置,享受随心大数据。Mincemeatpy实现的是一种非常简单的MapReduce模型,仅仅实现了任务的分布计算,并没有类似...

    MALK:一种高效处理大规模键值的MapReduce框架.pdf

    相比传统的MapReduce框架,Spark利用了内存计算的优势,能够提供更快的数据处理速度和更丰富的数据操作类型,如迭代算法、交互式查询等。 4. Malk框架的特点: 文档中提到的MALK框架是一种针对MapReduce的优化方案...

    MapReduce框架和HDFS框架

    《MapReduce框架与HDFS框架解析》 MapReduce和HDFS是大数据处理领域的核心组件,它们构成了Hadoop生态系统的基础。Hadoop,一个由Apache软件基金会开发的开源项目,深受业界欢迎,其设计灵感来源于谷歌发表的两篇...

    MapReduce设计模式

    在本段落中,我们集中讨论了与MapReduce设计模式相关的知识点。MapReduce是一种编程模型,用于处理和生成大数据集,常用于大规模数据处理。它的设计模式是MapReduce编程中实现高效数据处理的核心方法和策略。我们将...

    mapreduce框架学习之天气统计

    本学习案例聚焦于MapReduce框架在天气统计中的应用,通过实际编程实践加深对框架的理解。 首先,MapReduce的工作原理可以概括为两个主要阶段:Map阶段和Reduce阶段。在Map阶段,输入的数据被分割成多个小块,每个块...

    hadoop mapreduce多表关联join多个job相互依赖传递参数

    在Hadoop MapReduce环境中,处理大数据时经常遇到多表关联(Join)的需求,尤其是在复杂的业务逻辑中。MapReduce提供了一种分布式计算模型,能够高效地处理大规模数据集,但面对多表关联,尤其是多个Job之间的依赖和...

    19、Join操作map side join 和 reduce side join

    在处理涉及多数据集的任务时,Join 操作是必不可少的,它用于合并来自不同数据源的相关数据。本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在...

    MapReduce1.doc

    MapReduce的基本操作是对大量数据文件进行处理。在Map阶段,数据被分割成多个块(通常由HDFS存储),并分配给各个工作节点进行处理。每个节点运行Map函数,对输入的数据块进行转换,生成中间键值对。 2. **概要...

    MapReduce框架统计流量模板数据

    为MapReduce框架对电话号码的上行流量和下行流量及总流量进行统计的模板数据

    基于MapReduce框架下的数据挖掘方法研究.pdf

    在MapReduce框架下进行数据挖掘任务,可以充分利用分布式平台的计算能力,从而处理之前难以想象的数据量级。 随着数据挖掘技术的深入研究和应用,其在不同行业中发挥的作用也越来越大。例如,在零售业,数据挖掘...

    17_尚硅谷大数据之MapReduce框架原理1

    在Hadoop中,MapReduce实现了一个高效、可扩展的框架,使得开发者可以专注于编写map和reduce函数,而框架负责任务调度、数据分发和结果整合。 **3.1 MapReduce工作流程** 1. **提交阶段**:客户端将应用程序(包括...

Global site tag (gtag.js) - Google Analytics