`
goon
  • 浏览: 183788 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

hadoop学习1——job执行过程

 
阅读更多

接触hadoop半年多了,主要使用hadoop+hive做数据分析。部署和使用现在都没什么问题了,但是就是对其内部原理不是非常清楚,所以准备从头从源码开始系统学习,把学习过程中的问题和自己的理解记录在此。

下面是一段调试wordcount:

环境:windows + cygwin + eclipse(怎么搭建环境、和搭建过程中遇到的问题以后有空再写,现在主要学习一下hadoop的运行原理),伪分布式模式

测试数据:

   t1.txt:

hello world! hello ufida!
yes i do!
say something.

 

t2.txt:

cow is a cow.
word count job test.

 

 

调试代码:

public class WordCount {
	static Logger log = Logger.getLogger(WordCount.class);
	
	public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(LongWritable key, Text value,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			
			log.info("map 进程:" + Thread.currentThread().toString());
			log.info("map 参数:key:" + key.get() + ";value:" + value);
			
			String line = value.toString();
			StringTokenizer tokenizer = new StringTokenizer(line);
			while (tokenizer.hasMoreTokens()) {
				word.set(tokenizer.nextToken());
				log.info("word:" + word.toString());
				output.collect(word, one);
			}
		}
	}

	public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
		public void reduce(Text key, Iterator<IntWritable> values,
				OutputCollector<Text, IntWritable> output, Reporter reporter)
				throws IOException {
			
			log.info("reduce 进程:" + Thread.currentThread().toString());
			
			String s = "";
			int sum = 0;
			while (values.hasNext()) {
				IntWritable i = values.next();
				s = s + "[" + i.get() + "]";
				
				sum += i.get();
			}
			log.info("reduce 参数:key:" + key.toString() + ";values:" + s);
			
			output.collect(key, new IntWritable(sum));
		}
	}

	public static void main(String[] args) throws Exception {
		log.info("单词统计...");
		JobConf conf = new JobConf(WordCount.class);
		log.info("jar包位置:" + conf.getJar());
		conf.setJobName("wordcount");

		conf.setOutputKeyClass(Text.class);
		conf.setOutputValueClass(IntWritable.class);

		conf.setMapperClass(Map.class);
		conf.setCombinerClass(Reduce.class);
		conf.setReducerClass(Reduce.class);

		conf.setInputFormat(TextInputFormat.class);
		conf.setOutputFormat(TextOutputFormat.class);

		FileInputFormat.setInputPaths(conf, new Path("/temp/in"));
		FileOutputFormat.setOutputPath(conf, new Path("/temp/out"));

		JobClient.runJob(conf);
	}
}

 运行日志:

12/02/09 11:08:05 INFO test.WordCount: 单词统计...
12/02/09 11:08:05 INFO test.WordCount: jar包位置:D:\workspaces\eclipseWorkspace\.metadata\.plugins\org.apache.hadoop.eclipse\hadoopTest_WordCount.java-234599505300279609.jar
12/02/09 11:08:06 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
12/02/09 11:08:06 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/02/09 11:08:06 INFO mapred.FileInputFormat: Total input paths to process : 2
12/02/09 11:08:06 INFO mapred.JobClient: Running job: job_local_0001
12/02/09 11:08:06 INFO mapred.FileInputFormat: Total input paths to process : 2
12/02/09 11:08:06 INFO mapred.MapTask: numReduceTasks: 1
12/02/09 11:08:06 INFO mapred.MapTask: io.sort.mb = 100
12/02/09 11:08:06 INFO mapred.MapTask: data buffer = 79691776/99614720
12/02/09 11:08:06 INFO mapred.MapTask: record buffer = 262144/327680
12/02/09 11:08:06 INFO test.WordCount: map 进程:Thread[Thread-14,5,main]
12/02/09 11:08:06 INFO test.WordCount: map 参数:key:0;value:hello world! hello ufida!
12/02/09 11:08:06 INFO test.WordCount: word:hello
12/02/09 11:08:06 INFO test.WordCount: word:world!
12/02/09 11:08:06 INFO test.WordCount: word:hello
12/02/09 11:08:06 INFO test.WordCount: word:ufida!
12/02/09 11:08:06 INFO test.WordCount: map 进程:Thread[Thread-14,5,main]
12/02/09 11:08:06 INFO test.WordCount: map 参数:key:27;value:yes i do!
12/02/09 11:08:06 INFO test.WordCount: word:yes
12/02/09 11:08:06 INFO test.WordCount: word:i
12/02/09 11:08:06 INFO test.WordCount: word:do!
12/02/09 11:08:06 INFO test.WordCount: map 进程:Thread[Thread-14,5,main]
12/02/09 11:08:06 INFO test.WordCount: map 参数:key:38;value:say something.
12/02/09 11:08:06 INFO test.WordCount: word:say
12/02/09 11:08:06 INFO test.WordCount: word:something.
12/02/09 11:08:06 INFO mapred.MapTask: Starting flush of map output
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:do!;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:hello;values:[1][1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:i;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:say;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:something.;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:ufida!;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:world!;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:yes;values:[1]
12/02/09 11:08:07 INFO mapred.MapTask: Finished spill 0
12/02/09 11:08:07 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
12/02/09 11:08:07 INFO mapred.LocalJobRunner: hdfs://localhost:9000/temp/in/t1.txt:0+52
12/02/09 11:08:07 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.
12/02/09 11:08:07 INFO mapred.MapTask: numReduceTasks: 1
12/02/09 11:08:07 INFO mapred.MapTask: io.sort.mb = 100
12/02/09 11:08:07 INFO mapred.MapTask: data buffer = 79691776/99614720
12/02/09 11:08:07 INFO mapred.MapTask: record buffer = 262144/327680
12/02/09 11:08:07 INFO test.WordCount: map 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: map 参数:key:0;value:cow is a cow.
12/02/09 11:08:07 INFO test.WordCount: word:cow
12/02/09 11:08:07 INFO test.WordCount: word:is
12/02/09 11:08:07 INFO test.WordCount: word:a
12/02/09 11:08:07 INFO test.WordCount: word:cow.
12/02/09 11:08:07 INFO test.WordCount: map 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: map 参数:key:15;value:word count job test.
12/02/09 11:08:07 INFO test.WordCount: word:word
12/02/09 11:08:07 INFO test.WordCount: word:count
12/02/09 11:08:07 INFO test.WordCount: word:job
12/02/09 11:08:07 INFO test.WordCount: word:test.
12/02/09 11:08:07 INFO mapred.MapTask: Starting flush of map output
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:a;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:count;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow.;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:is;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:job;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:test.;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:word;values:[1]
12/02/09 11:08:07 INFO mapred.MapTask: Finished spill 0
12/02/09 11:08:07 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
12/02/09 11:08:07 INFO mapred.LocalJobRunner: hdfs://localhost:9000/temp/in/t2.txt:0+35
12/02/09 11:08:07 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done.
12/02/09 11:08:07 INFO mapred.LocalJobRunner: 
12/02/09 11:08:07 INFO mapred.Merger: Merging 2 sorted segments
12/02/09 11:08:07 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 180 bytes
12/02/09 11:08:07 INFO mapred.LocalJobRunner: 
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:a;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:count;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow.;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:do!;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:hello;values:[2]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:i;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:is;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:job;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:say;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:something.;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:test.;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:ufida!;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:word;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:world!;values:[1]
12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main]
12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:yes;values:[1]
12/02/09 11:08:07 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
12/02/09 11:08:07 INFO mapred.LocalJobRunner: 
12/02/09 11:08:07 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now
12/02/09 11:08:07 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://localhost:9000/temp/out
12/02/09 11:08:07 INFO mapred.LocalJobRunner: reduce > reduce
12/02/09 11:08:07 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.
12/02/09 11:08:07 INFO mapred.JobClient:  map 100% reduce 100%
12/02/09 11:08:07 INFO mapred.JobClient: Job complete: job_local_0001
12/02/09 11:08:07 INFO mapred.JobClient: Counters: 15
12/02/09 11:08:07 INFO mapred.JobClient:   FileSystemCounters
12/02/09 11:08:07 INFO mapred.JobClient:     FILE_BYTES_READ=62828
12/02/09 11:08:07 INFO mapred.JobClient:     HDFS_BYTES_READ=62311
12/02/09 11:08:07 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=63761
12/02/09 11:08:07 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=125860
12/02/09 11:08:07 INFO mapred.JobClient:   Map-Reduce Framework
12/02/09 11:08:07 INFO mapred.JobClient:     Reduce input groups=16
12/02/09 11:08:07 INFO mapred.JobClient:     Combine output records=16
12/02/09 11:08:07 INFO mapred.JobClient:     Map input records=5
12/02/09 11:08:07 INFO mapred.JobClient:     Reduce shuffle bytes=0
12/02/09 11:08:07 INFO mapred.JobClient:     Reduce output records=16
12/02/09 11:08:07 INFO mapred.JobClient:     Spilled Records=32
12/02/09 11:08:07 INFO mapred.JobClient:     Map output bytes=154
12/02/09 11:08:07 INFO mapred.JobClient:     Map input bytes=87
12/02/09 11:08:07 INFO mapred.JobClient:     Combine input records=17
12/02/09 11:08:07 INFO mapred.JobClient:     Map output records=17
12/02/09 11:08:07 INFO mapred.JobClient:     Reduce input records=16

 本以为hadoop会开很多线程来运行一个job,但是从日志“Thread[Thread-14,5,main]”可以看出其实一直都是一个线程在运行,可能是因为数据量太小,没有超过一个块的大小,所以只开了一个线程吧。具体以后再研究一下源码。仔细看日志,可以发现其大概运行过程如下(伪代码):

checkNumberPath();//检查输入文件个数(2个)
for(i=0;i<2;i++){
   Array lines = readFile(i);//读取文件所有的行
  for(line : lines){
       map();//解析出word,添加到Collector
      combine();
  }
}
reduce();

 从日志最后几行,map过程、combine过程、reduce过程 之前之后多少个输入和输出也能可能出大概过程。

分享到:
评论

相关推荐

    Hadoop学习总结之四:Map-Reduce过程解析

    ### Hadoop MapReduce任务提交与执行流程解析 #### 一、客户端提交任务 在Hadoop MapReduce框架中,客户端的任务提交是整个MapReduce作业启动的关键步骤。这一过程主要由`JobClient`类中的`runJob(JobConf job)`...

    hadoop学习之wordCount以及文件上传demo,经测试通过

    总结起来,"hadoop学习之wordCount以及文件上传demo"涵盖了Hadoop的基本操作,包括数据处理的核心——MapReduce模型,以及文件系统的使用。通过WordCount实例,我们可以了解Hadoop的分布式计算原理;通过文件上传,...

    Linux提交hadoop任务

    1. **Linux环境**:Linux操作系统因其稳定性和高效性,常被用作Hadoop集群的基础操作系统。开发者在Linux终端中通过命令行交互,执行Hadoop相关的操作,如上传、提交和监控任务。 2. **Hadoop**:Hadoop是Apache...

    hadoop之wordcount例程代码

    首先,我们要理解Hadoop的核心组件——HDFS(Hadoop Distributed File System)和MapReduce。HDFS是Hadoop的分布式文件系统,将大文件分割成多个块并存储在集群的不同节点上,以确保容错性和高可用性。MapReduce则是...

    hadoop经典实战教程

    - **案例1:WordCount程序**:实现最经典的Hadoop示例程序——词频统计,介绍如何编写MapReduce程序来处理文本文件中的单词计数问题。 - **案例2:日志分析**:利用Hadoop对网站的日志文件进行处理,提取有价值的...

    胡克秋-TonY:原生于Hadoop的深度学习执行框架-脱敏.pdf

    企业在实施机器学习过程中会遇到各种挑战,如数据的收集与预处理、特征提取、模型开发与训练、模型部署和服务等环节。此外,如何实现高效的迭代速度也是一个重要问题。针对这些问题,LinkedIn提出了自己的解决方案,...

    使用命令行编译打包运行自己的MapReduce程序 Hadoop2.6.0

    在Hadoop 2.6.0中,运行一个简单的MapReduce示例——WordCount,至少需要以下三个JAR文件: - `$HADOOP_HOME/share/hadoop/common/hadoop-common-2.6.0.jar` - `$HADOOP_HOME/share/hadoop/mapreduce/hadoop-...

    Java实现Hadoop下词配对Wordcount计数代码实现

    Hadoop的`Job`类用于提交任务到集群执行。 以下是一个简化的Java代码示例: ```java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable;...

    Hadoop大数据平台构建、测试Job教学课件.pptx

    通过这个教学课件,你将了解到如何在Hadoop环境中配置和运行MapReduce作业,这涵盖了从准备输入数据、上传到HDFS,到启动Job以及查看结果的全过程。这对于理解和掌握Hadoop大数据处理的基本操作至关重要。在实际工作...

    window环境下需要hadoop.dll

    在Java编程中,空指针异常(NullPointerException)通常表示尝试访问或操作一个null对象,这可能是在执行Hadoop作业(job)时,系统无法找到或加载hadoop.dll,进而无法初始化或调用某些功能。 解决这个问题的方法...

    hadoop源码分析-mapreduce部分.doc

    《Hadoop源码分析——MapReduce深度解析》 Hadoop,作为云计算领域的核心组件,以其分布式存储和计算能力,为大数据处理提供了强大的支持。MapReduce是Hadoop的主要计算框架,其设计思想源于Google的论文,旨在解决...

    hadoop_deploy

    5. **MapReduce**:MapReduce是Hadoop的数据处理框架,它将复杂的大规模数据处理任务拆分为两个阶段——Map和Reduce,使得数据计算能够在分布式环境中并行执行。开发者可以通过编写Mapper和Reducer类来实现自己的...

    hadoop eclipse插件

    首先,安装Hadoop Eclipse插件是将Hadoop开发环境与流行的Java IDE——Eclipse相结合的关键步骤。通常,可以通过Eclipse的"Help" -&gt; "Install New Software"菜单,输入插件的更新地址来完成安装。确保选择与你的...

    hadoop的默认配置文件

    1. **hdfs-default.xml**:这是Hadoop分布式文件系统的默认配置文件。它包含了HDFS的各种配置参数,如命名节点(NameNode)和数据节点(DataNode)的设置、副本数量、块大小、安全模式、文件权限等。例如,`dfs....

    hadoop-eclipse-plugin-2.6.0

    为了方便在Eclipse中进行Hadoop应用的开发和调试,Apache Hadoop提供了专门的Eclipse插件——hadoop-eclipse-plugin。本文将详细介绍这个插件以及它如何助力Hadoop开发。 hadoop-eclipse-plugin-2.6.0是针对Hadoop ...

    基于Hadoop实现朴素贝叶斯文本分类器.zip

    此外,还可能有一个`Driver.java`文件,它负责配置Job并提交到Hadoop集群执行。 总结来说,这个项目展示了如何结合Hadoop的分布式特性与朴素贝叶斯算法,实现大规模文本数据的分类。通过Java编程,我们可以有效地...

    Hadoop (十三)Hadoop-MR编程 -- 【模拟qq推荐你可能认识的人】

    在本篇中,我们将深入探讨Hadoop MapReduce编程模型,并以一个实际的应用场景为例——模拟QQ推荐你可能认识的人。这个例子将帮助我们理解MapReduce如何处理大规模数据,以及如何利用Java来实现这一过程。 Hadoop是...

    Hadoop - Hadoop in Action

    1. **Hadoop作为分布式编程框架的介绍** 2. **Hadoop集群的基本硬件组件** 3. **Hadoop的安装与配置** 4. **MapReduce框架的基础知识** 5. **编写与运行基本的MapReduce程序** 6. **高级MapReduce技术** 7. **Hadoop...

Global site tag (gtag.js) - Google Analytics