`
coderplay
  • 浏览: 576894 次
  • 性别: Icon_minigender_1
  • 来自: 广州杭州
社区版块
存档分类
最新评论

hadoop源码分析之MapReduce(二)

阅读更多

 

任务的申请、派发与执行

TaskTracker.run() 连接JobTracker

TaskTracker 的启动过程会初始化一系列参数和服务(另有单独的一节介绍),然后尝试连接JobTracker 服务(即必须实现InterTrackerProtocol 接口),如果连接断开,则会循环尝试连接JobTracker ,并重新初始化所有成员和参数,此过程参见run() 方法。

TaskTracker.offerService() 主循环

如果连接JobTracker 服务成功,TaskTracker 就会调用offerService() 函数进入主执行循环中。这个循环会每隔10 秒与JobTracker 通讯一次,调用transmitHeartBeat() 获得HeartbeatResponse 信息。然后调用HeartbeatResponsegetActions() 函数获得JobTracker 传过来的所有指令即一个TaskTrackerAction 数组。再遍历这个数组,如果是一个新任务指令即LaunchTaskAction 则调用startNewTask() 函数执行新任务,

如果是 CommitTaskAction

否则加入到tasksToCleanup 队列,交给一个taskCleanupThread 线程来处理,如执行KillJobAction 或者KillTaskAction 等。

TaskTracker.transmitHeartBeat() 获取JobTracker 指令

transmitHeartBeat() 函数处理中,TaskTracker 会创建一个新的TaskTrackerStatus 对象记录目前任务的执行状况,然后通过IPC 接口调用JobTrackerheartbeat() 方法发送过去,并接受新的指令,即返回值TaskTrackerAction 数组。在这个调用之前,TaskTracker 会先检查目前执行的Task 数目以及本地磁盘的空间使用情况等,如果可以接收新的Task 则设置heartbeat()askForNewTask 参数为true 。操作成功后再更新相关的统计信息等。

JobTracker 调度作业第二步:派发任务

JobTracker 接到TaskTrackerheartbeat() 调用后,首先会检查上一个心跳响应是否完成,是没要求启动或重启任务,如果一切正常,则会处理心跳。JobTracker 会使用它的调度器taskScheduler 来组装任务到一个任务列表tasks 中。具体实现在taskSchedulerassignTasks() 方法。得到tasks 的数据后,把这些任务封装在一些LanuchTaskAction 中,发回给TaskTracker ,让它去执行任务。此时JobTrackerhearbeat() 结束派发任务。

下面简单分析下 hadoop 默认的作业调度器 JobQueueTaskScheduler 怎么实现以上所说的 assignTasks() 方法。首先它会检查 TaskTracker 端还可以做多少个 map reduce 任务,将要派发的任务数是否超出这个数,是否超出集群的任务平均剩余可负载数。如果都没超出,则为此 TaskTracker 分配一个 MapTask ReduceTask 。产生 Map 任务使用 JobInProgress obtainNewMapTask() 方法,实质上最后调用了 JobInProgress findNewMapTask() 访问 nonRunningMapCache 得到 Map 任务的 TaskInProgress ;而产生 Reduce 任务使用 JobInProgress.obtainNewReduceTask() 方法,实质上最后调用了 JobInProgress findNewReduceTask() 访问 nonRuningReduceCache 得到 Reduce 任务的 TaskInProgress

 

6
0
分享到:
评论
13 楼 ski_jugg 2012-03-11  
你好,有一个问题很困扰我。
JobTracker处理心跳采用JobQueueTaskScheduler调度机制给TT分配map任务时,会执行很多次obtainNewLocalMapTask和一次obtainNewNonLocalMapTask。最终使用的都是JobInProgress的findNewMapTask方法,只是传递的level不一样。
可是我不知道为什么obtainNewLocalMapTask会传递maxlevel这个值,而且maxlevel=NetworkTopology.DEFAULT_HOST_LEVEL=2。我很是不理解。
1·难道说obtainNewLocalMapTask只是会从近到远寻找任务,而不见得一定要是本地嘛?
2·maxlevel这个值是代表集群的级别数吗?为什么是2?
12 楼 ytulgr 2011-03-21  
是每当TrakTracker发送heartbeat到TaskTracker才触发任务分配吗?这样是否可认为按照TaskTracker发送heartbeat的先后顺序得到任务执行?
11 楼 riddle_chen 2009-05-05  
jiwenke 写道

coderplay 写道sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.是我没有写清楚,我想控制总的mapper任务数,比如生成5个任务,每台机器跑一个这样的需求。我同意这是由splits决定的,所以这个问题就转换为怎样生成splits的问题,我看了代码,发现splits的生成个数是和文件的个数有关的 - 如果每个文件小于block size,那就应该splits的个数等于文件个数。这样的话,如果我想控制总的任务数,就需要有那么多个文件,而我想只用一个文件。不知道现在的hadoop里面是不是支持。还有,我看到你的分析,在mapred中,TaskTracker通过heartbeat得到任务指令,然后去startNewTask(),但这里面我就迷失了.从后面往前推,maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?期待您的指点!谢谢!

JobConf.setNumMapTasks(n)是有意义的,结合block size会具体影响到map任务的个数,详见FileInputFormat.getSplits源码。假设没有设置mapred.min.split.size,缺省为1的情况下,针对每个文件会按照min (totalsize[所有文件总大小]/mapnum[jobconf设置的mapnum], blocksize)为大小来拆分,并不是说文件小于block size就不去拆分。
10 楼 coderplay 2009-02-26  
gonggaosheng 写道

你好,我想请问一下有没有什么方法可以调试hadoop呢?现在我可以使用eclipse或者
jdb调试一个mapreduce任务,比如wordcount。但问题是我现在还不能调试JobTracker任务,比如说是我在运行wordcount例子的时候想要调试JobTracker,
看看它具体的执行情况,有没有什么好的建议呢?非常感谢!

请见
http://wiki.apache.org/hadoop/HowToDebugMapReducePrograms
9 楼 gonggaosheng 2009-02-25  
你好,我想请问一下有没有什么方法可以调试hadoop呢?现在我可以使用eclipse或者
jdb调试一个mapreduce任务,比如wordcount。但问题是我现在还不能调试JobTracker任务,比如说是我在运行wordcount例子的时候想要调试JobTracker,
看看它具体的执行情况,有没有什么好的建议呢?非常感谢!
8 楼 jiwenke 2009-02-17  
coderplay 写道

jiwenke 写道maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?TaskRunner.java中的280行上下.

看到了,谢谢!
看到TaskLauncher的run调用了startNewTasks,但在哪里调用TaskLauncher的run?
在TaskTracker中:
      
if (actions != null){ 
          for(TaskTrackerAction action: actions) {
            if (action instanceof LaunchTaskAction) {
              addToTaskQueue((LaunchTaskAction)action);
            } else if (action instanceof CommitTaskAction) {
              CommitTaskAction commitAction = (CommitTaskAction)action;
              if (!commitResponses.contains(commitAction.getTaskID())) {
                LOG.info("Received commit task action for " + 
                          commitAction.getTaskID());
                commitResponses.add(commitAction.getTaskID());
              }

但到addToTaskQueue没有看到调用TaskLauncher的run啊?
7 楼 coderplay 2009-02-17  
jiwenke 写道

maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?

TaskRunner.java中的280行上下.
6 楼 jiwenke 2009-02-17  
coderplay 写道

sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.

是我没有写清楚,我想控制总的mapper任务数,比如生成5个任务,每台机器跑一个这样的需求。我同意这是由splits决定的,所以这个问题就转换为怎样生成splits的问题,我看了代码,发现splits的生成个数是和文件的个数有关的 - 如果每个文件小于block size,那就应该splits的个数等于文件个数。这样的话,如果我想控制总的任务数,就需要有那么多个文件,而我想只用一个文件。不知道现在的hadoop里面是不是支持。
还有,我看到你的分析,在mapred中,TaskTracker通过heartbeat得到任务指令,然后去startNewTask(),但这里面我就迷失了.
从后面往前推,maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?
期待您的指点!谢谢!

5 楼 coderplay 2009-02-17  
jiwenke 写道

太感谢你的分析了!我在使用的时候遇到一个关于任务切分的问题,应为我的需求比较简单,只需要控制并行的任务数(处理的数据和控制是分离的),所以我使用了NLineInputFormat,然后写了一个通知文件来给mapper配参数。按我的理解,应该是在该文件中的一行输入对应一个可以在一个node里执行的任务(我参考的是源码的注释)* In many "pleasantly" parallel applications, each process/mapper  * processes the same input file (s), but with computations are  * controlled by different parameters.(Referred to as "parameter sweeps"). * One way to achieve this, is to specify a set of parameters  * (one set per line) as input in a control file  * (which is the input path to the map-reduce application, * where as the input dataset is specified  * via a config variable in JobConf.). *  * The NLineInputFormat can be used in such applications, that splits  * the input file such that by default, one line is fed as * a value to one map task, and key is the offset. * i.e. (k,v) is (LongWritable, Text). * The location hints will span the whole mapred cluster.但很奇怪的是只生成了一个任务,这个任务里有几个mapper - 而不是几个任务。而后来我发现,生成的任务和splits有关,而splits的生成个数在现在的hadoop里似乎只和input里面的文件个数有关。我想用一个简单的输入文件就能控制任务的生成,不知道按现在的hadoop实现能不能完成?非常感谢你的指教!!谢谢!

sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.
至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.
4 楼 jiwenke 2009-02-16  
太感谢你的分析了!
我在使用的时候遇到一个关于任务切分的问题,应为我的需求比较简单,只需要控制并行的任务数(处理的数据和控制是分离的),所以我使用了NLineInputFormat,然后写了一个通知文件来给mapper配参数。
按我的理解,应该是在该文件中的一行输入对应一个可以在一个node里执行的任务(我参考的是源码的注释)
* In many "pleasantly" parallel applications, each process/mapper
* processes the same input file (s), but with computations are
* controlled by different parameters.(Referred to as "parameter sweeps").
* One way to achieve this, is to specify a set of parameters
* (one set per line) as input in a control file
* (which is the input path to the map-reduce application,
* where as the input dataset is specified
* via a config variable in JobConf.).
*
* The NLineInputFormat can be used in such applications, that splits
* the input file such that by default, one line is fed as
* a value to one map task, and key is the offset.
* i.e. (k,v) is (LongWritable, Text).
* The location hints will span the whole mapred cluster.
但很奇怪的是只生成了一个任务,这个任务里有几个mapper - 而不是几个任务。而后来我发现,生成的任务和splits有关,而splits的生成个数在现在的hadoop里似乎只和input里面的文件个数有关。我想用一个简单的输入文件就能控制任务的生成,不知道按现在的hadoop实现能不能完成?非常感谢你的指教!!谢谢!
3 楼 diddyrock 2009-01-21  
谢谢高人!我争取能把原来的一些程序升级到0.19哈,偷懒了,没去看源码,老版本里面好像连FileInputFormat都没有,希望早点出到1.0版本,那样以后维护就方便了
2 楼 coderplay 2009-01-21  
diddyrock 写道

高人,问你一个问题啊,现在0.17的wiki里面说:通常,JobConf会指明Mapper、Combiner(如果有的话)、 Partitioner、Reducer、InputFormat和 OutputFormat的具体实现。JobConf还能指定一组输入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及输出文件应该写在哪儿 (setOutputPath(Path))。我的问题是0.17里面这个setInputPaths,addInputPath怎么调用啊,老版本的0.12里面是直接通过jobConf.addInputPaths,但是0.17里面这个API没有了~谢啦

FileInputFormat.addInputPath(jobconf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobconf, new Path(args[1]));

另外提醒你JobConf将要被弃用了, Mapper, Reducer等再也不是接口了,而是一个抽象类。OutputCollector和Reporter等都要合进Context里面了。变化挺大的。
1 楼 diddyrock 2009-01-21  
高人,问你一个问题啊,现在0.17的wiki里面说:通常,JobConf会指明Mapper、Combiner(如果有的话)、 Partitioner、Reducer、InputFormat和 OutputFormat的具体实现。JobConf还能指定一组输入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及输出文件应该写在哪儿 (setOutputPath(Path))。
我的问题是0.17里面这个setInputPaths,addInputPath怎么调用啊,老版本的0.12里面是直接通过jobConf.addInputPaths,但是0.17里面这个API没有了~谢啦

相关推荐

    hadoop源码分析-mapreduce部分.doc

    《Hadoop源码分析——MapReduce深度解析》 Hadoop,作为云计算领域的核心组件,以其分布式存储和计算能力,为大数据处理提供了强大的支持。MapReduce是Hadoop的主要计算框架,其设计思想源于Google的论文,旨在解决...

    Hadoop源码分析(完整版)

    在Hadoop源码分析中,我们能看到这些Google技术的影子,例如Chubby和ZooKeeper,GFS和HDFS,BigTable和HBase,MapReduce和Hadoop。通过对比这些技术,学习者可以更容易地把握Hadoop的设计思路。 Hadoop源码复杂且...

    Hadoop源码分析视频下载

    这个"Hadoop源码分析视频下载"提供了一种深入理解Hadoop内部工作原理的途径,这对于开发者、系统管理员以及对大数据技术感兴趣的人来说是非常有价值的。接下来,我们将详细探讨Hadoop的核心组件、其设计哲学、源码...

    Hadoop源码分析 完整版 共55章

    ### Hadoop源码分析知识点概览 #### 一、Hadoop概述与背景 - **Google核心技术**:Hadoop的设计理念很大程度上受到了Google一系列核心技术的影响,包括Google File System (GFS)、BigTable以及MapReduce等。这些...

    hadoop源码分析-HDFS&MapReduce

    **MapReduce源码分析** 1. **JobTracker与TaskTracker**:MapReduce的作业调度和任务执行由JobTracker和TaskTracker完成。JobTracker负责分配任务,监控任务状态,而TaskTracker在工作节点上执行任务。源码分析可...

    Hadoop源码分析完整版

    总的来说,Hadoop源码分析是提升大数据处理技术深度的重要途径,涵盖的内容广泛且深入,包括分布式文件系统的设计原理、并行计算模型的实现、资源管理的优化策略等多个方面。通过学习和研究,你将能够构建起对Hadoop...

    Hadoop MapReduce Cookbook 源码

    《Hadoop MapReduce Cookbook 源码》是一本专注于实战的书籍,旨在帮助读者通过具体的例子深入理解并掌握Hadoop MapReduce技术。MapReduce是大数据处理领域中的核心组件,尤其在处理大规模分布式数据集时,它的重要...

    Hadoop源代码分析(完整版).pdf

    Hadoop 的源代码分析可以分为三个部分:HDFS、MapReduce 和其他组件。 HDFS HDFS 是 Hadoop 的核心组件之一,是一个分布式文件系统。HDFS 的主要功能是提供一个高可靠、高可扩展的文件系统,可以存储大量的数据。...

    Hadoop实战+Hadoop权威指南(第二版)+Hadoop源码分析(完整版)_PDF文件

    这个压缩包包含三本关于Hadoop的重要书籍:《Hadoop实战》、《Hadoop权威指南(第二版)》和《Hadoop源码分析(完整版)》,它们涵盖了从基础到深入的所有关键知识点,对于不同阶段的Hadoop学习者都极具价值。...

    Hadoop源码分析.rar

    本资源"**Hadoop源码分析.rar**"包含了丰富的资料,旨在帮助学习者更深入地了解Hadoop的工作原理和实现细节。 **MapReduce**是Hadoop的核心计算模型,由两个主要阶段组成:Map阶段和Reduce阶段。Map阶段将输入数据...

    Hadoop源码分析

    在深入探讨Hadoop源码分析之前,我们先理解Hadoop的核心概念。Hadoop是一个开源的分布式计算框架,由Apache基金会开发,主要用于处理和存储大规模数据。它的主要组件包括HDFS(Hadoop Distributed File System)和...

    Hadoop源码分析 第一章 Hadoop脚本

    《Hadoop源码分析 第一章 Hadoop脚本》 Hadoop是大数据处理领域中的一个核心框架,它为海量数据的存储和计算提供了分布式解决方案。本文将深入剖析Hadoop脚本,带你理解其背后的实现机制,这对于理解Hadoop的工作...

    hadoop 框架下 mapreduce源码例子 wordcount

    在这个例子中,我们将深入理解Hadoop MapReduce的工作原理以及如何在Eclipse环境下实现WordCount源码。 1. **Hadoop MapReduce概述**: Hadoop MapReduce是由两个主要部分组成的:Map阶段和Reduce阶段。Map阶段将...

    Hadoop分析气象数据完整版源代码(含Hadoop的MapReduce代码和SSM框架)

    Hadoop分析气象数据完整版源代码(含Hadoop的MapReduce代码和SSM框架) 《分布式》布置了一道小作业,这是作业的所有代码,里面包含了Hadoop的MapReduce代码、和SSM框架显示数据的代码

    java操作hadoop之mapreduce计算整数的最大值和最小值实战源码

    本教程将详细讲解如何使用Java编程语言操作Hadoop的MapReduce来计算整数序列中的最大值和最小值,这对于数据分析和处理任务来说是非常基础且实用的技能。 首先,我们需要理解MapReduce的工作原理。MapReduce是一种...

    mapred.zip_hadoop_hadoop mapreduce_mapReduce

    这个"mapred.zip"文件显然包含了与Hadoop MapReduce相关的测试样例、文档和源码,这对于理解MapReduce的工作原理以及进行实际开发是非常宝贵的资源。 MapReduce的核心理念是将大规模数据处理任务分解为两个主要阶段...

    MapReduce 2.0源码分析与编程实

    全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce2.0的框架结构和源码分析,MapReduce2.0的配置与测试,MapReduce2.0运行流程,MapReduce2.0高级程序设计以及相关特性等内容。《MapReduce...

Global site tag (gtag.js) - Google Analytics