`
ganliang13
  • 浏览: 253122 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

基于hadoop的多个reduce 输出

阅读更多
import java.io.File;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MultipOutputWordCount extends Configured implements Tool {
	/*
	 * Mapper<Object, Text, Text, IntWritable>
	 * Object ,读取的字节偏移量
	 * Text Map读取的文本行
	 * Text Map的输出Key
	 * IntWritable 的输出Value
	 */
	public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			//一行行读取文件内容,一行行处理文件
			StringTokenizer itr = new StringTokenizer(value.toString());//对输入行切词,eg:Hello World,Hello Hadoop
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				context.write(word, one);//<Hello,1>,<World,1>,<Hello,1>,<Hadoop,1>
			}
		}
	}

	/**
	 *   Reducer<Text, IntWritable, Text, IntWritable> 
	 * Text:Reduce 输入Key
	 * IntWritable:Reduce的输入Value
	 * Text: Reduce 输出Key 默认类型
	 * IntWritable,输入Value,默认类型LongWritable
	 */
	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();
		@SuppressWarnings("rawtypes")
		private MultipleOutputs multipleOutputs;

		protected void setup(Context context) throws IOException, InterruptedException {
			multipleOutputs =new MultipleOutputs<Text,IntWritable>(context);
		}
		
		protected void cleanup(Context context) throws IOException,
				InterruptedException {
			multipleOutputs.close();
		}
		@SuppressWarnings("unchecked")
		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			multipleOutputs.write(NullWritable.get(), new Text(key.toString()+":"+result), "1");
			multipleOutputs.write(NullWritable.get(), key, "2");
			multipleOutputs.write(NullWritable.get(), "我是你大爷", "3");
		}
	}
	
	public static class MultipOutputWordFormat extends MultipleTextOutputFormat<Text, IntWritable>{
		
	}

	public static void main(String[] args) throws Exception {
		System.exit(ToolRunner.run(new MultipOutputWordCount(), args));
	}

	@Override
	public int run(String[] args) throws Exception {
		File jarFile = EJob.createTempJar("bin");
		ClassLoader classLoader = EJob.getClassLoader();
		Thread.currentThread().setContextClassLoader(classLoader);
		//Hadoop 运行环境
		Configuration conf = new Configuration();
		conf.set("mapred.job.tracker", "bfdbjc1:12001");
		
		//任务参数设置
		  //a.创建任务,并设置名称,以便跟踪
		Job job = new Job(conf, "word count");
		  //b.运行主类,Map类,Reduce类
		job.setJarByClass(MultipOutputWordCount.class);
		job.setMapperClass(MultipOutputWordCount.TokenizerMapper.class);
		job.setReducerClass(MultipOutputWordCount.IntSumReducer.class);
		//下面两行不需要写,Map默认输出类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		  //c.设置Reduce输入输出类型,Map默认出及Reduce默认输入是<Text,IntWritable>
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);
		
		//HDFS输入,如果是路径默认读取路径下所有文件.
		FileInputFormat.addInputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/a.txt"));
		//reduce 输出路径
		FileOutputFormat.setOutputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/output/2da1"));
		
		//Eclipse 本地提交
		((JobConf) job.getConfiguration()).setJar(jarFile.toString());
		
		//等待任务运行完成
		 job.waitForCompletion(true);
		 return 0;
	}
}

 

分享到:
评论

相关推荐

    基于Hadoop-Map Reduce的算法.zip

    《基于Hadoop-MapReduce的算法详解》 在当今大数据时代,处理海量数据已经成为企业和科研机构的日常需求。为了应对这种挑战,一个强大的工具——Hadoop应运而生。Hadoop是一个开源框架,专为分布式存储和大规模数据...

    基于Hadoop的交通视频大数据监控方案.pdf

    在本文中,李晓蕾作者对基于Hadoop技术的交通视频大数据监控方案进行了深入的研究。针对海量交通视频数据监控和分析问题,本研究提出了异常检测算法的设计方案,并实现了交通数据的实时更新和异常分析。在此基础上,...

    基于Hadoop的云盘系统.zip

    HDFS的核心思想是将大文件分割成多个块,每个块分布在不同的节点上,以实现数据的分布式存储。通过副本机制,HDFS确保了数据的可靠性和容错性。NameNode作为元数据管理节点,保存文件系统的命名空间和块信息,...

    基于hadoop的云计算基础架构分析

    【基于Hadoop的云计算基础架构分析】 随着大数据时代的到来,数据的海量存储和高效处理成为技术发展的关键。Hadoop作为一款开源的分布式计算框架,因其高效、可扩展和成本效益高的特性,被广泛应用于云计算领域。...

    基于hadoop实现输出出现频率最高的20个词

    为了找出频率最高的前N个词,我们需要级联多个Job。第一个Job负责统计每个单词的出现次数,第二个Job则用于找出出现频率最高的前N个词。 ```java public class TopN extends Configured implements Tool { public ...

    基于Hadoop的分布式云计算_云存储方案的研究与设计.pdf

    【基于Hadoop的分布式云计算_云存储方案的研究与设计】主要涵盖了云计算领域的多个核心知识点,包括分布式系统、分布式开发、云存储以及MapReduce模型。以下是对这些知识点的详细阐述: 1. **分布式系统**:分布式...

    基于Hadoop的云计算模型

    - **Shuffle阶段**:在Map阶段结束后,系统会对所有中间结果进行排序和分区,以便将相同键的键值对分发给同一个Reduce任务处理。 - **Reduce阶段**:在这个阶段,Reduce任务收集相同键的所有键值对,并执行汇总操作...

    基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据.zip

    在这个项目“基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据”中,我们将深入探讨如何利用 Hadoop 的核心组件 MapReduce 对 NBA 球员的数据进行分析。 MapReduce 是一种编程模型,用于大规模数据集...

    基于Hadoop的网络日志分析系统研究

    为了验证该系统的有效性,研究人员进行了多个实验测试,包括但不限于: - **性能测试**:评估系统在处理大规模日志数据时的性能表现。 - **可扩展性测试**:测试系统是否能够随着数据量的增长而扩展。 - **容错能力...

    基于Hadoop的气象云储存与数据处理应用浅析.pdf

    基于Hadoop的气象云储存与数据处理应用浅析 本文主要介绍了Hadoop架构的构成,并对Hadoop架构的MapReduce实现进行了详细的描述。同时,本文还开发出一个在Hadoop架构的基础上进行气象数值统计的实例,并根据这个...

    基于Hadoop的研究及性能分析.pdf

    HDFS是Hadoop的数据存储系统,它将大文件分割成多个块,并且在集群的不同节点上冗余存储,确保数据的容错性和高可用性。这种分布式存储方式使得即使单个节点故障,系统仍能正常运行,因为数据在多个地方都有备份。 ...

    基于Hadoop的大数据网络安全实体识别方法.pdf

    基于Hadoop的大数据网络安全实体识别方法,可以通过HDFS和MapReduce对大量网络安全日志数据进行分布式存储和并行计算,大大提高了数据处理的速度和规模。在实现上,数据采集系统可以采集网络设备的数据信息,形成...

    一个基于hadoop的大数据实战.zip

    《基于Hadoop的大数据实战详解》 在当今信息爆炸的时代,大数据已经成为企业决策、科学研究和社会治理的重要工具。而Hadoop作为开源的分布式计算框架,无疑是处理海量数据的首选方案之一。本文将深入探讨Hadoop在...

    基于Hadoop云计算平台的数据挖掘分析.pdf

    总结来说,基于Hadoop云计算平台的数据挖掘分析,不仅能够帮助企业在海量的客户数据中筛选出有用的信息,还能够通过云计算技术的特点,为数据存储提供安全性和高效的数据处理能力,从而为企业运营发展提供重要的数据...

    基于Hadoop云计算平台的文本处理算法的研究与改进.pdf

    本文研究基于Hadoop平台的文本处理算法,通过分析Hadoop的核心技术,搭建了能够完成分布式文本文件处理任务的云计算平台,并对文件文本内容处理算法进行了改进和实现。这在处理大规模文本数据时尤其有用,如数据去重...

    基于Hadoop的分布式索引构建

    以下是关于基于Hadoop的分布式索引构建的详细知识点。 首先,分布式索引是相较于传统单机索引而言的。在大规模数据集上构建索引时,分布式系统能将任务分解成多个小任务,通过分布式节点并行处理,显著提高效率。...

    基于Hadoop的大数据处理关键技术综述.pptx

    【基于Hadoop的大数据处理关键技术综述】 大数据,作为一种新兴的技术架构,旨在经济有效地从高频、海量、多结构和类型的复杂数据中挖掘价值。其主要特征包括高并发读写需求、海量数据的高效存储与访问以及高可扩展...

    基于Hadoop云计算智能家居信息处理平台设计.doc

    【标题】: "基于Hadoop云计算的智能家居信息处理平台设计" 【描述】: "随着物联网技术的普及,尤其是智能家居领域,对处理海量数据的需求日益增长,掌握云计算技术变得至关重要。云计算为智能家居提供了便利,简化...

    基于Hadoop的MapReduce框架研究报告.ppt

    基于Hadoop的MapReduce框架研究报告 本文将对基于Hadoop的MapReduce框架进行研究和分析,包括Hadoop简介、MapReduce计算模型、分布式并行编程概念、_instance分析等方面。 一、Hadoop简介 Hadoop是一个开源分布式...

Global site tag (gtag.js) - Google Analytics