注意:
1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。
2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。
比如:
Map1 -> Map2 -> Reducer -> Map3 -> Map4
(不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)
任务介绍:
这个任务需要两步完成:
1. 对一篇文章进行WordCount
2. 统计出现次数超过5词的单词
WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:
package hadoop_in_action_exersice; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class ChainedJobs { public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); public static final int LOW_LIMIT = 5; @Override public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer st = new StringTokenizer(line); while(st.hasMoreTokens()) output.collect(new Text(st.nextToken()), one); } } public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; while(values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws IOException { JobConf conf = new JobConf(ChainedJobs.class); conf.setJobName("wordcount"); //设置一个用户定义的job名称 conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类 conf.setOutputValueClass(IntWritable.class); //为job输出设置value类 conf.setMapperClass(TokenizeMapper.class); //为job设置Mapper类 conf.setCombinerClass(TokenizeReducer.class); //为job设置Combiner类 conf.setReducerClass(TokenizeReducer.class); //为job设置Reduce类 conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类 conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类 // Remove output folder before run job(s) FileSystem fs=FileSystem.get(conf); String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT"; Path op=new Path(outputPath); if (fs.exists(op)) { fs.delete(op, true); System.out.println("存在此输出路径,已删除!!!"); } FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount")); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); JobClient.runJob(conf); //运行一个job } }
上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。
为了方便理解,上面的输入的例子如下:
accessed 3 accessible 4 accomplish 1 accounting 7 accurately 1 acquire 1 across 1 actual 1 actually 1 add 3 added 2 addition 1 additional 4
old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X
新的API会方便简洁很多
下面是增加了一个Mapper 来过滤
public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> { @Override public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { if(value.get() >= LOW_LIMIT) { output.collect(key, value); } } }
这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出
所以,目前为止,任务链如下:
TokenizerMapper -> TokenizeReducer -> RangeFilterMapper
所以我们的main函数改成下面的样子:
public static void main(String[] args) throws IOException { JobConf conf = new JobConf(ChainedJobs.class); conf.setJobName("wordcount"); //设置一个用户定义的job名称 // conf.setOutputKeyClass(Text.class); //为job的输出数据设置Key类 // conf.setOutputValueClass(IntWritable.class); //为job输出设置value类 // conf.setMapperClass(TokenizeMapper.class); //为job设置Mapper类 // conf.setCombinerClass(TokenizeReducer.class); //为job设置Combiner类 // conf.setReducerClass(TokenizeReducer.class); //为job设置Reduce类 // conf.setInputFormat(TextInputFormat.class); //为map-reduce任务设置InputFormat实现类 // conf.setOutputFormat(TextOutputFormat.class); //为map-reduce任务设置OutputFormat实现类 // Step1 : mapper forr word count JobConf wordCountMapper = new JobConf(false); ChainMapper.addMapper(conf, TokenizeMapper.class, LongWritable.class, // input key type Text.class, // input value type Text.class, // output key type IntWritable.class, // output value type false, //byValue or byRefference 传值还是传引用 wordCountMapper); // Step2: reducer for word count JobConf wordCountReducer = new JobConf(false); ChainReducer.setReducer(conf, TokenizeReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, false, wordCountReducer); // Step3: mapper used as filter JobConf rangeFilterMapper = new JobConf(false); ChainReducer.addMapper(conf, RangeFilterMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, false, rangeFilterMapper); // Remove output folder before run job(s) FileSystem fs=FileSystem.get(conf); String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT"; Path op=new Path(outputPath); if (fs.exists(op)) { fs.delete(op, true); System.out.println("存在此输出路径,已删除!!!"); } FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount")); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); JobClient.runJob(conf); //运行一个job }
下面是运行结果的一部分:
a 40 and 26 are 12 as 6 be 7 been 8 but 5 by 5 can 12 change 5 data 5 files 7 for 28 from 5 has 7 have 8 if 6 in 27 is 16 it 13 more 8 not 5 of 23 on 5 outputs 5 see 6 so 11 that 11 the 54
可以看到,英文之中,如果NLP不去除停用词(a, the, for ...) 等,效果确实会被大大的影响。
相关推荐
赠送jar包:hadoop-auth-2.6.5.jar 赠送原API文档:hadoop-auth-2.6.5-javadoc.jar 赠送源代码:hadoop-auth-2.6.5-sources.jar 包含翻译后的API文档:hadoop-auth-2.6.5-javadoc-API文档-中文(简体)-英语-对照版...
1. Hadoop简介2.... Hadoop在yahoo的应用附录A: 云计算在线监测平台附录B: Hadoop安装、运行、使用说明附录C:使用DistributedCache的MapReduce程序附录D:使用ChainMapper和ChainReducer的MapReduce程序
1. Hadoop简介2.... Hadoop在yahoo的应用附录A: 云计算在线监测平台附录B: Hadoop安装、运行、使用说明附录C:使用DistributedCache的MapReduce程序附录D:使用ChainMapper和ChainReducer的MapReduce程序
【Hadoop实战视频教程】是针对大数据处理领域的一款专业学习资源,主要涵盖了Hadoop的各个方面,包括环境搭建、核心组件的使用以及实际案例的应用。这个教程不仅提供了视频讲解,还包含有配套的环境搭建文件,使得...
在Hadoop的类路径中添加这个JAR文件后,就可以在MapReduce任务或者HDFS操作中使用LZO压缩功能。 3. `hadoop-lzo-0.4.21-SNAPSHOT-sources.jar`:这个文件包含了Hadoop-LZO的源代码,对于开发者来说非常有用,因为...
本文将详细介绍如何在单机环境下配置Hadoop,使其能够运行基本的大数据处理任务。 #### 一、配置Java环境 Hadoop的运行依赖于Java环境,因此首先需要确保Java已正确安装在系统上。以下是在Ubuntu系统中安装Sun ...
4. **Hadoop API**:学习使用Hadoop API进行数据读写和处理,例如FileSystem API用于文件操作,InputFormat和OutputFormat定义输入输出格式,Mapper和Reducer实现数据处理逻辑。 5. **MapReduce编程**:理解...
6. **org.apache.hadoop.mapred**: 这是Hadoop MapReduce的旧版接口,包括作业提交、任务调度和执行。主要类有`JobConf`、`JobClient`和`JobTracker`,用于管理MapReduce作业的生命周期。 7. **org.apache.hadoop....
Apache Flume, Distributed Log Collection for Hadoop,2015 第二版,Packt Publishing
It is a good book for both Hadoop beginners and those in need of advancing their Hadoop skills. The author has explored every component of Hadoop. Prior to that, the author helps you understand how ...
Hadoop通过数据本地化策略,将Map任务分配到包含对应数据块的节点上执行,从而提高计算效率。 Hadoop的API分为多个包,包括: 1. **org.apache.hadoop.conf**:管理配置参数。 2. **org.apache.hadoop.fs**:抽象...
`Hadoop: The Definitive Guide`中可能会讲解如何创建、读取和操作HDFS上的文件,以及如何配置HDFS参数以优化性能。 MapReduce是Hadoop处理大数据的主要计算模型,它将大规模数据处理任务分解为小的“映射”和...
hadoop-hue-hive-cookbook TODO:在此处输入食谱说明。 支持的平台 TODO:列出您支持的平台。 属性 钥匙 类型 描述 默认 ['hadoop-hue-hive']['培根'] 布尔值 是否包括培根 真的 用法 hadoop-hue-hive::default ...
Its simple programming model, "code once and deploy at any scale" paradigm, and an ever-growing ecosystem make Hadoop an inclusive platform for programmers with different levels of expertise and ...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分析。这个压缩包文件包含的是"Hadoop.dll"和"winutils.exe"两个关键组件,它们对于在Windows环境下配置和运行Hadoop生态系统至关重要。 首先,...
$ sudo chown -R hadoop:hadoop /opt/hadoop-0.2.203.0 ``` 这里`/opt/hadoop-0.2.203.0`是Hadoop的具体安装路径,应根据实际情况进行调整。 2. **重新启动Hadoop服务**:修改完所有权后,需要重新启动Hadoop...
Hadoop LZO库将LZO算法封装成Hadoop可使用的格式,使得MapReduce任务可以直接处理压缩过的数据,无需先进行解压,这大大节省了集群资源。 在“hadoop-lzo-0.4.15.tar.gz”压缩包中,通常包含以下内容: 1. `lib/` ...
具体来说,需要在 Hadoop 集群的 shell 中,运行“hadoop fs –mkdir /user/bikun”命令,创建子目录“bikun”,然后使用命令“hadoop fs –chown bikun:supergroup /user/bikun”,修改子目录“bikun”的拥有者为...