`
qindongliang1922
  • 浏览: 2189148 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117681
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:126079
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:60034
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71405
社区版块
存档分类
最新评论

如何基于新API使用Hadoop的Reduce Side Join

阅读更多
上篇,散仙介绍了基于Hadoop的旧版API结合DataJoin工具类和MapReduce实现的侧连接,那么本次,散仙就来看下,如何在新版API(散仙的Hadoop是1.2版本,在2.x的hadoop版本里实现代码一样)中实现一个Reduce Side Join,在这之前,我们还是先来温故下Reduce侧连接的实现原理:

在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。


测试数据,依旧是上次使用的数据:



a文件的数据  
  
1,三劫散仙,13575468248  
2,凤舞九天,18965235874  
3,忙忙碌碌,15986854789  
4,少林寺方丈,15698745862  



b文件的数据  
  
3,A,99,2013-03-05  
1,B,89,2013-02-05  
2,C,69,2013-03-09  
3,D,56,2013-06-07  


代码如下:
package com.qin.reducejoin;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/***
 * 
 * Hadoop1.2的版本,新版本API实现的Reduce侧连接
 * 
 * @author qindongliang
 * 
 *    大数据交流群:376932160
 *  搜索技术交流群:324714439
 * 
 * 
 * 
 * **/
public class NewReduceJoin2 {
	
	
	
	/**
	 * 
	 * 
	 * 自定义一个输出实体
	 * 
	 * **/
	private static class CombineEntity implements WritableComparable<CombineEntity>{

		
		private Text joinKey;//连接key
		private Text flag;//文件来源标志
		private Text secondPart;//除了键外的其他部分的数据
		
		
		public CombineEntity() {
			// TODO Auto-generated constructor stub
			this.joinKey=new Text();
			this.flag=new Text();
			this.secondPart=new Text();
		}
		
		public Text getJoinKey() {
			return joinKey;
		}

		public void setJoinKey(Text joinKey) {
			this.joinKey = joinKey;
		}

		public Text getFlag() {
			return flag;
		}

		public void setFlag(Text flag) {
			this.flag = flag;
		}

		public Text getSecondPart() {
			return secondPart;
		}

		public void setSecondPart(Text secondPart) {
			this.secondPart = secondPart;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.joinKey.readFields(in);
			this.flag.readFields(in);
			this.secondPart.readFields(in);
			
		}

		@Override
		public void write(DataOutput out) throws IOException {
			this.joinKey.write(out);
			this.flag.write(out);
			this.secondPart.write(out);
			
		}

		@Override
		public int compareTo(CombineEntity o) {
			// TODO Auto-generated method stub
			return this.joinKey.compareTo(o.joinKey);
		}
		
		
		
	}
	
	
	
	
	private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{
		
		private CombineEntity combine=new CombineEntity();
		private Text flag=new Text();
		private  Text joinKey=new Text();
		private Text secondPart=new Text();
		
		
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
		
			   //获得文件输入路径
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
		
            if(pathName.endsWith("a.txt")){
            	
            	String  valueItems[]=value.toString().split(",");
            	//设置标志位
            	flag.set("0");   
            	
            	//设置链接键
            	joinKey.set(valueItems[0]);
      
            	//设置第二部分
            	secondPart.set(valueItems[1]+"\t"+valueItems[2]);
            	
            	//封装实体
            	combine.setFlag(flag);//标志位
            	combine.setJoinKey(joinKey);//链接键
            	combine.setSecondPart(secondPart);//其他部分
            	
            	 //写出
            	context.write(combine.getJoinKey(), combine);
            	
            	
            }else if(pathName.endsWith("b.txt")){
            	
            	String  valueItems[]=value.toString().split(",");
            	//设置标志位
            	flag.set("1");   
            	
            	//设置链接键
            	joinKey.set(valueItems[0]);
      
            	//设置第二部分注意不同的文件的列数不一样
            	secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);
            	
            	//封装实体
            	combine.setFlag(flag);//标志位
            	combine.setJoinKey(joinKey);//链接键
            	combine.setSecondPart(secondPart);//其他部分
            	
            	 //写出
            	context.write(combine.getJoinKey(), combine);
            	
            	
            }
            
            
			
			
			 
		 
			
		}
		
	}
	
	
	private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{
		
		
		//存储一个分组中左表信息
		private List<Text> leftTable=new ArrayList<Text>();
		//存储一个分组中右表信息
		private List<Text> rightTable=new ArrayList<Text>();
		
		private Text secondPart=null;
		
		private Text output=new Text();
		
		
		 
		//一个分组调用一次
		@Override
		protected void reduce(Text key, Iterable<CombineEntity> values,Context context)
				throws IOException, InterruptedException {
			 leftTable.clear();//清空分组数据
			 rightTable.clear();//清空分组数据
			 
			 
			 /**
			  * 将不同文件的数据,分别放在不同的集合
			  * 中,注意数据量过大时,会出现
			  * OOM的异常
			  * 
			  * **/
			 
			 for(CombineEntity ce:values){
				 
				 this.secondPart=new Text(ce.getSecondPart().toString());
				 
				 
				 //左表
				 
				 if(ce.getFlag().toString().trim().equals("0")){
					 leftTable.add(secondPart);
					 
				 }else if(ce.getFlag().toString().trim().equals("1")){
					 
					 rightTable.add(secondPart);
					 
				 }
				 
				 
				 
				 
			 }
			 
			 //=====================
			 for(Text left:leftTable){
				 
				 for(Text right:rightTable){
					 
					 output.set(left+"\t"+right);//连接左右数据
					 context.write(key, output);//输出
				 }
				 
			 }
			 
			 
			 
			
		}
		
	}
	
	
	
	
	
	
	
	
	public static void main(String[] args)throws Exception {
		
		 
		
	
		 //Job job=new Job(conf,"myjoin");
		 JobConf conf=new JobConf(NewReduceJoin2.class); 
		   conf.set("mapred.job.tracker","192.168.75.130:9001");
		    conf.setJar("tt.jar");
		  
		  
		  Job job=new Job(conf, "2222222");
		 job.setJarByClass(NewReduceJoin2.class);
		 System.out.println("模式:  "+conf.get("mapred.job.tracker"));;
		 
		 
		 //设置Map和Reduce自定义类
		 job.setMapperClass(JMapper.class);
		 job.setReducerClass(JReduce.class);
		 
		 //设置Map端输出
		 job.setMapOutputKeyClass(Text.class);
		 job.setMapOutputValueClass(CombineEntity.class);
		 
		 //设置Reduce端的输出
		 job.setOutputKeyClass(Text.class);
		 job.setOutputValueClass(Text.class);
		 
	
		 job.setInputFormatClass(TextInputFormat.class);
		 job.setOutputFormatClass(TextOutputFormat.class);
		 
	 
		 FileSystem fs=FileSystem.get(conf);
		 
		 Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew2");
		 
		 if(fs.exists(op)){
			 fs.delete(op, true);
			 System.out.println("存在此输出路径,已删除!!!");
		 }
		 
		 
	  FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));
	  FileOutputFormat.setOutputPath(job, op);
	   
	  System.exit(job.waitForCompletion(true)?0:1);
	  
	  
		 
		 
		 
		 
		 
		 
		 
				
				 
		
		
		
	}
	
	
	

}

运行日志如下:
模式:  192.168.75.130:9001
存在此输出路径,已删除!!!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2
INFO - NativeCodeLoader.<clinit>(43) | Loaded the native-hadoop library
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404222310_0026
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404222310_0026
INFO - Counters.log(585) | Counters: 29
INFO - Counters.log(587) |   Job Counters 
INFO - Counters.log(589) |     Launched reduce tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=10742
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=2
INFO - Counters.log(589) |     Data-local map tasks=2
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9738
INFO - Counters.log(587) |   File Output Format Counters 
INFO - Counters.log(589) |     Bytes Written=172
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=237
INFO - Counters.log(589) |     HDFS_BYTES_READ=415
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=166329
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172
INFO - Counters.log(587) |   File Input Format Counters 
INFO - Counters.log(589) |     Bytes Read=187
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=243
INFO - Counters.log(589) |     Map input records=8
INFO - Counters.log(589) |     Reduce shuffle bytes=243
INFO - Counters.log(589) |     Spilled Records=16
INFO - Counters.log(589) |     Map output bytes=215
INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944
INFO - Counters.log(589) |     CPU time spent (ms)=1520
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228
INFO - Counters.log(589) |     Reduce input records=8
INFO - Counters.log(589) |     Reduce input groups=4
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=441524224
INFO - Counters.log(589) |     Reduce output records=4
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184306688
INFO - Counters.log(589) |     Map output records=8

运行完的数据截图如下:



至此,我们在新版API中也准确,实现了Reduce的侧连接,需要注意的是Reduce侧连接的不足之处:
之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。

另外一点需要注意的是,散仙在eclipse里进行调试,Local模式下会报异常,建议提交到hadoop的测试集群上进行测试。



  • 大小: 197.7 KB
分享到:
评论

相关推荐

    19、Join操作map side join 和 reduce side join

    在大数据处理领域,Hadoop MapReduce 是一种广泛使用的分布式计算框架。在处理涉及多数据集的任务时,Join 操作是必不可少的,它用于合并来自不同数据源的相关数据。本文主要探讨了两种 MapReduce 中的 Join 实现:...

    hadoop_join.jar.zip_hadoop_hadoop query_reduce

    通常,Hadoop中的Join可以分为几种类型:Bucket Join、Sort-Merge Join、Replicated Join和Map-Side Join等。每种Join策略都有其适用场景和优缺点。 `hadoop_join.jar`是一个针对Hadoop环境设计的Join查询工具,它...

    dept_marks_reduceside_join

    通过以上分析,我们可以看出"dept_marks_reduceside_join"项目是一个典型的MapReduce应用,它展示了如何利用Java和Hadoop MapReduce API处理和关联大量学生数据,以实现对部门成绩的统计分析。这个项目对于理解和...

    hadoop mapreduce多表关联join多个job相互依赖传递参数

    - **Reduce-side Join**:更通用的方法,所有表的数据在mapper阶段分别处理,然后在reducer阶段进行关联,但可能涉及大量的数据交换。 - **Bucket Join**或**Sort-Merge Join**:通过预处理使得不同表的数据在相同...

    6.Hadoop入门进阶课程_第6周_MapReduce应用案例.pdf

    通过一个求各个部门的总工资的测试例子,探讨了MapReduce的join操作,包括reduce-side join、map-side join和semi join。Reduce-side join在shuffle阶段会进行大量数据传输,导致网络IO效率低下,而map-side join...

    hadoop_join_aggregate:在hadoop中加入和聚合mapreduce算法

    Map side join 比 reducer side join 快。 但是只有当您执行映射端连接操作的表之一小到足以放入内存时,映射端连接才足够。 日期集信息 客户数据集:每行包含:CustID、Name、Age、CountryCode、Salary。 交易...

    在Hadoop Map-Reduce中实施联接

    5. **Three-way Join 和 Multi-way Join**:对于更多的数据集,可以扩展Reduce-Side Join的概念,但可能会增加复杂性和性能挑战。通常需要更复杂的分区策略和数据预处理。 在实现Join时,还需要考虑以下优化策略: ...

    Hadoop开发第四期

    - **Reduce-Side Join**:在Reduce阶段完成Join操作,适用于两个表大小相当的情况。 - **Broadcast Join**:将一个小表广播到所有节点,适用于小表连接大表的情况,可以减少网络传输量。 #### 四、配置Hive元数据...

    Mapside-Join

    Mapside Join是大数据处理领域中的一种优化策略,主要用于Hadoop MapReduce框架,旨在提高大规模数据集的JOIN操作效率。在传统的数据库系统中,JOIN操作通常在服务器端完成,而在分布式计算环境中,由于数据量庞大,...

    Hive-Summit-2011-join.zip_hive

    3. **Map-side Join**:为了解决大规模数据集的Join问题,Hive引入了Map-side Join。这种方法适用于小表与大表的连接,小表可以被完全加载到内存中,从而避免了在Reduce阶段的昂贵数据交换。 4. **Bucketing与...

    Hive & Performance 学习笔记

    Reduce-side join,也称为Shuffle Join,是MapReduce中的默认Join方式,数据在reduce阶段进行合并匹配。Map-side join适用于小表可以完全加载到内存的情况,所有节点都会保存小表,从而实现一次遍历大表完成Join。...

    Big-Data---Hadoop---Yelp-Dataset:文件夹包含5个问题和pdf,其中包含有关五个问题和Yelp数据集的描述

    大数据--Hadoop--Yelp数据集 文件夹包含5个问题和pdf,其中包含有关五个问题和Yelp数据... 问题5:Map Side Join:将所有业务实体加载到分布式缓存中。 使用地图侧连接列出了位于特定区域中的企业的用户ID和评论文本。

    分布式平台等值连接优化技术分析.pdf

    针对等值连接优化技术的研究,常见的方法有Map-side Join和Reduce-side Join。Map-side Join适用于小表可以完全加载到内存中,并广播到所有分布式节点上的场景。它避免了shuffle和reduce操作,从而提升了效率。...

    Big-Data_hadoop_Yelp_Data_Analysis

    大数据--Hadoop--Yelp数据集文件夹包含5个问题和pdf,其中包含有关五个问题和Yelp数据集... 问题5:Map Side Join:将所有业务实体加载到分布式缓存中。 使用地图侧连接列出了位于特定区域中的企业的用户ID和评论文本。

    深入浅出数据仓库中SQL性能优化

    - **Join操作优化**:尽可能使用Map-side join代替Reduce-side join,以减少数据传输量。 - **分区优化**:合理利用分区表,减少不必要的数据扫描。 - **索引优化**:对于频繁查询的字段创建索引,加速查询过程。 - ...

    Hive_优化——深入浅出学Hive

    6. **合并 MapReduce 操作**:通过 Combine 或 Reduce-Side Join 减少中间步骤。 7. **Partition**:利用分区减少数据扫描,提高查询效率。 通过以上优化策略,可以显著提升 Hive 在大数据处理中的性能,使得数据...

    大数据MapReduce文件分发

    - 为了提高效率,Hadoop MapReduce支持多种优化技术,如Combiner(局部聚合)、Spill(磁盘溢出)和Map-side Join(基于广播的小表)等。 9. **Hadoop的缓存机制**: - MapReduce可以通过缓存机制将中间结果或...

    Data-Intensive+Text+Processing+with+MapReduce

    关系联接是数据库中的常见操作,在MapReduce中也有对应的方法实现,包括Reduce-Side Join、Map-Side Join和Memory-Backed Join。这些方法能够实现在分布式环境下的表连接,对于数据集成和分析具有重要意义。 ### ...

    hive简介共5页.pdf.zip

    5. **性能优化**:Hive提供了多种优化策略,如Joins优化(Map-side Join, Reduce-side Join)、Bucketing和Sorting,以及使用Tez或Spark作为执行引擎以提高性能。 6. **Hive与Hadoop的关系**:Hive是建立在Hadoop之...

Global site tag (gtag.js) - Google Analytics