- 浏览: 576891 次
- 性别:
- 来自: 广州杭州
文章分类
最新评论
-
bohc:
谢谢,搞了两天了,现在才算是找到问题所在,解决了。
文件在使用FileChannel.map后不能被删除(Windows上) -
zhang0000jun:
在jdk1.8中执行正好和楼主的结果相反,请指教
从Java视角理解CPU缓存(CPU Cache) -
在世界的中心呼喚愛:
forenroll 写道请问楼主的那个分析工具cachemis ...
从Java视角理解CPU缓存(CPU Cache) -
xgj1988:
我这里打出的结果是: 0 L1-dcache-load-mis ...
从Java视角理解CPU缓存(CPU Cache) -
thebye85:
请教下大神,为什么频繁的park会导致大量context sw ...
从Java视角理解CPU上下文切换(Context Switch)
hadoop的源码已经粗看过一遍,但每次想要了解细节的时候,还得去翻代码. 看了又是忘记. 所以我决定这些天把其中的重要的细节记下来。
声明:
1. 本文假设读者已经掌握一些MapReduce的基本概念,曾经编写过MapReduce程序。
2. 此源代码分析是基于hadoop svn的trunk之上(目前0.20.0-dev),由于hadoop正在换新的MapReduce api(org.apache.hadoop.mapreduce包), 以后很多类会弃用,很多接口会改变,这儿只能尽量保持同步。
3. 关于hdfs源代码可以参考caibinbupt的hdfs源代码分析 ,这儿就不再详述。
4. 这篇文章是基于javen 的分析之上的,感谢javen的辛勤劳动。javen的源码分析是在早期的hadoop版本上,在这儿有一些内容会不一样。
一、基本概念
1.1 MapReduce逻辑过程
1.2 MapReduce物理分布
二、实现细节
2.1 总体结构
我们在编写MapReduce程序时通常是上是这样写的:
Configuration conf = new Configuration(); // 读取hadoop配置 Job job = new Job(conf, "作业名称"); // 实例化一道作业 job.setMapperClass(Mapper类型); job.setCombinerClass(Combiner类型); job.setReducerClass(Reducer类型); job.setOutputKeyClass(输出Key的类型); job.setOutputValueClass(输出Value的类型); FileInputFormat.addInputPath(job, new Path(输入hdfs路径)); FileOutputFormat.setOutputPath(job, new Path(输出hdfs路径)); // 其它初始化配置 JobClient.runJob(job);
一道MapRedcue作业是通过JobClient.rubJob(job)向master节点的JobTracker提交的, JobTracker接到JobClient的请求后把其加入作业队列中。在这之前master节点的NameNode, SecondedNameNode,JobTracker和slaves节点的DataNode, TaskTracker都已经启动。JobTracker一直在等待JobClient通过RPC提交作业,而TaskTracker一直通过RPC向 JobTracker发送心跳heartbeat询问有没有任务可做,如果有,让其派发任务给它执行。如果JobTracker的作业队列不为空, 则TaskTracker发送的心跳将会获得JobTracker给它派发的任务。这是一道pull过程: slave主动向master拉生意。slave节点的TaskTracker接到任务后在其本地发起Task,执行任务。以下是简略示意图:
2.1.1 Mapper和Reducer
运行于Hadoop的MapReduce应用程序最基本的组成部分包括一个Mapper和一个Reducer类,以及一个创建JobConf的执行程序,在一些应用中还可以包括一个Combiner类,它实际也是Reducer的实现。
2.1.2 JobTracker和TaskTracker
它们都是由一个master服务JobTracker和多个运行于多个节点的slaver服务TaskTracker两个类提供的服务调度的。master负责调度job的每一个子任务task运行于slave上,并监控它们,如果发现有失败的task就重新运行它,slave则负责直接执行每一个task。TaskTracker都需要运行在HDFS的DataNode上,而JobTracker则不需要,一般情况应该把JobTracker部署在单独的机器上。
2.1.3 JobClient
每一个job都会在用户端通过JobClient类将应用程序以及配置参数Configuration打包成jar文件存储在HDFS,并把路径提交到JobTracker的master服务,然后由master创建每一个Task(即MapTask和ReduceTask)将它们分发到各个TaskTracker服务中去执行。
2.1.4 JobInProgress
JobClient提交job后,JobTracker会创建一个JobInProgress来跟踪和调度这个job,并把它添加到job队列里。JobInProgress会根据提交的job jar中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用于监控和调度MapTask,同时在创建指定数目的TaskInProgress用于监控和调度ReduceTask,缺省为1个ReduceTask。
2.1.5 TaskInProgress
JobTracker启动任务时通过每一个TaskInProgress来launchTask,这时会把Task对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress,作用类似)用于监控和调度该Task。启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。TaskRunner会自动装载job jar,并设置好环境变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中。
2.1.6 MapTask和ReduceTask
一个完整的job会自动依次执行Mapper、Combiner(在JobConf指定了Combiner时执行)和Reducer,其中Mapper和Combiner是由MapTask调用执行,Reducer则由ReduceTask调用,Combiner实际也是Reducer接口类的实现。Mapper会根据job jar中定义的输入数据集按<key1,value1>对读入,处理完成生成临时的<key2,value2>对,如果定义了Combiner,MapTask会在Mapper完成调用该Combiner将相同key的值做合并处理,以减少输出结果集。MapTask的任务全完成即交给ReduceTask进程调用Reducer处理,生成最终结果<key3,value3>对。这个过程在下一部分再详细介绍。
2.2 JobTracker与作业处理
2.2.1 JobClient提交作业
JobClient.runJob(job)静态方法会实例化一个JobClient实例,然后用此实例的submitJob(job)方法向 master提交作业。此方法会返回一个RunningJob对象,它用来跟踪作业的状态。作业提交完毕后,JobClient会根据此对象开始轮询作业的进度,直到作业完成。
submitJob(job)内部是通过submitJobInternal(job)方法完成实质性的作业提交。 submitJobInternal(job)方法首先会向hadoop分布系统文件系统hdfs依次上传三个文件: job.jar, job.split和job.xml。
job.xml: 作业配置,例如Mapper, Combiner, Reducer的类型,输入输出格式的类型等。
job.jar: jar包,里面包含了执行此任务需要的各种类,比如 Mapper,Reducer等实现。
job.split: 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。
这三个文件在hdfs上的路径由hadoop-default.xml文件中的mapreduce系统路径mapred.system.dir属性 + jobid决定。mapred.system.dir属性默认是/tmp/hadoop-user_name/mapred/system。写完这三个文件之后, 此方法会通过RPC调用master节点上的JobTracker.submitJob(job)方法,此时作业已经提交完成。关于RPC的细节,后续章节将会阐述。
2.2.2 JobTacker调度作业
JobTracker接到JobClient提交的作业后,即在JobTracker.submitJob(job)方法中,首先产生一个JobInProgress对象。此对象代表一道作业,它的作用是维护这道作业的所有信息,包括作业剖析JobProfile和最近作业状态JobStatus,并登记此作业所有Tasks进任务表中。随后JobTracker将此JobInProgress对象通过listener.jobAdded(job)方法加入到调度队列中,并用一个成员变量jobs来维护所有的作业。
下面将说明hadoop的作业调度
作业调度在hadoop-0.19.0版得到了很大的改进,原来的调度策略规定是先进先出(FIFO)的。随着hadoop的商业应用增多,各个公司对它的需求也增多。其中Facebook公司提交了一个公平调度器Fair Scheduler; Yahoo!公司提交了Capacity Scheduler。它们分别在hadoop源码树的src/contrib/fairscheduler和src/contrib/capacity- scheduler目录中。而hadoop默认的调度器是FIFO策略的JobQueueTaskScheduler,它有两个成员变量jobQueueJobInProgressListener与eagerTaskInitializationListener。
其中eagerTaskInitializationListener负责任务Task的初始化。其具体实现是这样的: 这个listener在初始化时会开启一个JobInitThread线程,当作业通过jobAdded(job)加入到初始化队列jobInitQueue中,根据作业的优先级排序(resortInitQueue方法)后, 这个线程就会调用JobInProgress.initTasks()立即初始化作业的所有任务。
2.2.3 JobInProgress初始化任务
任务Task分两种: MapTask 和reduceTask,它们的管理对象都是TaskInProgress 。
JobInProgress.initTasks()方法首先从JobClient上传的job.split文件中读取所有数据块的列表,然后根据这个列表创建对应数目的Map执行管理对象TaskInProgress。创建这些TaskInProgress对象完毕后,initTasks()方法会通过createCache()方法为这些对象产生一个未执行任务的Map缓存nonRunningMapCache。slave端的TaskTracker向master发送心跳时,就可以直接从这个cache中取任务去执行。createCache()方法的作用是为以上TaskInProgress对象在网络拓扑结构上分配拥有此任务数据块的节点。从近到远一层一层地寻找,首先是同一节点,然后在寻找同一机柜上的节点,接着寻找相同关换机下的节点,直到找了maxLevel层结束。这样的话,在JobTracker给TaskTracker派发任务的时候,可以迅速找到最近的TaskTracker,让它执行任务。
其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,缺省只创建1个Reduce任务。监控和调度Reduce任务的也是TaskInProgress类,不过构造方法有所不同,TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask。同样地,initTasks()也会通过createCache()方法对这些TaskInProgress对象寻找maxLevel层的可行TaskTracker,进而产生nonRunningReduceCache成员。
JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,然后再调用JobHistory.JobInfo.logStarted()记录job的执行日志。到这里JobTracker里初始化job的过程全部结束,执行则是通过另一异步的方式处理的,下面接着介绍它。
评论
JobClient.runJob(job)静态方法会实例化一个JobClient实例,然后用此实例的submitJob(job)方法向 master提交作业。此方法会返回一个RunningJob对象,它用来跟踪作业的状态。作业提交完毕后,JobClient会根据此对象开始轮询作业的进度,直到作业完成。
submitJob(job)内部是通过submitJobInternal(job)方法完成实质性的作业提交。 submitJobInternal(job)方法首先会向hadoop分布系统文件系统hdfs依次上传三个文件: job.jar, job.split和job.xml。
job.xml: 作业配置,例如Mapper, Combiner, Reducer的类型,输入输出格式的类型等。
job.jar: jar包,里面包含了执行此任务需要的各种类,比如 Mapper,Reducer等实现。
job.split: 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。
这三个文件在hdfs上的路径由hadoop-default.xml文件中的mapreduce系统路径mapred.system.dir属性 + jobid决定。mapred.system.dir属性默认是/tmp/hadoop-user_name/mapred/system。写完这三个文件之后, 此方法会通过RPC调用master节点上的JobTracker.submitJob(job)方法,此时作业已经提交完成。关于RPC的细节,后续章节将会阐述。
其中我一点比较疑惑 copy job resources 到 hdfs中 这个hdfs是JobTracker的 还是其他节点的?
1. Configuration conf = new Configuration(); // 读取hadoop配置
2. Job job = new Job(conf, "作业名称"); // 实例化一道作业
。。。。。。
10. // 其它初始化配置
11. JobClient.runJob(job);
第11行的输入参数是Job对象,可在实际的API中并没有JobClient.runJob(Job job)的方法定义;仅仅只有:
public static RunningJob runJob(JobConf job)
而在0.20.1中JobConf已经Deprecated了,那怎么提交job呢?
如果按照之前0.18 中的wordcound来写,则只能用JobConf,很迷茫。。。。
是的
请教一下hadoop中能否指定map和reduce任务运行的节点呢?谢谢
不改hadoop的源代码情况下,是不行的。
继续请教一些问题:)刚看了一下0.19版的这部分相关内容,在0.19版里,ReduceTask的分配并没有用cache,这应该是0.20修改的,对吧。。。然后就是关于分配的,我个人看完感觉就是这样的,一个TaskTracker发来心跳消息,如果这时候TaskTracker还有能力和必要处理MapTask,JobTracker会按照任务优先级,挑选一个Task分配给他。那么这样的话,会不会导致先到先得,全局优化性很差呢??
会的, 可以考虑FairScheduler和CapacityTaskScheduler
我对于MapReduce没有什么实际经验,想请教一下,Task分配这部分对于MapReduce整个框架来说会不会是一个性能的关键点?谢谢。。。
是,这很影响整个系统的并行性。
刚看了一下0.19版的这部分相关内容,在0.19版里,ReduceTask的分配并没有用cache,这应该是0.20修改的,对吧。。。
然后就是关于分配的,我个人看完感觉就是这样的,一个TaskTracker发来心跳消息,如果这时候TaskTracker还有能力和必要处理MapTask,JobTracker会按照任务优先级,挑选一个Task分配给他。那么这样的话,会不会导致先到先得,全局优化性很差呢??
我对于MapReduce没有什么实际经验,想请教一下,Task分配这部分对于MapReduce整个框架来说会不会是一个性能的关键点?谢谢。。。
发表评论
-
抛砖引玉, 淘宝统一离线数据分析平台设计
2011-11-03 22:58 8198把这个拿出来的目的, 是想得到更多的反馈意见, 请邮件至zho ... -
NameNode优化笔记 (二)
2011-01-13 15:03 0事情发生在11月初至11月中旬, 云梯用户不断反映作业运行得慢 ... -
NameNode优化笔记 (一)
2011-01-12 10:32 6190很久没有发博客了, 最 ... -
我在Hadoop云计算会议的演讲
2010-10-26 14:59 9091点击下载演讲稿 由中科院计算所主办的“Hadoop ... -
分布式online与offline设计 slides
2010-08-25 00:24 4198花了两个小时简单了做了一个ppt,给兄弟公司相关人员讲解off ... -
演讲: Hadoop与数据分析
2010-05-29 20:35 7445前些天受金山软件公司西山居朋友的邀请, 去了趟珠海与金山的朋友 ... -
Hadoop的Mapper是怎么从HDFS上读取TextInputFormat数据的
2010-05-29 11:46 7972LineRecordReader.next(LongWri ... -
Anthill: 一种基于MapReduce的分布式DBMS
2010-05-11 22:47 3767MapReduce is a parallel computi ... -
HDFS的追加/刷新/读设计
2010-01-26 00:26 3272hdfs将在0.21版(尚未发布),把DFSOutputStr ... -
TFile, SequenceFile与gz,lzo压缩的测试
2010-01-07 22:47 5020先记一记,以后解释 :) $hadoop jar tf ... -
hive权限控制
2009-09-07 14:35 5094对hive的元数据表结构要作以下调整: hive用户不与表 ... -
avro编译
2009-07-04 00:36 3764avro是doug cutting主持的rpc ... -
Hive的一些问题
2009-06-01 16:51 3835偏激了一点. 总体来说H ... -
hive的编译模块设计
2009-05-22 15:39 3740很少在博客里写翻译的东西, 这次例外. 原文在这儿 . 译文 ... -
HIVE问答, 某天的hadoop群聊天记录
2009-05-07 17:10 10897某天晚上在hadoop群里一时兴起, 回答了一些hive相关的 ... -
暨南大学并行计算实验室MapReduce研究现状
2009-05-04 21:20 51224月份在学校花了半小时做的一个ppt, 内容是我们在应用ha ... -
hadoop上最多到底能放多少个文件?
2009-02-11 18:25 4330这主要取决于NameNode的内存。因为DFS集群运行时,文件 ... -
hadoop改进方面的胡思乱想
2009-02-04 10:57 44331. 我做数据挖掘的时候, 经常需要只对key分组,不必排序。 ... -
hadoop源码分析之MapReduce(二)
2009-01-18 22:14 8599任务的申请、派发与执行 TaskTracker.run() ... -
hadoop源码分析
2008-12-26 15:37 4938blog挺难贴图的, 我已经建了一个开源的项目, 用来存放文 ...
相关推荐
《Hadoop源码分析——MapReduce深度解析》 Hadoop,作为云计算领域的核心组件,以其分布式存储和计算能力,为大数据处理提供了强大的支持。MapReduce是Hadoop的主要计算框架,其设计思想源于Google的论文,旨在解决...
在Hadoop源码分析中,我们能看到这些Google技术的影子,例如Chubby和ZooKeeper,GFS和HDFS,BigTable和HBase,MapReduce和Hadoop。通过对比这些技术,学习者可以更容易地把握Hadoop的设计思路。 Hadoop源码复杂且...
这个"Hadoop源码分析视频下载"提供了一种深入理解Hadoop内部工作原理的途径,这对于开发者、系统管理员以及对大数据技术感兴趣的人来说是非常有价值的。接下来,我们将详细探讨Hadoop的核心组件、其设计哲学、源码...
### Hadoop源码分析知识点概览 #### 一、Hadoop概述与背景 - **Google核心技术**:Hadoop的设计理念很大程度上受到了Google一系列核心技术的影响,包括Google File System (GFS)、BigTable以及MapReduce等。这些...
在Hadoop这个分布式计算框架中,HDFS(Hadoop Distributed File System)和MapReduce是两个核心组件,它们共同构建了大数据处理的基础架构。...对于想要成为Hadoop专家的开发者来说,源码分析是不可或缺的一环。
总的来说,Hadoop源码分析是提升大数据处理技术深度的重要途径,涵盖的内容广泛且深入,包括分布式文件系统的设计原理、并行计算模型的实现、资源管理的优化策略等多个方面。通过学习和研究,你将能够构建起对Hadoop...
《Hadoop MapReduce Cookbook 源码》是一本专注于实战的书籍,旨在帮助读者通过具体的例子深入理解并掌握Hadoop MapReduce技术。MapReduce是大数据处理领域中的核心组件,尤其在处理大规模分布式数据集时,它的重要...
HDFS 是 Hadoop 的核心组件之一,是一个分布式文件系统。HDFS 的主要功能是提供一个高可靠、高可扩展的文件系统,可以存储大量的数据。HDFS 的架构主要包括以下几个部分: * Namenode:负责管理文件系统的命名空间...
《Hadoop源码分析 第一章 Hadoop脚本》 Hadoop是大数据处理领域中的一个核心框架,它为海量数据的存储和计算提供了分布式解决方案。本文将深入剖析Hadoop脚本,带你理解其背后的实现机制,这对于理解Hadoop的工作...
本教程将详细讲解如何使用Java编程语言操作Hadoop的MapReduce来计算整数序列中的最大值和最小值,这对于数据分析和处理任务来说是非常基础且实用的技能。 首先,我们需要理解MapReduce的工作原理。MapReduce是一种...
本资源"**Hadoop源码分析.rar**"包含了丰富的资料,旨在帮助学习者更深入地了解Hadoop的工作原理和实现细节。 **MapReduce**是Hadoop的核心计算模型,由两个主要阶段组成:Map阶段和Reduce阶段。Map阶段将输入数据...
在Hadoop框架中,MapReduce是一种分布式计算模型,用于处理和生成大数据集。WordCount是MapReduce中的一个经典示例,它演示了如何利用该框架进行数据处理。在这个例子中,我们将深入理解Hadoop MapReduce的工作原理...
在深入探讨Hadoop源码分析之前,我们先理解Hadoop的核心概念。Hadoop是一个开源的分布式计算框架,由Apache基金会开发,主要用于处理和存储大规模数据。它的主要组件包括HDFS(Hadoop Distributed File System)和...
Hadoop分析气象数据完整版源代码(含Hadoop的MapReduce代码和SSM框架) 《分布式》布置了一道小作业,这是作业的所有代码,里面包含了Hadoop的MapReduce代码、和SSM框架显示数据的代码
在Hadoop生态系统中,MapReduce是一种分布式计算框架,它允许用户编写并运行处理大量数据的程序。这个"mapred.zip"文件显然包含了与Hadoop MapReduce相关的测试样例、文档和源码,这对于理解MapReduce的工作原理以及...
最后,《Hadoop源码分析(完整版)》则面向中高级用户,提供了一次深入了解Hadoop内部工作机制的机会。通过源码分析,读者可以理解Hadoop各个组件的设计思想和实现细节,包括数据分片、容错机制、网络通信和数据压缩...
Hadoop 源码分析 HDFS 数据流 Hadoop 的 HDFS(Hadoop Distributed File System)是 Hadoop 项目中最核心的组件之一,它提供了高可靠、高-performance 的分布式文件系统。HDFS 的核心组件包括 Namenode、Datanode、...