看下如何在Map侧高效完成的join,因为在reduce侧进行join在shuffle阶段会消耗大量的时间,如果在Map端进行Join,那么就会节省大量的资源,当然,这也是有具体的应用场景的。
使用场景:一张表十分小、一张表很大。
用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。
模拟的测试数据如下:
小表: HDFS路径:hdfs://192.168.75.130:9000/root/dist/a.txt
1,三劫散仙,13575468248
2,凤舞九天,18965235874
3,忙忙碌碌,15986854789
4,少林寺方丈,15698745862
大表:HDFS路径:hdfs://192.168.75.130:9000/root/inputjoindb/b.txt
3,A,99,2013-03-05
1,B,89,2013-02-05
2,C,69,2013-03-09
3,D,56,2013-06-07
使用Hadoop1.2的版本进行实现,源码如下:
- package com.mapjoin;
- import java.io.BufferedReader;
- import java.io.FileReader;
- import java.io.IOException;
- import java.net.URI;
- import java.nio.charset.Charset;
- import java.nio.file.Files;
- import java.nio.file.Paths;
- import java.util.HashMap;
- import java.util.List;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- /***
- * 基于Map侧的复制链接
- *Hadoop技术交流群: 37693216
- * @author qindongliang
- ***/
- public class MapJoin {
- /***
- * 在Map侧setup方法里,取出缓存文件
- * 放入HashMap中进行join
- *
- *
- * **/
- public static class MMppe extends Mapper<Object, Text, Text, Text>{
- /**
- * 此map是存放小表数据用的
- * 注意小表的key是不重复的
- * 类似与数据库的外键表
- * 在这里的小表,就相当于一个外键表
- *
- *
- * **/
- private HashMap<String,String> map=new HashMap<String, String>();
- /**
- * 输出的Key
- * */
- private Text outputKey=new Text();
- /**
- * 输出的Value
- *
- * */
- private Text outputValue=new Text();
- //存放map的一行数据
- String mapInputStr=null;
- //存放主表的整个列值
- String mapInputStrs[] =null;
- //存放外键表(小表)的,除了链接键之外的整个其他列的拼接字符串
- String mapSecondPart=null;
- /**
- * Map的初始化方法
- *
- * 主要任务是将小表存入到一个Hash中
- * 格式,k=外键 === v=其他列拼接的字符串
- *
- *
- * **/
- @Override
- protected void setup(Context context)throws IOException, InterruptedException {
- //读取文件流
- BufferedReader br=null;
- String temp;
- // 获取DistributedCached里面 的共享文件
- Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration());
- for(Path p:path){
- if(p.getName().endsWith("a.txt")){
- br=new BufferedReader(new FileReader(p.toString()));
- //List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8"));
- while((temp=br.readLine())!=null){
- String ss[]=temp.split(",");
- map.put(ss[0], ss[1]+"\t"+ss[2]);//放入hash表中
- }
- }
- }
- //System.out.println("map完:"+map);
- }
- /**
- *
- * 在map里,直接读取数据,从另一个表的map里
- * 获取key进行join就可以了
- *
- *
- * ***/
- @Override
- protected void map(Object key, Text value,Context context)throws IOException, InterruptedException {
- //空值跳过
- if(value==null||value.toString().equals("")){
- return;
- }
- this.mapInputStr=value.toString();//读取输入的值
- this.mapInputStrs=this.mapInputStr.split(",");//拆分成数组
- this.mapSecondPart=map.get(mapInputStrs[0]);//获取外键表的部分
- //如果存在此key
- if(this.mapSecondPart!=null){
- this.outputKey.set(mapInputStrs[0]);//输出的key
- //输出的value是拼接的两个表的数据
- this.outputValue.set(this.mapSecondPart+"\t"+mapInputStrs[1]+"\t"+mapInputStrs[2]+"\t"+mapInputStrs[3]);
- //写入磁盘
- context.write(this.outputKey, this.outputValue);
- }
- }
- //驱动类
- public static void main(String[] args)throws Exception {
- JobConf conf=new JobConf(MMppe.class);
- //小表共享
- String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt";
- //添加到共享cache里
- DistributedCache.addCacheFile(new URI(bpath), conf);
- conf.set("mapred.job.tracker","192.168.75.130:9001");
- conf.setJar("tt.jar");
- Job job=new Job(conf, "2222222");
- job.setJarByClass(MapJoin.class);
- System.out.println("模式: "+conf.get("mapred.job.tracker"));;
- //设置Map和Reduce自定义类
- job.setMapperClass(MMppe.class);
- job.setNumReduceTasks(0);
- //设置Map端输出
- // job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- //设置Reduce端的输出
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setInputFormatClass(TextInputFormat.class);
- job.setOutputFormatClass(TextOutputFormat.class);
- FileSystem fs=FileSystem.get(conf);
- Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew3/");
- if(fs.exists(op)){
- fs.delete(op, true);
- System.out.println("存在此输出路径,已删除!!!");
- }
- FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb/b.txt"));
- FileOutputFormat.setOutputPath(job, op);
- System.exit(job.waitForCompletion(true)?0:1);
- }
- }
- }
package com.mapjoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /*** * 基于Map侧的复制链接 *Hadoop技术交流群: 37693216 * @author qindongliang ***/ public class MapJoin { /*** * 在Map侧setup方法里,取出缓存文件 * 放入HashMap中进行join * * * **/ public static class MMppe extends Mapper<Object, Text, Text, Text>{ /** * 此map是存放小表数据用的 * 注意小表的key是不重复的 * 类似与数据库的外键表 * 在这里的小表,就相当于一个外键表 * * * **/ private HashMap<String,String> map=new HashMap<String, String>(); /** * 输出的Key * */ private Text outputKey=new Text(); /** * 输出的Value * * */ private Text outputValue=new Text(); //存放map的一行数据 String mapInputStr=null; //存放主表的整个列值 String mapInputStrs[] =null; //存放外键表(小表)的,除了链接键之外的整个其他列的拼接字符串 String mapSecondPart=null; /** * Map的初始化方法 * * 主要任务是将小表存入到一个Hash中 * 格式,k=外键 === v=其他列拼接的字符串 * * * **/ @Override protected void setup(Context context)throws IOException, InterruptedException { //读取文件流 BufferedReader br=null; String temp; // 获取DistributedCached里面 的共享文件 Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration()); for(Path p:path){ if(p.getName().endsWith("a.txt")){ br=new BufferedReader(new FileReader(p.toString())); //List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8")); while((temp=br.readLine())!=null){ String ss[]=temp.split(","); map.put(ss[0], ss[1]+"\t"+ss[2]);//放入hash表中 } } } //System.out.println("map完:"+map); } /** * * 在map里,直接读取数据,从另一个表的map里 * 获取key进行join就可以了 * * * ***/ @Override protected void map(Object key, Text value,Context context)throws IOException, InterruptedException { //空值跳过 if(value==null||value.toString().equals("")){ return; } this.mapInputStr=value.toString();//读取输入的值 this.mapInputStrs=this.mapInputStr.split(",");//拆分成数组 this.mapSecondPart=map.get(mapInputStrs[0]);//获取外键表的部分 //如果存在此key if(this.mapSecondPart!=null){ this.outputKey.set(mapInputStrs[0]);//输出的key //输出的value是拼接的两个表的数据 this.outputValue.set(this.mapSecondPart+"\t"+mapInputStrs[1]+"\t"+mapInputStrs[2]+"\t"+mapInputStrs[3]); //写入磁盘 context.write(this.outputKey, this.outputValue); } } //驱动类 public static void main(String[] args)throws Exception { JobConf conf=new JobConf(MMppe.class); //小表共享 String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt"; //添加到共享cache里 DistributedCache.addCacheFile(new URI(bpath), conf); conf.set("mapred.job.tracker","192.168.75.130:9001"); conf.setJar("tt.jar"); Job job=new Job(conf, "2222222"); job.setJarByClass(MapJoin.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; //设置Map和Reduce自定义类 job.setMapperClass(MMppe.class); job.setNumReduceTasks(0); //设置Map端输出 // job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置Reduce端的输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileSystem fs=FileSystem.get(conf); Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew3/"); if(fs.exists(op)){ fs.delete(op, true); System.out.println("存在此输出路径,已删除!!!"); } FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb/b.txt")); FileOutputFormat.setOutputPath(job, op); System.exit(job.waitForCompletion(true)?0:1); } } }
运行日志:
- 模式: 192.168.75.130:9001
- 存在此输出路径,已删除!!!
- WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
- INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1
- INFO - NativeCodeLoader.<clinit>(43) | Loaded the native-hadoop library
- WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded
- INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404250130_0011
- INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0%
- INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404250130_0011
- INFO - Counters.log(585) | Counters: 19
- INFO - Counters.log(587) | Job Counters
- INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=9878
- INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0
- INFO - Counters.log(589) | Launched map tasks=1
- INFO - Counters.log(589) | Data-local map tasks=1
- INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=0
- INFO - Counters.log(587) | File Output Format Counters
- INFO - Counters.log(589) | Bytes Written=172
- INFO - Counters.log(587) | FileSystemCounters
- INFO - Counters.log(589) | HDFS_BYTES_READ=188
- INFO - Counters.log(589) | FILE_BYTES_WRITTEN=55746
- INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=172
- INFO - Counters.log(587) | File Input Format Counters
- INFO - Counters.log(589) | Bytes Read=74
- INFO - Counters.log(587) | Map-Reduce Framework
- INFO - Counters.log(589) | Map input records=4
- INFO - Counters.log(589) | Physical memory (bytes) snapshot=78663680
- INFO - Counters.log(589) | Spilled Records=0
- INFO - Counters.log(589) | CPU time spent (ms)=230
- INFO - Counters.log(589) | Total committed heap usage (bytes)=15728640
- INFO - Counters.log(589) | Virtual memory (bytes) snapshot=725975040
- INFO - Counters.log(589) | Map output records=4
- INFO - Counters.log(589) | SPLIT_RAW_BYTES=114
模式: 192.168.75.130:9001 存在此输出路径,已删除!!! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 INFO - NativeCodeLoader.<clinit>(43) | Loaded the native-hadoop library WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404250130_0011 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404250130_0011 INFO - Counters.log(585) | Counters: 19 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=9878 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=0 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=172 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | HDFS_BYTES_READ=188 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=55746 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=172 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=74 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map input records=4 INFO - Counters.log(589) | Physical memory (bytes) snapshot=78663680 INFO - Counters.log(589) | Spilled Records=0 INFO - Counters.log(589) | CPU time spent (ms)=230 INFO - Counters.log(589) | Total committed heap usage (bytes)=15728640 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=725975040 INFO - Counters.log(589) | Map output records=4 INFO - Counters.log(589) | SPLIT_RAW_BYTES=114
结果如下:
- 3 忙忙碌碌 15986854789 A 99 2013-03-05
- 1 三劫散仙 13575468248 B 89 2013-02-05
- 2 凤舞九天 18965235874 C 69 2013-03-09
- 3 忙忙碌碌 15986854789 D 56 2013-06-07
3 忙忙碌碌 15986854789 A 99 2013-03-05 1 三劫散仙 13575468248 B 89 2013-02-05 2 凤舞九天 18965235874 C 69 2013-03-09 3 忙忙碌碌 15986854789 D 56 2013-06-07
可以看出,结果是正确的,这种方式,非常高效,但通常,只适应于两个表里面,一个表非常大,而另外一张表,则非常小,究竟什么样的算小,基本上当你的内存能够,很轻松的装下,并不会对主程序造成很大影响的时候,我们就可以在Map端通过利用DistributeCached复制链接技术进行Join了。
相关推荐
总之,JoinMap 4.0是一款强大且易用的工具,它将复杂的遗传数据分析过程简化,帮助科研人员高效地进行遗传连锁图的构建和QTL定位。无论是学术研究还是产业应用,JoinMap 4.0都是遗传学领域不可或缺的伙伴。
MapJoin 的实现原理是,在 Map 阶段将来自不同数据源的数据进行标记,并将连接字段作为 Key,其他字段作为 Value,然后输出到 Reduce 阶段。在 Reduce 阶段,将标记的数据进行分组,最后将分组后的数据进行合并。 ...
JoinMap 4.0 是一款在遗传学领域广泛应用的软件,专为构建和分析遗传连锁图谱而设计。这款软件适用于农林牧等多个领域的研究,通过高效的数据处理和统计分析,帮助科研人员揭示基因定位、遗传变异规律以及物种进化等...
Joinmap 使用图文教程 Joinmap 是一个功能强大的遗传图绘制软件,广泛应用于植物和动物遗传学研究中。本教程将逐步指导用户使用 Joinmap 软件进行遗传图绘制。 新建项目和设置数据格式 在 Joinmap 软件中,点击 ...
Join Map 4能够帮助研究人员在基因组上定位这些QTL,从而揭示性状遗传的基础。该软件通过统计分析,识别出与性状变化显著关联的遗传标记,这些标记可以指示可能的QTL区域。 在使用Join Map 4时,首先需要导入群体的...
Map Side Join 减轻了 Reduce 阶段的压力,因为大部分 Join 工作在 Map 阶段已完成,适合于数据量不均衡且小数据集能完全装入内存的情况。 四、实现细节 在 Reduce Side Join 的实现中,Mapper 根据输入文件名处理...
本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码示例来阐述它们的工作原理。 1. Map JOIN: Map JOIN在Hadoop中主要应用于小表与大表的连接。小表的数据可以完全加载到内存中,而大...
连锁图谱构建,韩国忠南大学的破解工具,很好用
Joinmap,这里有详细的使用说明,不过是全英文的讲解。
用户可以通过提供的joinmap4.0软件进行安装和使用,按照软件提供的指南进行数据导入、参数设置和分析流程,以实现高效且准确的QTL定位分析。对于初学者,理解软件的每个模块和参数设定的重要性是至关重要的,而深入...
因此,在Map阶段完成之后,只需要对大表进行处理即可,减少了数据的Shuffle传输,从而加快join操作的速度。对于大数据量的处理,Map Join通常比Common Join要快得多。 3. AutoMap Join(自动Map Join) Hive还提供...
C语言头文件 MAPC语言头文件 MAPC语言头文件 MAPC语言头文件 MAPC语言头文件 MAPC语言头文件 MAPC语言头文件 MAPC语言头文件 MAPC语言头文件 MAPC语言头文件 MAPC语言头文件 MAPC语言头文件 MAPC语言头文件 MAPC语言...
- **Map-side Join**:如果所有表都能在内存中完全装载,可以在mapper端完成关联,避免网络传输,提高效率。 - **Reduce-side Join**:更通用的方法,所有表的数据在mapper阶段分别处理,然后在reducer阶段进行...
Mapside Join的核心思想是在Map阶段就完成JOIN操作,而不是等到Reduce阶段。它主要应用于一种情况:当一个数据集(称为"小表")可以完全加载到内存中,而另一个数据集(称为"大表")非常大,无法全部装入内存。小表...
在GameMap下,地图分割技术可能包含以下几个步骤: 1. 地形分析:根据游戏世界的地形特征,如山脉、河流、城市等,将地图划分为逻辑上的区域。 2. 切割算法:使用特定算法将大地图切割成合适的大小和形状,确保每个...
XML因其结构化和易于解析的特性,在数据交换和配置文件中广泛使用,而Map则作为Java中存储键值对的高效数据结构。在实际开发中,我们可能需要在XML和Map之间进行转换,以便于数据处理。本文将详细讲解如何使用Java...
例如,如果发现电机在某一工作区间效率较低,可能就需要调整工作模式或选择更高效的电机型号。此外,MAP还能辅助分析电机在不同负载条件下的热效应,这对于电机的冷却设计和长期稳定运行也至关重要。 "map.zip"这个...
自定义`Map`的一个挑战是实现高效的查找、插入和删除操作,这通常需要熟练掌握二叉搜索树的特性,尤其是红黑树的插入和旋转规则。此外,为了保证`Map`的线程安全,可能还需要考虑多线程环境下的同步机制,如互斥锁...