- 浏览: 183788 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
kittaaron123:
爱玛,写得很好,最近也想看下这个写个文档 可以借鉴一下
Java NIO——Selector机制解析三(源码分析) -
liaohb:
pollWrapper:保存selector上注册的FD,包括 ...
Java NIO——Selector机制解析三(源码分析) -
wertyliii:
写的很好。。感觉再做点比喻什么的就更好理解了
Java NIO——Selector机制解析三(源码分析)
接触hadoop半年多了,主要使用hadoop+hive做数据分析。部署和使用现在都没什么问题了,但是就是对其内部原理不是非常清楚,所以准备从头从源码开始系统学习,把学习过程中的问题和自己的理解记录在此。
下面是一段调试wordcount:
环境:windows + cygwin + eclipse(怎么搭建环境、和搭建过程中遇到的问题以后有空再写,现在主要学习一下hadoop的运行原理),伪分布式模式
测试数据:
t1.txt:
hello world! hello ufida! yes i do! say something.
t2.txt:
cow is a cow. word count job test.
调试代码:
public class WordCount { static Logger log = Logger.getLogger(WordCount.class); public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { log.info("map 进程:" + Thread.currentThread().toString()); log.info("map 参数:key:" + key.get() + ";value:" + value); String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); log.info("word:" + word.toString()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { log.info("reduce 进程:" + Thread.currentThread().toString()); String s = ""; int sum = 0; while (values.hasNext()) { IntWritable i = values.next(); s = s + "[" + i.get() + "]"; sum += i.get(); } log.info("reduce 参数:key:" + key.toString() + ";values:" + s); output.collect(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { log.info("单词统计..."); JobConf conf = new JobConf(WordCount.class); log.info("jar包位置:" + conf.getJar()); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path("/temp/in")); FileOutputFormat.setOutputPath(conf, new Path("/temp/out")); JobClient.runJob(conf); } }
运行日志:
12/02/09 11:08:05 INFO test.WordCount: 单词统计... 12/02/09 11:08:05 INFO test.WordCount: jar包位置:D:\workspaces\eclipseWorkspace\.metadata\.plugins\org.apache.hadoop.eclipse\hadoopTest_WordCount.java-234599505300279609.jar 12/02/09 11:08:06 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 12/02/09 11:08:06 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 12/02/09 11:08:06 INFO mapred.FileInputFormat: Total input paths to process : 2 12/02/09 11:08:06 INFO mapred.JobClient: Running job: job_local_0001 12/02/09 11:08:06 INFO mapred.FileInputFormat: Total input paths to process : 2 12/02/09 11:08:06 INFO mapred.MapTask: numReduceTasks: 1 12/02/09 11:08:06 INFO mapred.MapTask: io.sort.mb = 100 12/02/09 11:08:06 INFO mapred.MapTask: data buffer = 79691776/99614720 12/02/09 11:08:06 INFO mapred.MapTask: record buffer = 262144/327680 12/02/09 11:08:06 INFO test.WordCount: map 进程:Thread[Thread-14,5,main] 12/02/09 11:08:06 INFO test.WordCount: map 参数:key:0;value:hello world! hello ufida! 12/02/09 11:08:06 INFO test.WordCount: word:hello 12/02/09 11:08:06 INFO test.WordCount: word:world! 12/02/09 11:08:06 INFO test.WordCount: word:hello 12/02/09 11:08:06 INFO test.WordCount: word:ufida! 12/02/09 11:08:06 INFO test.WordCount: map 进程:Thread[Thread-14,5,main] 12/02/09 11:08:06 INFO test.WordCount: map 参数:key:27;value:yes i do! 12/02/09 11:08:06 INFO test.WordCount: word:yes 12/02/09 11:08:06 INFO test.WordCount: word:i 12/02/09 11:08:06 INFO test.WordCount: word:do! 12/02/09 11:08:06 INFO test.WordCount: map 进程:Thread[Thread-14,5,main] 12/02/09 11:08:06 INFO test.WordCount: map 参数:key:38;value:say something. 12/02/09 11:08:06 INFO test.WordCount: word:say 12/02/09 11:08:06 INFO test.WordCount: word:something. 12/02/09 11:08:06 INFO mapred.MapTask: Starting flush of map output 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:do!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:hello;values:[1][1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:i;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:say;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:something.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:ufida!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:world!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:yes;values:[1] 12/02/09 11:08:07 INFO mapred.MapTask: Finished spill 0 12/02/09 11:08:07 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 12/02/09 11:08:07 INFO mapred.LocalJobRunner: hdfs://localhost:9000/temp/in/t1.txt:0+52 12/02/09 11:08:07 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done. 12/02/09 11:08:07 INFO mapred.MapTask: numReduceTasks: 1 12/02/09 11:08:07 INFO mapred.MapTask: io.sort.mb = 100 12/02/09 11:08:07 INFO mapred.MapTask: data buffer = 79691776/99614720 12/02/09 11:08:07 INFO mapred.MapTask: record buffer = 262144/327680 12/02/09 11:08:07 INFO test.WordCount: map 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: map 参数:key:0;value:cow is a cow. 12/02/09 11:08:07 INFO test.WordCount: word:cow 12/02/09 11:08:07 INFO test.WordCount: word:is 12/02/09 11:08:07 INFO test.WordCount: word:a 12/02/09 11:08:07 INFO test.WordCount: word:cow. 12/02/09 11:08:07 INFO test.WordCount: map 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: map 参数:key:15;value:word count job test. 12/02/09 11:08:07 INFO test.WordCount: word:word 12/02/09 11:08:07 INFO test.WordCount: word:count 12/02/09 11:08:07 INFO test.WordCount: word:job 12/02/09 11:08:07 INFO test.WordCount: word:test. 12/02/09 11:08:07 INFO mapred.MapTask: Starting flush of map output 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:a;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:count;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:is;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:job;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:test.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:word;values:[1] 12/02/09 11:08:07 INFO mapred.MapTask: Finished spill 0 12/02/09 11:08:07 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 12/02/09 11:08:07 INFO mapred.LocalJobRunner: hdfs://localhost:9000/temp/in/t2.txt:0+35 12/02/09 11:08:07 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000001_0' done. 12/02/09 11:08:07 INFO mapred.LocalJobRunner: 12/02/09 11:08:07 INFO mapred.Merger: Merging 2 sorted segments 12/02/09 11:08:07 INFO mapred.Merger: Down to the last merge-pass, with 2 segments left of total size: 180 bytes 12/02/09 11:08:07 INFO mapred.LocalJobRunner: 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:a;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:count;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:cow.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:do!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:hello;values:[2] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:i;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:is;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:job;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:say;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:something.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:test.;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:ufida!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:word;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:world!;values:[1] 12/02/09 11:08:07 INFO test.WordCount: reduce 进程:Thread[Thread-14,5,main] 12/02/09 11:08:07 INFO test.WordCount: reduce 参数:key:yes;values:[1] 12/02/09 11:08:07 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 12/02/09 11:08:07 INFO mapred.LocalJobRunner: 12/02/09 11:08:07 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now 12/02/09 11:08:07 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://localhost:9000/temp/out 12/02/09 11:08:07 INFO mapred.LocalJobRunner: reduce > reduce 12/02/09 11:08:07 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done. 12/02/09 11:08:07 INFO mapred.JobClient: map 100% reduce 100% 12/02/09 11:08:07 INFO mapred.JobClient: Job complete: job_local_0001 12/02/09 11:08:07 INFO mapred.JobClient: Counters: 15 12/02/09 11:08:07 INFO mapred.JobClient: FileSystemCounters 12/02/09 11:08:07 INFO mapred.JobClient: FILE_BYTES_READ=62828 12/02/09 11:08:07 INFO mapred.JobClient: HDFS_BYTES_READ=62311 12/02/09 11:08:07 INFO mapred.JobClient: FILE_BYTES_WRITTEN=63761 12/02/09 11:08:07 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=125860 12/02/09 11:08:07 INFO mapred.JobClient: Map-Reduce Framework 12/02/09 11:08:07 INFO mapred.JobClient: Reduce input groups=16 12/02/09 11:08:07 INFO mapred.JobClient: Combine output records=16 12/02/09 11:08:07 INFO mapred.JobClient: Map input records=5 12/02/09 11:08:07 INFO mapred.JobClient: Reduce shuffle bytes=0 12/02/09 11:08:07 INFO mapred.JobClient: Reduce output records=16 12/02/09 11:08:07 INFO mapred.JobClient: Spilled Records=32 12/02/09 11:08:07 INFO mapred.JobClient: Map output bytes=154 12/02/09 11:08:07 INFO mapred.JobClient: Map input bytes=87 12/02/09 11:08:07 INFO mapred.JobClient: Combine input records=17 12/02/09 11:08:07 INFO mapred.JobClient: Map output records=17 12/02/09 11:08:07 INFO mapred.JobClient: Reduce input records=16
本以为hadoop会开很多线程来运行一个job,但是从日志“Thread[Thread-14,5,main]”可以看出其实一直都是一个线程在运行,可能是因为数据量太小,没有超过一个块的大小,所以只开了一个线程吧。具体以后再研究一下源码。仔细看日志,可以发现其大概运行过程如下(伪代码):
checkNumberPath();//检查输入文件个数(2个) for(i=0;i<2;i++){ Array lines = readFile(i);//读取文件所有的行 for(line : lines){ map();//解析出word,添加到Collector combine(); } } reduce();
从日志最后几行,map过程、combine过程、reduce过程 之前之后多少个输入和输出也能可能出大概过程。
发表评论
-
hadoop学习——IO之ObjectWritable
2012-02-16 12:50 3224ObjectWritable类主要方法 public voi ... -
hadoop学习5——从start-all.sh入手调试源码
2012-02-13 18:08 7069hadoop0.20.2 一下为引用别处内容: 第一 ... -
hadoop学习4——使用hadoop压缩(zipping)文件
2012-02-10 15:15 6269hadoop0.20.2 1.使用streaming命令(摘 ... -
hadoop学习3——DistributedCache加载本地库
2012-02-10 10:29 2514本地库位置:hadoop发行版的lib/native目录下 ... -
hadoop学习2——DistributedCache的部分用法
2012-02-09 17:35 3375DistributedCache的部分用法。 调试代码:wo ... -
hadoop问题记录1
2012-02-09 11:43 5705eclipse调试时遇到如下问题: 12/02/09 10: ...
相关推荐
### Hadoop MapReduce任务提交与执行流程解析 #### 一、客户端提交任务 在Hadoop MapReduce框架中,客户端的任务提交是整个MapReduce作业启动的关键步骤。这一过程主要由`JobClient`类中的`runJob(JobConf job)`...
总结起来,"hadoop学习之wordCount以及文件上传demo"涵盖了Hadoop的基本操作,包括数据处理的核心——MapReduce模型,以及文件系统的使用。通过WordCount实例,我们可以了解Hadoop的分布式计算原理;通过文件上传,...
1. **Linux环境**:Linux操作系统因其稳定性和高效性,常被用作Hadoop集群的基础操作系统。开发者在Linux终端中通过命令行交互,执行Hadoop相关的操作,如上传、提交和监控任务。 2. **Hadoop**:Hadoop是Apache...
首先,我们要理解Hadoop的核心组件——HDFS(Hadoop Distributed File System)和MapReduce。HDFS是Hadoop的分布式文件系统,将大文件分割成多个块并存储在集群的不同节点上,以确保容错性和高可用性。MapReduce则是...
- **案例1:WordCount程序**:实现最经典的Hadoop示例程序——词频统计,介绍如何编写MapReduce程序来处理文本文件中的单词计数问题。 - **案例2:日志分析**:利用Hadoop对网站的日志文件进行处理,提取有价值的...
企业在实施机器学习过程中会遇到各种挑战,如数据的收集与预处理、特征提取、模型开发与训练、模型部署和服务等环节。此外,如何实现高效的迭代速度也是一个重要问题。针对这些问题,LinkedIn提出了自己的解决方案,...
在Hadoop 2.6.0中,运行一个简单的MapReduce示例——WordCount,至少需要以下三个JAR文件: - `$HADOOP_HOME/share/hadoop/common/hadoop-common-2.6.0.jar` - `$HADOOP_HOME/share/hadoop/mapreduce/hadoop-...
Hadoop的`Job`类用于提交任务到集群执行。 以下是一个简化的Java代码示例: ```java import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable;...
通过这个教学课件,你将了解到如何在Hadoop环境中配置和运行MapReduce作业,这涵盖了从准备输入数据、上传到HDFS,到启动Job以及查看结果的全过程。这对于理解和掌握Hadoop大数据处理的基本操作至关重要。在实际工作...
在Java编程中,空指针异常(NullPointerException)通常表示尝试访问或操作一个null对象,这可能是在执行Hadoop作业(job)时,系统无法找到或加载hadoop.dll,进而无法初始化或调用某些功能。 解决这个问题的方法...
《Hadoop源码分析——MapReduce深度解析》 Hadoop,作为云计算领域的核心组件,以其分布式存储和计算能力,为大数据处理提供了强大的支持。MapReduce是Hadoop的主要计算框架,其设计思想源于Google的论文,旨在解决...
5. **MapReduce**:MapReduce是Hadoop的数据处理框架,它将复杂的大规模数据处理任务拆分为两个阶段——Map和Reduce,使得数据计算能够在分布式环境中并行执行。开发者可以通过编写Mapper和Reducer类来实现自己的...
首先,安装Hadoop Eclipse插件是将Hadoop开发环境与流行的Java IDE——Eclipse相结合的关键步骤。通常,可以通过Eclipse的"Help" -> "Install New Software"菜单,输入插件的更新地址来完成安装。确保选择与你的...
1. **hdfs-default.xml**:这是Hadoop分布式文件系统的默认配置文件。它包含了HDFS的各种配置参数,如命名节点(NameNode)和数据节点(DataNode)的设置、副本数量、块大小、安全模式、文件权限等。例如,`dfs....
为了方便在Eclipse中进行Hadoop应用的开发和调试,Apache Hadoop提供了专门的Eclipse插件——hadoop-eclipse-plugin。本文将详细介绍这个插件以及它如何助力Hadoop开发。 hadoop-eclipse-plugin-2.6.0是针对Hadoop ...
此外,还可能有一个`Driver.java`文件,它负责配置Job并提交到Hadoop集群执行。 总结来说,这个项目展示了如何结合Hadoop的分布式特性与朴素贝叶斯算法,实现大规模文本数据的分类。通过Java编程,我们可以有效地...
在本篇中,我们将深入探讨Hadoop MapReduce编程模型,并以一个实际的应用场景为例——模拟QQ推荐你可能认识的人。这个例子将帮助我们理解MapReduce如何处理大规模数据,以及如何利用Java来实现这一过程。 Hadoop是...
1. **Hadoop作为分布式编程框架的介绍** 2. **Hadoop集群的基本硬件组件** 3. **Hadoop的安装与配置** 4. **MapReduce框架的基础知识** 5. **编写与运行基本的MapReduce程序** 6. **高级MapReduce技术** 7. **Hadoop...