`
qindongliang1922
  • 浏览: 2188854 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117670
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:126074
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:60034
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71402
社区版块
存档分类
最新评论

Hadoop的SemiJoin

阅读更多
散仙,在前两篇博客里,写了关于Hadoop的Map侧join
和Reduce的join,今天我们就来在看另外一种比较中立的Join。

SemiJoin,一般称为半链接,其原理是在Map侧过滤掉了一些不需要join的数据,从而大大减少了reduce的shffule时间,因为我们知道,如果仅仅使用Reduce侧连接,那么如果一份数据中,存在大量的无效数据,而这些数据,在join中,并不需要,但是因为没有做过预处理,所以这些数据,直到真正的执行reduce函数时,才被定义为无效数据,而这时候,前面已经执行过shuffle和merge和sort,所以这部分无效的数据,就浪费了大量的网络IO和磁盘IO,所以在整体来讲,这是一种降低性能的表现,如果存在的无效数据越多,那么这种趋势,就越明显。

之所以会出现半连接,这其实也是reduce侧连接的一个变种,只不过我们在Map侧,过滤掉了一些无效的数据,所以减少了reduce过程的shuffle时间,所以能获取一个性能的提升。

具体的原理也是利用DistributedCache将小表的的分发到各个节点上,在Map过程的setup函数里,读取缓存里面的文件,只将小表的链接键存储在hashset里,在map函数执行时,对每一条数据,进行判断,如果这条数据的链接键为空或者在hashset里面不存在,那么则认为这条数据,是无效的数据,所以这条数据,并不会被partition分区后写入磁盘,参与reduce阶段的shuffle和sort,所以在一定程序上,提升了join性能。需要注意的是如果
小表的key依然非常巨大,可能会导致我们的程序出现OOM的情况,那么这时候我们就需要考虑其他的链接方式了。

测试数据如下:
模拟小表数据:
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862


模拟大表数据:
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07
5,E,100,2013-09-09
6,H,200,2014-01-10

代码如下:

package com.semijoin;

import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/***
 * 
 * Hadoop1.2的版本
 * 
 * hadoop的半链接
 * 
 * SemiJoin实现
 * 
 * @author qindongliang
 * 
 *    大数据交流群:376932160
 *  搜索技术交流群:324714439
 * 
 * 
 * 
 * **/
public class Semjoin {
	
	
	
	/**
	 * 
	 * 
	 * 自定义一个输出实体
	 * 
	 * **/
	private static class CombineEntity implements WritableComparable<CombineEntity>{

		
		private Text joinKey;//连接key
		private Text flag;//文件来源标志
		private Text secondPart;//除了键外的其他部分的数据
		
		
		public CombineEntity() {
			// TODO Auto-generated constructor stub
			this.joinKey=new Text();
			this.flag=new Text();
			this.secondPart=new Text();
		}
		
		public Text getJoinKey() {
			return joinKey;
		}

		public void setJoinKey(Text joinKey) {
			this.joinKey = joinKey;
		}

		public Text getFlag() {
			return flag;
		}

		public void setFlag(Text flag) {
			this.flag = flag;
		}

		public Text getSecondPart() {
			return secondPart;
		}

		public void setSecondPart(Text secondPart) {
			this.secondPart = secondPart;
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			this.joinKey.readFields(in);
			this.flag.readFields(in);
			this.secondPart.readFields(in);
			
		}

		@Override
		public void write(DataOutput out) throws IOException {
			this.joinKey.write(out);
			this.flag.write(out);
			this.secondPart.write(out);
			
		}

		@Override
		public int compareTo(CombineEntity o) {
			// TODO Auto-generated method stub
			return this.joinKey.compareTo(o.joinKey);
		}
		
		
		
	}
	
	
	
	
	private static class JMapper extends Mapper<LongWritable, Text, Text, CombineEntity>{
		
		private CombineEntity combine=new CombineEntity();
		private Text flag=new Text();
		private  Text joinKey=new Text();
		private Text secondPart=new Text();
		/**
		 * 存储小表的key
		 * 
		 * 
		 * */
		private HashSet<String> joinKeySet=new HashSet<String>();
		
		
		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
		 
			//读取文件流
			BufferedReader br=null;
			String temp;
			// 获取DistributedCached里面 的共享文件
			Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());
			
			for(Path p:path){
				
				if(p.getName().endsWith("a.txt")){
					br=new BufferedReader(new FileReader(p.toString()));
					//List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8"));
					
					while((temp=br.readLine())!=null){
						String ss[]=temp.split(",");
						//map.put(ss[0], ss[1]+"\t"+ss[2]);//放入hash表中
						joinKeySet.add(ss[0]);//加入小表的key
					}
				}
			}
			
			
		}
		
		
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
		
			   //获得文件输入路径
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
		
            if(pathName.endsWith("a.txt")){
            	
            	String  valueItems[]=value.toString().split(",");
            	
            	
            	/**
            	 * 在这里过滤必须要的连接字符
            	 * 
            	 * */
            	if(joinKeySet.contains(valueItems[0])){
            		//设置标志位
                	flag.set("0");   
                	//设置链接键
                	joinKey.set(valueItems[0]);          
                	//设置第二部分
                	secondPart.set(valueItems[1]+"\t"+valueItems[2]);
                	
                	//封装实体
                	combine.setFlag(flag);//标志位
                	combine.setJoinKey(joinKey);//链接键
                	combine.setSecondPart(secondPart);//其他部分
                	
                	 //写出
                	context.write(combine.getJoinKey(), combine);	
            	}else{
            		System.out.println("a.txt里");
            		System.out.println("在小表中无此记录,执行过滤掉!");
            		for(String v:valueItems){
            			System.out.print(v+"   ");
            		}
            		
            		return ;
            		
            	}
            	
            	
            	
            }else if(pathName.endsWith("b.txt")){
            	String  valueItems[]=value.toString().split(",");
            	
            	/**
            	 * 
            	 * 判断是否在集合中
            	 * 
            	 * */
            	if(joinKeySet.contains(valueItems[0])){
            		

                	//设置标志位
                	flag.set("1");   
                	
                	//设置链接键
                	joinKey.set(valueItems[0]);
          
                	//设置第二部分注意不同的文件的列数不一样
                	secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]);
                	
                	//封装实体
                	combine.setFlag(flag);//标志位
                	combine.setJoinKey(joinKey);//链接键
                	combine.setSecondPart(secondPart);//其他部分
                	
                	 //写出
                	context.write(combine.getJoinKey(), combine);
            		
            		
            	}else{            		
            		//执行过滤 ......
            		System.out.println("b.txt里");
            		System.out.println("在小表中无此记录,执行过滤掉!");
            		for(String v:valueItems){
            			System.out.print(v+"   ");
            		}
            		
            		return ;
            		
            		
            	}
            	
            
            	
            	
            }
            
            
			
			
			 
		 
			
		}
		
	}
	
	
	private static class JReduce extends Reducer<Text, CombineEntity, Text, Text>{
		
		
		//存储一个分组中左表信息
		private List<Text> leftTable=new ArrayList<Text>();
		//存储一个分组中右表信息
		private List<Text> rightTable=new ArrayList<Text>();
		
		private Text secondPart=null;
		
		private Text output=new Text();
		
		
		 
		//一个分组调用一次
		@Override
		protected void reduce(Text key, Iterable<CombineEntity> values,Context context)
				throws IOException, InterruptedException {
			 leftTable.clear();//清空分组数据
			 rightTable.clear();//清空分组数据
			 
			 
			 /**
			  * 将不同文件的数据,分别放在不同的集合
			  * 中,注意数据量过大时,会出现
			  * OOM的异常
			  * 
			  * **/
			 
			 for(CombineEntity ce:values){
				 
				 this.secondPart=new Text(ce.getSecondPart().toString());
				 
				 
				 //左表
				 
				 if(ce.getFlag().toString().trim().equals("0")){
					 leftTable.add(secondPart);
					 
				 }else if(ce.getFlag().toString().trim().equals("1")){
					 
					 rightTable.add(secondPart);
					 
				 }
				 
				 
				 
				 
			 }
			 
			 //=====================
			 for(Text left:leftTable){
				 
				 for(Text right:rightTable){
					 
					 output.set(left+"\t"+right);//连接左右数据
					 context.write(key, output);//输出
				 }
				 
			 }
			 
			 
			 
			
		}
		
	}
	
	
	
	
	
	
	
	
	public static void main(String[] args)throws Exception {
		
		 
	
	
		 //Job job=new Job(conf,"myjoin");
		 JobConf conf=new JobConf(Semjoin.class); 
		   conf.set("mapred.job.tracker","192.168.75.130:9001");
		    conf.setJar("tt.jar");
		  
		  
			//小表共享
			String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt";
			//添加到共享cache里
		DistributedCache.addCacheFile(new URI(bpath), conf);
		
		  Job job=new Job(conf, "aaaaa");
		 job.setJarByClass(Semjoin.class);
		 System.out.println("模式:  "+conf.get("mapred.job.tracker"));;
		 
		 
		 //设置Map和Reduce自定义类
		 job.setMapperClass(JMapper.class);
		 job.setReducerClass(JReduce.class);
		 
		 //设置Map端输出
		 job.setMapOutputKeyClass(Text.class);
		 job.setMapOutputValueClass(CombineEntity.class);
		 
		 //设置Reduce端的输出
		 job.setOutputKeyClass(Text.class);
		 job.setOutputValueClass(Text.class);
		 
	
		 job.setInputFormatClass(TextInputFormat.class);
		 job.setOutputFormatClass(TextOutputFormat.class);
		 
	 
		 FileSystem fs=FileSystem.get(conf);
		 
		 Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew4");
		 
		 if(fs.exists(op)){
			 fs.delete(op, true);
			 System.out.println("存在此输出路径,已删除!!!");
		 }
		 
		 
	  FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb"));
	  FileOutputFormat.setOutputPath(job, op);
	   
	  System.exit(job.waitForCompletion(true)?0:1);
	  
	  
		 
		 
		 
		 
		 
		 
		 
				
				 
		
		
		
	}
	
	
	

}

运行日志如下:
模式:  192.168.75.130:9001
存在此输出路径,已删除!!!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 2
WARN - NativeCodeLoader.<clinit>(52) | Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404260312_0002
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 50% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 33%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 100%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404260312_0002
INFO - Counters.log(585) | Counters: 29
INFO - Counters.log(587) |   Job Counters 
INFO - Counters.log(589) |     Launched reduce tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=12445
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=2
INFO - Counters.log(589) |     Data-local map tasks=2
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=9801
INFO - Counters.log(587) |   File Output Format Counters 
INFO - Counters.log(589) |     Bytes Written=172
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     FILE_BYTES_READ=237
INFO - Counters.log(589) |     HDFS_BYTES_READ=455
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=169503
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172
INFO - Counters.log(587) |   File Input Format Counters 
INFO - Counters.log(589) |     Bytes Read=227
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map output materialized bytes=243
INFO - Counters.log(589) |     Map input records=10
INFO - Counters.log(589) |     Reduce shuffle bytes=243
INFO - Counters.log(589) |     Spilled Records=16
INFO - Counters.log(589) |     Map output bytes=215
INFO - Counters.log(589) |     Total committed heap usage (bytes)=336338944
INFO - Counters.log(589) |     CPU time spent (ms)=1770
INFO - Counters.log(589) |     Combine input records=0
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=228
INFO - Counters.log(589) |     Reduce input records=8
INFO - Counters.log(589) |     Reduce input groups=4
INFO - Counters.log(589) |     Combine output records=0
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=442564608
INFO - Counters.log(589) |     Reduce output records=4
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=2184306688
INFO - Counters.log(589) |     Map output records=8

在map侧过滤的数据,在50030中查看的截图如下:



运行结果如下所示:
1	三劫散仙	13575468248	B	89	2013-02-05
2	凤舞九天	18965235874	C	69	2013-03-09
3	忙忙碌碌	15986854789	A	99	2013-03-05
3	忙忙碌碌	15986854789	D	56	2013-06-07


至此,这个半链接就完成了,结果正确,在hadoop的几种join方式里,只有在Map侧的链接比较高效,但也需要根据具体的实际情况,进行选择。

  • 大小: 119.9 KB
分享到:
评论

相关推荐

    5堂Hadoop必修课,不会这些勿称高手

    4. 掌握MapReduce的Join算法,包括Reduce Join、Map Join、Semi Join和Bloom Filter。 5. 学习Zookeeper的基本原理和架构,以及如何在分布式环境中部署Zookeeper,包括配置管理Hadoop集群。 6. 掌握Hadoop和Spark...

    6.Hadoop入门进阶课程_第6周_MapReduce应用案例.pdf

    通过一个求各个部门的总工资的测试例子,探讨了MapReduce的join操作,包括reduce-side join、map-side join和semi join。Reduce-side join在shuffle阶段会进行大量数据传输,导致网络IO效率低下,而map-side join...

    Hive查询sql left join exists

    本文主要关注LEFT JOIN和EXISTS子句的使用,这两个都是数据查询中常见的技术,特别是在大数据处理领域,如Hadoop环境下的Hive。 首先,LEFT JOIN(左外连接)是连接两个表的一种方式,返回所有左表(在FROM子句中...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    技术点56 通过MapReduce 对Bloom filter 进行semi-join 7.3 本章小结 8 结合R 和Hadoop 进行数据统计 8.1 比较R 和MapReduce 集成的几种方法 8.2 R 基础知识 8.3 R 和Streaming 8.3.1 Streaming 和...

    hive优化(ppt)

    在Hive中,通过使用`LEFT SEMI JOIN`而非`LEFT OUTER JOIN`,可以在不包含NULL值的情况下快速找到匹配项,从而减少数据传输和处理的时间,特别是在处理大规模数据集时效果显著。 ### 存储格式和压缩 存储格式和...

    HIVE-SQL开发规范.docx

    - 选择合适的JOIN类型,如LEFT SEMI JOIN比INNER JOIN更高效。 - 使用CBO(Cost-Based Optimizer)进行成本估算,自动选择最优执行路径。 2.6 安全性 遵循企业安全策略,使用Hive权限控制,限制用户对数据的访问。 ...

    部分普通sql查询在hive中的实现方式

    - **背景**:Hive不支持标准SQL中的`IN`和`EXISTS`关键字,但可以通过`LEFT SEMI JOIN`来模拟这些功能。 - **示例**:标准SQL中的`IN`关键字查询如下: ```sql SELECT a.key, a.value FROM a WHERE a.key IN ...

    Hive基本操作命令大全

    * in 查询:`SELECT * FROM things LEFT SEMI JOIN sales ON (sales.id = things.id);` * Map 连接:`SELECT /*+ MAPJOIN(things) */ sales.*, things.* FROM sales JOIN things ON (sales.id = things.id);` 其他...

    hive操作实战

    **左半连接(`LEFT SEMI JOIN`)** ``` LEFT SEMI JOIN 的运行结果: Total MapReduce CPU Time Spent: 1 seconds 30 msec OK zs2 2 zs3 3 ``` **解析** 左半连接返回的是左表中与右表匹配的所有记录。在这个例子...

    罗炳森-SQL等价改写核心思想

    半连接(SEMI JOIN)是另一种关键的SQL操作,它仅返回主表中与子表有匹配关系的数据。常见的半连接形式包括使用IN或EXISTS子查询。例如,查询部门(dept)中存在员工(emp)的部门信息,可以使用两种方式表达:IN子...

    Hive编程指南+HIVE从入门到精通+Hive高级编程+Apache Oozie

    3. **Joins优化**: 理解不同类型的JOIN(Inner、Outer、Semi、Anti等),并优化JOIN操作以减少资源消耗。 4. **Hive与Spark集成**: 利用Spark的内存计算能力加速Hive查询。 5. **数据安全与权限**: 实现Hive的用户...

    SQL等价改写核心思想概述.pptx

    2. **半连接(SEMI JOIN)**: - 半连接只返回主表中与子表有匹配记录的数据,常用于`IN`或`EXISTS`子查询。这种连接方式可以减少数据传输量,提高效率。半连接的等价改写可以通过内连接实现,如将`IN`子查询转换为...

    hive 简明教程

    - Join操作:包括`INNER JOIN`、`LEFT JOIN`、`RIGHT JOIN`、`FULL JOIN`以及`LEFT SEMI-JOIN`。 Hive还提供了排序操作,支持`ORDER BY`、`SORT BY`、`DISTRIBUTE BY`和`CLUSTER BY`等排序方式,以适应不同的业务...

    SQL等价改写核心思想概述.pdf

    2. 半连接(SEMI JOIN): - 半连接只返回主表的数据,而且这些数据必须与子表有关联。它可以等价于IN或EXISTS子查询。例如,查询部门(dept)中存在员工(emp)的记录,可以使用IN或EXISTS来实现半连接。 3. 反...

    Hive教程.pdf

    - **Left Semi-Join**: - 使用EXISTS子查询实现左半连接。 - `SELECT t1.* FROM table1 t1 WHERE EXISTS (SELECT 1 FROM table2 t2 WHERE t1.column = t2.column);` #### 七、排序 - **OrderBy**: - `SELECT * ...

    Hive基本命令整理

    Hive基本命令整理 作为大数据处理的重要工具,Hive 提供了许多实用的命令来帮助开发人员高效地处理和分析数据。...hive&gt; SELECT * FROM things LEFT SEMI JOIN sales ON (things.id = sales.id); ```

    2-7+HBase平台建设实践.pdf

    Phoenix支持二级索引,能并行执行操作,将计算推向数据端,进行谓词下推和semi-join优化,适用于处理海量数据和点查场景。 4. **OLAP分析 - Kylin** Apache Kylin是一个分布式的OLAP分析工具,它可以与HBase和...

    基于Facebook的Hive开发

    - **半连接(Semi-Join)** - **映射连接(Map join/bucket map join/sort merge map join)** - **用户定义函数(UDF/UDAF/UDTF)** - **侧视图(Lateral view)** - **子查询(Subqueries in from clause)** - **...

Global site tag (gtag.js) - Google Analytics