`
ganliang13
  • 浏览: 252535 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

基于hadoop的多个reduce 输出

阅读更多
import java.io.File;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipOutputWordCount extends Configured implements Tool {
	/*
	 * Mapper<Object, Text, Text, IntWritable>
	 * Object ,读取的字节偏移量
	 * Text Map读取的文本行
	 * Text Map的输出Key
	 * IntWritable 的输出Value
	 */
	public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			//一行行读取文件内容,一行行处理文件
			StringTokenizer itr = new StringTokenizer(value.toString());//对输入行切词,eg:Hello World,Hello Hadoop
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);//<Hello,1>,<World,1>,<Hello,1>,<Hadoop,1>
			}
		}
	}

	/**
	 *   Reducer<Text, IntWritable, Text, IntWritable> 
	 * Text:Reduce 输入Key
	 * IntWritable:Reduce的输入Value
	 * Text: Reduce 输出Key 默认类型
	 * IntWritable,输入Value,默认类型LongWritable
	 */
	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();
		@SuppressWarnings("rawtypes")
		private MultipleOutputs multipleOutputs;

		protected void setup(Context context) throws IOException, InterruptedException {
			multipleOutputs =new MultipleOutputs<Text,IntWritable>(context);
		}
		
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			multipleOutputs.close();
		}
		@SuppressWarnings("unchecked")
		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			multipleOutputs.write(NullWritable.get(), new Text(key.toString()+":"+result), "1");
			multipleOutputs.write(NullWritable.get(), key, "2");
			multipleOutputs.write(NullWritable.get(), "我是你大爷", "3");
		}
	}
	
	public static class MultipOutputWordFormat extends MultipleTextOutputFormat<Text, IntWritable>{
		
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new MultipOutputWordCount(), args));
	}

	@Override
	public int run(String[] args) throws Exception {
		File jarFile = EJob.createTempJar("bin");
		ClassLoader classLoader = EJob.getClassLoader();
		Thread.currentThread().setContextClassLoader(classLoader);
		//Hadoop 运行环境
		Configuration conf = new Configuration();
		conf.set("mapred.job.tracker", "bfdbjc1:12001");
		
		//任务参数设置
		  //a.创建任务,并设置名称,以便跟踪
		Job job = new Job(conf, "word count");
		  //b.运行主类,Map类,Reduce类
		job.setJarByClass(MultipOutputWordCount.class);
		job.setMapperClass(MultipOutputWordCount.TokenizerMapper.class);
		job.setReducerClass(MultipOutputWordCount.IntSumReducer.class);
		//下面两行不需要写,Map默认输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		  //c.设置Reduce输入输出类型,Map默认出及Reduce默认输入是<Text,IntWritable>
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);
		
		//HDFS输入,如果是路径默认读取路径下所有文件.
		FileInputFormat.addInputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/a.txt"));
		//reduce 输出路径
		FileOutputFormat.setOutputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/output/2da1"));
		
		//Eclipse 本地提交
		((JobConf) job.getConfiguration()).setJar(jarFile.toString());
		
		//等待任务运行完成
		 job.waitForCompletion(true);
		 return 0;
	}
}

 

分享到:
评论

相关推荐

    基于Hadoop-Map Reduce的算法.zip

    《基于Hadoop-MapReduce的算法详解》 在当今大数据时代,处理海量数据已经成为企业和科研机构的日常需求。为了应对这种挑战,一个强大的工具——Hadoop应运而生。Hadoop是一个开源框架,专为分布式存储和大规模数据...

    基于Hadoop的交通视频大数据监控方案.pdf

    在本文中,李晓蕾作者对基于Hadoop技术的交通视频大数据监控方案进行了深入的研究。针对海量交通视频数据监控和分析问题,本研究提出了异常检测算法的设计方案,并实现了交通数据的实时更新和异常分析。在此基础上,...

    基于hadoop的云计算基础架构分析

    【基于Hadoop的云计算基础架构分析】 随着大数据时代的到来,数据的海量存储和高效处理成为技术发展的关键。Hadoop作为一款开源的分布式计算框架,因其高效、可扩展和成本效益高的特性,被广泛应用于云计算领域。...

    基于hadoop实现输出出现频率最高的20个词

    为了找出频率最高的前N个词,我们需要级联多个Job。第一个Job负责统计每个单词的出现次数,第二个Job则用于找出出现频率最高的前N个词。 ```java public class TopN extends Configured implements Tool { public ...

    基于Hadoop的分布式云计算_云存储方案的研究与设计.pdf

    【基于Hadoop的分布式云计算_云存储方案的研究与设计】主要涵盖了云计算领域的多个核心知识点,包括分布式系统、分布式开发、云存储以及MapReduce模型。以下是对这些知识点的详细阐述: 1. **分布式系统**:分布式...

    基于Hadoop的云计算模型

    - **Shuffle阶段**:在Map阶段结束后,系统会对所有中间结果进行排序和分区,以便将相同键的键值对分发给同一个Reduce任务处理。 - **Reduce阶段**:在这个阶段,Reduce任务收集相同键的所有键值对,并执行汇总操作...

    基于Hadoop的网络日志分析系统研究

    为了验证该系统的有效性,研究人员进行了多个实验测试,包括但不限于: - **性能测试**:评估系统在处理大规模日志数据时的性能表现。 - **可扩展性测试**:测试系统是否能够随着数据量的增长而扩展。 - **容错能力...

    基于Hadoop的气象云储存与数据处理应用浅析.pdf

    基于Hadoop的气象云储存与数据处理应用浅析 本文主要介绍了Hadoop架构的构成,并对Hadoop架构的MapReduce实现进行了详细的描述。同时,本文还开发出一个在Hadoop架构的基础上进行气象数值统计的实例,并根据这个...

    基于Hadoop的研究及性能分析.pdf

    HDFS是Hadoop的数据存储系统,它将大文件分割成多个块,并且在集群的不同节点上冗余存储,确保数据的容错性和高可用性。这种分布式存储方式使得即使单个节点故障,系统仍能正常运行,因为数据在多个地方都有备份。 ...

    基于Hadoop的大数据网络安全实体识别方法.pdf

    基于Hadoop的大数据网络安全实体识别方法,可以通过HDFS和MapReduce对大量网络安全日志数据进行分布式存储和并行计算,大大提高了数据处理的速度和规模。在实现上,数据采集系统可以采集网络设备的数据信息,形成...

    基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据.zip

    在这个项目“基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据”中,我们将深入探讨如何利用 Hadoop 的核心组件 MapReduce 对 NBA 球员的数据进行分析。 MapReduce 是一种编程模型,用于大规模数据集...

    一个基于hadoop的大数据实战.zip

    《基于Hadoop的大数据实战详解》 在当今信息爆炸的时代,大数据已经成为企业决策、科学研究和社会治理的重要工具。而Hadoop作为开源的分布式计算框架,无疑是处理海量数据的首选方案之一。本文将深入探讨Hadoop在...

    基于Hadoop云计算平台的数据挖掘分析.pdf

    总结来说,基于Hadoop云计算平台的数据挖掘分析,不仅能够帮助企业在海量的客户数据中筛选出有用的信息,还能够通过云计算技术的特点,为数据存储提供安全性和高效的数据处理能力,从而为企业运营发展提供重要的数据...

    基于Hadoop云计算平台的文本处理算法的研究与改进.pdf

    本文研究基于Hadoop平台的文本处理算法,通过分析Hadoop的核心技术,搭建了能够完成分布式文本文件处理任务的云计算平台,并对文件文本内容处理算法进行了改进和实现。这在处理大规模文本数据时尤其有用,如数据去重...

    基于Hadoop的分布式索引构建

    以下是关于基于Hadoop的分布式索引构建的详细知识点。 首先,分布式索引是相较于传统单机索引而言的。在大规模数据集上构建索引时,分布式系统能将任务分解成多个小任务,通过分布式节点并行处理,显著提高效率。...

    基于Hadoop的大数据处理关键技术综述.pptx

    【基于Hadoop的大数据处理关键技术综述】 大数据,作为一种新兴的技术架构,旨在经济有效地从高频、海量、多结构和类型的复杂数据中挖掘价值。其主要特征包括高并发读写需求、海量数据的高效存储与访问以及高可扩展...

    基于Hadoop云计算智能家居信息处理平台设计.doc

    【标题】: "基于Hadoop云计算的智能家居信息处理平台设计" 【描述】: "随着物联网技术的普及,尤其是智能家居领域,对处理海量数据的需求日益增长,掌握云计算技术变得至关重要。云计算为智能家居提供了便利,简化...

    基于Hadoop的MapReduce框架研究报告.ppt

    基于Hadoop的MapReduce框架研究报告 本文将对基于Hadoop的MapReduce框架进行研究和分析,包括Hadoop简介、MapReduce计算模型、分布式并行编程概念、_instance分析等方面。 一、Hadoop简介 Hadoop是一个开源分布式...

    基于HADOOP的倒排索引实现

    总的来说,基于Hadoop的倒排索引实现是一个结合了分布式计算和高效数据结构的优秀实践,它展示了如何利用MapReduce模型解决大数据场景下的文本检索问题。通过理解这一过程,开发者可以更好地运用Hadoop来处理复杂的...

Global site tag (gtag.js) - Google Analytics