import java.io.File; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class MultipOutputWordCount extends Configured implements Tool { /* * Mapper<Object, Text, Text, IntWritable> * Object ,读取的字节偏移量 * Text Map读取的文本行 * Text Map的输出Key * IntWritable 的输出Value */ public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { //一行行读取文件内容,一行行处理文件 StringTokenizer itr = new StringTokenizer(value.toString());//对输入行切词,eg:Hello World,Hello Hadoop while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one);//<Hello,1>,<World,1>,<Hello,1>,<Hadoop,1> } } } /** * Reducer<Text, IntWritable, Text, IntWritable> * Text:Reduce 输入Key * IntWritable:Reduce的输入Value * Text: Reduce 输出Key 默认类型 * IntWritable,输入Value,默认类型LongWritable */ public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); @SuppressWarnings("rawtypes") private MultipleOutputs multipleOutputs; protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs =new MultipleOutputs<Text,IntWritable>(context); } protected void cleanup(Context context) throws IOException, InterruptedException { multipleOutputs.close(); } @SuppressWarnings("unchecked") public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); multipleOutputs.write(NullWritable.get(), new Text(key.toString()+":"+result), "1"); multipleOutputs.write(NullWritable.get(), key, "2"); multipleOutputs.write(NullWritable.get(), "我是你大爷", "3"); } } public static class MultipOutputWordFormat extends MultipleTextOutputFormat<Text, IntWritable>{ } public static void main(String[] args) throws Exception { System.exit(ToolRunner.run(new MultipOutputWordCount(), args)); } @Override public int run(String[] args) throws Exception { File jarFile = EJob.createTempJar("bin"); ClassLoader classLoader = EJob.getClassLoader(); Thread.currentThread().setContextClassLoader(classLoader); //Hadoop 运行环境 Configuration conf = new Configuration(); conf.set("mapred.job.tracker", "bfdbjc1:12001"); //任务参数设置 //a.创建任务,并设置名称,以便跟踪 Job job = new Job(conf, "word count"); //b.运行主类,Map类,Reduce类 job.setJarByClass(MultipOutputWordCount.class); job.setMapperClass(MultipOutputWordCount.TokenizerMapper.class); job.setReducerClass(MultipOutputWordCount.IntSumReducer.class); //下面两行不需要写,Map默认输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //c.设置Reduce输入输出类型,Map默认出及Reduce默认输入是<Text,IntWritable> job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //HDFS输入,如果是路径默认读取路径下所有文件. FileInputFormat.addInputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/a.txt")); //reduce 输出路径 FileOutputFormat.setOutputPath(job, new Path("hdfs://bfdbjc1:12000/user/work/output/2da1")); //Eclipse 本地提交 ((JobConf) job.getConfiguration()).setJar(jarFile.toString()); //等待任务运行完成 job.waitForCompletion(true); return 0; } }
相关推荐
《基于Hadoop-MapReduce的算法详解》 在当今大数据时代,处理海量数据已经成为企业和科研机构的日常需求。为了应对这种挑战,一个强大的工具——Hadoop应运而生。Hadoop是一个开源框架,专为分布式存储和大规模数据...
在本文中,李晓蕾作者对基于Hadoop技术的交通视频大数据监控方案进行了深入的研究。针对海量交通视频数据监控和分析问题,本研究提出了异常检测算法的设计方案,并实现了交通数据的实时更新和异常分析。在此基础上,...
HDFS的核心思想是将大文件分割成多个块,每个块分布在不同的节点上,以实现数据的分布式存储。通过副本机制,HDFS确保了数据的可靠性和容错性。NameNode作为元数据管理节点,保存文件系统的命名空间和块信息,...
【基于Hadoop的云计算基础架构分析】 随着大数据时代的到来,数据的海量存储和高效处理成为技术发展的关键。Hadoop作为一款开源的分布式计算框架,因其高效、可扩展和成本效益高的特性,被广泛应用于云计算领域。...
为了找出频率最高的前N个词,我们需要级联多个Job。第一个Job负责统计每个单词的出现次数,第二个Job则用于找出出现频率最高的前N个词。 ```java public class TopN extends Configured implements Tool { public ...
【基于Hadoop的分布式云计算_云存储方案的研究与设计】主要涵盖了云计算领域的多个核心知识点,包括分布式系统、分布式开发、云存储以及MapReduce模型。以下是对这些知识点的详细阐述: 1. **分布式系统**:分布式...
- **Shuffle阶段**:在Map阶段结束后,系统会对所有中间结果进行排序和分区,以便将相同键的键值对分发给同一个Reduce任务处理。 - **Reduce阶段**:在这个阶段,Reduce任务收集相同键的所有键值对,并执行汇总操作...
为了验证该系统的有效性,研究人员进行了多个实验测试,包括但不限于: - **性能测试**:评估系统在处理大规模日志数据时的性能表现。 - **可扩展性测试**:测试系统是否能够随着数据量的增长而扩展。 - **容错能力...
基于Hadoop的气象云储存与数据处理应用浅析 本文主要介绍了Hadoop架构的构成,并对Hadoop架构的MapReduce实现进行了详细的描述。同时,本文还开发出一个在Hadoop架构的基础上进行气象数值统计的实例,并根据这个...
HDFS是Hadoop的数据存储系统,它将大文件分割成多个块,并且在集群的不同节点上冗余存储,确保数据的容错性和高可用性。这种分布式存储方式使得即使单个节点故障,系统仍能正常运行,因为数据在多个地方都有备份。 ...
基于Hadoop的大数据网络安全实体识别方法,可以通过HDFS和MapReduce对大量网络安全日志数据进行分布式存储和并行计算,大大提高了数据处理的速度和规模。在实现上,数据采集系统可以采集网络设备的数据信息,形成...
在这个项目“基于 Hadoop 平台,使用 MapReduce 编程,统计NBA球员五项数据”中,我们将深入探讨如何利用 Hadoop 的核心组件 MapReduce 对 NBA 球员的数据进行分析。 MapReduce 是一种编程模型,用于大规模数据集...
《基于Hadoop的大数据实战详解》 在当今信息爆炸的时代,大数据已经成为企业决策、科学研究和社会治理的重要工具。而Hadoop作为开源的分布式计算框架,无疑是处理海量数据的首选方案之一。本文将深入探讨Hadoop在...
总结来说,基于Hadoop云计算平台的数据挖掘分析,不仅能够帮助企业在海量的客户数据中筛选出有用的信息,还能够通过云计算技术的特点,为数据存储提供安全性和高效的数据处理能力,从而为企业运营发展提供重要的数据...
本文研究基于Hadoop平台的文本处理算法,通过分析Hadoop的核心技术,搭建了能够完成分布式文本文件处理任务的云计算平台,并对文件文本内容处理算法进行了改进和实现。这在处理大规模文本数据时尤其有用,如数据去重...
以下是关于基于Hadoop的分布式索引构建的详细知识点。 首先,分布式索引是相较于传统单机索引而言的。在大规模数据集上构建索引时,分布式系统能将任务分解成多个小任务,通过分布式节点并行处理,显著提高效率。...
【基于Hadoop的大数据处理关键技术综述】 大数据,作为一种新兴的技术架构,旨在经济有效地从高频、海量、多结构和类型的复杂数据中挖掘价值。其主要特征包括高并发读写需求、海量数据的高效存储与访问以及高可扩展...
【标题】: "基于Hadoop云计算的智能家居信息处理平台设计" 【描述】: "随着物联网技术的普及,尤其是智能家居领域,对处理海量数据的需求日益增长,掌握云计算技术变得至关重要。云计算为智能家居提供了便利,简化...
基于Hadoop的MapReduce框架研究报告 本文将对基于Hadoop的MapReduce框架进行研究和分析,包括Hadoop简介、MapReduce计算模型、分布式并行编程概念、_instance分析等方面。 一、Hadoop简介 Hadoop是一个开源分布式...