以WordCount为例子开始学习hadoop,试着去分析hadoop的工作机制。这篇文章的目的是分析从创建JOB,到读取Configuration的过程
package com.fnk.hadoop;
import java.io.IOException;
import java.net.URL;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount {
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line, ",");
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
//设置当前类到JobConf,并且初始化JobConf
JobConf conf = new JobConf(WordCount.class);
System.out.println(conf.get("fs.default.name"));
conf.setJobName("wordcount");
//设置输出Key的类别
conf.setOutputKeyClass(Text.class);
//设置输出value的类别
conf.setOutputValueClass(IntWritable.class);
//设置mapper的类
conf.setMapperClass(Map.class);
//设置combiner类,此类是用来减少Mapper和Reduce之间的数据传 //输量的,提高performace。要求输入和输出的Key 类型要一致。输入和输出的//value类型也要一致。
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
//设置输入文件夹和输出文件夹
//输入输出文件夹的格式为hdfs://localhost:9000/input //hdfs://localhost:9000/output
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]
+ System.currentTimeMillis()));
JobClient.runJob(conf);
}
//设置mapper的类,调用Configuration类的setClass方法
public void setMapperClass(Class<? extends Mapper> theClass) {
setClass("mapred.mapper.class", theClass, Mapper.class);
}
}
public class JobConf extends Configuration {
//初始化Jobconf
public JobConf(Class exampleClass) {
设置jar类
setJarByClass(exampleClass);
checkAndWarnDeprecation();
}
//如果cls这个类在jar包里面,那么把它设置到“mapred.jar”这个config里面
public void setJarByClass(Class cls) {
String jar = findContainingJar(cls);
if (jar != null) {
setJar(jar);
}
}
//查找my_class是不是包含在jar包里面,是的话返回jar包名,否则返回null
private static String findContainingJar(Class my_class) {
ClassLoader loader = my_class.getClassLoader();
String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
try {
for(Enumeration itr = loader.getResources(class_file);
itr.hasMoreElements();) {
URL url = (URL) itr.nextElement();
//判断是不是包含在jar包里面
if ("jar".equals(url.getProtocol())) {
String toReturn = url.getPath();
if (toReturn.startsWith("file:")) {
toReturn = toReturn.substring("file:".length());
}
toReturn = URLDecoder.decode(toReturn, "UTF-8");
return toReturn.replaceAll("!.*$", "");
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return null;
}
//把jar包名设置到“mapred.jar”这个config里面
public void setJar(String jar) { set("mapred.jar", jar); }
//判断是不是设置了配置项mapred.task.maxvmem,设置了通知这个配置项已经被其它配置//项替代了,将调用Configuration类中的方法去取这个配置项,因为第一次调用,将会调用//loadResource方法,把所有的配置项加载到内存
private void checkAndWarnDeprecation() {
if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
+ " Instead use " + JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY
+ " and " + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
}
}
}
public class Configuration implements Iterable<Map.Entry<String,String>>,Writable {
//
static{
//print deprecation warning if hadoop-site.xml is found in classpath
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
if(cL.getResource("hadoop-site.xml")!=null) {
LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
"Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
+ "mapred-site.xml and hdfs-site.xml to override properties of " +
"core-default.xml, mapred-default.xml and hdfs-default.xml " +
"respectively");
}
addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");
}
//设置class
public void setClass(String name, Class<?> theClass, Class<?> xface) {
//判断xface是不是theClass的父类
if (!xface.isAssignableFrom(theClass))
throw new RuntimeException(theClass+" not "+xface.getName());
set(name, theClass.getName());
}
//读取配置信息到properties
private void loadResource(Properties properties, Object name, boolean quiet) {
try {
DocumentBuilderFactory docBuilderFactory
= DocumentBuilderFactory.newInstance();
//ignore all comments inside the xml file
docBuilderFactory.setIgnoringComments(true);
//allow includes in the xml file
docBuilderFactory.setNamespaceAware(true);
try {
docBuilderFactory.setXIncludeAware(true);
} catch (UnsupportedOperationException e) {
LOG.error("Failed to set setXIncludeAware(true) for parser "
+ docBuilderFactory
+ ":" + e,
e);
}
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = null;
Element root = null;
if (name instanceof URL) { // an URL resource
URL url = (URL)name;
if (url != null) {
if (!quiet) {
LOG.info("parsing " + url);
}
doc = builder.parse(url.toString());
}
} else if (name instanceof String) { // a CLASSPATH resource
URL url = getResource((String)name);
if (url != null) {
if (!quiet) {
LOG.info("parsing " + url);
}
doc = builder.parse(url.toString());
}
} else if (name instanceof Path) { // a file resource
// Can't use FileSystem API or we get an infinite loop
// since FileSystem uses Configuration API. Use java.io.File instead.
File file = new File(((Path)name).toUri().getPath())
.getAbsoluteFile();
if (file.exists()) {
if (!quiet) {
LOG.info("parsing " + file);
}
InputStream in = new BufferedInputStream(new FileInputStream(file));
try {
doc = builder.parse(in);
} finally {
in.close();
}
}
} else if (name instanceof InputStream) {
try {
doc = builder.parse((InputStream)name);
} finally {
((InputStream)name).close();
}
} else if (name instanceof Element) {
root = (Element)name;
}
if (doc == null && root == null) {
if (quiet)
return;
throw new RuntimeException(name + " not found");
}
if (root == null) {
root = doc.getDocumentElement();
}
if (!"configuration".equals(root.getTagName()))
LOG.fatal("bad conf file: top-level element not <configuration>");
NodeList props = root.getChildNodes();
for (int i = 0; i < props.getLength(); i++) {
Node propNode = props.item(i);
if (!(propNode instanceof Element))
continue;
Element prop = (Element)propNode;
if ("configuration".equals(prop.getTagName())) {
loadResource(properties, prop, quiet);
continue;
}
if (!"property".equals(prop.getTagName()))
LOG.warn("bad conf file: element not <property>");
NodeList fields = prop.getChildNodes();
String attr = null;
String value = null;
boolean finalParameter = false;
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element)fieldNode;
if ("name".equals(field.getTagName()) && field.hasChildNodes())
attr = ((Text)field.getFirstChild()).getData().trim();
if ("value".equals(field.getTagName()) && field.hasChildNodes())
value = ((Text)field.getFirstChild()).getData();
if ("final".equals(field.getTagName()) && field.hasChildNodes())
finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
}
// Ignore this parameter if it has already been marked as 'final'
if (attr != null && value != null) {
if (!finalParameters.contains(attr)) {
properties.setProperty(attr, value);
if (finalParameter)
finalParameters.add(attr);
} else {
LOG.warn(name+":a attempt to override final parameter: "+attr
+"; Ignoring.");
}
}
}
} catch (IOException e) {
LOG.fatal("error parsing conf file: " + e);
throw new RuntimeException(e);
} catch (DOMException e) {
LOG.fatal("error parsing conf file: " + e);
throw new RuntimeException(e);
} catch (SAXException e) {
LOG.fatal("error parsing conf file: " + e);
throw new RuntimeException(e);
} catch (ParserConfigurationException e) {
LOG.fatal("error parsing conf file: " + e);
throw new RuntimeException(e);
}
}
}
分享到:
相关推荐
总结来说,基于Windows、Eclipse、Maven和Hadoop实现WordCount源码的过程主要包括以下步骤:配置开发环境、创建Maven项目、编写MapReduce代码、打包成jar文件,以及在Hadoop集群上运行作业。通过这个过程,我们可以...
在这个例子中,我们将深入理解Hadoop MapReduce的工作原理以及如何在Eclipse环境下实现WordCount源码。 1. **Hadoop MapReduce概述**: Hadoop MapReduce是由两个主要部分组成的:Map阶段和Reduce阶段。Map阶段将...
Hadoop开发WordCount源码程序详细讲解,每一行都带注释说明。
【Hadoop环境搭建与WordCount实例浅析】 Hadoop是一个分布式计算框架,广泛应用于大数据处理。要搭建Hadoop环境并实现WordCount实例,你需要遵循以下步骤: 1. **环境准备**: - 首先,你需要一个Linux操作系统...
在Hadoop生态系统中,`WordCount`程序是一个经典的示例,用于演示分布式计算的基本原理。在标题中的"WordCount2_hadoopwordcount_"可能指的是Hadoop WordCount的第二个版本,通常是在Hadoop 2.x环境下运行。这个程序...
这里我们将深入探讨WordCount的1.0和2.0版本的Java源码,以及它们在大数据处理中的应用。 1. **Hadoop概述** - Hadoop是基于Java开发的,其核心包括HDFS(Hadoop Distributed File System)和MapReduce。 - HDFS...
hadoop入门级的代码 Java编写 eclipse可运行 包含 hdfs的文件操作 rpc远程调用的简单示例 map-reduce的几个例子:wordcount 学生平均成绩 手机流量统计
大数据实验报告 Hadoop 编程实现 wordcount 单词统计程序附源码 本实验报告旨在介绍使用 Hadoop 编程实现 wordcount 单词统计程序的步骤和代码实现。实验的目的在于熟悉 Hadoop 虚拟机的安装与环境的配置,初步理解...
【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...
在提供的压缩包文件"1.WordCount"中,可能包含了实现WordCount的Java源代码、配置文件、输入数据以及运行和测试所需的脚本。这些文件可以帮助用户理解整个流程,从编写代码到提交作业,再到查看结果。通过分析和运行...
word源码java wordcount Hadoop: Intellij结合Maven本地运行和调试MapReduce程序 (无需搭载Hadoop和HDFS环境) 2018年05月20日 10:33:18 阅读数:11 编辑 楼主花费了1天终于按照教程自己第一次成功运行了这个案例。 ...
1.3.2旧的WordCount分析与1.3.3新的WordCount分析,从源码层面解读了旧版和新版的WordCount程序实现,展示了程序如何定义Map和Reduce函数以及它们的工作机制。 1.4WordCount处理过程描述了WordCount程序在MapReduce...
《Hadoop入门脚本WordCount详解》 在大数据处理领域,Hadoop是一个不可或缺的重要工具,它的分布式计算模型为海量数据的处理提供了强大的支持。而WordCount则是Hadoop入门的经典示例,通过它,我们可以深入理解...
### Hadoop运行WordCount实例详解 #### 一、Hadoop简介与WordCount程序的重要性 Hadoop 是一个由Apache基金会所开发的分布式系统基础架构。它能够处理非常庞大的数据集,并且能够在集群上运行,通过将大数据分割...
### Spark 下实现 WordCount #### 一、简介 在大数据处理领域,Apache Spark 是一个非常流行的框架,它能够高效地处理大规模数据集。WordCount 是一个经典的示例程序,用于统计文本文件中每个单词出现的次数。本篇...
代码是基于windows系统下搭建eclipse+hadoop2.8.3开发实例。使用eclipse直接导入代码使用的前提是,需要在本地配置要hadoop2.8.3,本代码亲测可用,能够详细地统计出dataNode下面的file3.txt文件中单词的个数。
例如,输入文件包含"Hello world",则WordCount程序的输出应为"Hello 1"和"world 1"。 ### 二、实验原理与步骤 1. **配置Hadoop环境** - **Java安装与环境变量配置**:确保系统已安装Java 1.8,并配置好JAVA_HOME...
WordCount是Hadoop中的一个经典示例程序,用于演示如何在Hadoop环境中进行简单的数据处理。这个程序的基本任务是对输入文本中的每个单词进行计数,统计出每个单词出现的频率。 Hadoop的核心组件包括HDFS(Hadoop ...