- 浏览: 578027 次
- 性别:
- 来自: 广州杭州
文章分类
最新评论
-
bohc:
谢谢,搞了两天了,现在才算是找到问题所在,解决了。
文件在使用FileChannel.map后不能被删除(Windows上) -
zhang0000jun:
在jdk1.8中执行正好和楼主的结果相反,请指教
从Java视角理解CPU缓存(CPU Cache) -
在世界的中心呼喚愛:
forenroll 写道请问楼主的那个分析工具cachemis ...
从Java视角理解CPU缓存(CPU Cache) -
xgj1988:
我这里打出的结果是: 0 L1-dcache-load-mis ...
从Java视角理解CPU缓存(CPU Cache) -
thebye85:
请教下大神,为什么频繁的park会导致大量context sw ...
从Java视角理解CPU上下文切换(Context Switch)
任务的申请、派发与执行
TaskTracker.run() 连接JobTracker
TaskTracker 的启动过程会初始化一系列参数和服务(另有单独的一节介绍),然后尝试连接JobTracker 服务(即必须实现InterTrackerProtocol 接口),如果连接断开,则会循环尝试连接JobTracker ,并重新初始化所有成员和参数,此过程参见run() 方法。
TaskTracker.offerService() 主循环
如果连接JobTracker 服务成功,TaskTracker 就会调用offerService() 函数进入主执行循环中。这个循环会每隔10 秒与JobTracker 通讯一次,调用transmitHeartBeat() 获得HeartbeatResponse 信息。然后调用HeartbeatResponse 的getActions() 函数获得JobTracker 传过来的所有指令即一个TaskTrackerAction 数组。再遍历这个数组,如果是一个新任务指令即LaunchTaskAction 则调用startNewTask() 函数执行新任务,
如果是 CommitTaskAction
否则加入到tasksToCleanup 队列,交给一个taskCleanupThread 线程来处理,如执行KillJobAction 或者KillTaskAction 等。
TaskTracker.transmitHeartBeat() 获取JobTracker 指令
在transmitHeartBeat() 函数处理中,TaskTracker 会创建一个新的TaskTrackerStatus 对象记录目前任务的执行状况,然后通过IPC 接口调用JobTracker 的heartbeat() 方法发送过去,并接受新的指令,即返回值TaskTrackerAction 数组。在这个调用之前,TaskTracker 会先检查目前执行的Task 数目以及本地磁盘的空间使用情况等,如果可以接收新的Task 则设置heartbeat() 的askForNewTask 参数为true 。操作成功后再更新相关的统计信息等。
JobTracker 调度作业第二步:派发任务
JobTracker 接到TaskTracker 的heartbeat() 调用后,首先会检查上一个心跳响应是否完成,是没要求启动或重启任务,如果一切正常,则会处理心跳。JobTracker 会使用它的调度器taskScheduler 来组装任务到一个任务列表tasks 中。具体实现在taskScheduler 的assignTasks() 方法。得到tasks 的数据后,把这些任务封装在一些LanuchTaskAction 中,发回给TaskTracker ,让它去执行任务。此时JobTracker 的hearbeat() 结束派发任务。
下面简单分析下 hadoop 默认的作业调度器 JobQueueTaskScheduler 怎么实现以上所说的 assignTasks() 方法。首先它会检查 TaskTracker 端还可以做多少个 map 和 reduce 任务,将要派发的任务数是否超出这个数,是否超出集群的任务平均剩余可负载数。如果都没超出,则为此 TaskTracker 分配一个 MapTask 或 ReduceTask 。产生 Map 任务使用 JobInProgress 的 obtainNewMapTask() 方法,实质上最后调用了 JobInProgress 的 findNewMapTask() 访问 nonRunningMapCache 得到 Map 任务的 TaskInProgress ;而产生 Reduce 任务使用 JobInProgress.obtainNewReduceTask() 方法,实质上最后调用了 JobInProgress 的 findNewReduceTask() 访问 nonRuningReduceCache 得到 Reduce 任务的 TaskInProgress 。
评论
JobTracker处理心跳采用JobQueueTaskScheduler调度机制给TT分配map任务时,会执行很多次obtainNewLocalMapTask和一次obtainNewNonLocalMapTask。最终使用的都是JobInProgress的findNewMapTask方法,只是传递的level不一样。
可是我不知道为什么obtainNewLocalMapTask会传递maxlevel这个值,而且maxlevel=NetworkTopology.DEFAULT_HOST_LEVEL=2。我很是不理解。
1·难道说obtainNewLocalMapTask只是会从近到远寻找任务,而不见得一定要是本地嘛?
2·maxlevel这个值是代表集群的级别数吗?为什么是2?
coderplay 写道sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.是我没有写清楚,我想控制总的mapper任务数,比如生成5个任务,每台机器跑一个这样的需求。我同意这是由splits决定的,所以这个问题就转换为怎样生成splits的问题,我看了代码,发现splits的生成个数是和文件的个数有关的 - 如果每个文件小于block size,那就应该splits的个数等于文件个数。这样的话,如果我想控制总的任务数,就需要有那么多个文件,而我想只用一个文件。不知道现在的hadoop里面是不是支持。还有,我看到你的分析,在mapred中,TaskTracker通过heartbeat得到任务指令,然后去startNewTask(),但这里面我就迷失了.从后面往前推,maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?期待您的指点!谢谢!
JobConf.setNumMapTasks(n)是有意义的,结合block size会具体影响到map任务的个数,详见FileInputFormat.getSplits源码。假设没有设置mapred.min.split.size,缺省为1的情况下,针对每个文件会按照min (totalsize[所有文件总大小]/mapnum[jobconf设置的mapnum], blocksize)为大小来拆分,并不是说文件小于block size就不去拆分。
你好,我想请问一下有没有什么方法可以调试hadoop呢?现在我可以使用eclipse或者
jdb调试一个mapreduce任务,比如wordcount。但问题是我现在还不能调试JobTracker任务,比如说是我在运行wordcount例子的时候想要调试JobTracker,
看看它具体的执行情况,有没有什么好的建议呢?非常感谢!
请见
http://wiki.apache.org/hadoop/HowToDebugMapReducePrograms
jdb调试一个mapreduce任务,比如wordcount。但问题是我现在还不能调试JobTracker任务,比如说是我在运行wordcount例子的时候想要调试JobTracker,
看看它具体的执行情况,有没有什么好的建议呢?非常感谢!
jiwenke 写道maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?TaskRunner.java中的280行上下.
看到了,谢谢!
看到TaskLauncher的run调用了startNewTasks,但在哪里调用TaskLauncher的run?
在TaskTracker中:
if (actions != null){ for(TaskTrackerAction action: actions) { if (action instanceof LaunchTaskAction) { addToTaskQueue((LaunchTaskAction)action); } else if (action instanceof CommitTaskAction) { CommitTaskAction commitAction = (CommitTaskAction)action; if (!commitResponses.contains(commitAction.getTaskID())) { LOG.info("Received commit task action for " + commitAction.getTaskID()); commitResponses.add(commitAction.getTaskID()); }
但到addToTaskQueue没有看到调用TaskLauncher的run啊?
maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?
TaskRunner.java中的280行上下.
sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.
是我没有写清楚,我想控制总的mapper任务数,比如生成5个任务,每台机器跑一个这样的需求。我同意这是由splits决定的,所以这个问题就转换为怎样生成splits的问题,我看了代码,发现splits的生成个数是和文件的个数有关的 - 如果每个文件小于block size,那就应该splits的个数等于文件个数。这样的话,如果我想控制总的任务数,就需要有那么多个文件,而我想只用一个文件。不知道现在的hadoop里面是不是支持。
还有,我看到你的分析,在mapred中,TaskTracker通过heartbeat得到任务指令,然后去startNewTask(),但这里面我就迷失了.
从后面往前推,maptask的启动是在Child.run()中启动的,而Child.run()的启动似乎是在JVMManager中启动的,但我就找不到startNewTask()是怎样启动JVMManager的?
期待您的指点!谢谢!
太感谢你的分析了!我在使用的时候遇到一个关于任务切分的问题,应为我的需求比较简单,只需要控制并行的任务数(处理的数据和控制是分离的),所以我使用了NLineInputFormat,然后写了一个通知文件来给mapper配参数。按我的理解,应该是在该文件中的一行输入对应一个可以在一个node里执行的任务(我参考的是源码的注释)* In many "pleasantly" parallel applications, each process/mapper * processes the same input file (s), but with computations are * controlled by different parameters.(Referred to as "parameter sweeps"). * One way to achieve this, is to specify a set of parameters * (one set per line) as input in a control file * (which is the input path to the map-reduce application, * where as the input dataset is specified * via a config variable in JobConf.). * * The NLineInputFormat can be used in such applications, that splits * the input file such that by default, one line is fed as * a value to one map task, and key is the offset. * i.e. (k,v) is (LongWritable, Text). * The location hints will span the whole mapred cluster.但很奇怪的是只生成了一个任务,这个任务里有几个mapper - 而不是几个任务。而后来我发现,生成的任务和splits有关,而splits的生成个数在现在的hadoop里似乎只和input里面的文件个数有关。我想用一个简单的输入文件就能控制任务的生成,不知道按现在的hadoop实现能不能完成?非常感谢你的指教!!谢谢!
sorry,理解能力太差, 我不是很明白你的意思.如果你是需要控制单个节点同时在执行的的mapper/reducer数.你可以通过修改slave节点配置中的mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum.
至于总的mapper任务数,是由splits决定的,当然可以通过JobConf.setNumMapTasks(n)来增大它,但没多大意义.
我在使用的时候遇到一个关于任务切分的问题,应为我的需求比较简单,只需要控制并行的任务数(处理的数据和控制是分离的),所以我使用了NLineInputFormat,然后写了一个通知文件来给mapper配参数。
按我的理解,应该是在该文件中的一行输入对应一个可以在一个node里执行的任务(我参考的是源码的注释)
* In many "pleasantly" parallel applications, each process/mapper
* processes the same input file (s), but with computations are
* controlled by different parameters.(Referred to as "parameter sweeps").
* One way to achieve this, is to specify a set of parameters
* (one set per line) as input in a control file
* (which is the input path to the map-reduce application,
* where as the input dataset is specified
* via a config variable in JobConf.).
*
* The NLineInputFormat can be used in such applications, that splits
* the input file such that by default, one line is fed as
* a value to one map task, and key is the offset.
* i.e. (k,v) is (LongWritable, Text).
* The location hints will span the whole mapred cluster.
但很奇怪的是只生成了一个任务,这个任务里有几个mapper - 而不是几个任务。而后来我发现,生成的任务和splits有关,而splits的生成个数在现在的hadoop里似乎只和input里面的文件个数有关。我想用一个简单的输入文件就能控制任务的生成,不知道按现在的hadoop实现能不能完成?非常感谢你的指教!!谢谢!
高人,问你一个问题啊,现在0.17的wiki里面说:通常,JobConf会指明Mapper、Combiner(如果有的话)、 Partitioner、Reducer、InputFormat和 OutputFormat的具体实现。JobConf还能指定一组输入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及输出文件应该写在哪儿 (setOutputPath(Path))。我的问题是0.17里面这个setInputPaths,addInputPath怎么调用啊,老版本的0.12里面是直接通过jobConf.addInputPaths,但是0.17里面这个API没有了~谢啦
FileInputFormat.addInputPath(jobconf, new Path(args[0]));
FileOutputFormat.setOutputPath(jobconf, new Path(args[1]));
另外提醒你JobConf将要被弃用了, Mapper, Reducer等再也不是接口了,而是一个抽象类。OutputCollector和Reporter等都要合进Context里面了。变化挺大的。
我的问题是0.17里面这个setInputPaths,addInputPath怎么调用啊,老版本的0.12里面是直接通过jobConf.addInputPaths,但是0.17里面这个API没有了~谢啦
发表评论
-
抛砖引玉, 淘宝统一离线数据分析平台设计
2011-11-03 22:58 8206把这个拿出来的目的, 是想得到更多的反馈意见, 请邮件至zho ... -
NameNode优化笔记 (二)
2011-01-13 15:03 0事情发生在11月初至11月中旬, 云梯用户不断反映作业运行得慢 ... -
NameNode优化笔记 (一)
2011-01-12 10:32 6199很久没有发博客了, 最 ... -
我在Hadoop云计算会议的演讲
2010-10-26 14:59 9116点击下载演讲稿 由中科院计算所主办的“Hadoop ... -
分布式online与offline设计 slides
2010-08-25 00:24 4224花了两个小时简单了做了一个ppt,给兄弟公司相关人员讲解off ... -
演讲: Hadoop与数据分析
2010-05-29 20:35 7459前些天受金山软件公司西山居朋友的邀请, 去了趟珠海与金山的朋友 ... -
Hadoop的Mapper是怎么从HDFS上读取TextInputFormat数据的
2010-05-29 11:46 7979LineRecordReader.next(LongWri ... -
Anthill: 一种基于MapReduce的分布式DBMS
2010-05-11 22:47 3777MapReduce is a parallel computi ... -
HDFS的追加/刷新/读设计
2010-01-26 00:26 3281hdfs将在0.21版(尚未发布),把DFSOutputStr ... -
TFile, SequenceFile与gz,lzo压缩的测试
2010-01-07 22:47 5030先记一记,以后解释 :) $hadoop jar tf ... -
hive权限控制
2009-09-07 14:35 5126对hive的元数据表结构要作以下调整: hive用户不与表 ... -
avro编译
2009-07-04 00:36 3774avro是doug cutting主持的rpc ... -
Hive的一些问题
2009-06-01 16:51 3842偏激了一点. 总体来说H ... -
hive的编译模块设计
2009-05-22 15:39 3748很少在博客里写翻译的东西, 这次例外. 原文在这儿 . 译文 ... -
HIVE问答, 某天的hadoop群聊天记录
2009-05-07 17:10 10903某天晚上在hadoop群里一时兴起, 回答了一些hive相关的 ... -
暨南大学并行计算实验室MapReduce研究现状
2009-05-04 21:20 51424月份在学校花了半小时做的一个ppt, 内容是我们在应用ha ... -
hadoop上最多到底能放多少个文件?
2009-02-11 18:25 4346这主要取决于NameNode的内存。因为DFS集群运行时,文件 ... -
hadoop改进方面的胡思乱想
2009-02-04 10:57 44401. 我做数据挖掘的时候, 经常需要只对key分组,不必排序。 ... -
hadoop源码分析
2008-12-26 15:37 4950blog挺难贴图的, 我已经建了一个开源的项目, 用来存放文 ... -
hadoop源码分析之MapReduce(一)
2008-12-16 13:08 19361hadoop的源码已 ...
相关推荐
《Hadoop源码分析——MapReduce深度解析》 Hadoop,作为云计算领域的核心组件,以其分布式存储和计算能力,为大数据处理提供了强大的支持。MapReduce是Hadoop的主要计算框架,其设计思想源于Google的论文,旨在解决...
在Hadoop源码分析中,我们能看到这些Google技术的影子,例如Chubby和ZooKeeper,GFS和HDFS,BigTable和HBase,MapReduce和Hadoop。通过对比这些技术,学习者可以更容易地把握Hadoop的设计思路。 Hadoop源码复杂且...
这个"Hadoop源码分析视频下载"提供了一种深入理解Hadoop内部工作原理的途径,这对于开发者、系统管理员以及对大数据技术感兴趣的人来说是非常有价值的。接下来,我们将详细探讨Hadoop的核心组件、其设计哲学、源码...
### Hadoop源码分析知识点概览 #### 一、Hadoop概述与背景 - **Google核心技术**:Hadoop的设计理念很大程度上受到了Google一系列核心技术的影响,包括Google File System (GFS)、BigTable以及MapReduce等。这些...
**MapReduce源码分析** 1. **JobTracker与TaskTracker**:MapReduce的作业调度和任务执行由JobTracker和TaskTracker完成。JobTracker负责分配任务,监控任务状态,而TaskTracker在工作节点上执行任务。源码分析可...
总的来说,Hadoop源码分析是提升大数据处理技术深度的重要途径,涵盖的内容广泛且深入,包括分布式文件系统的设计原理、并行计算模型的实现、资源管理的优化策略等多个方面。通过学习和研究,你将能够构建起对Hadoop...
《Hadoop MapReduce Cookbook 源码》是一本专注于实战的书籍,旨在帮助读者通过具体的例子深入理解并掌握Hadoop MapReduce技术。MapReduce是大数据处理领域中的核心组件,尤其在处理大规模分布式数据集时,它的重要...
Hadoop 的源代码分析可以分为三个部分:HDFS、MapReduce 和其他组件。 HDFS HDFS 是 Hadoop 的核心组件之一,是一个分布式文件系统。HDFS 的主要功能是提供一个高可靠、高可扩展的文件系统,可以存储大量的数据。...
这个压缩包包含三本关于Hadoop的重要书籍:《Hadoop实战》、《Hadoop权威指南(第二版)》和《Hadoop源码分析(完整版)》,它们涵盖了从基础到深入的所有关键知识点,对于不同阶段的Hadoop学习者都极具价值。...
本资源"**Hadoop源码分析.rar**"包含了丰富的资料,旨在帮助学习者更深入地了解Hadoop的工作原理和实现细节。 **MapReduce**是Hadoop的核心计算模型,由两个主要阶段组成:Map阶段和Reduce阶段。Map阶段将输入数据...
在深入探讨Hadoop源码分析之前,我们先理解Hadoop的核心概念。Hadoop是一个开源的分布式计算框架,由Apache基金会开发,主要用于处理和存储大规模数据。它的主要组件包括HDFS(Hadoop Distributed File System)和...
《Hadoop源码分析 第一章 Hadoop脚本》 Hadoop是大数据处理领域中的一个核心框架,它为海量数据的存储和计算提供了分布式解决方案。本文将深入剖析Hadoop脚本,带你理解其背后的实现机制,这对于理解Hadoop的工作...
在这个例子中,我们将深入理解Hadoop MapReduce的工作原理以及如何在Eclipse环境下实现WordCount源码。 1. **Hadoop MapReduce概述**: Hadoop MapReduce是由两个主要部分组成的:Map阶段和Reduce阶段。Map阶段将...
Hadoop分析气象数据完整版源代码(含Hadoop的MapReduce代码和SSM框架) 《分布式》布置了一道小作业,这是作业的所有代码,里面包含了Hadoop的MapReduce代码、和SSM框架显示数据的代码
本教程将详细讲解如何使用Java编程语言操作Hadoop的MapReduce来计算整数序列中的最大值和最小值,这对于数据分析和处理任务来说是非常基础且实用的技能。 首先,我们需要理解MapReduce的工作原理。MapReduce是一种...
这个"mapred.zip"文件显然包含了与Hadoop MapReduce相关的测试样例、文档和源码,这对于理解MapReduce的工作原理以及进行实际开发是非常宝贵的资源。 MapReduce的核心理念是将大规模数据处理任务分解为两个主要阶段...
全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce2.0的框架结构和源码分析,MapReduce2.0的配置与测试,MapReduce2.0运行流程,MapReduce2.0高级程序设计以及相关特性等内容。《MapReduce...