`
乡里伢崽
  • 浏览: 111878 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Hadoop中MapReduce多种join实现实例分析

 
阅读更多
原文:http://database.51cto.com/art/201410/454277.htm

一、概述

对于RDBMS中的join操作大伙一定非常熟悉,写sql的时候要十分注意细节,稍有差池就会耗时巨久造成很大的性能瓶颈,而在Hadoop中使用MapReduce框架进行join的操作时同样耗时,但是由于hadoop的分布式设计理念的特殊性,因此对于这种join操作同样也具备了一定的特殊性。本文主要对MapReduce框架对表之间的join操作的几种实现方式进行详细分析,并且根据我在实际开发过程中遇到的实际例子来进行进一步的说明。

二、实现原理

1、在Reudce端进行连接。


在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:

Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。原理非常简单,下面来看一个实例:

(1)自定义一个value返回类型:
package com.mr.reduceSizeJoin;   
import java.io.DataInput;   
import java.io.DataOutput;   
import java.io.IOException;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.io.WritableComparable;   
public class CombineValues implements WritableComparable<CombineValues>{   
    //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);   
    private Text joinKey;//链接关键字   
    private Text flag;//文件来源标志   
    private Text secondPart;//除了链接键外的其他部分   
    public void setJoinKey(Text joinKey) {   
        this.joinKey = joinKey;   
    }   
    public void setFlag(Text flag) {   
        this.flag = flag;   
    }   
    public void setSecondPart(Text secondPart) {   
        this.secondPart = secondPart;   
    }   
    public Text getFlag() {   
        return flag;   
    }   
    public Text getSecondPart() {   
        return secondPart;   
    }   
    public Text getJoinKey() {   
        return joinKey;   
    }   
    public CombineValues() {   
        this.joinKey =  new Text();   
        this.flag = new Text();   
        this.secondPart = new Text();   
    }
 
    @Override 
    public void write(DataOutput out) throws IOException {   
        this.joinKey.write(out);   
        this.flag.write(out);   
        this.secondPart.write(out);   
    }   
    @Override 
    public void readFields(DataInput in) throws IOException {   
        this.joinKey.readFields(in);   
        this.flag.readFields(in);   
        this.secondPart.readFields(in);   
    }   
    @Override 
    public int compareTo(CombineValues o) {   
        return this.joinKey.compareTo(o.getJoinKey());   
    }   
    @Override 
    public String toString() {   
        // TODO Auto-generated method stub   
        return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";   
    }   
} 


(2)map、reduce主体代码

package com.mr.reduceSizeJoin;   
import java.io.IOException;   
import java.util.ArrayList;   
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.conf.Configured;   
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.mapreduce.Job;   
import org.apache.hadoop.mapreduce.Mapper;   
import org.apache.hadoop.mapreduce.Reducer;   
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
import org.apache.hadoop.util.Tool;   
import org.apache.hadoop.util.ToolRunner;   
import org.slf4j.Logger;   
import org.slf4j.LoggerFactory;   
/**   
 * @author zengzhaozheng   
 * 用途说明:   
 * reudce side join中的left outer join   
 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段   
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)   
 * tb_dim_city.dat文件内容,分隔符为"|":   
 * id     name  orderid  city_code  is_show   
 * 0       其他        9999     9999         0   
 * 1       长春        1        901          1   
 * 2       吉林        2        902          1   
 * 3       四平        3        903          1   
 * 4       松原        4        904          1   
 * 5       通化        5        905          1   
 * 6       辽源        6        906          1   
 * 7       白城        7        907          1   
 * 8       白山        8        908          1   
 * 9       延吉        9        909          1   
 * -------------------------风骚的分割线-------------------------------   
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
 * tb_user_profiles.dat文件内容,分隔符为"|":   
 * userID   network     flow    cityID   
 * 1           2G       123      1   
 * 2           3G       333      2   
 * 3           3G       555      1   
 * 4           2G       777      3   
 * 5           3G       666      4   
 *   
 * -------------------------风骚的分割线-------------------------------   
 *  结果:   
 *  1   长春  1   901 1   1   2G  123   
 *  1   长春  1   901 1   3   3G  555   
 *  2   吉林  2   902 1   2   3G  333   
 *  3   四平  3   903 1   4   2G  777   
 *  4   松原  4   904 1   5   3G  666   
 */ 
public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{   
    private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);   
    public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   
        private CombineValues combineValues = new CombineValues();   
        private Text flag = new Text();   
        private Text joinKey = new Text();   
        private Text secondPart = new Text();   
        @Override 
        protected void map(Object key, Text value, Context context)   
                throws IOException, InterruptedException {   
            //获得文件输入路径   
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
            //数据来自tb_dim_city.dat文件,标志即为"0"   
            if(pathName.endsWith("tb_dim_city.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //过滤格式错误的记录   
                if(valueItems.length != 5){   
                    return;   
                }   
                flag.set("0");   
                joinKey.set(valueItems[0]);   
                secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
                combineValues.setFlag(flag);   
                combineValues.setJoinKey(joinKey);   
                combineValues.setSecondPart(secondPart);   
                context.write(combineValues.getJoinKey(), combineValues);
 
                }//数据来自于tb_user_profiles.dat,标志即为"1"   
            else if(pathName.endsWith("tb_user_profiles.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //过滤格式错误的记录   
                if(valueItems.length != 4){   
                    return;   
                }   
                flag.set("1");   
                joinKey.set(valueItems[3]);   
                secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
                combineValues.setFlag(flag);   
                combineValues.setJoinKey(joinKey);   
                combineValues.setSecondPart(secondPart);   
                context.write(combineValues.getJoinKey(), combineValues);   
            }   
        }   
    }   
    public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   
        //存储一个分组中的左表信息   
        private ArrayList<Text> leftTable = new ArrayList<Text>();   
        //存储一个分组中的右表信息   
        private ArrayList<Text> rightTable = new ArrayList<Text>();   
        private Text secondPar = null;   
        private Text output = new Text();   
        /**   
         * 一个分组调用一次reduce函数   
         */ 
        @Override 
        protected void reduce(Text key, Iterable<CombineValues> value, Context context)   
                throws IOException, InterruptedException {   
            leftTable.clear();   
            rightTable.clear();   
            /**   
             * 将分组中的元素按照文件分别进行存放   
             * 这种方法要注意的问题:   
             * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,   
             * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最   
             * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。   
             */ 
            for(CombineValues cv : value){   
                secondPar = new Text(cv.getSecondPart().toString());   
                //左表tb_dim_city   
                if("0".equals(cv.getFlag().toString().trim())){   
                    leftTable.add(secondPar);   
                }   
                //右表tb_user_profiles   
                else if("1".equals(cv.getFlag().toString().trim())){   
                    rightTable.add(secondPar);   
                }   
            }   
            logger.info("tb_dim_city:"+leftTable.toString());   
            logger.info("tb_user_profiles:"+rightTable.toString());   
            for(Text leftPart : leftTable){   
                for(Text rightPart : rightTable){   
                    output.set(leftPart+ "\t" + rightPart);   
                    context.write(key, output);   
                }   
            }   
        }   
    }   
    @Override 
    public int run(String[] args) throws Exception {   
          Configuration conf=getConf(); //获得配置文件对象   
            Job job=new Job(conf,"LeftOutJoinMR");   
            job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   
            FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
            job.setMapperClass(LeftOutJoinMapper.class);   
            job.setReducerClass(LeftOutJoinReducer.class);
            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格格式
 
            //设置map的输出key和value类型   
            job.setMapOutputKeyClass(Text.class);   
            job.setMapOutputValueClass(CombineValues.class);
 
            //设置reduce的输出key和value类型   
            job.setOutputKeyClass(Text.class);   
            job.setOutputValueClass(Text.class);   
            job.waitForCompletion(true);   
            return job.isSuccessful()?0:1;   
    }   
    public static void main(String[] args) throws IOException,   
            ClassNotFoundException, InterruptedException {   
        try {   
            int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);   
            System.exit(returnCode);   
        } catch (Exception e) {   
            // TODO Auto-generated catch block   
            logger.error(e.getMessage());   
        }   
    }   
} 


   其中具体的分析以及数据的输出输入请看代码中的注释已经写得比较清楚了,这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。

2、在Map端进行连接。

使用场景:一张表十分小、一张表很大。

用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。

直接上代码,比较简单:

package com.mr.mapSideJoin;   
import java.io.BufferedReader;   
import java.io.FileReader;   
import java.io.IOException;   
import java.util.HashMap;   
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.conf.Configured;   
import org.apache.hadoop.filecache.DistributedCache;   
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.mapreduce.Job;   
import org.apache.hadoop.mapreduce.Mapper;   
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
import org.apache.hadoop.util.Tool;   
import org.apache.hadoop.util.ToolRunner;   
import org.slf4j.Logger;   
import org.slf4j.LoggerFactory;   
/**   
 * @author zengzhaozheng   
 *   
 * 用途说明:   
 * Map side join中的left outer join   
 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段   
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),   
 * 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|":   
 * id     name  orderid  city_code  is_show   
 * 0       其他        9999     9999         0   
 * 1       长春        1        901          1   
 * 2       吉林        2        902          1   
 * 3       四平        3        903          1   
 * 4       松原        4        904          1   
 * 5       通化        5        905          1   
 * 6       辽源        6        906          1   
 * 7       白城        7        907          1   
 * 8       白山        8        908          1   
 * 9       延吉        9        909          1   
 * -------------------------风骚的分割线-------------------------------   
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
 * tb_user_profiles.dat文件内容,分隔符为"|":   
 * userID   network     flow    cityID   
 * 1           2G       123      1   
 * 2           3G       333      2   
 * 3           3G       555      1   
 * 4           2G       777      3   
 * 5           3G       666      4   
 * -------------------------风骚的分割线-------------------------------   
 *  结果:   
 *  1   长春  1   901 1   1   2G  123   
 *  1   长春  1   901 1   3   3G  555   
 *  2   吉林  2   902 1   2   3G  333   
 *  3   四平  3   903 1   4   2G  777   
 *  4   松原  4   904 1   5   3G  666   
 */ 
public class MapSideJoinMain extends Configured implements Tool{   
    private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);   
    public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {
 
        private HashMap<String,String> city_info = new HashMap<String, String>();   
        private Text outPutKey = new Text();   
        private Text outPutValue = new Text();   
        private String mapInputStr = null;   
        private String mapInputSpit[] = null;   
        private String city_secondPart = null;   
        /**   
         * 此方法在每个task开始之前执行,这里主要用作从DistributedCache   
         * 中取到tb_dim_city文件,并将里边记录取出放到内存中。   
         */ 
        @Override 
        protected void setup(Context context)   
                throws IOException, InterruptedException {   
            BufferedReader br = null;   
            //获得当前作业的DistributedCache相关文件   
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
            String cityInfo = null;   
            for(Path p : distributePaths){   
                if(p.toString().endsWith("tb_dim_city.dat")){   
                    //读缓存文件,并放到mem中   
                    br = new BufferedReader(new FileReader(p.toString()));   
                    while(null!=(cityInfo=br.readLine())){   
                        String[] cityPart = cityInfo.split("\\|",5);   
                        if(cityPart.length ==5){   
                            city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);   
                        }   
                    }   
                }   
            }   
        }
 
        /**   
         * Map端的实现相当简单,直接判断tb_user_profiles.dat中的   
         * cityID是否存在我的map中就ok了,这样就可以实现Map Join了   
         */ 
        @Override 
        protected void map(Object key, Text value, Context context)   
                throws IOException, InterruptedException {   
            //排掉空行   
            if(value == null || value.toString().equals("")){   
                return;   
            }   
            mapInputStr = value.toString();   
            mapInputSpit = mapInputStr.split("\\|",4);   
            //过滤非法记录   
            if(mapInputSpit.length != 4){   
                return;   
            }   
            //判断链接字段是否在map中存在   
            city_secondPart = city_info.get(mapInputSpit[3]);   
            if(city_secondPart != null){   
                this.outPutKey.set(mapInputSpit[3]);   
                this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);   
                context.write(outPutKey, outPutValue);   
            }   
        }   
    }   
    @Override 
    public int run(String[] args) throws Exception {   
            Configuration conf=getConf(); //获得配置文件对象   
            DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件   
            Job job=new Job(conf,"MapJoinMR");   
            job.setNumReduceTasks(0);
 
            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   
            FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径
 
            job.setJarByClass(MapSideJoinMain.class);   
            job.setMapperClass(LeftOutJoinMapper.class);
 
            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
 
            //设置map的输出key和value类型   
            job.setMapOutputKeyClass(Text.class);
 
            //设置reduce的输出key和value类型   
            job.setOutputKeyClass(Text.class);   
            job.setOutputValueClass(Text.class);   
            job.waitForCompletion(true);   
            return job.isSuccessful()?0:1;   
    }   
    public static void main(String[] args) throws IOException,   
            ClassNotFoundException, InterruptedException {   
        try {   
            int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);   
            System.exit(returnCode);   
        } catch (Exception e) {   
            // TODO Auto-generated catch block   
            logger.error(e.getMessage());   
        }   
    }   
} 


这里说说DistributedCache。DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性,接下来有用再整理研究一下写一篇blog,这里就不详细说了。

另外还有一种比较变态的Map Join方式,就是结合HBase来做Map Join操作。这种方式完全可以突破内存的控制,使你毫无忌惮的使用Map Join,而且效率也非常不错。


3、SemiJoin。

SemiJoin就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。看代码:

package com.mr.SemiJoin;   
import java.io.BufferedReader;   
import java.io.FileReader;   
import java.io.IOException;   
import java.util.ArrayList;   
import java.util.HashSet;   
import org.apache.hadoop.conf.Configuration;   
import org.apache.hadoop.conf.Configured;   
import org.apache.hadoop.filecache.DistributedCache;   
import org.apache.hadoop.fs.Path;   
import org.apache.hadoop.io.Text;   
import org.apache.hadoop.mapreduce.Job;   
import org.apache.hadoop.mapreduce.Mapper;   
import org.apache.hadoop.mapreduce.Reducer;   
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
import org.apache.hadoop.util.Tool;   
import org.apache.hadoop.util.ToolRunner;   
import org.slf4j.Logger;   
import org.slf4j.LoggerFactory;   
/**   
 * @author zengzhaozheng   
 *   
 * 用途说明:   
 * reudce side join中的left outer join   
 * 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段   
 * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)   
 * tb_dim_city.dat文件内容,分隔符为"|":   
 * id     name  orderid  city_code  is_show   
 * 0       其他        9999     9999         0   
 * 1       长春        1        901          1   
 * 2       吉林        2        902          1   
 * 3       四平        3        903          1   
 * 4       松原        4        904          1   
 * 5       通化        5        905          1   
 * 6       辽源        6        906          1   
 * 7       白城        7        907          1   
 * 8       白山        8        908          1   
 * 9       延吉        9        909          1   
 * -------------------------风骚的分割线-------------------------------   
 * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)   
 * tb_user_profiles.dat文件内容,分隔符为"|":   
 * userID   network     flow    cityID   
 * 1           2G       123      1   
 * 2           3G       333      2   
 * 3           3G       555      1   
 * 4           2G       777      3   
 * 5           3G       666      4   
 * -------------------------风骚的分割线-------------------------------   
 * joinKey.dat内容:   
 * city_code   
 * 1   
 * 2   
 * 3   
 * 4   
 * -------------------------风骚的分割线-------------------------------   
 *  结果:   
 *  1   长春  1   901 1   1   2G  123   
 *  1   长春  1   901 1   3   3G  555   
 *  2   吉林  2   902 1   2   3G  333   
 *  3   四平  3   903 1   4   2G  777   
 *  4   松原  4   904 1   5   3G  666   
 */ 
public class SemiJoin extends Configured implements Tool{   
    private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);   
    public static class SemiJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   
        private CombineValues combineValues = new CombineValues();   
        private HashSet<String> joinKeySet = new HashSet<String>();   
        private Text flag = new Text();   
        private Text joinKey = new Text();   
        private Text secondPart = new Text();   
        /**   
         * 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b   
         */ 
        @Override 
        protected void setup(Context context)   
                throws IOException, InterruptedException {   
            BufferedReader br = null;   
            //获得当前作业的DistributedCache相关文件   
            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
            String joinKeyStr = null;   
            for(Path p : distributePaths){   
                if(p.toString().endsWith("joinKey.dat")){   
                    //读缓存文件,并放到mem中   
                    br = new BufferedReader(new FileReader(p.toString()));   
                    while(null!=(joinKeyStr=br.readLine())){   
                        joinKeySet.add(joinKeyStr);   
                    }   
                }   
            }   
        }   
        @Override 
        protected void map(Object key, Text value, Context context)   
                throws IOException, InterruptedException {   
            //获得文件输入路径   
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   
            //数据来自tb_dim_city.dat文件,标志即为"0"   
            if(pathName.endsWith("tb_dim_city.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //过滤格式错误的记录   
                if(valueItems.length != 5){   
                    return;   
                }   
                //过滤掉不需要参加join的记录   
                if(joinKeySet.contains(valueItems[0])){   
                    flag.set("0");   
                    joinKey.set(valueItems[0]);   
                    secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   
                    combineValues.setFlag(flag);   
                    combineValues.setJoinKey(joinKey);   
                    combineValues.setSecondPart(secondPart);   
                    context.write(combineValues.getJoinKey(), combineValues);   
                }else{   
                    return ;   
                }   
            }//数据来自于tb_user_profiles.dat,标志即为"1"   
            else if(pathName.endsWith("tb_user_profiles.dat")){   
                String[] valueItems = value.toString().split("\\|");   
                //过滤格式错误的记录   
                if(valueItems.length != 4){   
                    return;   
                }   
                //过滤掉不需要参加join的记录   
                if(joinKeySet.contains(valueItems[3])){   
                    flag.set("1");   
                    joinKey.set(valueItems[3]);   
                    secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   
                    combineValues.setFlag(flag);   
                    combineValues.setJoinKey(joinKey);   
                    combineValues.setSecondPart(secondPart);   
                    context.write(combineValues.getJoinKey(), combineValues);   
                }else{   
                    return ;   
                }   
            }   
        }   
    }   
    public static class SemiJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   
        //存储一个分组中的左表信息   
        private ArrayList<Text> leftTable = new ArrayList<Text>();   
        //存储一个分组中的右表信息   
        private ArrayList<Text> rightTable = new ArrayList<Text>();   
        private Text secondPar = null;   
        private Text output = new Text();   
        /**   
         * 一个分组调用一次reduce函数   
         */ 
        @Override 
        protected void reduce(Text key, Iterable<CombineValues> value, Context context)   
                throws IOException, InterruptedException {   
            leftTable.clear();   
            rightTable.clear();   
            /**   
             * 将分组中的元素按照文件分别进行存放   
             * 这种方法要注意的问题:   
             * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,   
             * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最   
             * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。   
             */ 
            for(CombineValues cv : value){   
                secondPar = new Text(cv.getSecondPart().toString());   
                //左表tb_dim_city   
                if("0".equals(cv.getFlag().toString().trim())){   
                    leftTable.add(secondPar);   
                }   
                //右表tb_user_profiles   
                else if("1".equals(cv.getFlag().toString().trim())){   
                    rightTable.add(secondPar);   
                }   
            }   
            logger.info("tb_dim_city:"+leftTable.toString());   
            logger.info("tb_user_profiles:"+rightTable.toString());   
            for(Text leftPart : leftTable){   
                for(Text rightPart : rightTable){   
                    output.set(leftPart+ "\t" + rightPart);   
                    context.write(key, output);   
                }   
            }   
        }   
    }   
    @Override 
    public int run(String[] args) throws Exception {   
            Configuration conf=getConf(); //获得配置文件对象   
            DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
            Job job=new Job(conf,"LeftOutJoinMR");   
            job.setJarByClass(SemiJoin.class);
 
            FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   
            FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
 
            job.setMapperClass(SemiJoinMapper.class);   
            job.setReducerClass(SemiJoinReducer.class);
 
            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   
            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
 
            //设置map的输出key和value类型   
            job.setMapOutputKeyClass(Text.class);   
            job.setMapOutputValueClass(CombineValues.class);
 
            //设置reduce的输出key和value类型   
            job.setOutputKeyClass(Text.class);   
            job.setOutputValueClass(Text.class);   
            job.waitForCompletion(true);   
            return job.isSuccessful()?0:1;   
    }   
    public static void main(String[] args) throws IOException,   
            ClassNotFoundException, InterruptedException {   
        try {   
            int returnCode =  ToolRunner.run(new SemiJoin(),args);   
            System.exit(returnCode);   
        } catch (Exception e) {   
            logger.error(e.getMessage());   
        }   
    }   
} 


这里还说说SemiJoin也是有一定的适用范围的,其抽取出来进行join的key是要放到内存中的,所以不能够太大,容易在Map端造成OOM。

三、总结

blog介绍了三种join方式。这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,写分布式大数据处理程序的时最好要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据的倾斜度降到最低,使我们的代码倾向性更好。
分享到:
评论

相关推荐

    Hadoop-MapReduce-Cookbook-Example-Code:Hadoop MapReduce Cookbook 示例代码

    在这个压缩包中,我们看到的是书中的实例代码,名为“Hadoop-MapReduce-Cookbook-Example-Code-master”。这个项目主要是用Java编写的,反映了MapReduce在大数据处理中的核心应用。 MapReduce是Google提出的一种...

    elasticsearch与hadoop比较

    Hadoop的MapReduce和Spark的计算框架能够支持任意复杂的数据聚合和转换,而Hive和SparkSQL的引入则进一步降低了大数据分析的开发难度。 尽管如此,Elasticsearch仍然是一个极为出色的分布式计算框架。它的版本不断...

    大数据技术 ODPS MapReduce对外开放实践 共20页.pptx

    ODPS MapReduce在实际应用中,广泛服务于大数据竞赛、ETL(提取、转换、加载)、特征提取等场景,例如在快的打车项目中,用于实时数据分析和政府、公众事业项目的决策支持。然而,随着大数据处理需求的不断增长,...

    20、MapReduce 工作流介绍

    MapReduce工作流是一种在Hadoop生态系统中处理大数据的机制,它允许多个MapReduce作业(MR作业)按照特定的依赖顺序依次执行,以完成更复杂的计算任务。这些作业之间的依赖关系通常形成一个有向无环图(DAG),其中...

    详解HDFS多文件Join操作的实例

    本文将详解 HDFS 多文件 Join 操作的实例,包括 All Join 和 Left Join 两种类型,及其在实际应用中的实现。 一、HDFS 多文件 Join 操作的概念 在 HDFS 中,Join 操作是指将多个文件或表格中的数据进行合并和匹配...

    Hadoop应用开发技术详解PDF电子书下载 带书签目录 部分

    综上所述,《Hadoop应用开发技术详解》这本书应该涵盖了上述知识点,并提供了丰富的实例和练习,帮助读者全面掌握Hadoop应用开发的核心技术和最佳实践。如果对这部分内容感兴趣,建议获取完整版本以获得更深入的学习...

    相关文件和完整源码.zip

    总的来说,这个压缩包提供了一个完整的MapReduce项目实例,包括数据、预期输出以及实现代码,对于学习MapReduce的join操作和理解Maven构建流程非常有帮助。开发者可以通过解压文件,阅读源代码,运行项目,并对比...

    Hadoop Real-World Solutions Cookbook - Second Edition

    《Hadoop Real-World Solutions Cookbook - 第二版》是一本全面介绍了Hadoop生态系统中关键技术的实战指南。书中不仅涵盖了Hadoop 2.x及其核心组件YARN的深入讲解,还详细探讨了Hive、Pig、Oozie、Flume、Sqoop、...

    MapReduceCase

    5. **Hadoop与MapReduce**:虽然MapReduce最初由Google提出,但实际应用中,最常见的是Apache Hadoop的实现。Hadoop提供了一个完整的生态系统,包括HDFS(分布式文件系统)和YARN(资源调度器),使得MapReduce可以...

    ASP.NET + SqlSever 大数据解决方案 PK HADOOP - 孙凯旋 - 博客园.pdf

    ### ASP.NET + SqlServer 大数据解决方案与Hadoop对比分析 #### 一、SqlServer的特点及其在大数据中的应用 **SqlServer的优点:** 1. **支持索引:** SqlServer 提供了强大的索引支持,包括聚集索引和非聚集索引,...

    pig编程指南源码

    源码中可能包含了如何在Hadoop集群上运行Pig脚本的示例,以及如何配置Pig与Hadoop的交互,如设置HDFS路径、处理错误和监控性能。 七、Pig与大数据生态系统 Pig不仅可以与其他Hadoop组件如HBase、Hive和Spark集成,...

    mapreduce-patterns-examples

    在Java中,MapReduce程序通常通过实现`Mapper`和`Reducer`接口来编写。`Mapper`类负责Map阶段的逻辑,而`Reducer`类处理Reduce阶段。此外,还需要配置Job类,设置输入输出路径、Mapper和Reducer类等参数。 例如,一...

    apache-hive-2.3.9-bin.tar大数据HIVE.zip

    4. **编译与执行计划**:Hive将HQL语句转换为MapReduce任务,或者在更现代的Hadoop版本中,转换为Tez或Spark任务。这使得Hive能充分利用Hadoop集群的并行计算能力。 5. **优化器**:Hive的优化器(如CBO,Cost-...

    mapreducepatterns:MapReduce设计模式存储库(O'Reilly 2012)示例源代码

    在Java中实现MapReduce,通常需要继承`org.apache.hadoop.mapreduce.Mapper`和`org.apache.hadoop.mapreduce.Reducer`类,重写相应的`map()`、`reduce()`方法以及可选的`setup()`、`cleanup()`等方法。`...

    hive开发资料中文版

    Hive 将大数据处理的任务分解为可并行执行的小任务,通过 MapReduce 在 Hadoop 集群中执行,实现高效的数据分析。 在开发和使用 Hive 时,我们还需要关注以下几点: - **数据加载(Load Data)**:可以通过 `LOAD ...

    hive-1.1.0-cdh5.14.2.tar.zip

    《Hive 1.1.0 在 CDH 5.14.2 中的应用与解析》 Hive,作为大数据处理领域的重要组件,是Apache ...在实际应用中,用户需要根据自身的业务需求和数据规模,合理利用Hive的各项功能,以实现最佳的数据管理和分析效果。

    hive简单使用共13页.pdf.zip

    Hive是Apache Hadoop生态系统中的一个数据仓库工具,它允许用户使用SQL-like的语言(HQL,Hive Query ...在实际工作中,结合Hadoop的分布式存储和处理能力,Hive能有效解决大规模数据处理的挑战,实现高效的数据分析。

    Hive使用手册1

    Hive是基于Hadoop的数据仓库工具,它可以将结构化的数据文件映射为一张数据库表,并提供SQL-like查询语言(HQL)进行数据查询、分析。Hive设计的目标是使数据分析人员能够利用熟悉的关系型数据库概念来处理大数据集...

    深入理解Spark核心思想与源码分析

    Spark的主要特点是其内存计算模型,通过将数据存储在内存中,实现了比Hadoop MapReduce更高的计算速度。 **2. Resilient Distributed Datasets (RDD)** RDD是Spark的基础数据抽象,它是一个不可变、分区的记录集合...

Global site tag (gtag.js) - Google Analytics