`
郑云飞
  • 浏览: 817385 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

MapReduce 中的两表 join 几种方案简介

 
阅读更多

1. 概述

在传统数据库(如:MYSQL)中,JOIN操作是非常常见且非常耗时的。而在HADOOP中进行JOIN操作,同样常见且耗时,由于Hadoop的独特设计思想,当进行JOIN操作时,有一些特殊的技巧。

本文首先介绍了Hadoop上通常的JOIN实现方法,然后给出了几种针对不同输入数据集的优化方法。

2. 常见的join方法介绍

假设要进行join的数据分别来自File1和File2.

2.1 reduce side join

reduce side join是一种最简单的join方式,其主要思想如下:

在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数据打一个标签(tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。

在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list, 然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接操作。

REF:hadoop join之reduce side join

http://blog.csdn.net/huashetianzu/article/details/7819244

2.2 map side join

之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。

Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。

为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:

(1)用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。(2)用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

REF:hadoop join之map side join

http://blog.csdn.net/huashetianzu/article/details/7821674

2.3 Semi Join

Semi Join也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join操作的数据,则可以大大节省网络IO。

实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上,然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。

更多关于半连接的介绍,可参考:半连接介绍:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

REF:hadoop join之semi join

http://blog.csdn.net/huashetianzu/article/details/7823326

2.4 reduce side join + BloomFilter

在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler以节省空间。

BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素一定可能在集合中。

因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。

更多关于BloomFilter的介绍,可参考:http://blog.csdn.net/jiaomeng/article/details/1495500

3. 二次排序

在Hadoop中,默认情况下是按照key进行排序,如果要按照value进行排序怎么办?即:对于同一个key,reduce函数接收到的value list是按照value排序的。这种应用需求在join操作中很常见,比如,希望相同的key中,小表对应的value排在前面。

有两种方法进行二次排序,分别为:buffer and in memory sort和 value-to-key conversion。

对于buffer and in memory sort,主要思想是:在reduce()函数中,将某个key对应的所有value保存下来,然后进行排序。 这种方法最大的缺点是:可能会造成out of memory。

对于value-to-key conversion,主要思想是:将key和部分value拼接成一个组合key(实现WritableComparable接口或者调用setSortComparatorClass函数),这样reduce获取的结果便是先按key排序,后按value排序的结果,需要注意的是,用户需要自己实现Paritioner,以便只按照key进行数据划分。Hadoop显式的支持二次排序,在Configuration类中有个setGroupingComparatorClass()方法,可用于设置排序group的key值,具体参考:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html

4. 后记

最近一直在找工作,由于简历上写了熟悉Hadoop,所以几乎每个面试官都会问一些Hadoop相关的东西,而 Hadoop上Join的实现就成了一道必问的问题,而极个别公司还会涉及到DistributedCache原理以及怎样利用DistributedCache进行Join操作。为了更好地应对这些面试官,特整理此文章。

 

5. 参考资料

(1) 书籍《Data-Intensive Text Processing with MapReduce》 page 60~67 Jimmy Lin and Chris Dyer,University of Maryland, College Park

(2) 书籍《Hadoop In Action》page 107~131

(3) mapreduce的二次排序 SecondarySort:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html

(4) 半连接介绍:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

(5) BloomFilter介绍:http://blog.csdn.net/jiaomeng/article/details/1495500

(6)本文来自:http://dongxicheng.org/mapreduce/hadoop-join-two-tables/

————————————————————————————————————————————————

看完了上面的 hadoop 中 MR 常规 join 思路,下面我们来看一种比较极端的例子,大表 join 小表,而小表的大小在 5M 以下的情况:

之所以我这里说小表要限制 5M 以下,是因为我这里用到的思路是 :

file-》jar-》main String configuration -》configuration map HashMap

步骤:

1、从jar里面读取的文件内容以String的形式存在main方法的 configuration context 全局环境变量里

2、在map函数里读取 context 环境变量的字符串,然后split字符串组建小表成为一个HashMap

     这样一个大表关联小表的例子就ok了,由于context是放在namenode上的,而namenode对内存是有限制的,

所以你的小表文件不要太大,这样我们可以比较的方便的利用 context 做join了。

这种方式其实就是 2.2 map side join 的一种具体实现而已。

Talk is cheap, show you the code~

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
public class Test {
 
    public static class MapperClass extends
            Mapper<LongWritable, Text, Text, Text> {
 
        Configuration config = null;
        HashSet<String> idSet = new HashSet<String>();
        HashMap<String, String> cityIdNameMap = new HashMap<String, String>();
        Map<String, String> houseTypeMap = new HashMap<String, String>();
 
        public void setup(Context context) {
            config = context.getConfiguration();
            if (config == null)
                return;
            String idStr = config.get("idStr");
            String[] idArr = idStr.split(",");
            for (String id : idArr) {
                idSet.add(id);
            }
 
            String cityIdNameStr = config.get("cityIdNameStr");
            String[] cityIdNameArr = cityIdNameStr.split(",");
            for (String cityIdName : cityIdNameArr) {
                cityIdNameMap.put(cityIdName.split("\t")[0],
                        cityIdName.split("\t")[1]);
            }
 
            houseTypeMap.put("8", "Test");
 
        }
 
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
 
            String[] info = value.toString().split("\\|");
            String insertDate = info[InfoField.InsertDate].split(" ")[0]
                    .split("-")[0]; // date: 2012-10-01
            insertDate = insertDate
                    + info[InfoField.InsertDate].split(" ")[0].split("-")[1]; // date:201210
 
            String userID = info[InfoField.UserID]; // userid
            if (!idSet.contains(userID)) {
                return;
            }
 
            String disLocalID = "";
            String[] disLocalIDArr = info[InfoField.DisLocalID].split(",");
            if (disLocalIDArr.length >= 2) {
                disLocalID = disLocalIDArr[1];
            } else {
                try {
                    disLocalID = disLocalIDArr[0];
                } catch (Exception e) {
                    e.printStackTrace();
                    return;
                }
            }
            String localValue = cityIdNameMap.get(disLocalID);
            disLocalID = localValue == null ? disLocalID : localValue; // city
 
            String[] cateIdArr = info[InfoField.CateID].split(",");
            String cateId = "";
            String secondType = "";
            if (cateIdArr.length >= 3) {
                cateId = cateIdArr[2];
                if (houseTypeMap.get(cateId) != null) {
                    secondType = houseTypeMap.get(cateId); // secondType
                } else {
                    return;
                }
            } else {
                return;
            }
 
            String upType = info[InfoField.UpType];
            String outKey = insertDate + "_" + userID + "_" + disLocalID + "_"
                    + secondType;
            String outValue = upType.equals("0") ? "1_1" : "1_0";
            context.write(new Text(outKey), new Text(outValue));
        }
    }
 
    public static class ReducerClass extends
            Reducer<Text, Text, NullWritable, Text> {
 
        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            int pv = 0;
            int uv = 0;
 
            for (Text val : values) {
                String[] tmpArr = val.toString().split("_");
                pv += Integer.parseInt(tmpArr[0]);
                uv += Integer.parseInt(tmpArr[1]);
            }
 
            String outValue = key + "_" + pv + "_" + uv;
            context.write(NullWritable.get(), new Text(outValue));
 
        }
    }
 
    public String getResource(String fileFullName) throws IOException {
        // 返回读取指定资源的输入流
        InputStream is = this.getClass().getResourceAsStream(fileFullName);
        BufferedReader br = new BufferedReader(new InputStreamReader(is));
        String s = "";
        String res = "";
        while ((s = br.readLine()) != null)
            res = res.equals("") ? s : res + "," + s;
        return res;
    }
 
    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.exit(2);
        }
 
        String idStr = new Test().getResource("userIDList.txt");
        String cityIdNameStr = new Test().getResource("cityIdName.txt");
        conf.set("idStr", idStr);
        conf.set("cityIdNameStr", cityIdNameStr);
        Job job = new Job(conf, "test01");
        // job.setInputFormatClass(TextInputFormat.class);
        job.setJarByClass(Test.class);
        job.setMapperClass(Test.MapperClass.class);
        job.setReducerClass(Test.ReducerClass.class);
        job.setNumReduceTasks(25);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
 
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(
                job, new Path(otherArgs[1]));
 
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

说明:

1、getResource() 方法指定了可以从jar包中读取配置文件,并拼接成一个String返回。

2、setup() 方法起到一个mapreduce前的初始化的工作,他的作用是从 context 中

获取main中存入的配置文件字符串,并用来构建一个hashmap,放在map外面,

每个node上MR前只被执行一次。

3、注意上面代码的第 125、126 行,conf.set(key, value) 中的 value 大小是由限制的,

在 0.20.x 版本中是 5M 的大小限制,如果大于此大小建议采用分布式缓存读文件的策略。

参考:解决 hadoop jobconf 限制为5M的问题

http://my.oschina.net/132722/blog/174601

 

推荐阅读:

使用HBase的MAP侧联接

 http://blog.sina.com.cn/s/blog_ae33b83901016lkq.html 

 

PS:关于如何从jar包中读取配置文件,请参考:

(1)深入jar包:从jar包中读取资源文件      

     http://www.iteye.com/topic/483115

(2)读取jar内资源文件     

     http://heipark.iteye.com/blog/1439114

(3)Java相对路径读取资源文件    

         http://lavasoft.blog.51cto.com/62575/265821/

(4)Java加载资源文件时的路径问题   

         http://www.cnblogs.com/lmtoo/archive/2012/10/18/2729272.html

         如何优雅读取properties文件

         http://blogread.cn/it/article/3262?f=wb

注意:

不能先 getResource()  获取路径然后读取内容,

因为".../ResourceJar.jar!/resource/...."并不是文件资源定位符的格式。

所以,如果jar包中的类源代码用File f=new File(相对路径);的形式,是不可能定位到文件资源的。

这也是为什么源代码打包成jar文件后,调用jar包时会报出FileNotFoundException的症结所在了。

但可以通过Class类的getResourceAsStream()方法来直接获取文件内容 

这种方法是如何读取jar中的资源文件的,这一点对于我们来说是透明的。

而且 getResource() 和 getResourceAsStream() 在 maven 项目下对于相对、绝对路径的寻找规则貌似还不一样:

System.out.println(QQWryFile.class.getResource("/qqwry.dat").getFile()); 

System.out.println(QQWryFile.class.getClassLoader().getResourceAsStream("/qqwry.dat"));
System.out.println(QQWryFile.class.getClassLoader().getResourceAsStream("qqwry.dat"));

System.out.println(QQWryFile.class.getResourceAsStream("/qqwry.dat"));
System.out.println(QQWryFile.class.getResourceAsStream("qqwry.dat"));

TIPS:Class和ClassLoader的getResourceAsStream()方法的区别:

这两个方法还是略有区别的, 以前一直不加以区分,直到今天发现要写这样的代码的时候运行 
错误, 才把这个问题澄清了一下。 

基本上,两个都可以用于从 classpath 里面进行资源读取,  classpath包含classpath中的路径 
和classpath中的jar。 

两个方法的区别是资源的定义不同, 一个主要用于相对与一个object取资源,而另一个用于取相对于classpath的 
资源,用的是绝对路径。 

在使用Class.getResourceAsStream 时, 资源路径有两种方式, 一种以 / 开头,则这样的路径是指定绝对 
路径, 如果不以 / 开头, 则路径是相对与这个class所在的包的。 

在使用ClassLoader.getResourceAsStream时, 路径直接使用相对于classpath的绝对路径。 

举例,下面的三个语句,实际结果是一样的: 

1
2
3
   com.explorers.Test.class.getResourceAsStream("abc.jpg")
= com.explorers.Test.class.getResourceAsStream("/com/explorers/abc.jpg")
= ClassLoader.getResourceAsStream("com/explorers/abc.jpg")

http://macrochen.iteye.com/blog/293918

http://blogread.cn/it/article/3262?f=wb

 

分享到:
评论

相关推荐

    大数据课程设计-Hadoop-MapReduce实现sql的统计、groupby和join-全部源码

    通常,有几种方法来实现JOIN,包括:1) 全外连接(Full Outer Join)通常通过两次MapReduce作业完成,第一次MapReduce实现一个表的JOIN键的完整复制,第二次JOIN另一个表;2) 交叉连接(Cross Join)只需将所有键值...

    云计算 mapreduce - <Data-Intensive[1].Text.Processing.With.MapReduce>

    - **索引压缩**:介绍几种压缩技术,如字节对齐编码、位对齐编码以及文档列表压缩等。 4. **图算法** - **图表示法**:探讨如何有效地表示图结构数据。 - **并行广度优先搜索**:展示如何利用MapReduce进行并行...

    Hadoop MapReduce高级特性

    最后是连接(join)操作,虽然标题中并未明确提及,但是连接是MapReduce中的一个重要操作,通常用于将来自不同数据集的记录合并在一起。在MapReduce中,连接操作通常是通过在Map阶段对键进行分组,然后在Reduce阶段...

    【MapReduce篇08】MapReduce优化1

    MapReduce是一种分布式计算模型,常用于大数据处理。在优化MapReduce程序时,主要关注的是提高计算机性能和降低I/O操作的复杂性。本文将详细探讨这两个方面,以及相关的优化策略。 首先,计算机性能是MapReduce效率...

    hadoop_join.jar.zip_hadoop_hadoop query_reduce

    通常,Hadoop中的Join可以分为几种类型:Bucket Join、Sort-Merge Join、Replicated Join和Map-Side Join等。每种Join策略都有其适用场景和优缺点。 `hadoop_join.jar`是一个针对Hadoop环境设计的Join查询工具,它...

    Hadoop Reduce Join及基于MRV2 API 重写

    在Hadoop MapReduce中,Reduce Join是一种实现大规模数据集间连接的高效方法。本文将探讨Reduce Join的工作原理,以及如何利用MRV2(MapReduce v2)API对它进行重写。 首先,我们来理解什么是Reduce Join。在关系...

    Hive原理与实现

    - **操作符**:Hive执行的基本单位,每个操作符对应一个特定的功能,例如TableScanOperator负责从表中读取数据,JoinOperator负责连接两个数据集。 - **执行计划**:操作符按照一定的顺序组合而成的计划,描述了Hive...

    基于MapReduce框架的SQL引擎设计.pdf

    总结来说,基于MapReduce框架的SQL引擎设计是应对大数据时代挑战的一种有效解决方案。它结合了SQL的强大查询能力和MapReduce的分布式计算能力,为大规模数据处理提供了高性能、高可用性和易于扩展性的平台。对于需要...

    hadoop join implement

    本文旨在介绍一种新的方法——自适应连接计划生成(Adaptive Join Plan Generation),该方法针对Hadoop中的join操作提出了几种新颖的技术,并设计了一个连接类型计划生成器。 #### 背景与挑战 随着数据量的急剧...

    67-ForkJoin框架学习笔记1

    ForkJoin框架在Hadoop MapReduce中有着类似的原理,通过将大任务拆解为Map阶段的小任务并进行并行处理,再由Reduce阶段进行结果的整合。 ForkJoin框架主要由以下几个关键组件构成: 1. **ForkJoinPool**:这是执行...

    Mapside-Join

    Mapside Join是大数据处理领域中的一种优化策略,主要用于Hadoop MapReduce框架,旨在提高大规模数据集的JOIN操作效率。在传统的数据库系统中,JOIN操作通常在服务器端完成,而在分布式计算环境中,由于数据量庞大,...

    Hadoop硬实战 [(美)霍姆斯著][电子工业出版社][2015.01]_PDF电子书下载 带书签目录 高清完整版.rar )

    8.1 比较R 和MapReduce 集成的几种方法 8.2 R 基础知识 8.3 R 和Streaming 8.3.1 Streaming 和map-only R 技术点57 计算股票日平均值 8.3.2 Streaming、R 和完整的MapReduce 技术点58 计算股票的...

    Hive用户指南(Hive_user_guide)_中文版.pdf

    Hive允许用户以数据仓库的方式管理存储在Hadoop文件系统(HDFS)中的大量数据,并且通过MapReduce执行查询操作。 Hive的架构由以下几个核心部分组成: 1. 用户接口:包括命令行接口(CLI)、客户端(Client)和Web...

    大数据底层原理和基础概念面试题30道

    17、YARN 中的资源分配分为哪几种类型?它们分别适用于哪些场景?YARN 中的资源分配主要分为两种类型:静态资源分配和动态资源分配。静态资源分配:在应用程序启动时分配固定数量的资源,适用于资源需求稳定的应用...

    Hive高级编程

    为了提高 Hive 的性能,可以采用以下几种方法: 1. **使用压缩**:选择合适的压缩算法可以减少存储空间并加速 MapReduce 任务的处理速度。 2. **分区表**:通过分区表可以减少不必要的数据扫描,加快查询速度。 3. ...

    hive和hbase整合

    Hive和HBase是大数据处理领域常用的两种技术。Hive是建立在Hadoop上的数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行。而HBase则是...

    Hive数据仓库工具----220222.rar

    总结,Hive作为数据仓库工具,以其易用性、扩展性和与SQL的兼容性,为大数据处理提供了一种高效的解决方案。通过理解Hive的基本概念、架构和操作,我们可以更好地利用这个工具处理和分析海量数据。

    HIVE从入门到精通

    为了解决这一问题,Hive应运而生,它提供了一种更高级别的SQL-like语言(HQL,Hive Query Language),使得非专业MapReduce程序员也能便捷地进行大数据分析。 【Hive的定义】 Hive的核心功能是将结构化的数据文件...

    《Hadoop开发者》第四期

    Hadoop MapReduce框架提供了几种不同的Join实现方法。 **实现方法**: - **Map端Join**:适用于小表Join大表的场景,通过将小表数据加载到内存中,然后在Map阶段完成Join操作。 - **Reduce端Join**:适用于两个大表...

Global site tag (gtag.js) - Google Analytics