`
jimmee
  • 浏览: 538738 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hadoop的mapreduce的join操作原理

阅读更多

 

1. 概述

如果我们有如下的两个文件:

person.txt(字段是id nameaddressId):

1       tom     100

2       jme     101

3       kite    102

4       jack    100

5       tim     101

address.txt(字段是idname)

100     Beijing

101     Shanghai

102     Guangzhou

103     Shenzhen

最后需要输出person所在的位置信息,形式如下:

100         1     tom  Beijing

100         4     jack Beijing

101         2     jme  Shanghai

101         5     tim   Shanghai

102         3     kite  Guangzhou

 

2. 实现的原理

上述两个文件,若看成两张表,则连接字段是person.addressId= address.id,此处讨论的实现方式INNER JOIN

2.1. reduce side join的方式

 reduce side join是一种最简单的join方式,其主要思想如下:在map阶段,统一读取所有源的数据,例如,这里读取person.txtaddress.txtmap的输出的keyjoin的字段对应的值,输出的value是每条记录的其他字段值,为了对数据来源做区分,每个value值的基础上还需要增加一个标记值(tag),例如,来自person.txttag设置为0,来自address.txttag设置为1。相比普通的mapreduce任务,join操作的map输出的value值增加了一个tag标记值。在reduce阶段,reduce函数获取key对应的value列表,将value列表根据来源的tag进行分组,最后执行笛卡尔积进行输出,即reduce阶段进行实际的连接操作。

hadoopcontrib/data_join org.apache.hadoop.contrib.utils.join中已经提供了此种join方式的实现,基于上述包完成的join操作的代码如下:

SampleTaggedMapOutput.java

 

public class SampleTaggedMapOutput extends TaggedMapOutput {
  private Text data;
 
  public SampleTaggedMapOutput() {
    this.data = new Text("");
  }
 
  public SampleTaggedMapOutput(Text data) {
    this.data = data;
  }
 
  public Writable getData() {
    return data;
  }
 
  public void write(DataOutput out) throws IOException {
    this.tag.write(out);
    this.data.write(out);
  }
 
  public void readFields(DataInput in) throws IOException {
    this.tag.readFields(in);
    this.data.readFields(in);
  }
}

 

 

SampleDataJoinMapper.java

 

public class SampleDataJoinMapper extends DataJoinMapperBase {
 
 
       @Override
       protected Text generateInputTag(String filename) {
              if (filename.contains("person")) {
                     return new Text("0");
              } else {
                     return new Text("1");
              }
       }
 
  protected Text generateGroupKey(TaggedMapOutput aRecord) {
    String line = ((Text) aRecord.getData()).toString();
    String groupKey = "";
    String[] tokens = line.split("\\t");
    if (this.inputTag.toString().equals("0")) {
           groupKey = tokens[2];
    } else {
           groupKey = tokens[0];
    }
    return new Text(groupKey);
  }
 
  protected TaggedMapOutput generateTaggedMapOutput(Object value) {
    TaggedMapOutput retv = new SampleTaggedMapOutput((Text) value);
    retv.setTag(new Text(this.inputTag));
    return retv;
  }
}
 

 

 

SampleDataJoinReducer.java:

 

public class SampleDataJoinReducer extends DataJoinReducerBase {
  /**
   *
   * @param tags
   *          a list of source tags
   * @param values
   *          a value per source
   * @return combined value derived from values of the sources
   */
  protected TaggedMapOutput combine(Object[] tags, Object[] values) {
    // eliminate rows which didnot match in one of the two tables (for INNER JOIN)
    if (tags.length < 2)
       return null; 
    String joinedStr = "";
    String [] p = null, a = null;
    Text t = (Text) tags[0];
       if (t.toString().equals("0")) {
              p =  ((Text) (((TaggedMapOutput) values[0]).getData())).toString().split("\t");
              a = ((Text) (((TaggedMapOutput) values[1]).getData())).toString().split("\t");
       } else {
              p =  ((Text) (((TaggedMapOutput) values[1]).getData())).toString().split("\t");
              a = ((Text) (((TaggedMapOutput) values[0]).getData())).toString().split("\t");
       }
             
       joinedStr = p[0] + "\t" + p[1] + "\t" + a[1];
 
    TaggedMapOutput retv = new SampleTaggedMapOutput(new Text(joinedStr));
    retv.setTag((Text) tags[0]);
   
    return retv;
  }
}

 

 

小结:reduce side join技术是灵活的,但是大部分情况它会变得效率极低。由于join直到reduce()阶段才会开始,我们将会在网络中传递shuffle所有数据(执行copysort等动作),然而在大多数情况下,join阶段会丢掉大多数传递的数据。因此,得到的启示有两点:

1)如果确定是reduce-sidejoin,那么参与join的文件在map端尽可能先过滤掉无关的数据,例如针对特定的文件的projection/filtering,而不是传递到reduce节点后,在join时才做

2)是否可以直接在map端就完成join操作,答案是肯定的。

2.2 map side join的方式

1. 其中一个join的文件比较小,能够完全放进内存(In-Memory Hash Join

其实这个类似数据库中classic hash join的方式,就是一个是build input阶段,一个是probe input阶段,这里都在map端完成。build input阶段,读取小文件的数据到内存,构建一个hashtableprobe input阶段,读取大文件,通过查询hashtable实现join

具体在hadoop中,已有相关的类支持:

1)使用DistributedCache类,原理上就是在集群中的每个DataNode上的LocalFS都帮复制一份需要的小文件,然后你的每个Mapper进程读的就都是本地的那个文件,之后建立hashtable

2)在mapper的类里查询hashtable,同时实现join

针对上面那个例子:mapper类的操作如下:

 

public class MapClass extends MapReduceBase implements
              Mapper {
       private Hashtable<String, String> joinData = new Hashtable<String, String>();
 
       @Override
       public void configure(JobConf conf) {
              try {
                     Path[] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
                     if (cacheFiles != null && cacheFiles.length > 0) {
                            String line;
                            String[] tokens;
                            BufferedReader joinReader = new BufferedReader(new FileReader(
                                          cacheFiles[0].toString()));
                            try {
                                   while ((line = joinReader.readLine()) != null) {
                                          tokens = line.split("\\t", 2);
                                          joinData.put(tokens[0], tokens[1]);
                                   }
                            } finally {
                                   joinReader.close();
                            }
                     }
              } catch (IOException e) {
                     System.err.println("Exception reading DistributedCache: " + e);
              }
       }
 
       public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
              Text t = (Text) value;
              String [] fields = t.toString().split("\\t");
              String address = joinData.get(fields[2]);
              if (address != null) {
                     output.collect(new Text(fields[2]), new Text(fields[0] + "\t" + fields[1] + "\t" + address));
              }
       }
}

 

 

提交任务的类可以这么写:

 

DistributedCache.addCacheFile(new Path(“/address.txt”).toUri(), conf);
FileInputFormat.addInputPath(conf, new Path(“/person.txt”));
FileOutputFormat.setOutputPath(conf, new Path(“/join_out”));
conf.setMapperClass(MapClass.class);
conf.setNumReduceTasks(0);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(TextOutputFormat.class);
JobClient.runJob(conf);

 

 

 

2. join的最小的文件不能完全放进内存,仍然要实现map-sidejoin

题外话:数据库中,对这种情况的处理是使用grace hash join的算法。例如:SQL Server分别将build inputprobe input切分成多个分区部分(partition),每个partition都包括一个独立的、成对匹配的build inputprobe input,这样就将一个大的hash join切分成多个独立、互相不影响的hash join,每一个分区的hash join都能够在内存中完成。SQL Server将切分后的partition文件保存在磁盘上,每次装载一个分区的build inputprobe input到内存中,进行一次hash join

回来正题:

第一步的思考:最小的文件整个数据放不进内存中,那么是否可以考虑可以满足join条件的数据能否放进内存,从而实现In-Memory Hash Join的方式,实现方式很简单:选取小文件(例如名称为small_file),将其参与joinkey的数据抽取出来,保存到文件small_file_temp中,small_file_temp这里保证很小,可以完全放到内存中,因此后续的工作时使用small_file_temp文件和大文件进行In-Memory Hash Join

进一步的思考:

其实保存数据的时候,可以先分区,就是可以根据数据的特征,先将数据划分为多个文件进行保存,实际处理时,只处理其中的一部分数据,这样预先就减少了处理的数据量。

 

如果已经分区了,但最终的小文件是还是放不进内存的,那么还可以怎么做?

方式一:特定的场景下,可以使用bloomfilter

条件1:小表对于大表中的数据而言,仅仅用于判断大表的数据是否满足join条件,不再从小表中获取其他数据

条件2:不要求数据100%准确,因为bloomfilter存在一定的误判。

可以看到,方式一的最终目的仍然是将操作转换为In-Memory Hash Join的方式。

 

方式二:能否参考数据库中的grace hash join的方式?答案是肯定的。实际的做法是,在保存数据的时候,在分区的基础上(partition),再分桶(bucket),小文件和大文件都拆成很多bucket,小文件和大文件之间的桶的个数只要相互之间是倍数关系即可(是倍数关系,即可相互映射),一般情况下,小文件的桶个数是大文件的2的倍数。这里以hive的实现进行说明:在提交任务之前,先在本地执行一个任务,生成每个大表的桶文件对应的小表的桶文件的hashtable,之后将这些hashtable文件分发到map节点,map节点在执行任务的,根据输入的大表的桶文件,加载进来相应的hashtable文件,之后执行join操作。

注意:分发数据本身就存在一定的代价的,因此若分发数据消耗的时间较多,也许此时reduce-side join的会更快。

 

3. sort merge join的实现

 

上述所讲的操作均没有提及数据是否有序,若数据有序的情况下,完全可以执行类似merge的操作,一边读取数据,一边做join操作。此种join方式不再进一步的探讨。

分享到:
评论

相关推荐

    Hadoop Mapreduce Cookbook(英文版)

    4. **数据处理策略**:MapReduce支持多种数据处理策略,如排序、分组、Join操作等。这些策略在处理大规模数据时非常关键,能够优化计算效率和结果准确性。 5. **优化技巧**:书中会探讨如何优化MapReduce作业,包括...

    hadoop mapreduce多表关联join多个job相互依赖传递参数

    在Hadoop MapReduce环境中,处理大数据时经常遇到多表关联(Join)的需求,尤其是在复杂的业务逻辑中。MapReduce提供了一种分布式计算模型,能够高效地处理大规模数据集,但面对多表关联,尤其是多个Job之间的依赖和...

    Hadoop MapReduce高级特性

    最后是连接(join)操作,虽然标题中并未明确提及,但是连接是MapReduce中的一个重要操作,通常用于将来自不同数据集的记录合并在一起。在MapReduce中,连接操作通常是通过在Map阶段对键进行分组,然后在Reduce阶段...

    hadoop_join.jar.zip_hadoop_hadoop query_reduce

    本文将深入探讨如何使用Hadoop和MapReduce进行高效的Join查询,并解析如何通过`hadoop_join.jar`这个工具来实现这一过程。 Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心组件包括HDFS(Hadoop ...

    hadoop Join代码(map join 和reduce join)

    在Hadoop MapReduce中,数据处理的核心任务之一就是JOIN操作,它相当于关系数据库中的连接操作,用于合并来自不同数据源的相关信息。本文将深入探讨Map JOIN和Reduce JOIN两种在Hadoop中实现JOIN的方法,并通过代码...

    大数据课程设计-Hadoop-MapReduce实现sql的统计、groupby和join-全部源码

    本课程设计主要围绕如何使用Hadoop的MapReduce实现SQL中的统计、GROUP BY和JOIN操作,这是一次深入理解大数据处理机制的实践过程。 首先,让我们来探讨SQL的统计功能。在SQL中,统计通常涉及到COUNT、SUM、AVG、MAX...

    Hadoop Reduce Join及基于MRV2 API 重写

    本文将探讨Reduce Join的工作原理,以及如何利用MRV2(MapReduce v2)API对它进行重写。 首先,我们来理解什么是Reduce Join。在关系数据库中,JOIN操作用于合并两个或多个表的数据,根据它们之间的关联字段。在...

    Hadoop-MapReduce-Cookbook-Example-Code:Hadoop MapReduce Cookbook 示例代码

    3. **join操作**:在分布式环境中合并来自不同数据源的数据。例如,可以将用户信息与购买记录进行关联,以便进行更复杂的分析。 4. **数据分析**:例如,统计网页链接关系、分析用户行为模式等,这些都是大数据分析...

    hadoop join implement

    2. **MapReduce的局限性**:Hadoop的MapReduce模型更倾向于支持聚合操作而非join操作。 3. **Map Join的风险**:虽然片段复制(Fragment-replicate)或称为Map Join的方法在某些场景下能提高性能,但在其他情况下...

    MapReduce之Join操作

    join是非常常见的操作,各种优化手段已经到了极致。在海量数据的环境下,不可避免的也会碰到这种类型的需求,例如在数据分析时需要连接从不同的数据源中获取到的数据。不同于传统的单机模式,在分布式存储的下采用 ...

    19、Join操作map side join 和 reduce side join

    在大数据处理领域,Hadoop MapReduce 是一种广泛使用的分布式计算框架。在处理涉及多数据集的任务时,Join 操作是必不可少的,它用于合并来自不同数据源的相关数据。本文主要探讨了两种 MapReduce 中的 Join 实现:...

    6.Hadoop入门进阶课程_第6周_MapReduce应用案例.pdf

    综上所述,MapReduce应用案例文档深入地介绍了MapReduce编程模型在Hadoop生态系统中的实际使用,包括对join操作的细节分析,以及如何搭建Hadoop环境,如何上传和管理测试数据。此外,文档还提供了Hadoop学习资源的...

    hadoop0.23.9离线api

    org.apache.hadoop.mapreduce.lib.join org.apache.hadoop.mapreduce.lib.map org.apache.hadoop.mapreduce.lib.output org.apache.hadoop.mapreduce.lib.partition org.apache.hadoop.mapreduce.lib.reduce ...

    大数据MapReduce文件分发

    - 为了提高效率,Hadoop MapReduce支持多种优化技术,如Combiner(局部聚合)、Spill(磁盘溢出)和Map-side Join(基于广播的小表)等。 9. **Hadoop的缓存机制**: - MapReduce可以通过缓存机制将中间结果或...

    基于Java的Hadoop HDFS和MapReduce实践案例设计源码

    内容涵盖HDFS的JAVA API操作,如文件读取、写入、删除、元数据查询和文件列表等,以及MapReduce编程模型的多个应用,包括求平均数、Join操作、TopK算法、二次排序,并涉及自定义InputFormat、OutputFormat和shuflle...

    5堂Hadoop必修课,不会这些勿称高手

    具体要掌握的五个主题分别是:大数据分布式集群搭建(高可用性,HA),构建企业级MapReduce项目,Hadoop和Spark的源码编译,以及Zookeeper和MapReduce的高级Join操作。 描述部分列举了一些具体的知识点,包含搭建...

    word源码java-hadoop-test:hadoop、mapreduce的一些练习

    包org.dan.mr.order_pro_mapjoin MapReduce实现订单信息和产品信息的join逻辑,在Mapper端实现,避免数据倾斜 包org.dan.mr.wordindex MapReduce单词索引 包org.dan.mr.shared_friends MapReduce查找共同好友 包org....

    spark,hadoop,bank

    这使得API不仅表达力强而且优雅,例如`map`、`filter`、`groupBy`、`sort`、`union`、`join`、`leftOuterJoin`、`rightOuterJoin`、`reduce`、`count`、`fold`、`reduceByKey`等操作符。 2. **高效的数据处理**:...

    MapReduce-Code:Hadoop平台下的MapReduce源码分析

    其他表示父目录下的.java文件的总称):1.org.apache.hadoop.mapred(旧版MapReduceAPI):( 1).jobcontrol(job作业直接控制类)(2 ).join :(作业作业中用于模仿数据连接处理工具)(3).lib(MapReduce所依赖...

Global site tag (gtag.js) - Google Analytics