`
qindongliang1922
  • 浏览: 2193348 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117797
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:126218
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:60173
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71509
社区版块
存档分类
最新评论

Hadoop的Map Sied Join

阅读更多
散仙,在有关Hadoop的上篇博客里,给出了基于Reduce侧的表连接,今天,散仙,就再来看下如何在Map侧高效完成的join,因为在reduce侧进行join在shuffle阶段会消耗大量的时间,如果在Map端进行Join,那么就会节省大量的资源,当然,这也是有具体的应用场景的。

使用场景:一张表十分小、一张表很大。
   用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。

模拟的测试数据如下:

小表: HDFS路径:hdfs://192.168.75.130:9000/root/dist/a.txt
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862

大表:HDFS路径:hdfs://192.168.75.130:9000/root/inputjoindb/b.txt
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07

使用Hadoop1.2的版本进行实现,源码如下:

package com.mapjoin;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

 
 

/***
 * 基于Map侧的复制链接
 *Hadoop技术交流群: 37693216 
 * @author qindongliang 
 ***/
public class MapJoin {
	
	
	
	/***
	 * 在Map侧setup方法里,取出缓存文件
	 * 放入HashMap中进行join
	 * 
	 * 
	 * **/
	public static  class MMppe extends Mapper<Object, Text, Text, Text>{
		
		
		/**
		 * 此map是存放小表数据用的
		 * 注意小表的key是不重复的
		 * 类似与数据库的外键表
		 * 在这里的小表,就相当于一个外键表
		 * 
		 * 
		 * **/
		private HashMap<String,String> map=new HashMap<String, String>();
		
		/**
		 * 输出的Key
		 * */
		private Text outputKey=new Text();
		
		/**
		 * 输出的Value
		 * 
		 * */
		private Text outputValue=new Text();
		
		//存放map的一行数据
		String mapInputStr=null;
		//存放主表的整个列值
		String mapInputStrs[] =null;
		
		//存放外键表(小表)的,除了链接键之外的整个其他列的拼接字符串
		String mapSecondPart=null;
		
		
		/**
		 * Map的初始化方法
		 * 
		 * 主要任务是将小表存入到一个Hash中
		 * 格式,k=外键   ===  v=其他列拼接的字符串
		 * 
		 * 
		 * **/
		@Override
		protected void setup(Context context)throws IOException, InterruptedException {
			 
			//读取文件流
			BufferedReader br=null;
			String temp;
			// 获取DistributedCached里面 的共享文件
			Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());
			
			for(Path p:path){
				
				if(p.getName().endsWith("a.txt")){
					br=new BufferedReader(new FileReader(p.toString()));
					//List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8"));
					
					while((temp=br.readLine())!=null){
						String ss[]=temp.split(",");
						map.put(ss[0], ss[1]+"\t"+ss[2]);//放入hash表中
					}
				}
			}
			
			//System.out.println("map完:"+map);
			
			
		}
		
		
		
		/**
		 * 
		 * 在map里,直接读取数据,从另一个表的map里
		 * 获取key进行join就可以了
		 * 
		 * 
		 * ***/
		@Override
		protected void map(Object key, Text value,Context context)throws IOException, InterruptedException {
			 
			
			//空值跳过
			if(value==null||value.toString().equals("")){
				return;
			}
			
			this.mapInputStr=value.toString();//读取输入的值
			this.mapInputStrs=this.mapInputStr.split(",");//拆分成数组
			
			
			this.mapSecondPart=map.get(mapInputStrs[0]);//获取外键表的部分
			
			//如果存在此key
			if(this.mapSecondPart!=null){
				this.outputKey.set(mapInputStrs[0]);//输出的key
				//输出的value是拼接的两个表的数据
				this.outputValue.set(this.mapSecondPart+"\t"+mapInputStrs[1]+"\t"+mapInputStrs[2]+"\t"+mapInputStrs[3]);
				
				//写入磁盘
				context.write(this.outputKey, this.outputValue);
			}
			
			
			
			
			
			
			
		}
		
		
		
		//驱动类
		public static void main(String[] args)throws Exception {
			
		 
			JobConf conf=new JobConf(MMppe.class); 
			
			//小表共享
			String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt";
			//添加到共享cache里
			DistributedCache.addCacheFile(new URI(bpath), conf);
		
			
			 
			 conf.set("mapred.job.tracker","192.168.75.130:9001");
			conf.setJar("tt.jar");
			  
			  
			  Job job=new Job(conf, "2222222");
			 job.setJarByClass(MapJoin.class);
			 System.out.println("模式:  "+conf.get("mapred.job.tracker"));;
			 
			 
			 //设置Map和Reduce自定义类
			 job.setMapperClass(MMppe.class);
			 job.setNumReduceTasks(0);
			 
			 //设置Map端输出
			// job.setMapOutputKeyClass(Text.class);
			 job.setMapOutputValueClass(Text.class);
			 
			 //设置Reduce端的输出
			 job.setOutputKeyClass(Text.class);
			 job.setOutputValueClass(Text.class);
			 
		
			 job.setInputFormatClass(TextInputFormat.class);
			 job.setOutputFormatClass(TextOutputFormat.class);
			 
		 
			 FileSystem fs=FileSystem.get(conf);
			 
			 Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew3/");
			 
			 if(fs.exists(op)){
				 fs.delete(op, true);
				 System.out.println("存在此输出路径,已删除!!!");
			 }
			 
			 
		  FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb/b.txt"));
		  FileOutputFormat.setOutputPath(job, op);
		   
		  System.exit(job.waitForCompletion(true)?0:1);
			
			
		}
		
	}
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	

}

运行日志:
模式:  192.168.75.130:9001
存在此输出路径,已删除!!!
WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
INFO - NativeCodeLoader.<clinit>(43) | Loaded the native-hadoop library
WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404250130_0011
INFO - JobClient.monitorAndPrintJob(1393) |  map 0% reduce 0%
INFO - JobClient.monitorAndPrintJob(1393) |  map 100% reduce 0%
INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404250130_0011
INFO - Counters.log(585) | Counters: 19
INFO - Counters.log(587) |   Job Counters 
INFO - Counters.log(589) |     SLOTS_MILLIS_MAPS=9878
INFO - Counters.log(589) |     Total time spent by all reduces waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Total time spent by all maps waiting after reserving slots (ms)=0
INFO - Counters.log(589) |     Launched map tasks=1
INFO - Counters.log(589) |     Data-local map tasks=1
INFO - Counters.log(589) |     SLOTS_MILLIS_REDUCES=0
INFO - Counters.log(587) |   File Output Format Counters 
INFO - Counters.log(589) |     Bytes Written=172
INFO - Counters.log(587) |   FileSystemCounters
INFO - Counters.log(589) |     HDFS_BYTES_READ=188
INFO - Counters.log(589) |     FILE_BYTES_WRITTEN=55746
INFO - Counters.log(589) |     HDFS_BYTES_WRITTEN=172
INFO - Counters.log(587) |   File Input Format Counters 
INFO - Counters.log(589) |     Bytes Read=74
INFO - Counters.log(587) |   Map-Reduce Framework
INFO - Counters.log(589) |     Map input records=4
INFO - Counters.log(589) |     Physical memory (bytes) snapshot=78663680
INFO - Counters.log(589) |     Spilled Records=0
INFO - Counters.log(589) |     CPU time spent (ms)=230
INFO - Counters.log(589) |     Total committed heap usage (bytes)=15728640
INFO - Counters.log(589) |     Virtual memory (bytes) snapshot=725975040
INFO - Counters.log(589) |     Map output records=4
INFO - Counters.log(589) |     SPLIT_RAW_BYTES=114


结果如下:
3	忙忙碌碌	15986854789	A	99	2013-03-05
1	三劫散仙	13575468248	B	89	2013-02-05
2	凤舞九天	18965235874	C	69	2013-03-09
3	忙忙碌碌	15986854789	D	56	2013-06-07

可以看出,结果是正确的,这种方式,非常高效,但通常,只适应于两个表里面,一个表非常大,而另外一张表,则非常小,究竟什么样的算小,基本上当你的内存能够,很轻松的装下,并不会对主程序造成很大影响的时候,我们就可以在Map端通过利用DistributeCached复制链接技术进行Join了。

分享到:
评论

相关推荐

    hadoop Join代码(map join 和reduce join)

    本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...

    hadoop map-reduce turorial

    ### Hadoop Map-Reduce 教程详析 #### 目标与作用 Hadoop Map-Reduce框架是设计用于处理大规模数据集(多太字节级)的软件框架,它允许在大量廉价硬件集群上(可达数千节点)进行并行处理,确保了数据处理的可靠性...

    Hadoop Map-Reduce教程

    ### Hadoop Map-Reduce 教程 #### 一、Hadoop Map-Reduce 概述 Hadoop Map-Reduce 是一种编程模型,用于处理大规模数据集(通常为TB级或以上)。这种模型支持分布式计算,可以在成百上千台计算机上运行。Map-...

    19、Join操作map side join 和 reduce side join

    本文主要探讨了两种 MapReduce 中的 Join 实现:Map Side Join 和 Reduce Side Join。 一、Join 的概念 Join 操作在数据库中是非常常见的,它用于将来自两个或更多表的数据根据某些共享字段(即键)关联起来。在 ...

    远程调用执行Hadoop Map/Reduce

    本篇文章将深入探讨“远程调用执行Hadoop Map/Reduce”的概念、原理及其实现过程,同时结合标签“源码”和“工具”,我们将涉及到如何通过编程接口与Hadoop集群进行交互。 Hadoop MapReduce是一种编程模型,用于大...

    hadoop中map/reduce

    在大数据处理领域,Hadoop是不可或缺的核心框架,其核心组件MapReduce则是分布式计算的重要实现方式。MapReduce的设计理念源于Google的同名论文,它通过将大规模数据处理任务分解为两个阶段:Map(映射)和Reduce...

    Hadoop Map Reduce教程

    ### Hadoop MapReduce 教程知识点详解 #### 一、Hadoop MapReduce 概述 - **定义**:Hadoop MapReduce 是一个基于 Java 的分布式数据处理框架,它能够高效地处理大规模数据集。该框架将任务分解为一系列较小的任务...

    hadoop map reduce 中文教程

    每个案例都详细列出了实践步骤,包括如何编写 Map 和 Reduce 函数、如何配置 Hadoop 环境、如何运行 MapReduce 任务等。 #### 六、总结 Hadoop MapReduce 是一种非常强大的分布式数据处理工具,它通过简单的编程...

    Hadoop Map Reduce 教程.doc

    Hadoop Map Reduce 教程.doc

    hadoop_join.jar.zip_hadoop_hadoop query_reduce

    通常,Hadoop中的Join可以分为几种类型:Bucket Join、Sort-Merge Join、Replicated Join和Map-Side Join等。每种Join策略都有其适用场景和优缺点。 `hadoop_join.jar`是一个针对Hadoop环境设计的Join查询工具,它...

    hadoop map reduce hbase 一人一档

    标题“hadoop map reduce hbase 一人一档”揭示了这个系统的核心组成部分。Hadoop MapReduce是一种分布式计算框架,用于处理和存储大规模数据集。它通过将复杂任务分解为可并行处理的“映射”和“化简”阶段,使得在...

    hadoop join implement

    ### Hadoop Join Implementation:关键技术与优化策略 #### 摘要 在大数据处理领域,Hadoop作为主流的大规模数据处理框架之一,其MapReduce模型在并行数据处理方面展现出巨大优势。然而,对于数据间的连接操作(即...

    hadoop-0.21.0-datajoin.jar

    hadoop-0.21.0-datajoin.jar

    Windows平台下Hadoop的Map/Reduce开发

    在Windows平台上进行Hadoop的Map/Reduce开发可能会比在Linux环境下多一些挑战,但通过详细的步骤和理解Map/Reduce的工作机制,开发者可以有效地克服这些困难。以下是对标题和描述中涉及知识点的详细说明: **Hadoop...

    Hadoop源代码分析(MapTask)

    Hadoop源代码分析(MapTask) Hadoop的MapTask类是Hadoop MapReduce框架中的一部分,负责执行Map任务。MapTask类继承自Task类,是MapReduce框架中的一个重要组件。本文将对MapTask类的源代码进行分析,了解其内部...

    最高气温 map reduce hadoop 实例

    【标题】:“最高气温 map reduce hadoop 实例” 在大数据处理领域,Hadoop是一个不可或缺的开源框架,它专为分布式存储和处理大量数据而设计。本实例将介绍如何使用Hadoop MapReduce解决一个实际问题——找出给定...

    基于hadoop的好友推荐系统

    【标题】"基于Hadoop的好友推荐系统"揭示了如何利用大数据处理框架Hadoop来构建一个高效、可扩展的社交网络中的好友推荐功能。在现代的社交媒体平台中,好友推荐是提升用户粘性和互动性的重要手段,通过分析用户的...

    Analysis-of-Stock-Market-using-Hadoop-Map-Reduce:使用Hadoop Map Reduce分析股票市场

    使用Hadoop Map Reduce分析股票市场 如何运行程序? 首先在您的系统中安装Hadoop。 请按照以下步骤进行安装 然后开始执行给定的命令 cd hadoop-3.2.2 / sbin ./start-dfs.sh ./start-yarn.sh jps 导出HADOOP_...

    hadoop-datajoin-2.6.0.jar

    java运行依赖jar包

Global site tag (gtag.js) - Google Analytics