浏览 1436 次
锁定老帖子 主题:Hadoop的Map Sied Join
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2014-04-24
散仙,在有关Hadoop的上篇博客里,给出了基于Reduce侧的表连接,今天,散仙,就再来看下如何在Map侧高效完成的join,因为在reduce侧进行join在shuffle阶段会消耗大量的时间,如果在Map端进行Join,那么就会节省大量的资源,当然,这也是有具体的应用场景的。
使用场景:一张表十分小、一张表很大。 用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。 模拟的测试数据如下: 小表: HDFS路径:hdfs://192.168.75.130:9000/root/dist/a.txt 1,三劫散仙,13575468248 2,凤舞九天,18965235874 3,忙忙碌碌,15986854789 4,少林寺方丈,15698745862 大表:HDFS路径:hdfs://192.168.75.130:9000/root/inputjoindb/b.txt 3,A,99,2013-03-05 1,B,89,2013-02-05 2,C,69,2013-03-09 3,D,56,2013-06-07 使用Hadoop1.2的版本进行实现,源码如下: <pre name="code" class="java">package com.mapjoin; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /*** * 基于Map侧的复制链接 *Hadoop技术交流群: 37693216 * @author qindongliang ***/ public class MapJoin { /*** * 在Map侧setup方法里,取出缓存文件 * 放入HashMap中进行join * * * **/ public static class MMppe extends Mapper<Object, Text, Text, Text>{ /** * 此map是存放小表数据用的 * 注意小表的key是不重复的 * 类似与数据库的外键表 * 在这里的小表,就相当于一个外键表 * * * **/ private HashMap<String,String> map=new HashMap<String, String>(); /** * 输出的Key * */ private Text outputKey=new Text(); /** * 输出的Value * * */ private Text outputValue=new Text(); //存放map的一行数据 String mapInputStr=null; //存放主表的整个列值 String mapInputStrs[] =null; //存放外键表(小表)的,除了链接键之外的整个其他列的拼接字符串 String mapSecondPart=null; /** * Map的初始化方法 * * 主要任务是将小表存入到一个Hash中 * 格式,k=外键 === v=其他列拼接的字符串 * * * **/ @Override protected void setup(Context context)throws IOException, InterruptedException { //读取文件流 BufferedReader br=null; String temp; // 获取DistributedCached里面 的共享文件 Path path[]=DistributedCache.getLocalCacheFiles(context.getConfiguration()); for(Path p:path){ if(p.getName().endsWith("a.txt")){ br=new BufferedReader(new FileReader(p.toString())); //List<String> list=Files.readAllLines(Paths.get(p.getName()), Charset.forName("UTF-8")); while((temp=br.readLine())!=null){ String ss[]=temp.split(","); map.put(ss[0], ss[1]+"\t"+ss[2]);//放入hash表中 } } } //System.out.println("map完:"+map); } /** * * 在map里,直接读取数据,从另一个表的map里 * 获取key进行join就可以了 * * * ***/ @Override protected void map(Object key, Text value,Context context)throws IOException, InterruptedException { //空值跳过 if(value==null||value.toString().equals("")){ return; } this.mapInputStr=value.toString();//读取输入的值 this.mapInputStrs=this.mapInputStr.split(",");//拆分成数组 this.mapSecondPart=map.get(mapInputStrs[0]);//获取外键表的部分 //如果存在此key if(this.mapSecondPart!=null){ this.outputKey.set(mapInputStrs[0]);//输出的key //输出的value是拼接的两个表的数据 this.outputValue.set(this.mapSecondPart+"\t"+mapInputStrs[1]+"\t"+mapInputStrs[2]+"\t"+mapInputStrs[3]); //写入磁盘 context.write(this.outputKey, this.outputValue); } } //驱动类 public static void main(String[] args)throws Exception { JobConf conf=new JobConf(MMppe.class); //小表共享 String bpath="hdfs://192.168.75.130:9000/root/dist/a.txt"; //添加到共享cache里 DistributedCache.addCacheFile(new URI(bpath), conf); conf.set("mapred.job.tracker","192.168.75.130:9001"); conf.setJar("tt.jar"); Job job=new Job(conf, "2222222"); job.setJarByClass(MapJoin.class); System.out.println("模式: "+conf.get("mapred.job.tracker"));; //设置Map和Reduce自定义类 job.setMapperClass(MMppe.class); job.setNumReduceTasks(0); //设置Map端输出 // job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //设置Reduce端的输出 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileSystem fs=FileSystem.get(conf); Path op=new Path("hdfs://192.168.75.130:9000/root/outputjoindbnew3/"); if(fs.exists(op)){ fs.delete(op, true); System.out.println("存在此输出路径,已删除!!!"); } FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.75.130:9000/root/inputjoindb/b.txt")); FileOutputFormat.setOutputPath(job, op); System.exit(job.waitForCompletion(true)?0:1); } } } </pre> 运行日志: <pre name="code" class="java">模式: 192.168.75.130:9001 存在此输出路径,已删除!!! WARN - JobClient.copyAndConfigureFiles(746) | Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. INFO - FileInputFormat.listStatus(237) | Total input paths to process : 1 INFO - NativeCodeLoader.<clinit>(43) | Loaded the native-hadoop library WARN - LoadSnappy.<clinit>(46) | Snappy native library not loaded INFO - JobClient.monitorAndPrintJob(1380) | Running job: job_201404250130_0011 INFO - JobClient.monitorAndPrintJob(1393) | map 0% reduce 0% INFO - JobClient.monitorAndPrintJob(1393) | map 100% reduce 0% INFO - JobClient.monitorAndPrintJob(1448) | Job complete: job_201404250130_0011 INFO - Counters.log(585) | Counters: 19 INFO - Counters.log(587) | Job Counters INFO - Counters.log(589) | SLOTS_MILLIS_MAPS=9878 INFO - Counters.log(589) | Total time spent by all reduces waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Total time spent by all maps waiting after reserving slots (ms)=0 INFO - Counters.log(589) | Launched map tasks=1 INFO - Counters.log(589) | Data-local map tasks=1 INFO - Counters.log(589) | SLOTS_MILLIS_REDUCES=0 INFO - Counters.log(587) | File Output Format Counters INFO - Counters.log(589) | Bytes Written=172 INFO - Counters.log(587) | FileSystemCounters INFO - Counters.log(589) | HDFS_BYTES_READ=188 INFO - Counters.log(589) | FILE_BYTES_WRITTEN=55746 INFO - Counters.log(589) | HDFS_BYTES_WRITTEN=172 INFO - Counters.log(587) | File Input Format Counters INFO - Counters.log(589) | Bytes Read=74 INFO - Counters.log(587) | Map-Reduce Framework INFO - Counters.log(589) | Map input records=4 INFO - Counters.log(589) | Physical memory (bytes) snapshot=78663680 INFO - Counters.log(589) | Spilled Records=0 INFO - Counters.log(589) | CPU time spent (ms)=230 INFO - Counters.log(589) | Total committed heap usage (bytes)=15728640 INFO - Counters.log(589) | Virtual memory (bytes) snapshot=725975040 INFO - Counters.log(589) | Map output records=4 INFO - Counters.log(589) | SPLIT_RAW_BYTES=114 </pre> 结果如下: <pre name="code" class="java">3 忙忙碌碌 15986854789 A 99 2013-03-05 1 三劫散仙 13575468248 B 89 2013-02-05 2 凤舞九天 18965235874 C 69 2013-03-09 3 忙忙碌碌 15986854789 D 56 2013-06-07 </pre> 可以看出,结果是正确的,这种方式,非常高效,但通常,只适应于两个表里面,一个表非常大,而另外一张表,则非常小,究竟什么样的算小,基本上当你的内存能够,很轻松的装下,并不会对主程序造成很大影响的时候,我们就可以在Map端通过利用DistributeCached复制链接技术进行Join了。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |