`

[Hadoop]使用Hadoop进行ReduceSideJoin

 
阅读更多

Note:

1. 内容主要参考<Hadoop in Action> Chapter 5.2

2. 代码主要参考: http://qindongliang.iteye.com/blog/2052842

3. 这是基于老的API的实现,这种方法并不高效简洁

 

数据:(原始数据可以从movielens-1m里面去要,这里将原始数据进行了简单的修改方便演示与自测)
文件: u.user 
结构: user id | age | gender | occupation | zip code
样例:
1|24|M|technician|85711
2|53|F|other|94043
3|23|M|writer|32067
4|24|M|technician|43537
5|33|F|other|15213
6|42|M|executive|98101
7|57|M|administrator|91344
8|36|M|administrator|05201
9|29|M|student|01002
10|53|M|lawyer|90703
文件: u.data
结构: user id | item id | rating | timestamp. 
样例:
1 242 3 881250949
1 302 3 891717742
2 377 1 878887116
4 51 2 880606923
6 346 1 886397596
8 474 4 884182806
10 265 2 881171488
1 465 5 891628467
2 451 3 886324817
6 86 3 883603013
 
任务:
将两个文件的内容连接起来,输出部分内容:(inner join)
输出结构: user id | age | rating
输出示例:
1age=24,ratings=3
1age=24,ratings=3
1age=24,ratings=5
10age=53,ratings=2
2age=53,ratings=1
2age=53,ratings=3
4age=24,ratings=2
6age=42,ratings=1
6age=42,ratings=3
8age=36,ratings=4
如果在u.data之中没有找到对应的记录则忽略此user
 
思路:

在Map函数之中处理各个文件每一行的值。并根据文件名为每一行的值加上一个groupKey.

在这个例子之中,groupKey即为user id, 比如 1,2,3 ...

经过map函数处理之后,会有如下数据出现:(假设将其json化)

[{
    tag: 'u.user',
    value: '1|24|M|technician|85711'
}, {
    tag: 'u.data',
    value: '1	242	3	881250949'
}, {
    tag: 'u.data',
    value: '1	377	1	878887116'
}]

 

Hadoop会将相同的groupKey的值放在一起,所以在Reduce函数之中,需要做的事情就是将这一系列的值合并在一起。注意:上面的list里面值的顺序是不固定的,无法确定u.user一定排在首位。

 

在演示最终代码之前,需要注意,英文版的<Hadoop in Action>上面的代码在我的版本上面是有问题的,需要修改如下几个地方:

1. 在TaggedWritable之中新增一个默认的构造方法,

public static class TaggedWritable extends TaggedMapOutput {  
    public TaggedWritable() {
    }
}

不然会提示如下错误: 原因是在反射的时候一定需要一个默认的构造函数

 java.lang.NoSuchMethodException: ch5.ReduceSideJoin$TaggedWritable.<init>()

 

2. readFields方法要增加一些处理空的代码,否则会报NullException

public void readFields(DataInput in) throws IOException {  
    this.tag.readFields(in);  
      //加入此部分代码,否则,可能报空指针异常
	String temp=in.readUTF();
	if (this.data == null|| !this.data.getClass().getName().equals(temp)) {
		try {
			this.data = (Writable) ReflectionUtils.newInstance(
					Class.forName(temp), null);
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
	}
    this.data.readFields(in);  
}

 

最后实现的代码如下:

package ch5;


import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
  

/***
 * 
 * Hadoop1.2的版本,旧版本实现的Reduce侧连接
 * 
 * @author qindongliang
 * 
 *    大数据交流群:376932160
 *  搜索技术交流群:324714439
 * 
 * 
 */

public class ReduceSideJoin extends Configured implements Tool {  
	
	/**
	 * 
	 * Map实现
	 * 
	 * */
    public static class MapClass extends DataJoinMapperBase {  
          
    	/**
    	 * 读取输入的文件路径
    	 * 
    	 * **/
        protected Text generateInputTag(String inputFile) {  
            
        	//返回文件路径,做标记
            return new Text(inputFile);  
        }  
          
        
        /***
         * 分组的Key
         * 
         * **/
        
        protected Text generateGroupKey(TaggedMapOutput aRecord) {  
			Text tag = aRecord.getTag();
			String line = ((Text)aRecord.getData()).toString();
			if(line.trim().length() < 2) return null;
			String sep = "\t";
			if(tag.toString().contains("u.user")) {
				sep = "[|]";
			}
			String[] tokens = line.split(sep);
			String groupKey = tokens[0];
			return new Text(groupKey);
        }  
          
        protected TaggedMapOutput generateTaggedMapOutput(Object value) {  
            TaggedWritable retv = new TaggedWritable((Text) value);  
            retv.setTag(this.inputTag);  
            return retv;  
        }  
    }  
      
    /**
     * 
     * Reduce进行笛卡尔积
     * 
     * **/
    public static class Reduce extends DataJoinReducerBase {  
          
    	
    	/***
    	 * 笛卡尔积
    	 * 
    	 * */
        protected TaggedMapOutput combine(Object[] tags, Object[] values) {  
            if (tags.length < 2) return null;		// 
            // 开始连接两边的内容
            String str = "";
            String userInfo = "";
            List<String> ratingDataList = new ArrayList<String>();
            
            for(int i = 0 ; i < tags.length; i++) {
            	
            	Text curTag = (Text) tags[i];
            	String line = ((TaggedWritable)values[i]).getData().toString();
            	if(curTag.toString().contains("u.user")) {
            		String[] tokens = line.split("[|]"); // 对于u.user 分隔符是| 并且只需要他的年龄这一列
            		userInfo = "age=" + tokens[1];
            	} else {
            		String[] tokens = line.split("\t");		// 对于u.data 分隔符是制表符"\t" 需要的是ratings这一列
            		ratingDataList.add(tokens[2]);
            	}
            }
            str = userInfo + ",ratings=" +  StringUtils.join(ratingDataList, "|");
            TaggedWritable retv = new TaggedWritable(new Text(str));  
            retv.setTag((Text) tags[0]);   
            return retv;  
        }  
    }  
      
    /**
     * 
     * 自定义的输出类型
     * 
     * ***/
    public static class TaggedWritable extends TaggedMapOutput {  
      
        private Writable data;  
          
        /**
         * 注意加上构造方法
         * 
         * */
        public TaggedWritable() {
		}
        
        public TaggedWritable(Writable data) {  
            this.tag = new Text("");  
            this.data = data;  
        }  
          
        public Writable getData() {  
            return data;  
        }  
          
        public void write(DataOutput out) throws IOException {  
            this.tag.write(out);  
            //此行代码很重要
            out.writeUTF(this.data.getClass().getName());
           
            this.data.write(out); 
            
        }  
          
		public void readFields(DataInput in) throws IOException {  
		    this.tag.readFields(in);  
		    //加入此部分代码,否则,可能报空指针异常
			String temp=in.readUTF();
			if (this.data == null|| !this.data.getClass().getName().equals(temp)) {
				try {
					this.data = (Writable) ReflectionUtils.newInstance(
							Class.forName(temp), null);
				} catch (ClassNotFoundException e) {
					e.printStackTrace();
				}
			}
		    this.data.readFields(in);  
		}  
    }  
      
    public int run(String[] args) throws Exception {  
        Configuration conf = getConf();  
          
        JobConf job = new JobConf(conf, ReduceSideJoin.class);  
      	job.setJarByClass(ReduceSideJoin.class);
      		 
      	String path = "/home/hadoop/DataSet/movielens-output";
		FileSystem fs=FileSystem.get(conf);
		Path out = new Path(path);
		if(fs.exists(out)){
			fs.delete(out, true);
			System.out.println("输出路径存在,已删除!");
		}
        
        Path in = new Path("/home/hadoop/DataSet/movielens");  
        FileInputFormat.setInputPaths(job, in);  
        FileOutputFormat.setOutputPath(job, out);  
          
        job.setJobName("ReduceSideJoin");  
        job.setMapperClass(MapClass.class);  
        job.setReducerClass(Reduce.class);  
          
        job.setInputFormat(TextInputFormat.class);  
        job.setOutputFormat(TextOutputFormat.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(TaggedWritable.class);  
          
        JobClient.runJob(job);   
        return 0;  
    }  
      
    public static void main(String[] args) throws Exception {   
    	
        int res = ToolRunner.run(new Configuration(),  
                                 new ReduceSideJoin(),  
                                 args);  
          
        System.exit(res);  
    }  
}  

 

 

分享到:
评论

相关推荐

    19、Join操作map side join 和 reduce side join

    在大数据处理领域,Hadoop MapReduce 是一种广泛使用的分布式计算框架。在处理涉及多数据集的任务时,Join 操作是必不可少的,它用于合并来自不同数据源的相关数据。本文主要探讨了两种 MapReduce 中的 Join 实现:...

    hadoop_join.jar.zip_hadoop_hadoop query_reduce

    本文将深入探讨如何使用Hadoop和MapReduce进行高效的Join查询,并解析如何通过`hadoop_join.jar`这个工具来实现这一过程。 Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它的核心组件包括HDFS(Hadoop ...

    6.Hadoop入门进阶课程_第6周_MapReduce应用案例.pdf

    Reduce-side join在shuffle阶段会进行大量数据传输,导致网络IO效率低下,而map-side join适用于处理一个大表和一个小表的场景。在这种场景中,小表可以直接放入内存,通过DistributedCache类将小表复制多份,每个...

    hadoop mapreduce多表关联join多个job相互依赖传递参数

    - **Reduce-side Join**:更通用的方法,所有表的数据在mapper阶段分别处理,然后在reducer阶段进行关联,但可能涉及大量的数据交换。 - **Bucket Join**或**Sort-Merge Join**:通过预处理使得不同表的数据在相同...

    Hadoop开发第四期

    - **Reduce-Side Join**:在Reduce阶段完成Join操作,适用于两个表大小相当的情况。 - **Broadcast Join**:将一个小表广播到所有节点,适用于小表连接大表的情况,可以减少网络传输量。 #### 四、配置Hive元数据...

    dept_marks_reduceside_join

    本项目"dept_marks_reduceside_join"是基于Java实现的一个MapReduce程序,其目标是将学生所属的部门数据与他们的分数进行连接,以便于分析学生在各个部门的表现情况。下面我们将深入探讨这个项目的实现原理和关键...

    hadoop_join_aggregate:在hadoop中加入和聚合mapreduce算法

    Map side join 比 reducer side join 快。 但是只有当您执行映射端连接操作的表之一小到足以放入内存时,映射端连接才足够。 日期集信息 客户数据集:每行包含:CustID、Name、Age、CountryCode、Salary。 交易...

    在Hadoop Map-Reduce中实施联接

    5. **Three-way Join 和 Multi-way Join**:对于更多的数据集,可以扩展Reduce-Side Join的概念,但可能会增加复杂性和性能挑战。通常需要更复杂的分区策略和数据预处理。 在实现Join时,还需要考虑以下优化策略: ...

    Big-Data---Hadoop---Yelp-Dataset:文件夹包含5个问题和pdf,其中包含有关五个问题和Yelp数据集的描述

    大数据--Hadoop--Yelp数据集 文件夹包含5个问题和pdf,其中包含有关五个问题和Yelp数据... 问题5:Map Side Join:将所有业务实体加载到分布式缓存中。 使用地图侧连接列出了位于特定区域中的企业的用户ID和评论文本。

    Mapside-Join

    Mapside Join适用于小表与大表JOIN的情况,但并非所有JOIN都可以使用此方法。如果两个表都非常大,都无法完全装入内存,那么就需要考虑其他分布式JOIN策略,如Bucket Join或基于排序的JOIN。 在实际应用中,为了...

    大数据MapReduce文件分发

    - 为了提高效率,Hadoop MapReduce支持多种优化技术,如Combiner(局部聚合)、Spill(磁盘溢出)和Map-side Join(基于广播的小表)等。 9. **Hadoop的缓存机制**: - MapReduce可以通过缓存机制将中间结果或...

    Big-Data_hadoop_Yelp_Data_Analysis

    大数据--Hadoop--Yelp数据集文件夹包含5个问题和pdf,其中包含有关五个问题和Yelp数据集... 问题5:Map Side Join:将所有业务实体加载到分布式缓存中。 使用地图侧连接列出了位于特定区域中的企业的用户ID和评论文本。

    Hive & Performance 学习笔记

    Reduce-side join,也称为Shuffle Join,是MapReduce中的默认Join方式,数据在reduce阶段进行合并匹配。Map-side join适用于小表可以完全加载到内存的情况,所有节点都会保存小表,从而实现一次遍历大表完成Join。...

    Hive-Summit-2011-join.zip_hive

    3. **Map-side Join**:为了解决大规模数据集的Join问题,Hive引入了Map-side Join。这种方法适用于小表与大表的连接,小表可以被完全加载到内存中,从而避免了在Reduce阶段的昂贵数据交换。 4. **Bucketing与...

    hive简介共5页.pdf.zip

    5. **性能优化**:Hive提供了多种优化策略,如Joins优化(Map-side Join, Reduce-side Join)、Bucketing和Sorting,以及使用Tez或Spark作为执行引擎以提高性能。 6. **Hive与Hadoop的关系**:Hive是建立在Hadoop之...

    Data-Intensive+Text+Processing+with+MapReduce

    关系联接是数据库中的常见操作,在MapReduce中也有对应的方法实现,包括Reduce-Side Join、Map-Side Join和Memory-Backed Join。这些方法能够实现在分布式环境下的表连接,对于数据集成和分析具有重要意义。 ### ...

    高性能并发业务Spark(1).pdf

    这通常发生在使用distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等算子时。为了判断和定位数据倾斜,开发者可以通过查看SparkWebUI的log信息或界面来观察哪些stage任务执行缓慢,...

    Hive_优化——深入浅出学Hive

    6. **合并 MapReduce 操作**:通过 Combine 或 Reduce-Side Join 减少中间步骤。 7. **Partition**:利用分区减少数据扫描,提高查询效率。 通过以上优化策略,可以显著提升 Hive 在大数据处理中的性能,使得数据...

    分布式平台等值连接优化技术分析.pdf

    针对等值连接优化技术的研究,常见的方法有Map-side Join和Reduce-side Join。Map-side Join适用于小表可以完全加载到内存中,并广播到所有分布式节点上的场景。它避免了shuffle和reduce操作,从而提升了效率。...

Global site tag (gtag.js) - Google Analytics