`

WordCount 源码浅析(1)

阅读更多

以WordCount为例子开始学习hadoop,试着去分析hadoop的工作机制。这篇文章的目的是分析从创建JOB,到读取Configuration的过程

package com.fnk.hadoop;

import java.io.IOException;
import java.net.URL;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {
	public static class Map extends MapReduceBase implements
			Mapper<LongWritable, Text, Text, IntWritable> {

		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(LongWritable key, Text value,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			String line = value.toString();
			StringTokenizer tokenizer = new StringTokenizer(line, ",");
			while (tokenizer.hasMoreTokens()) {
				word.set(tokenizer.nextToken());
				output.collect(word, one);
			}
		}
	}

	public static class Reduce extends MapReduceBase implements
			Reducer<Text, IntWritable, Text, IntWritable> {
		public void reduce(Text key, Iterator<IntWritable> values,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			int sum = 0;
			while (values.hasNext()) {
				sum += values.next().get();
			}
			output.collect(key, new IntWritable(sum));
		}
	}

	public static void main(String[] args) throws Exception {
                        //设置当前类到JobConf,并且初始化JobConf
		JobConf conf = new JobConf(WordCount.class);
		System.out.println(conf.get("fs.default.name"));
		conf.setJobName("wordcount");
                        //设置输出Key的类别
		conf.setOutputKeyClass(Text.class);
                        //设置输出value的类别
		conf.setOutputValueClass(IntWritable.class);
                        //设置mapper的类
		conf.setMapperClass(Map.class);
                        //设置combiner类,此类是用来减少Mapper和Reduce之间的数据传  //输量的,提高performace。要求输入和输出的Key 类型要一致。输入和输出的//value类型也要一致。
		conf.setCombinerClass(Reduce.class);
		conf.setReducerClass(Reduce.class);
		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);
                        //设置输入文件夹和输出文件夹 
                        //输入输出文件夹的格式为hdfs://localhost:9000/input      //hdfs://localhost:9000/output
		FileInputFormat.setInputPaths(conf, new Path(args[0]));
		FileOutputFormat.setOutputPath(conf, new Path(args[1]
				+ System.currentTimeMillis()));
		JobClient.runJob(conf);
	}
    //设置mapper的类,调用Configuration类的setClass方法
  public void setMapperClass(Class<? extends Mapper> theClass) {
    setClass("mapred.mapper.class", theClass, Mapper.class);
  }
}

public class JobConf extends Configuration { 
//初始化Jobconf
public JobConf(Class exampleClass) {
    设置jar类
    setJarByClass(exampleClass);
    checkAndWarnDeprecation();
  }
//如果cls这个类在jar包里面,那么把它设置到“mapred.jar”这个config里面
public void setJarByClass(Class cls) {
    String jar = findContainingJar(cls);
    if (jar != null) {
      setJar(jar);
    }   
  }
//查找my_class是不是包含在jar包里面,是的话返回jar包名,否则返回null
  private static String findContainingJar(Class my_class) {
    ClassLoader loader = my_class.getClassLoader();
    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
    try {
      for(Enumeration itr = loader.getResources(class_file);
          itr.hasMoreElements();) {
        URL url = (URL) itr.nextElement();
        //判断是不是包含在jar包里面
        if ("jar".equals(url.getProtocol())) {
          String toReturn = url.getPath();
          if (toReturn.startsWith("file:")) {
            toReturn = toReturn.substring("file:".length());
          }
          toReturn = URLDecoder.decode(toReturn, "UTF-8");
          return toReturn.replaceAll("!.*$", "");
        }
      }
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    return null;
  }
  //把jar包名设置到“mapred.jar”这个config里面
  public void setJar(String jar) { set("mapred.jar", jar); }
  //判断是不是设置了配置项mapred.task.maxvmem,设置了通知这个配置项已经被其它配置//项替代了,将调用Configuration类中的方法去取这个配置项,因为第一次调用,将会调用//loadResource方法,把所有的配置项加载到内存
  private void checkAndWarnDeprecation() {
    if(get(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY) != null) {
      LOG.warn(JobConf.deprecatedString(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY)
                + " Instead use " + JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY
                + " and " + JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY);
    }
  }
}


public class Configuration implements Iterable<Map.Entry<String,String>>,Writable {

//
static{
    //print deprecation warning if hadoop-site.xml is found in classpath
    ClassLoader cL = Thread.currentThread().getContextClassLoader();
    if (cL == null) {
      cL = Configuration.class.getClassLoader();
    }
    if(cL.getResource("hadoop-site.xml")!=null) {
      LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
          "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
          + "mapred-site.xml and hdfs-site.xml to override properties of " +
          "core-default.xml, mapred-default.xml and hdfs-default.xml " +
          "respectively");
    }
    addDefaultResource("core-default.xml");
    addDefaultResource("core-site.xml");
  }
//设置class
  public void setClass(String name, Class<?> theClass, Class<?> xface) {
    //判断xface是不是theClass的父类
    if (!xface.isAssignableFrom(theClass))
      throw new RuntimeException(theClass+" not "+xface.getName());
    set(name, theClass.getName());
  }
//读取配置信息到properties
private void loadResource(Properties properties, Object name, boolean quiet) {
    try {
      DocumentBuilderFactory docBuilderFactory 
        = DocumentBuilderFactory.newInstance();
      //ignore all comments inside the xml file
      docBuilderFactory.setIgnoringComments(true);

      //allow includes in the xml file
      docBuilderFactory.setNamespaceAware(true);
      try {
          docBuilderFactory.setXIncludeAware(true);
      } catch (UnsupportedOperationException e) {
        LOG.error("Failed to set setXIncludeAware(true) for parser "
                + docBuilderFactory
                + ":" + e,
                e);
      }
      DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
      Document doc = null;
      Element root = null;

      if (name instanceof URL) {                  // an URL resource
        URL url = (URL)name;
        if (url != null) {
          if (!quiet) {
            LOG.info("parsing " + url);
          }
          doc = builder.parse(url.toString());
        }
      } else if (name instanceof String) {        // a CLASSPATH resource
        URL url = getResource((String)name);
        if (url != null) {
          if (!quiet) {
            LOG.info("parsing " + url);
          }
          doc = builder.parse(url.toString());
        }
      } else if (name instanceof Path) {          // a file resource
        // Can't use FileSystem API or we get an infinite loop
        // since FileSystem uses Configuration API.  Use java.io.File instead.
        File file = new File(((Path)name).toUri().getPath())
          .getAbsoluteFile();
        if (file.exists()) {
          if (!quiet) {
            LOG.info("parsing " + file);
          }
          InputStream in = new BufferedInputStream(new FileInputStream(file));
          try {
            doc = builder.parse(in);
          } finally {
            in.close();
          }
        }
      } else if (name instanceof InputStream) {
        try {
          doc = builder.parse((InputStream)name);
        } finally {
          ((InputStream)name).close();
        }
      } else if (name instanceof Element) {
        root = (Element)name;
      }

      if (doc == null && root == null) {
        if (quiet)
          return;
        throw new RuntimeException(name + " not found");
      }

      if (root == null) {
        root = doc.getDocumentElement();
      }
      if (!"configuration".equals(root.getTagName()))
        LOG.fatal("bad conf file: top-level element not <configuration>");
      NodeList props = root.getChildNodes();
      for (int i = 0; i < props.getLength(); i++) {
        Node propNode = props.item(i);
        if (!(propNode instanceof Element))
          continue;
        Element prop = (Element)propNode;
        if ("configuration".equals(prop.getTagName())) {
          loadResource(properties, prop, quiet);
          continue;
        }
        if (!"property".equals(prop.getTagName()))
          LOG.warn("bad conf file: element not <property>");
        NodeList fields = prop.getChildNodes();
        String attr = null;
        String value = null;
        boolean finalParameter = false;
        for (int j = 0; j < fields.getLength(); j++) {
          Node fieldNode = fields.item(j);
          if (!(fieldNode instanceof Element))
            continue;
          Element field = (Element)fieldNode;
          if ("name".equals(field.getTagName()) && field.hasChildNodes())
            attr = ((Text)field.getFirstChild()).getData().trim();
          if ("value".equals(field.getTagName()) && field.hasChildNodes())
            value = ((Text)field.getFirstChild()).getData();
          if ("final".equals(field.getTagName()) && field.hasChildNodes())
            finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
        }
        
        // Ignore this parameter if it has already been marked as 'final'
        if (attr != null && value != null) {
          if (!finalParameters.contains(attr)) {
            properties.setProperty(attr, value);
            if (finalParameter)
              finalParameters.add(attr);
          } else {
            LOG.warn(name+":a attempt to override final parameter: "+attr
                     +";  Ignoring.");
          }
        }
      }
        
    } catch (IOException e) {
      LOG.fatal("error parsing conf file: " + e);
      throw new RuntimeException(e);
    } catch (DOMException e) {
      LOG.fatal("error parsing conf file: " + e);
      throw new RuntimeException(e);
    } catch (SAXException e) {
      LOG.fatal("error parsing conf file: " + e);
      throw new RuntimeException(e);
    } catch (ParserConfigurationException e) {
      LOG.fatal("error parsing conf file: " + e);
      throw new RuntimeException(e);
    }
  }
}
 

 

分享到:
评论

相关推荐

    基于Windows eclipse maven Hadoop 的WordCount源码

    总结来说,基于Windows、Eclipse、Maven和Hadoop实现WordCount源码的过程主要包括以下步骤:配置开发环境、创建Maven项目、编写MapReduce代码、打包成jar文件,以及在Hadoop集群上运行作业。通过这个过程,我们可以...

    hadoop 框架下 mapreduce源码例子 wordcount

    在这个例子中,我们将深入理解Hadoop MapReduce的工作原理以及如何在Eclipse环境下实现WordCount源码。 1. **Hadoop MapReduce概述**: Hadoop MapReduce是由两个主要部分组成的:Map阶段和Reduce阶段。Map阶段将...

    Hadoop开发WordCount源码详细讲解

    Hadoop开发WordCount源码程序详细讲解,每一行都带注释说明。

    Hadoop环境搭建与WordCount实例浅析.pdf

    【Hadoop环境搭建与WordCount实例浅析】 Hadoop是一个分布式计算框架,广泛应用于大数据处理。要搭建Hadoop环境并实现WordCount实例,你需要遵循以下步骤: 1. **环境准备**: - 首先,你需要一个Linux操作系统...

    WordCount2_hadoopwordcount_

    在Hadoop生态系统中,`WordCount`程序是一个经典的示例,用于演示分布式计算的基本原理。在标题中的"WordCount2_hadoopwordcount_"可能指的是Hadoop WordCount的第二个版本,通常是在Hadoop 2.x环境下运行。这个程序...

    WordCount源码

    这里我们将深入探讨WordCount的1.0和2.0版本的Java源码,以及它们在大数据处理中的应用。 1. **Hadoop概述** - Hadoop是基于Java开发的,其核心包括HDFS(Hadoop Distributed File System)和MapReduce。 - HDFS...

    hadoop入门java代码hdfs文件操作 wordCount源码

    hadoop入门级的代码 Java编写 eclipse可运行 包含 hdfs的文件操作 rpc远程调用的简单示例 map-reduce的几个例子:wordcount 学生平均成绩 手机流量统计

    大数据实验报告Hadoop编程实现wordcount单词统计程序附源码.doc

    大数据实验报告 Hadoop 编程实现 wordcount 单词统计程序附源码 本实验报告旨在介绍使用 Hadoop 编程实现 wordcount 单词统计程序的步骤和代码实现。实验的目的在于熟悉 Hadoop 虚拟机的安装与环境的配置,初步理解...

    Hadoop mapreduce实现wordcount

    【标题】Hadoop MapReduce 实现 WordCount MapReduce 是 Apache Hadoop 的核心组件之一,它为大数据处理提供了一个分布式计算框架。WordCount 是 MapReduce 框架中经典的入门示例,它统计文本文件中每个单词出现的...

    最简单MR WordCount

    在提供的压缩包文件"1.WordCount"中,可能包含了实现WordCount的Java源代码、配置文件、输入数据以及运行和测试所需的脚本。这些文件可以帮助用户理解整个流程,从编写代码到提交作业,再到查看结果。通过分析和运行...

    word源码java-wordcount:字数

    word源码java wordcount Hadoop: Intellij结合Maven本地运行和调试MapReduce程序 (无需搭载Hadoop和HDFS环境) 2018年05月20日 10:33:18 阅读数:11 编辑 楼主花费了1天终于按照教程自己第一次成功运行了这个案例。 ...

    Hadoop集群-WordCount运行详解.pdf

    1.3.2旧的WordCount分析与1.3.3新的WordCount分析,从源码层面解读了旧版和新版的WordCount程序实现,展示了程序如何定义Map和Reduce函数以及它们的工作机制。 1.4WordCount处理过程描述了WordCount程序在MapReduce...

    Hadoop入门脚本WordCount

    《Hadoop入门脚本WordCount详解》 在大数据处理领域,Hadoop是一个不可或缺的重要工具,它的分布式计算模型为海量数据的处理提供了强大的支持。而WordCount则是Hadoop入门的经典示例,通过它,我们可以深入理解...

    hadoop运行wordcount实例

    ### Hadoop运行WordCount实例详解 #### 一、Hadoop简介与WordCount程序的重要性 Hadoop 是一个由Apache基金会所开发的分布式系统基础架构。它能够处理非常庞大的数据集,并且能够在集群上运行,通过将大数据分割...

    spark下实现wordcount

    ### Spark 下实现 WordCount #### 一、简介 在大数据处理领域,Apache Spark 是一个非常流行的框架,它能够高效地处理大规模数据集。WordCount 是一个经典的示例程序,用于统计文本文件中每个单词出现的次数。本篇...

    Hadoop经典如门代码实例教程wordCount项目源码

    代码是基于windows系统下搭建eclipse+hadoop2.8.3开发实例。使用eclipse直接导入代码使用的前提是,需要在本地配置要hadoop2.8.3,本代码亲测可用,能够详细地统计出dataNode下面的file3.txt文件中单词的个数。

    使用hadoop实现WordCount实验报告.docx

    例如,输入文件包含"Hello world",则WordCount程序的输出应为"Hello 1"和"world 1"。 ### 二、实验原理与步骤 1. **配置Hadoop环境** - **Java安装与环境变量配置**:确保系统已安装Java 1.8,并配置好JAVA_HOME...

    Hadoop的WordCount实例代码

    WordCount是Hadoop中的一个经典示例程序,用于演示如何在Hadoop环境中进行简单的数据处理。这个程序的基本任务是对输入文本中的每个单词进行计数,统计出每个单词出现的频率。 Hadoop的核心组件包括HDFS(Hadoop ...

Global site tag (gtag.js) - Google Analytics