- 浏览: 141884 次
- 性别:
- 来自: 上海
最新评论
-
xueyinv86:
你这个增强版的wordcount是在哪个版本的hadoop上跑 ...
MapReduce入门程序WordCount增强版 -
chenjianjx:
很不错的收集!
几篇关于Hadoop+Hive数据仓库的入门文章 -
canedy:
import org.apache.hadoop.hbase. ...
使用HBase的一个典型例子,涉及了HBase中很多概念 -
天籁の圁:
你的图全部挂了啊
基于Eclipse的Hadoop应用开发环境的配置 -
landyer:
happinesss 写道你是做java开发的吗我是杂货铺,什 ...
MongoDB1.8安装、分布式自动分片(Auto-Sharding)配置备忘
WordCount程序应该是学习MapReduce编程最经典的样例程序了,小小一段程序就基本概括了MapReduce编程模型的核心思想。 现在考虑实现一个增强版的WordCount程序,要求: 下面我们来逐一分析一下该程序与原始版本的不同之处。 在最初版的wordCount里,程序是在main函数里直接runJob的,而增强版的main函数里通过调用ToolRunner.run()函数启动程序。 Tool 是Map/Reduce工具或应用的标准。ToolRunner用来运行实现了Tool接口的类,它与GenericOptionsParser合作解析Hadoop的命令行参数。 应用程序应该只处理其定制参数,把标准命令行选项通过 ToolRunner.run(Tool, String[]) 委托给 GenericOptionsParser处理。 增强版的WordCount类继承了Configured类并实现了Tool接口,因此第95行中的第二个参数就是WordCount类。这也是典 型的实现Tool接口的写法。Configured类提供了88行的函数getConf(),该函数功能是获得对象自身的配置。Tool接口主要实现一个 run函数,然后通过ToolRunner.run调用执行。 在run函数中,第83行,通过DistributedCache将参数文件分发到HDFS缓存文件。 DistributedCache 是Map/Reduce框架提供的功能,能够缓存应用程序所需的文件 (包括文本,档案文件,jar文件等)。应用程序在JobConf中通过url(hdfs://)指定需要被缓存的文件。 DistributedCache假定由hdfs://格式url指定的文件已经在 FileSystem上了。Map-Redcue框架在作业所有任务执行之前会把必要的文件拷贝到slave节点上。DistributedCache运 行高效是因为每个作业的文件只拷贝一次并且为那些没有文档的slave节点缓存文档。 DistributedCache 根据缓存文档修改的时间戳进行追踪。 在作业执行期间,当前应用程序或者外部程序不能修改缓存文件。 用户可以通过设置mapred.cache.{files|archives}来分发文件。 如果要分发多个文件,可以使用逗号分隔文件所在路径。也可以利用API来设置该属性: DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf) 其中URI的形式是 hdfs://host:port/absolute-path#link-name 在Streaming程序中,可以通过命令行选项 -cacheFile/-cacheArchive 分发文件。 在第25行获得缓存的参数文件。 在第12行用到了Counters, Counters 是多个由Map/Reduce框架或者应用程序定义的全局计数器。 每一个Counter可以是任何一种 Enum类型。同一特定Enum类型的Counter可以汇集到一个组,其类型为Counters.Group。应用程序可以定义任意(Enum类型)的 Counters并且可以通过 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long) 更新。之后框架会汇总这些全局counters。 在第54行用到了Reporter,Reporter是用于Map/Reduce应用程序报告进度,设定应用级别的状态消息, 更新Counters(计数器)的机制。 Mapper和Reducer的实现可以利用Reporter 来报告进度,或者仅是表明自己运行正常。在那种应用程序需要花很长时间处理个别键值对的场景中,这种机制是很关键的,因为框架可能会以为这个任务超时了, 从而将它强行杀死。另一个避免这种情况发生的方式是,将配置参数mapred.task.timeout设置为一个足够高的值(或者干脆设置为零,则没有 超时限制了)。第57行就用reporter来设置了程序运行的状态。 第20行标记是否忽略大小写,该参数并没有在程序中设置,而是留给运行程序的用户了。 另外,在第50行,采用了StringTokenizer进行单词的分割,记得当时做项目的时候就查看过API,StringTokenizer是不推荐使用的,所以我们都是采用split来实现。 下面是增强版WordCount的运行样例及结果 最后,比较囧的是,我竟然是第一次看到第60行的这种用法,虽然一眼就能判断出这是foreach操作,但是之前一直不知道Java还支持这种使用,查了一下,是1.5加入的特性。
package org.myorg;
import java.io.*;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
public class WordCount extends Configured implements Tool {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
static enum Counters { INPUT_WORDS }
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private boolean caseSensitive = true;
private Set<String> patternsToSkip = new HashSet<String>();
private long numRecords = 0;
private String inputFile;
public void configure(JobConf job) {
caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
inputFile = job.get("map.input.file");
if (job.getBoolean("wordcount.skip.patterns", false)) {
Path[] patternsFiles = new Path[0];
try {
patternsFiles = DistributedCache.getLocalCacheFiles(job);
} catch (IOException ioe) {
System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
}
for (Path patternsFile : patternsFiles) {
parseSkipFile(patternsFile);
}
}
}
private void parseSkipFile(Path patternsFile) {
try {
BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));
String pattern = null;
while ((pattern = fis.readLine()) != null) {
patternsToSkip.add(pattern);
}
} catch (IOException ioe) {
System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe));
}
}
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
for (String pattern : patternsToSkip) {
line = line.replaceAll(pattern, "");
}
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
reporter.incrCounter(Counters.INPUT_WORDS, 1);
}
if ((++numRecords % 100) == 0) {
reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), WordCount.class);
conf.setJobName("wordcount");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
List<String> other_args = new ArrayList<String>();
for (int i=0; i < args.length; ++i) {
if ("-skip".equals(args[i])) {
DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
conf.setBoolean("wordcount.skip.patterns", true);
} else {
other_args.add(args[i]);
}
}
FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new WordCount(), args);
System.exit(res);
}
}
该函数的原型是public static int run(Configuration conf, Tool tool, String[] args),其功能是将args作为参数,conf作为配置运行tool。
Hadoop命令行的常用选项有:-conf
-D
-fs
-jt
输入样例:
$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World, Bye World!
$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.
运行程序:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1
现在通过DistributedCache插入一个模式文件,文件中保存了要被忽略的单词模式。
$ hadoop dfs -cat /user/joe/wordcount/patterns.txt
.
,
!
to
再运行一次,这次使用更多的选项:
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
应该得到这样的输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1
再运行一次,这一次关闭大小写敏感性(case-sensitivity):
$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt
输出:
$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2
发表评论
-
thrift安装资料集合
2011-06-22 14:19 1142http://www.buywine168.com/index ... -
在Ubuntu下编译安装Thrift(支持php和c++)
2011-06-22 14:16 1641原文地址:http://www.coder4.com/arch ... -
HBase Thrift 0.5.0 + PHP 5 安裝設定
2011-06-22 14:13 1872原文地址: http://blog.kfchph.com ... -
Hadoop+hbase+thrift H.H.T环境部署
2011-06-21 12:58 1094原文地址:http://blog.sina.com.cn/s/ ... -
php操作hbase例子
2011-06-21 10:59 23241 $GLOBALS['THRIFT_ROOT'] = '/h ... -
HBase技术介绍
2011-06-21 10:54 1086原文地址:http://www.searc ... -
详细讲解Hadoop中的一个简单数据库HBase
2011-06-08 17:29 939原文地址:http://databas ... -
hive sql语法解读
2011-06-08 16:52 1350版权声明:转载时请以超链接形式标明文章原始出处和作者信息及 ... -
Hive 的启动方式
2011-06-08 16:49 2502Hive 的启动方式 hive 命令行 ... -
Hive环境搭建与入门
2011-06-08 16:47 1526原文地址:http://www.jiacheo.org ... -
Hbase入门6 -白话MySQL(RDBMS)与HBase之间
2011-06-08 15:38 1557作者: H.E. | 您可以转载, ... -
Apache Hive入门3–Hive与HBase的整合
2011-06-08 10:03 1565作者: H.E. | 您可以转载, ... -
Apache Hive入门2
2011-06-08 10:00 1338我的偏见: 对于互联 ... -
Apache Hive入门1
2011-06-08 09:59 2541作者: H.E. | 您可以转载, ... -
hbase分布安装部署
2011-06-07 22:37 18071、 hbase安装部署#cd /hadoop# ... -
使用HBase的一个典型例子,涉及了HBase中很多概念
2011-06-07 15:38 1581一个使用HBase的例子,如下。 import java ... -
HBase入门篇4–存储
2011-06-06 21:22 1590作者: H.E. | 您可以转载, 但必须以超链接形式标明文章 ... -
HBase入门篇3
2011-06-06 21:21 1850作者: H.E. | 您可以转载, 但必须以超链接形式标明文章 ... -
HBase入门篇2-Java操作HBase例子
2011-06-06 21:18 2398作者: H.E. | 您可以转载, 但必须以超链接形式标明文章 ... -
HBase入门篇
2011-06-06 21:17 2387作者: H.E. | 您可以转载, ...
相关推荐
WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的次数。在这个案例中,我们将深入探讨如何在 Hadoop 环境中使用 MapReduce 实现 WordCount。 【描述】在 Hadoop 环境中,WordCount 的...
WordCount是MapReduce中的一个经典示例,它演示了如何利用该框架进行数据处理。在这个例子中,我们将深入理解Hadoop MapReduce的工作原理以及如何在Eclipse环境下实现WordCount源码。 1. **Hadoop MapReduce概述**...
### Ubuntu安装Hadoop实现MapReduce里的WordCount #### 核心知识点概述 1. **Ubuntu环境下的基础配置**:包括VMware Tools的安装、JDK的安装与配置。 2. **Hadoop的安装与配置**:包括下载与解压、环境变量配置、...
WordCount是MapReduce最基础且经典的示例,它演示了如何利用MapReduce处理文本数据并统计每个单词出现的次数。在这个案例中,我们将深入理解MapReduce的工作原理,并通过WordCount的例子来解析其实现过程。 首先,...
总的来说,WordCount程序清晰地展示了MapReduce的基本工作原理,它将大文件中的每个单词作为key,出现次数作为value,通过map和reduce两个步骤,完成了对大量文本的高效统计。这个例子对于理解和应用MapReduce框架...
Hadoop是Apache开源项目,它实现了MapReduce模型,使得用户可以方便地编写处理海量数据的应用程序。 1. MapReduce编程模型 MapReduce的核心思想是将大数据处理分解为两步:Map和Reduce。Map阶段将原始数据分割成...
Hadoop及Mapreduce入门,简单易懂,让你快速进入hadoop的世界
实验内容涵盖了从启动全分布模式的Hadoop集群到编写、运行和分析MapReduce应用程序的全过程。 首先,实验启动了Hadoop集群的所有守护进程,包括NameNode(主节点,负责元数据管理)、DataNode(存储数据的节点)、...
通过这种方式,我们可以使用IntelliJ IDEA和Maven构建MapReduce应用程序,理解并实现WordCount示例,从而掌握MapReduce的基本工作原理和编程模型。这个例子对于学习大数据处理和分布式计算是很好的起点。
本篇文章将详细讲解如何通过一个简单的 WordCount 示例来入门 Hadoop MapReduce。 首先,我们需要了解 MapReduce 的基本工作流程。MapReduce 分为两个主要阶段:Map 阶段和 Reduce 阶段。Map 阶段将输入数据分割成...
以下是MapReduce WordCount程序的一个简化版Java代码示例: ```java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org....
wordcount, mapreduce经典,文字计数
在这个“MapReduce入门案例.rar”压缩包中,包含了五个基础的MapReduce编程示例,帮助初学者理解并掌握MapReduce的工作原理和用法。 1. 统计单词的个数: 这是最经典的MapReduce案例,通常被称为WordCount。在Map...
MapReduce的wordcount的jar包
WordCount是MapReduce编程的经典入门案例,用于统计文本数据中每个单词出现的次数。下面我们将详细阐述如何使用Java语言在Hadoop框架中实现WordCount程序。 首先,理解MapReduce的两个主要处理阶段:Map阶段和...
本文将详细解析这个简单的WordCount程序,帮助初学者快速入门Hadoop。 一、Hadoop简介 Hadoop是Apache基金会的一个开源项目,基于Java实现,主要设计用于处理和存储大规模数据。其核心包括两个主要组件:HDFS...
### Python 实现 MapReduce 的 WordCount 示例详解 #### 一、引言 MapReduce 是 Hadoop 生态系统中的一种编程模型,主要用于大规模数据集的并行处理。它通过两个主要阶段——`Map` 和 `Reduce` 来实现数据处理任务...
WordCount程序是Hadoop MapReduce的入门示例,它由两个阶段组成:Map阶段和Reduce阶段。在Map阶段,输入的文本文件被分割成多个块,然后在不同的节点上并行处理。每个节点上的Mapper将读取数据,分割出单词(通常是...
这个入门程序将帮助初学者理解Hadoop的WordCount应用,这是一个基础且经典的例子,用于演示MapReduce的工作原理。 首先,我们要理解MapReduce的两个核心部分:Map阶段和Reduce阶段。在Map阶段,原始数据被分成多个...
分布式网络环境中的MapReduce编程模型,以WordCount程序的实现为例,展示了如何在伪分布式模式下完成文本的单词计数功能。WordCount是一个基础程序,广泛用于演示分布式计算模型的基本原理,其核心操作分为Map(映射...