`
hududumo
  • 浏览: 244447 次
文章分类
社区版块
存档分类
最新评论

mapreduce程序reduce输出控制

 
阅读更多

1,在hadoop中,reduce支持多个输出,输出的文件名也是可控的,就是继承MultipleTextOutputFormat类,重写generateFileNameForKey方法

public class LzoHandleLogMr extends Configured implements Tool {

	 static class LzoHandleLogMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {
       
	  
		public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
	    	try {
	    	    String[] sp = value.toString().split(",");
	    		output.collect(new Text(sp[0]), value);
	    	}catch (Exception e) {
			   e.printStackTrace();
		    }    	
		}


	}
	static class LzoHandleLogReducer  extends MapReduceBase implements Reducer<Text, Text, Text, NullWritable> {
        


		@Override
		public void reduce(Text key, Iterator<Text> values,
				OutputCollector<Text, NullWritable> output, Reporter reporter)
				throws IOException {
			while (values.hasNext()) {
		   		  output.collect(values.next(), NullWritable.get());   
		   	   }
			
		}	
	}
	
	public static class LogNameMultipleTextOutputFormat extends MultipleTextOutputFormat<Text, NullWritable> 
	   {


		@Override
		protected String generateFileNameForKeyValue(Text key,
				NullWritable value, String name) {
			String sp[] = key.toString().split(",");
			String filename = sp[0];
			if(sp[0].contains(".")) filename="000000000000";
			return filename;
		}
		
	}
    


	@Override
	public int run(String[] args) throws Exception {
		 
		    JobConf jobconf = new JobConf(LzoHandleLogMr.class);
		    jobconf.setMapperClass(LzoHandleLogMapper.class);
		    jobconf.setReducerClass(LzoHandleLogReducer.class);
		    jobconf.setOutputFormat(LogNameMultipleTextOutputFormat.class);
		    jobconf.setOutputKeyClass(Text.class);
		    jobconf.setNumReduceTasks(12);
		    
		    
		 FileInputFormat.setInputPaths(jobconf,new Path(args[0]));
	    	FileOutputFormat.setOutputPath(jobconf,new Path(args[1]));
	    	FileOutputFormat.setCompressOutput(jobconf, true);
	    	FileOutputFormat.setOutputCompressorClass(jobconf, LzopCodec.class);  
	    	
	    	JobClient.runJob(jobconf);
	      return 0;
			
	}
}


在新版本的hadoopAPI是通过Job类来设置各种参数的,但是我调用 Job.setOutputFormatClass()来使用MultipleTextOutputFormat的时候,竟然报错,原因是必须继承子org.apache.hadoop.mapreduce.OutputFormat。0.20.2比较致命的其中一个bug, 升级到0.21能解决


2, 如果同一行数据,需要同时输出至多个文件的话,我们可以使用MultipleOutputs类:

  1. publicclassMultiFileextendsConfiguredimplementsTool{
  2. publicstaticclassMapClassextendsMapReduceBase
  3. implementsMapper<LongWritable,Text,NullWritable,Text>{
  4. privateMultipleOutputsmos;
  5. privateOutputCollector<NullWritable,Text>collector;
  6. publicvoidconfigure(JobConfconf){
  7. mos=newMultipleOutputs(conf);
  8. }
  9. publicvoidmap(LongWritablekey,Textvalue,
  10. OutputCollector<NullWritable,Text>output,
  11. Reporterreporter)throwsIOException{
  12. String[]arr=value.toString().split(",",-1);
  13. Stringchrono=arr[0]+","+arr[1]+","+arr[2];
  14. Stringgeo=arr[0]+","+arr[4]+","+arr[5];
  15. collector=mos.getCollector("chrono",reporter);
  16. collector.collect(NullWritable.get(),newText(chrono));
  17. collector=mos.getCollector("geo",reporter);
  18. collector.collect(NullWritable.get(),newText(geo));
  19. }
  20. publicvoidclose()throwsIOException{
  21. mos.close();
  22. }
  23. }
  24. publicintrun(String[]args)throwsException{
  25. Configurationconf=getConf();
  26. JobConfjob=newJobConf(conf,MultiFile.class);
  27. Pathin=newPath(args[0]);
  28. Pathout=newPath(args[1]);
  29. FileInputFormat.setInputPaths(job,in);
  30. FileOutputFormat.setOutputPath(job,out);
  31. job.setJobName("MultiFile");
  32. job.setMapperClass(MapClass.class);
  33. job.setInputFormat(TextInputFormat.class);
  34. job.setOutputKeyClass(NullWritable.class);
  35. job.setOutputValueClass(Text.class);
  36. job.setNumReduceTasks(0);
  37. MultipleOutputs.addNamedOutput(job,
  38. "chrono",
  39. TextOutputFormat.class,
  40. NullWritable.class,
  41. Text.class);
  42. MultipleOutputs.addNamedOutput(job,
  43. "geo",
  44. TextOutputFormat.class,
  45. NullWritable.class,
  46. Text.class);
  47. JobClient.runJob(job);
  48. return0;
  49. }
  50. }

这个类维护了一个<name, OutputCollector>的map。我们可以在job配置里添加collector,然后在reduce方法中,取得对应的collector并调用collector.write即可。


分享到:
评论

相关推荐

    window下eclipse中运行mapreduce程序所需要的Hadoop全部jar包

    在Windows环境下,使用Eclipse开发MapReduce程序时,必须确保所有必要的Hadoop库都被正确引入。这是因为MapReduce是Hadoop生态系统中的核心组件,用于处理分布式计算任务。以下是一些关于如何在Eclipse中配置和使用...

    使用命令行编译打包运行自己的MapReduce程序 Hadoop2.6.0

    ### 使用命令行编译打包运行自己的MapReduce程序 Hadoop2.6.0 #### Hadoop 2.x 版本变化及依赖分析 在Hadoop 2.x版本中,相较于早期版本,其架构和依赖库有了明显的变化。在早期版本如1.x中,所有的依赖都集中在`...

    mapreduce程序

    Hadoop MapReduce将大规模数据处理的任务分解为两个主要阶段:Map(映射)和Reduce(化简),使得并行处理变得可能,从而提高了计算效率。 **Map阶段**: Map阶段是MapReduce工作流程的第一步,它接收输入数据集,...

    中文分词mapreduce程序

    总结来说,这个“中文分词MapReduce程序”是一个基于Java的分布式分词工具,利用MapReduce模型对大量中文文本进行高效分词处理,通过分词将原始文本转化为可供分析的词汇单元,并在Reduce阶段完成词频统计。...

    MapReduce2.0程序设计多语言编程(理论+实践)

    3. **Java编程**:Java是MapReduce的原生语言,使用Hadoop的API可以直接创建MapReduce程序。Java API提供了丰富的类和接口,使得开发过程更为直观和便捷。例如,`org.apache.hadoop.mapreduce`包下的各种类和接口,...

    第一个Mapreduce程序.pdf

    本文主要介绍了如何使用Java编写MapReduce程序,并运行第一个MapReduce作业,包括遇到的问题和解决方案。 首先,环境搭建是使用Hadoop MapReduce的重要步骤。本文的环境基于CDH5(Cloudera's Distribution ...

    使用hadoop-streaming运行Python编写的MapReduce程序.rar

    这个压缩包“使用hadoop-streaming运行Python编写的MapReduce程序.rar”显然是一个教程或示例,旨在指导用户如何利用Python编写MapReduce任务,并通过Hadoop Streaming进行执行。 MapReduce是一种编程模型,由...

    简单的MapReduce程序(Hadoop2.2.0)

    MapReduce程序由两个主要阶段组成:Map阶段和Reduce阶段。在这个"MaxTemperature"程序中,我们的目标可能是找出输入数据集中的最高温度记录。输入数据可能存在于`inputdata`目录下,通常这些数据会以一种特定格式...

    基于Java和mapreduce实现的贝叶斯文本分类器设计.zip

    测试过程可基于单机Java程序,也可以是MapReduce程序。输出每个测试文档的分类结果; 3:利用测试文档的真实类别,计算分类模型的Precision,Recall和F1值。 详细介绍参考:...

    深入探究如何使用Java编写MapReduce程序.rar

    本篇将深入探讨如何使用Java编程语言来编写MapReduce程序。 一、MapReduce模型概述 MapReduce包含两个主要阶段:Map阶段和Reduce阶段。Map阶段将输入数据分割成多个小块,然后对每个数据块应用一个用户定义的Map...

    MapReduce简单程序示例

    对于初学者,理解并编写一个简单的MapReduce程序通常从处理文本数据开始。例如,我们可以编写一个程序来计算一个大型文本文件中每个单词出现的次数。在Map阶段,我们提取每个行中的单词,生成形如(单词,1)的键值...

    Java编写Mapreduce程序过程浅析

    在大数据处理领域,Apache Hadoop的MapReduce框架是不可或缺的一部分,尤其对于Java开发者而言,学习如何用Java编写MapReduce程序是提升数据处理能力的关键。本文将深入浅出地解析Java MapReduce程序的编写过程,...

    数据存储实验5-编写MapReduce程序实现词频统计.doc(实验报告)

    数据存储实验5-编写MapReduce程序实现词频统计 本实验的主要目的是通过编写MapReduce程序来实现词频统计,熟悉Hadoop中的MapReduce模块的处理逻辑和编程。实验中,我们将使用Linux操作系统和Eclipse或Intellij Idea...

    一个MapReduce简单程序示例

    在这个"一个MapReduce简单程序示例"中,我们将探讨MapReduce的基本原理、工作流程以及如何在Hadoop环境下实现一个简单的MapReduce程序。 首先,MapReduce的核心思想可以概括为两个主要阶段:Map(映射)和Reduce...

    8-在Eclipse中运行MapReduce程序1

    总结来说,在Eclipse中运行Hadoop MapReduce程序涉及创建项目,编写主类和Mapper、Reducer类,以及正确配置输入输出路径。通过这个过程,我们可以理解MapReduce的基本工作流程,并能实际操作执行分布式计算任务。...

    大数据实验5实验报告:MapReduce 初级编程实践

    以下是MapReduce程序的关键部分: ```java public static class Map extends Mapper, Text, Text, Text&gt; { // 这里实现map方法,将输入的value复制到输出的key上 public void map(Object key, Text value, Context...

    云计算技术实验报告三运行Hadoop MapReduce程序

    实验报告的标题为“云计算技术实验报告三运行Hadoop MapReduce程序”,这表明实验的核心是了解和实践Hadoop MapReduce的编程模型。MapReduce是一种分布式计算框架,常用于处理和生成大规模数据集,由Google提出并被...

    windows下配置cygwin、hadoop等并运行mapreduce及mapreduce程序讲解

    【Windows下配置Cygwin、Hadoop环境及MapReduce程序运行详解】 在Windows操作系统中运行Hadoop和MapReduce程序,通常需要借助Cygwin来模拟Linux环境,因为Hadoop主要设计用于类Unix系统。Cygwin是一个提供Linux环境...

    大数据平台构建:一个简单的MapReduce程序.pptx

    主函数是MapReduce程序的起点,负责配置和启动整个流程。首先,创建`Configuration`对象以读取Hadoop的配置信息。接着,构建`Job`实例,指定程序入口、设置Mapper和Reducer类,以及定义输出的key/value类型。最后,...

Global site tag (gtag.js) - Google Analytics