`
小野bupt
  • 浏览: 14801 次
  • 性别: Icon_minigender_1
文章分类
社区版块
存档分类
最新评论

关于 hadoop reduce 阶段遍历 Iterable 的 2 个“坑”

 
阅读更多

原文链接:http://my.oschina.net/leejun2005/blog/131744

之前有童鞋问到了这样一个问题:为什么我在 reduce 阶段遍历了一次Iterable 之后,再次遍历的时候,数据都没了呢?可能有童鞋想当然的回答:Iterable 只能单向遍历一次,就这样简单的原因。。。事实果真如此吗?

还是用代码说话:

package com.test;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class T {

	public static void main(String[] args) {

		// 只要实现了Iterable接口的对象都可以使用for-each循环。
		// Iterable接口只由iterator方法构成,
		// iterator()方法是java.lang.Iterable接口,被Collection继承。
		/*public interface Iterable<T> {
			Iterator<T> iterator();
		}*/
		Iterable<String> iter = new Iterable<String>() {
			public Iterator<String> iterator() {
				List<String> l = new ArrayList<String>();
				l.add("aa");
				l.add("bb");
				l.add("cc");
				return l.iterator();
			}
		};
		for(int count : new int[] {1, 2}){
			for (String item : iter) {
				System.out.println(item);
			}
			System.out.println("---------->> " + count + " END.");
		}
	}
}


结果当然是很正常的完整无误的打印了两遍Iterable 的值。那究竟是什么原因导致了 reduce 阶段的Iterable 只能被遍历一次呢?

我们先看一段测试代码:

测试数据:

a 3
a 4
b 50
b 60
a 70
b 8
a 9
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class TestIterable {

	public static class M1 extends Mapper<Object, Text, Text, Text> {
		private Text oKey = new Text();
		private Text oVal = new Text();
		String[] lineArr;

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			lineArr = value.toString().split(" ");
			oKey.set(lineArr[0]);
			oVal.set(lineArr[1]);
			context.write(oKey, oVal);
		}
	}

	public static class R1 extends Reducer<Text, Text, Text, Text> {
		List<String> valList = new ArrayList<String>();
		List<Text> textList = new ArrayList<Text>();
		String strAdd;
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
				InterruptedException {
			valList.clear();
			textList.clear();
			strAdd = "";
			for (Text val : values) {
				valList.add(val.toString());
				textList.add(val);
			}
			
			// 坑之 1 :为神马输出的全是最后一个值?why?
			for(Text text : textList){
				strAdd += text.toString() + ", ";
			}
			System.out.println(key.toString() + "\t" + strAdd);
			System.out.println(".......................");
			
			// 我这样干呢?对了吗?
			strAdd = "";
			for(String val : valList){
				strAdd += val + ", ";
			}
			System.out.println(key.toString() + "\t" + strAdd);
			System.out.println("----------------------");
			
			// 坑之 2 :第二次遍历的时候为什么得到的都是空?why?
			valList.clear();
			strAdd = "";
			for (Text val : values) {
				valList.add(val.toString());
			}
			for(String val : valList){
				strAdd += val + ", ";
			}
			System.out.println(key.toString() + "\t" + strAdd);
			System.out.println(">>>>>>>>>>>>>>>>>>>>>>");
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		conf.set("mapred.job.queue.name", "regular");
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: wordcount <in> <out>");
			System.exit(2);
		}
		System.out.println("------------------------");
		Job job = new Job(conf, "TestIterable");
		job.setJarByClass(TestIterable.class);
		job.setMapperClass(M1.class);
		job.setReducerClass(R1.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		// 输入输出路径
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileSystem.get(conf).delete(new Path(otherArgs[1]), true);
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}


在 Eclipse 控制台中的结果如下:

a	9, 9, 9, 9, 
.......................
a	3, 4, 70, 9, 
----------------------
a	
>>>>>>>>>>>>>>>>>>>>>>
b	8, 8, 8, 
.......................
b	50, 60, 8, 
----------------------
b	
>>>>>>>>>>>>>>>>>>>>>>


关于第 1 个坑:对象重用(objects reuse

reduce方法的javadoc中已经说明了会出现的问题:

The framework calls this method for each <key, (list of values)> pair in the grouped inputs. Output values must be of the same type as input values. Input keys must not be altered. The framework will reuse the key and value objects that are passed into the reduce, therefore the application should clone the objects they want to keep a copy of.

也就是说虽然reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。所以如果要保存key或者value的结果,只能将其中的值取出另存或者重新clone一个对象(例如Text store = new Text(value) 或者 String a = value.toString()),而不能直接赋引用。因为引用从始至终都是指向同一个对象,你如果直接保存它们,那最后它们都指向最后一个输入记录。会影响最终计算结果而出错。

看到这里,我想你会恍然大悟:这不是刚毕业找工作,面试官常问的问题:String 是不可变对象但为什么能相加呢?为什么字符串相加不提倡用 String,而用 StringBuilder ?如果你还不清楚这个问题怎么回答,建议你看看这篇深入理解 String, StringBuffer 与 StringBuilder 的区别http://my.oschina.net/leejun2005/blog/102377

关于第 2 个坑:http://stackoverflow.com/questions/6111248/iterate-twice-on-values

The Iterator you receive from that Iterable's iterator() method is special. The values may not all be in memory; Hadoop may be streaming them from disk. They aren't really backed by a Collection, so it's nontrivial to allow multiple iterations.

最后想说明的是:hadoop 框架的作者们真的是考虑很周全,在 hadoop 框架中,不仅有对象重用,还有 JVM 重用等,节约一切可以节约的资源,提高一切可以提高的性能。因为在这种海量数据处理的场景下,性能优化是非常重要的,你可能处理100条数据体现不出性能差别,但是你面对的是千亿、万亿级别的数据呢?

PS:

我的代码是在 Eclipse 中远程调试的,所以 reduce 是没有写 hdfs 的,直接在 eclipse 终端上可以看到结果,很方便,关于怎么在 windows 上远程调试 hadoop,请参考这里实战 windows7 下 eclipse 远程调试 linux hadoophttp://my.oschina.net/leejun2005/blog/122775

REF:

hadoop中迭代器的对象重用问题

http://paddy-w.iteye.com/blog/1514595

关于 hadoop 中 JVM 重用和对象重用的介绍

http://wikidoop.com/wiki/Hadoop/MapReduce/Reducer

分享到:
评论

相关推荐

    Hadoop Reduce Join及基于MRV2 API 重写

    标题 "Hadoop Reduce Join及基于MRV2 API 重写" 涉及到的是大数据处理框架Hadoop中的一个重要操作,即数据连接(Join)。在Hadoop MapReduce中,Reduce Join是一种实现大规模数据集间连接的高效方法。本文将探讨...

    hadoop map-reduce turorial

    **逐步详解**:从输入文件的读取到最终输出结果的生成,Map-Reduce框架通过Map和Reduce两个阶段实现了数据的高效处理,展现了其强大的数据并行处理能力。 #### Map-Reduce 用户界面 **负载**:用户可以通过配置...

    hadoop map reduce 中文教程

    MapReduce 主要分为两个阶段:Map 阶段和 Reduce 阶段。 - **Map 阶段**:在这个阶段,原始数据被分割成一系列的键值对(key-value pairs),然后由 Map 函数进行处理。Map 函数会对每一个输入的键值对执行相同的处理...

    Hadoop Map-Reduce教程

    在 Hadoop Map-Reduce 中,数据处理过程主要分为两个阶段:**Map 阶段** 和 **Reduce 阶段**。 ##### Map 阶段 Map 函数接收输入数据块,并将其转换为一系列键值对。这一阶段的主要任务是对输入数据进行预处理,...

    最高气温 map reduce hadoop 实例

    它分为两个主要阶段:Map阶段和Reduce阶段。在Map阶段,原始数据被分割成多个块,并分发到集群的不同节点上进行并行处理。每个节点上的Mapper函数会读取输入数据,对数据进行局部处理,并生成键值对的形式作为中间...

    WordCount2_hadoopwordcount_

    `Hadoop WordCount`的工作流程分为两个主要阶段:Map阶段和Reduce阶段。在Map阶段,输入数据(通常是文本文件)被分割成多个块,每个块由一个Map任务处理。Map任务读取数据,对每一行进行分词,然后将每个单词与计数...

    远程调用执行Hadoop Map/Reduce

    2. **JobTracker(Hadoop 1.x)或ResourceManager(Hadoop 2.x+)**:这是Hadoop集群的中心调度器,负责分配Map和Reduce任务到合适的DataNode上。在Hadoop 2.x及以上版本中,YARN(Yet Another Resource Negotiator...

    Hadoop Map Reduce教程

    - **定义**:Hadoop MapReduce 是一个基于 Java 的分布式数据处理框架,它能够高效地处理大规模数据集。该框架将任务分解为一系列较小的任务(Map 和 Reduce),并在集群中的多台计算机上并行执行这些任务。 - **...

    hadoop几个实例

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在大规模集群中高效处理和存储海量数据。这个压缩包文件包含的"hadop实用案例"很可能是为了帮助初学者理解和应用Hadoop技术。以下是关于Hadoop的一些...

    Map-Reduce原理体系架构和工作机制,eclipse与Hadoop集群连接

    Map-Reduce的设计初衷是为了简化大数据处理任务,通过将这些任务分解成两个阶段——Map阶段和Reduce阶段来实现。 #### 二、Map-Reduce的工作机制 **1. 分片(Splitting)** 数据首先被分片,即将原始输入数据分割...

    hadoop_join.jar.zip_hadoop_hadoop query_reduce

    MapReduce的工作原理可以分为两个主要阶段:Map阶段和Reduce阶段。在Map阶段,数据被分割成多个块,并在不同的节点上并行处理。每个Map任务接收一部分输入数据,执行指定的映射操作,生成中间键值对。在Reduce阶段,...

    hadoop中map/reduce

    MapReduce的设计理念源于Google的同名论文,它通过将大规模数据处理任务分解为两个阶段:Map(映射)和Reduce(化简),使得海量数据能够在多台计算机上并行处理,极大地提高了数据处理效率。 Map阶段是数据处理的...

    基于hadoop实现输出出现频率最高的20个词

    它通过将任务分解为两个阶段(Map阶段和Reduce阶段)来处理数据。Map阶段负责处理输入数据并生成中间结果,而Reduce阶段则负责对这些中间结果进行汇总,得出最终的结果。 ### 三、实现思路 #### 3.1 多Job级联 本...

    Hadoop mapreduce实现wordcount

    这个文件可能是 Hadoop 官方文档中关于 WordCount 示例的一个章节,详细介绍了如何编写 Map 和 Reduce 函数,以及如何配置和运行 WordCount 作业。可能包括了 Java 代码示例,讲解如何创建 MapReduce 程序,并将其...

    关于hadoop设计模式,基于hadoop1.2.2.zip

    它将复杂任务分解为两个阶段:Map阶段和Reduce阶段。Map阶段将输入数据切分为键值对,然后进行局部处理;Reduce阶段聚合Map阶段的结果,执行全局计算。MapReduce的关键设计模式包括: 1. 数据分区与排序:Map阶段...

    Hadoop权威指南(第2版).pdf

    需要注意的是,虽然本回答提供了关于Hadoop的详细知识点,但是提供的部分内容实际上并未包含任何有实际意义的信息,重复出现的网址"***"并不提供任何关于Hadoop的具体内容,因此对于深入理解Hadoop并没有实际帮助。...

    hadoop map reduce hbase 一人一档

    标题“hadoop map reduce hbase 一人一档”揭示了这个系统的核心组成部分。Hadoop MapReduce是一种分布式计算框架,用于处理和存储大规模数据集。它通过将复杂任务分解为可并行处理的“映射”和“化简”阶段,使得在...

    Hadoop 2.x

    Hadoop 2.x 是一个开源的分布式计算框架,它是Apache Hadoop项目的最新版本,旨在提供高效、可扩展的数据处理能力。这个版本引入了若干关键改进,使得Hadoop更适合大数据处理的需求,提高了系统的性能和可用性。 **...

    hadoop Join代码(map join 和reduce join)

    public void reduce(Text key, Iterable&lt;Text&gt; values, Context context) { StringBuilder result = new StringBuilder(); for (Text value : values) { result.append(value).append(","); } // 去除尾部逗号...

Global site tag (gtag.js) - Google Analytics