`

[Hadoop] Hadoop 链式任务 : ChainMapper and ChainReducer的使用

 
阅读更多

注意:

1. 本人目前使用的版本是1.2.1,因此ChainMapper使用的还是old api。 

2. 老的API之中,只支持 N-Mapper + 1-Reducer的模式。 Reducer不在链式任务最开始即可。

比如:

Map1 -> Map2 -> Reducer -> Map3 -> Map4

 

(不确定在新版的API之中是否支持 N-Reducer的模式。不过new api 确实要简单简洁很多)

 

 

任务介绍:

这个任务需要两步完成:

1. 对一篇文章进行WordCount

2. 统计出现次数超过5词的单词

 

WordCount我们很熟悉,因为版本限制,先使用old api 实现一次:

package hadoop_in_action_exersice;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class ChainedJobs {

	public static class TokenizeMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		public static final int LOW_LIMIT = 5;
		@Override
		public void map(LongWritable key, Text value,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			String line = value.toString();
			StringTokenizer st = new StringTokenizer(line);
			while(st.hasMoreTokens())
				output.collect(new Text(st.nextToken()), one);
			
		}
		
	}
	
	public static class TokenizeReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

		@Override
		public void reduce(Text key, Iterator<IntWritable> values,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			int sum = 0;
			while(values.hasNext()) {
				sum += values.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}
		
	}
	
	
	public static void main(String[] args) throws IOException {
		
		
		JobConf conf = new JobConf(ChainedJobs.class);
		conf.setJobName("wordcount");           //设置一个用户定义的job名称
        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类
        conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类
        conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类
        conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类
        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类

        // Remove output folder before run job(s)
        FileSystem fs=FileSystem.get(conf);
        String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
		Path op=new Path(outputPath);		 
		if (fs.exists(op)) {
			fs.delete(op, true);
			System.out.println("存在此输出路径,已删除!!!");
		}
        
        FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
		FileOutputFormat.setOutputPath(conf, new Path(outputPath));
        JobClient.runJob(conf);         //运行一个job
	}
	
}

 

上面是独立的一个Job,完成第一步。为了能紧接着完成第二步,我们需要在原来的基础上进行修改。

为了方便理解,上面的输入的例子如下:

accessed	3
accessible	4
accomplish	1
accounting	7
accurately	1
acquire	1
across	1
actual	1
actually	1
add	3
added	2
addition	1
additional	4

 

old api 的实现方式并不支持 setup() / cleanup() 操作这一点非常不好,因此在有可能的情况下最好还是要迁移到Hadoop 2.X 

新的API会方便简洁很多

 

下面是增加了一个Mapper 来过滤

public static class RangeFilterMapper extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> {

	@Override
	public void map(Text key, IntWritable value,
			OutputCollector<Text, IntWritable> output, Reporter reporter)
			throws IOException {
		
		if(value.get() >= LOW_LIMIT) {
			output.collect(key, value);
		}
		
	}
}

 这个Mapper做的事情很简单,就是针对每个key,如果他的value > LOW_LIMIT 那么就输出

所以,目前为止,任务链如下:

TokenizerMapper -> TokenizeReducer -> RangeFilterMapper 

 

所以我们的main函数改成下面的样子:

public static void main(String[] args) throws IOException {
	
	
	JobConf conf = new JobConf(ChainedJobs.class);
	conf.setJobName("wordcount");           //设置一个用户定义的job名称
//        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类
//        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类
//        conf.setMapperClass(TokenizeMapper.class);         //为job设置Mapper类
//        conf.setCombinerClass(TokenizeReducer.class);      //为job设置Combiner类
//        conf.setReducerClass(TokenizeReducer.class);        //为job设置Reduce类
//        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类
//        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类

	// Step1 : mapper forr word count 
	JobConf wordCountMapper  = new JobConf(false);
	ChainMapper.addMapper(conf, 
			TokenizeMapper.class, 
			LongWritable.class,		// input key type 
			Text.class, 			// input value type
			Text.class, 			// output key type
			IntWritable.class,		// output value type
			false, 					//byValue or byRefference 传值还是传引用
			wordCountMapper);
	
	// Step2: reducer for word count
	JobConf wordCountReducer  = new JobConf(false);
	ChainReducer.setReducer(conf, 
			TokenizeReducer.class, 
			Text.class, 
			IntWritable.class, 
			Text.class, 
			IntWritable.class, 
			false, 
			wordCountReducer);
	
        // Step3: mapper used as filter
	JobConf rangeFilterMapper  = new JobConf(false);
	ChainReducer.addMapper(conf, 
			RangeFilterMapper.class, 
			Text.class, 
			IntWritable.class, 
			Text.class, 
			IntWritable.class, 
			false, 
			rangeFilterMapper);
	
	
    // Remove output folder before run job(s)
    FileSystem fs=FileSystem.get(conf);
    String outputPath = "/home/hadoop/DataSet/Hadoop/WordCount-OUTPUT";
	Path op=new Path(outputPath);		 
	if (fs.exists(op)) {
		fs.delete(op, true);
		System.out.println("存在此输出路径,已删除!!!");
	}
    
    FileInputFormat.setInputPaths(conf, new Path("/home/hadoop/DataSet/Hadoop/WordCount"));
	FileOutputFormat.setOutputPath(conf, new Path(outputPath));
    JobClient.runJob(conf);         //运行一个job
}

  下面是运行结果的一部分:

a	40
and	26
are	12
as	6
be	7
been	8
but	5
by	5
can	12
change	5
data	5
files	7
for	28
from	5
has	7
have	8
if	6
in	27
is	16
it	13
more	8
not	5
of	23
on	5
outputs	5
see	6
so	11
that	11
the	54

 可以看到,英文之中,如果NLP不去除停用词(a, the, for ...) 等,效果确实会被大大的影响。

 

 

分享到:
评论

相关推荐

    hadoop-auth-2.6.5-API文档-中英对照版.zip

    赠送jar包:hadoop-auth-2.6.5.jar 赠送原API文档:hadoop-auth-2.6.5-javadoc.jar 赠送源代码:hadoop-auth-2.6.5-sources.jar 包含翻译后的API文档:hadoop-auth-2.6.5-javadoc-API文档-中文(简体)-英语-对照版...

    Hadoop实战-第2版-陆嘉恒.pdf

    1. Hadoop简介2.... Hadoop在yahoo的应用附录A: 云计算在线监测平台附录B: Hadoop安装、运行、使用说明附录C:使用DistributedCache的MapReduce程序附录D:使用ChainMapper和ChainReducer的MapReduce程序

    Hadoop实战-第二版-陆嘉恒 (2012版)

    1. Hadoop简介2.... Hadoop在yahoo的应用附录A: 云计算在线监测平台附录B: Hadoop安装、运行、使用说明附录C:使用DistributedCache的MapReduce程序附录D:使用ChainMapper和ChainReducer的MapReduce程序

    传播智客-Hadoop实战视频教程下载:里面还包括环境搭建所需的文件、教程

    【Hadoop实战视频教程】是针对大数据处理领域的一款专业学习资源,主要涵盖了Hadoop的各个方面,包括环境搭建、核心组件的使用以及实际案例的应用。这个教程不仅提供了视频讲解,还包含有配套的环境搭建文件,使得...

    hadoop-lzo-0.4.21-SNAPSHOT jars

    在Hadoop的类路径中添加这个JAR文件后,就可以在MapReduce任务或者HDFS操作中使用LZO压缩功能。 3. `hadoop-lzo-0.4.21-SNAPSHOT-sources.jar`:这个文件包含了Hadoop-LZO的源代码,对于开发者来说非常有用,因为...

    hadoop单机配置方法

    本文将详细介绍如何在单机环境下配置Hadoop,使其能够运行基本的大数据处理任务。 #### 一、配置Java环境 Hadoop的运行依赖于Java环境,因此首先需要确保Java已正确安装在系统上。以下是在Ubuntu系统中安装Sun ...

    hadoop 文档:Hadoop开发者下载

    4. **Hadoop API**:学习使用Hadoop API进行数据读写和处理,例如FileSystem API用于文件操作,InputFormat和OutputFormat定义输入输出格式,Mapper和Reducer实现数据处理逻辑。 5. **MapReduce编程**:理解...

    hadoop api.doc

    6. **org.apache.hadoop.mapred**: 这是Hadoop MapReduce的旧版接口,包括作业提交、任务调度和执行。主要类有`JobConf`、`JobClient`和`JobTracker`,用于管理MapReduce作业的生命周期。 7. **org.apache.hadoop....

    Apache Flume, Distributed Log Collection for Hadoop(第二版)

    Apache Flume, Distributed Log Collection for Hadoop,2015 第二版,Packt Publishing

    Hadoop from the beginning: The basics

    It is a good book for both Hadoop beginners and those in need of advancing their Hadoop skills. The author has explored every component of Hadoop. Prior to that, the author helps you understand how ...

    Hadoop源码的入门解析

    Hadoop通过数据本地化策略,将Map任务分配到包含对应数据块的节点上执行,从而提高计算效率。 Hadoop的API分为多个包,包括: 1. **org.apache.hadoop.conf**:管理配置参数。 2. **org.apache.hadoop.fs**:抽象...

    hadoop权威指南代码(Hadoop: The Definitive Guide code)

    `Hadoop: The Definitive Guide`中可能会讲解如何创建、读取和操作HDFS上的文件,以及如何配置HDFS参数以优化性能。 MapReduce是Hadoop处理大数据的主要计算模型,它将大规模数据处理任务分解为小的“映射”和...

    hadoop-hue-hive:Vagrant+Chef 食谱能够在 ubuntu 上安装 hadoop、hue 和 hive

    hadoop-hue-hive-cookbook TODO:在此处输入食谱说明。 支持的平台 TODO:列出您支持的平台。 属性 钥匙 类型 描述 默认 ['hadoop-hue-hive']['培根'] 布尔值 是否包括培根 真的 用法 hadoop-hue-hive::default ...

    Hadoop_Data Processing and Modelling-Packt Publishing(2016).pdf

    Its simple programming model, "code once and deploy at any scale" paradigm, and an ever-growing ecosystem make Hadoop an inclusive platform for programmers with different levels of expertise and ...

    hadoop.dll & winutils.exe For hadoop-2.8.0

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分析。这个压缩包文件包含的是"Hadoop.dll"和"winutils.exe"两个关键组件,它们对于在Windows环境下配置和运行Hadoop生态系统至关重要。 首先,...

    Hadoop datanode启动失败:Hadoop安装目录权限的问题

    $ sudo chown -R hadoop:hadoop /opt/hadoop-0.2.203.0 ``` 这里`/opt/hadoop-0.2.203.0`是Hadoop的具体安装路径,应根据实际情况进行调整。 2. **重新启动Hadoop服务**:修改完所有权后,需要重新启动Hadoop...

    hadoop-lzo-0.4.15.tar.gz

    Hadoop LZO库将LZO算法封装成Hadoop可使用的格式,使得MapReduce任务可以直接处理压缩过的数据,无需先进行解压,这大大节省了集群资源。 在“hadoop-lzo-0.4.15.tar.gz”压缩包中,通常包含以下内容: 1. `lib/` ...

    上传文件到Hadoop失败的原因分析及解决方法.pdf

    具体来说,需要在 Hadoop 集群的 shell 中,运行“hadoop fs –mkdir /user/bikun”命令,创建子目录“bikun”,然后使用命令“hadoop fs –chown bikun:supergroup /user/bikun”,修改子目录“bikun”的拥有者为...

Global site tag (gtag.js) - Google Analytics