- 浏览: 306430 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (165)
- hadoop (47)
- linux (11)
- nutch (7)
- hbase (7)
- solr (4)
- zookeeper (4)
- J2EE (1)
- jquery (3)
- java (17)
- mysql (14)
- perl (2)
- compass (4)
- suse (2)
- memcache (1)
- as (1)
- roller (1)
- web (7)
- MongoDB (8)
- struts2 (3)
- lucene (2)
- 算法 (4)
- 中文分词 (3)
- hive (17)
- noIT (1)
- 中间件 (2)
- maven (2)
- sd (0)
- php (2)
- asdf (0)
- kerberos 安装 (1)
- git (1)
- osgi (1)
- impala (1)
- book (1)
- python 安装 科学计算包 (1)
最新评论
-
dandongsoft:
你写的不好用啊
solr 同义词搜索 -
黎明lm:
meifangzi 写道楼主真厉害 都分析源码了 用了很久. ...
hadoop 源码分析(二) jobClient 通过RPC 代理提交作业到JobTracker -
meifangzi:
楼主真厉害 都分析源码了
hadoop 源码分析(二) jobClient 通过RPC 代理提交作业到JobTracker -
zhdkn:
顶一个,最近也在学习设计模式,发现一个问题,如果老是看别人的博 ...
Java观察者模式(Observer)详解及应用 -
lvwenwen:
木南飘香 写道
高并发网站的架构
hadoop mapreduce 之所有能够实现job的运行,以及将job分配到不同datanode 上的map和reduce task 是由TaskSchduler 完成的.
TaskScheduler mapreduce的任务调度器类,当jobClient 提交一个job 给JobTracker 的时候.JobTracker 接受taskTracker 的心跳.心跳信息含有空闲的slot信息等.JobTracker 则通过调用TaskScheduler 的assignTasks()方法类给报告心跳信息中含有空闲的slots信息的taskTracker 分布任务、
TaskScheduler 类为hadoop的 调度器的抽象类。默认继承它作为hadoop调度器的方式为FIFO,当然也有Capacity 和Fair等其他调度器,也可以自己编写符合特定场景所需要的调度器.通过继承TaskScheduler 类即可完成该功能、
下面就 FIFO 调度器进行简单的说明:
JobQueueTaskScheduler 类为FIFO 调度器的实现类.
1. 首先JobQueueTaskSchduler 注册两个监听器类:
JobQueueJobInProgressListener jobQueueJobInProgressListener;
EagerTaskInitializationListener eagerTaskInitializationListener;
JobQueueJobInProgressListener 维护一个job的queue ,其中JobSchedulingInfo 中包含job调度的信息:priority,startTime,id.以及 jobAdd update 等操作jobqueue的方法
EagerTaskInitializationListener 初始化job的listener ,这里所谓的初始化不是初始化job的属性信息,而是针对已经存在jobqueue中 即将被执行job的初始化,
resortInitQueue 按照priority 和starttime 来排序
jobRemoved()
jobUpdated()
jobStateChanged()当priority或是starttime被改变的时候则重新调用resortInitQueue()重新排序
在JobTracker 启动的时候 创建 mapred.jobinit.threads 改数量的线程去监控jobqueue.当jobqueue 中含有job的时候 则initjob
调度其中核心逻辑在assignTasks()方法中
下面分析分析 FIFO模式下的 assignTasks()
上面方法中真正执行task的方法为:
obtainNewNodeOrRackLocalMapTask 和obtainNewNonLocalMapTask
下一张详细的分析这两个方法
TaskScheduler mapreduce的任务调度器类,当jobClient 提交一个job 给JobTracker 的时候.JobTracker 接受taskTracker 的心跳.心跳信息含有空闲的slot信息等.JobTracker 则通过调用TaskScheduler 的assignTasks()方法类给报告心跳信息中含有空闲的slots信息的taskTracker 分布任务、
TaskScheduler 类为hadoop的 调度器的抽象类。默认继承它作为hadoop调度器的方式为FIFO,当然也有Capacity 和Fair等其他调度器,也可以自己编写符合特定场景所需要的调度器.通过继承TaskScheduler 类即可完成该功能、
下面就 FIFO 调度器进行简单的说明:
JobQueueTaskScheduler 类为FIFO 调度器的实现类.
1. 首先JobQueueTaskSchduler 注册两个监听器类:
JobQueueJobInProgressListener jobQueueJobInProgressListener;
EagerTaskInitializationListener eagerTaskInitializationListener;
JobQueueJobInProgressListener 维护一个job的queue ,其中JobSchedulingInfo 中包含job调度的信息:priority,startTime,id.以及 jobAdd update 等操作jobqueue的方法
EagerTaskInitializationListener 初始化job的listener ,这里所谓的初始化不是初始化job的属性信息,而是针对已经存在jobqueue中 即将被执行job的初始化,
class JobInitManager implements Runnable { public void run() { JobInProgress job = null; while (true) { try { synchronized (jobInitQueue) { while (jobInitQueue.isEmpty()) { jobInitQueue.wait(); } job = jobInitQueue.remove(0); } threadPool.execute(new InitJob(job)); } catch (InterruptedException t) { LOG.info("JobInitManagerThread interrupted."); break; } } LOG.info("Shutting down thread pool"); threadPool.shutdownNow(); } }
resortInitQueue 按照priority 和starttime 来排序
jobRemoved()
jobUpdated()
jobStateChanged()当priority或是starttime被改变的时候则重新调用resortInitQueue()重新排序
public EagerTaskInitializationListener(Configuration conf) { numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS); threadPool = Executors.newFixedThreadPool(numThreads); }
在JobTracker 启动的时候 创建 mapred.jobinit.threads 改数量的线程去监控jobqueue.当jobqueue 中含有job的时候 则initjob
class InitJob implements Runnable { private JobInProgress job; public InitJob(JobInProgress job) { this.job = job; } //调用run方法 回调TaskTrackerManager public void run() { ttm.initJob(job); } }
调度其中核心逻辑在assignTasks()方法中
下面分析分析 FIFO模式下的 assignTasks()
@Override public synchronized List<Task> assignTasks(TaskTracker taskTracker) throws IOException { TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); //获取集群中TaskTracker 总数 final int numTaskTrackers = clusterStatus.getTaskTrackers(); //集群中map slot总数 final int clusterMapCapacity = clusterStatus.getMaxMapTasks(); //集群中reduce slot 总数 final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks(); Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue(); // // Get map + reduce counts for the current tracker. // //当前的taskTracker 上map slot 总数 final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots(); //当前的taskTracker 上reduce slot 总数 final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots(); //当前的taskTracker上正在运行的 map数目 final int trackerRunningMaps = taskTrackerStatus.countMapTasks(); //当前的taskTracker上正在运行的 reduce数目 final int trackerRunningReduces = taskTrackerStatus.countReduceTasks(); // Assigned tasks List<Task> assignedTasks = new ArrayList<Task>(); // // Compute (running + pending) map and reduce task numbers across pool // //该taskTracker上剩余的reduce数 int remainingReduceLoad = 0; //该taskTracker 剩余的map数 int remainingMapLoad = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() == JobStatus.RUNNING) { remainingMapLoad += (job.desiredMaps() - job.finishedMaps()); if (job.scheduleReduces()) { remainingReduceLoad += (job.desiredReduces() - job.finishedReduces()); } } } } // Compute the 'load factor' for maps and reduces //map因子 double mapLoadFactor = 0.0; if (clusterMapCapacity > 0) { mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity; } double reduceLoadFactor = 0.0; if (clusterReduceCapacity > 0) { reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity; } // // In the below steps, we allocate first map tasks (if appropriate), // and then reduce tasks if appropriate. We go through all jobs // in order of job arrival; jobs only get serviced if their // predecessors are serviced, too. // // // We assign tasks to the current taskTracker if the given machine // has a workload that's less than the maximum load of that kind of // task. // However, if the cluster is close to getting loaded i.e. we don't // have enough _padding_ for speculative executions etc., we only // schedule the "highest priority" task i.e. the task from the job // with the highest priority. // final int trackerCurrentMapCapacity = Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), trackerMapCapacity); int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps; boolean exceededMapPadding = false; if (availableMapSlots > 0) { exceededMapPadding = exceededPadding(true, clusterStatus, trackerMapCapacity); } int numLocalMaps = 0; int numNonLocalMaps = 0; scheduleMaps: for (int i=0; i < availableMapSlots; ++i) { synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } Task t = null; // Try to schedule a node-local or rack-local Map task t = job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numLocalMaps; // Don't assign map tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededMapPadding) { break scheduleMaps; } // Try all jobs again for the next Map task break; } // Try to schedule a node-local or rack-local Map task t = job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { assignedTasks.add(t); ++numNonLocalMaps; // We assign at most 1 off-switch or speculative task // This is to prevent TaskTrackers from stealing local-tasks // from other TaskTrackers. break scheduleMaps; } } } } int assignedMaps = assignedTasks.size(); // // Same thing, but for reduce tasks // However we _never_ assign more than 1 reduce task per heartbeat // final int trackerCurrentReduceCapacity = Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), trackerReduceCapacity); final int availableReduceSlots = Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1); boolean exceededReducePadding = false; if (availableReduceSlots > 0) { exceededReducePadding = exceededPadding(false, clusterStatus, trackerReduceCapacity); synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts() ); if (t != null) { assignedTasks.add(t); break; } // Don't assign reduce tasks to the hilt! // Leave some free slots in the cluster for future task-failures, // speculative tasks etc. beyond the highest priority job if (exceededReducePadding) { break; } } } } if (LOG.isDebugEnabled()) { LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " + "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + (trackerCurrentMapCapacity - trackerRunningMaps) + ", " + assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + trackerCurrentReduceCapacity + "," + trackerRunningReduces + "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + ", " + (assignedTasks.size()-assignedMaps) + "]"); } return assignedTasks; }
上面方法中真正执行task的方法为:
obtainNewNodeOrRackLocalMapTask 和obtainNewNonLocalMapTask
下一张详细的分析这两个方法
发表评论
-
博客地址变更
2013-08-16 10:29 1220all the guys of visiting the bl ... -
hadoop 源码分析(六)hadoop taskTracker 生成map 和reduce任务流程
2013-04-09 17:36 2750taskTracker 生成map reduce ... -
hadoop 源码分析(六)hadoop taskTracker 生成map 和reduce任务流程
2013-04-09 17:33 0taskTracker 生成map reduce ... -
hadoop 源码分析(四)JobTracker 添加job 到schduler 队列中
2013-03-29 18:37 2882启动 JobTracker 1. 进入main方法: ... -
hadoop 源码分析(三) hadoop RPC 机制
2013-03-28 15:13 2415Hadoop 通信机制采用自己编写的RPC. 相比于 ... -
hadoop 源码分析(二) jobClient 通过RPC 代理提交作业到JobTracker
2013-03-27 12:57 38031.JobClient 客户端类 通过 ... -
hadoop 源码分析(一) jobClient 提交到JobTracker
2013-03-26 13:41 3620Hadoop 用了2年多了.从最初一起创业的 ... -
RHadoop 安装教程
2013-02-01 17:18 1632RHadoop 环境安装 硬件: centos6 ... -
pig
2012-11-16 19:28 1218转自:http://www.hadoopor.c ... -
hadoop与hive的映射
2012-11-15 10:21 2375hadoop与hive的映射 ... -
hadoop distcp
2012-07-31 10:00 2830hadoop distcp 使用:distcp ... -
MapReduce中Mapper类和Reducer类4函数解析
2012-07-20 18:05 2130MapReduce中Mapper类和Reducer类4函数解析 ... -
hadoop metrics 各参数解释
2012-07-17 18:59 1517hadoop metrics 各参数解释 研究使用hadoo ... -
Hbase几种数据入库(load)方式比较
2012-07-17 14:52 13881. 预先生成HFile入库 这个地址有详细的说明http:/ ... -
Hadoop客户端环境配置
2012-05-11 14:59 1777Hadoop客户端环境配置 1. 安装客户端(通过端用户可以 ... -
hadoop 通过distcp进行并行复制
2012-05-02 15:25 2483通过distcp进行并行复制 前面的HDFS访问模型都集中于 ... -
linux crontab 执行hadoop脚本 关于hadoop环境变量引入
2012-04-10 12:11 0crontab问题 crontab的特点:PATH不全和无终 ... -
hadoop fs 命令封装
2012-04-09 09:39 0hadoop fs 命令封装 #!/usr/bin/env ... -
map-reduce编程核心问题
2012-02-22 13:38 12801-How do we break up a large p ... -
Hadoop Archives
2012-02-17 14:25 0Hadoop Archives 什么是Hadoop arch ...
相关推荐
3. JobTracker通过TaskScheduler进行任务调度。 4. TaskTracker周期性地通过heartbeat向JobTracker报告自身状态和任务状态。 5. JobTracker根据调度策略决定哪些任务分配给哪些TaskTracker。 6. TaskTracker接收到新...
<name>mapred.jobtracker.taskScheduler <value>org.apache.hadoop.mapred.FairScheduler ``` 重启集群后,可以通过访问JobTracker的Web UI界面下的`http://<jobtracker URL>/scheduler`来确认公平调度器是否...
其次,论文对Hadoop作业调度流程进行了介绍,包括JobTracker、TaskTracker、TaskScheduler等。作业调度流程主要包括作业提交、任务分配、任务执行、任务监控等步骤。 最后,论文对论文主要研究内容进行了介绍,包括...
3. 调度系统:分析DAGScheduler和TaskScheduler的工作流程,理解如何优化任务执行。 4. 内存管理:讨论Spark如何在内存和磁盘之间进行数据交换,以及如何配置内存参数以提高性能。 5. Shuffle过程:深入理解数据重排...
传统的Hadoop调度算法,如FIFO(先进先出)和Capacity Scheduler,主要关注资源的公平分配和集群的整体利用率。然而,随着大数据应用的复杂性和多样性,这些算法在应对动态变化的工作负载和任务执行延迟问题上表现出...
<name>mapred.jobtracker.taskScheduler <value>org.apache.hadoop.mapred.FairScheduler ``` - 基本参数包括但不限于资源池的设置、最小共享资源的定义等。 - 高级参数则涵盖了资源抢占的具体配置、作业限制...
【Yarn资源调度器】是Hadoop大数据处理框架的核心组件之一,主要负责集群资源的管理和分配,确保高效、公平地运行各种计算任务。本课程详细介绍了Yarn的基本架构、工作机制、调度器及调度算法,以及如何进行实际操作...
- **动态调度器**:使用更先进的调度算法,如FairScheduler或CapacityScheduler,以实现资源的公平分配。 - **优先级设置**:根据不同任务的重要性设置优先级,确保关键任务优先执行。 - **内存与CPU核心数配置**:...
在Hadoop集群中,YARN(Yet Another Resource Negotiator)作为资源管理器,负责调度MapReduce任务的内存和CPU资源。YARN支持基于内存和CPU的两种资源调度策略,以确保集群资源的有效利用。在非默认配置下,合理地...
启动TaskScheduler的目的在于启动SchedulerBackend,并设置定时任务进行检查。 **步骤3**:以TaskScheduler实例为参数创建DAGScheduler实例并启动运行。 ```scala @volatile private[spark] var dagScheduler = new...
在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分析。这个“Hadoop配置文档”涵盖了关于如何设置和优化Hadoop环境的关键知识点。在深入探讨之前,我们需要明确Hadoop的基本架构,它主要由Hadoop...
5. **yarn.scheduler.minimum-allocation-mb**和`yarn.scheduler.maximum-allocation-mb`:YARN(Yet Another Resource Negotiator)调度器分配给每个容器的最小和最大内存,决定任务的并行度。 博客中可能提供的...
1. YARN(Yet Another Resource Negotiator):Hadoop 2.0引入了YARN作为新的资源管理框架,取代了原有的JobTracker,使得集群资源分配和任务调度更加高效和灵活。 2. HDFS HA(High Availability):为了解决...
例如,DAGScheduler如何将作业拆分为任务,以及TaskScheduler如何将任务分配给Executor。 7. **优化技巧**: Spark的性能优化是重要一环,包括配置调优、内存管理优化、数据序列化、减少shuffle操作等。理解这些...
- **调度器类**:`mapred.jobtracker.taskScheduler`用于指定使用的调度器类,对于公平调度器而言,应设置为`org.apache.hadoop.mapred.FairScheduler`。 - **默认资源池**:`mapred.fairscheduler.default-pool`...
`mapreduce.task.io.sort.mb`控制排序阶段的内存使用,而`mapreduce.job.maps`和`mapreduce.job.reduces`定义了作业的Map和Reduce任务数量。 最后,`yarn-default.xml`文件涉及YARN,它是Hadoop的资源管理和调度器...