`

hadoop 源码分析(五)hadoop 任务调度TaskScheduler

阅读更多
hadoop mapreduce  之所有能够实现job的运行,以及将job分配到不同datanode 上的map和reduce task 是由TaskSchduler 完成的.

TaskScheduler mapreduce的任务调度器类,当jobClient 提交一个job 给JobTracker 的时候.JobTracker 接受taskTracker 的心跳.心跳信息含有空闲的slot信息等.JobTracker 则通过调用TaskScheduler 的assignTasks()方法类给报告心跳信息中含有空闲的slots信息的taskTracker 分布任务、

TaskScheduler 类为hadoop的 调度器的抽象类。默认继承它作为hadoop调度器的方式为FIFO,当然也有Capacity 和Fair等其他调度器,也可以自己编写符合特定场景所需要的调度器.通过继承TaskScheduler 类即可完成该功能、
下面就 FIFO 调度器进行简单的说明:

JobQueueTaskScheduler 类为FIFO 调度器的实现类.
1. 首先JobQueueTaskSchduler 注册两个监听器类:
JobQueueJobInProgressListener jobQueueJobInProgressListener;
EagerTaskInitializationListener eagerTaskInitializationListener;

JobQueueJobInProgressListener 维护一个job的queue ,其中JobSchedulingInfo 中包含job调度的信息:priority,startTime,id.以及 jobAdd update 等操作jobqueue的方法
EagerTaskInitializationListener 初始化job的listener ,这里所谓的初始化不是初始化job的属性信息,而是针对已经存在jobqueue中 即将被执行job的初始化,
class JobInitManager implements Runnable {
   
    public void run() {
      JobInProgress job = null;
      while (true) {
        try {
          synchronized (jobInitQueue) {
            while (jobInitQueue.isEmpty()) {
              jobInitQueue.wait();
            }
            job = jobInitQueue.remove(0);
          }
          threadPool.execute(new InitJob(job));
        } catch (InterruptedException t) {
          LOG.info("JobInitManagerThread interrupted.");
          break;
        } 
      }
      LOG.info("Shutting down thread pool");
      threadPool.shutdownNow();
    }
  }


resortInitQueue 按照priority 和starttime 来排序
jobRemoved()
jobUpdated()
jobStateChanged()当priority或是starttime被改变的时候则重新调用resortInitQueue()重新排序


public EagerTaskInitializationListener(Configuration conf) {
    numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
    threadPool = Executors.newFixedThreadPool(numThreads);
  }

在JobTracker 启动的时候 创建 mapred.jobinit.threads 改数量的线程去监控jobqueue.当jobqueue 中含有job的时候 则initjob

class InitJob implements Runnable {
  
    private JobInProgress job;
    
    public InitJob(JobInProgress job) {
      this.job = job;
    }
    
//调用run方法 回调TaskTrackerManager
    public void run() {
      ttm.initJob(job);
    }
  }


调度其中核心逻辑在assignTasks()方法中
下面分析分析 FIFO模式下的 assignTasks()

@Override
  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
      throws IOException {
    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); 
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
//获取集群中TaskTracker 总数
final int numTaskTrackers = clusterStatus.getTaskTrackers();
//集群中map slot总数
final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
 //集群中reduce slot 总数
    final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

    Collection<JobInProgress> jobQueue =
      jobQueueJobInProgressListener.getJobQueue();

    //
    // Get map + reduce counts for the current tracker.
//
//当前的taskTracker 上map slot 总数
final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();
//当前的taskTracker 上reduce slot 总数
final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();
//当前的taskTracker上正在运行的 map数目
final int trackerRunningMaps = taskTrackerStatus.countMapTasks();
//当前的taskTracker上正在运行的 reduce数目
    final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();

    // Assigned tasks
    List<Task> assignedTasks = new ArrayList<Task>();

    //
    // Compute (running + pending) map and reduce task numbers across pool
//
//该taskTracker上剩余的reduce数
int remainingReduceLoad = 0;
//该taskTracker 剩余的map数
    int remainingMapLoad = 0;
    synchronized (jobQueue) {
      for (JobInProgress job : jobQueue) {
        if (job.getStatus().getRunState() == JobStatus.RUNNING) {
          remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
          if (job.scheduleReduces()) {
            remainingReduceLoad += 
              (job.desiredReduces() - job.finishedReduces());
          }
        }
      }
    }

// Compute the 'load factor' for maps and reduces
//map因子
    double mapLoadFactor = 0.0;
    if (clusterMapCapacity > 0) {
      mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
    }
    double reduceLoadFactor = 0.0;
    if (clusterReduceCapacity > 0) {
      reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
    }
        
    //
    // In the below steps, we allocate first map tasks (if appropriate),
    // and then reduce tasks if appropriate.  We go through all jobs
    // in order of job arrival; jobs only get serviced if their 
    // predecessors are serviced, too.
    //

    //
    // We assign tasks to the current taskTracker if the given machine 
    // has a workload that's less than the maximum load of that kind of
    // task.
    // However, if the cluster is close to getting loaded i.e. we don't
    // have enough _padding_ for speculative executions etc., we only 
    // schedule the "highest priority" task i.e. the task from the job 
    // with the highest priority.
    //
    
    final int trackerCurrentMapCapacity = 
      Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity), 
                              trackerMapCapacity);
    int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
    boolean exceededMapPadding = false;
    if (availableMapSlots > 0) {
      exceededMapPadding = 
        exceededPadding(true, clusterStatus, trackerMapCapacity);
    }
    
    int numLocalMaps = 0;
    int numNonLocalMaps = 0;
    scheduleMaps:
    for (int i=0; i < availableMapSlots; ++i) {
      synchronized (jobQueue) {
        for (JobInProgress job : jobQueue) {
          if (job.getStatus().getRunState() != JobStatus.RUNNING) {
            continue;
          }

          Task t = null;
          
          // Try to schedule a node-local or rack-local Map task
          t = 
            job.obtainNewNodeOrRackLocalMapTask(taskTrackerStatus, 
                numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts());
          if (t != null) {
            assignedTasks.add(t);
            ++numLocalMaps;
            
            // Don't assign map tasks to the hilt!
            // Leave some free slots in the cluster for future task-failures,
            // speculative tasks etc. beyond the highest priority job
            if (exceededMapPadding) {
              break scheduleMaps;
            }
           
            // Try all jobs again for the next Map task 
            break;
          }
          
          // Try to schedule a node-local or rack-local Map task
          t = 
            job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                   taskTrackerManager.getNumberOfUniqueHosts());
          
          if (t != null) {
            assignedTasks.add(t);
            ++numNonLocalMaps;
            
            // We assign at most 1 off-switch or speculative task
            // This is to prevent TaskTrackers from stealing local-tasks
            // from other TaskTrackers.
            break scheduleMaps;
          }
        }
      }
    }
    int assignedMaps = assignedTasks.size();

    //
    // Same thing, but for reduce tasks
    // However we _never_ assign more than 1 reduce task per heartbeat
    //
    final int trackerCurrentReduceCapacity = 
      Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity), 
               trackerReduceCapacity);
    final int availableReduceSlots = 
      Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
    boolean exceededReducePadding = false;
    if (availableReduceSlots > 0) {
      exceededReducePadding = exceededPadding(false, clusterStatus, 
                                              trackerReduceCapacity);
      synchronized (jobQueue) {
        for (JobInProgress job : jobQueue) {
          if (job.getStatus().getRunState() != JobStatus.RUNNING ||
              job.numReduceTasks == 0) {
            continue;
          }

          Task t = 
            job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, 
                                    taskTrackerManager.getNumberOfUniqueHosts()
                                    );
          if (t != null) {
            assignedTasks.add(t);
            break;
          }
          
          // Don't assign reduce tasks to the hilt!
          // Leave some free slots in the cluster for future task-failures,
          // speculative tasks etc. beyond the highest priority job
          if (exceededReducePadding) {
            break;
          }
        }
      }
    }
    
    if (LOG.isDebugEnabled()) {
      LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " +
                "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + 
                trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + 
                (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
                assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps + 
                ")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " + 
                trackerCurrentReduceCapacity + "," + trackerRunningReduces + 
                "] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) + 
                ", " + (assignedTasks.size()-assignedMaps) + "]");
    }

    return assignedTasks;
  }


上面方法中真正执行task的方法为:
obtainNewNodeOrRackLocalMapTask 和obtainNewNonLocalMapTask
下一张详细的分析这两个方法

  • 大小: 31.6 KB
7
2
分享到:
评论

相关推荐

    Hadoop任务调度器

    3. JobTracker通过TaskScheduler进行任务调度。 4. TaskTracker周期性地通过heartbeat向JobTracker报告自身状态和任务状态。 5. JobTracker根据调度策略决定哪些任务分配给哪些TaskTracker。 6. TaskTracker接收到新...

    hadoop调度指南

    &lt;name&gt;mapred.jobtracker.taskScheduler &lt;value&gt;org.apache.hadoop.mapred.FairScheduler ``` 重启集群后,可以通过访问JobTracker的Web UI界面下的`http://&lt;jobtracker URL&gt;/scheduler`来确认公平调度器是否...

    hadoop云计算平台作业调度算法的研究.ppt

    其次,论文对Hadoop作业调度流程进行了介绍,包括JobTracker、TaskTracker、TaskScheduler等。作业调度流程主要包括作业提交、任务分配、任务执行、任务监控等步骤。 最后,论文对论文主要研究内容进行了介绍,包括...

    Spark源码分析.pdf

    3. 调度系统:分析DAGScheduler和TaskScheduler的工作流程,理解如何优化任务执行。 4. 内存管理:讨论Spark如何在内存和磁盘之间进行数据交换,以及如何配置内存参数以提高性能。 5. Shuffle过程:深入理解数据重排...

    Hadoop平台下的作业调度算法的研究.pdf

    传统的Hadoop调度算法,如FIFO(先进先出)和Capacity Scheduler,主要关注资源的公平分配和集群的整体利用率。然而,随着大数据应用的复杂性和多样性,这些算法在应对动态变化的工作负载和任务执行延迟问题上表现出...

    Hadoop公平调度器指南.pdf

    &lt;name&gt;mapred.jobtracker.taskScheduler &lt;value&gt;org.apache.hadoop.mapred.FairScheduler ``` - 基本参数包括但不限于资源池的设置、最小共享资源的定义等。 - 高级参数则涵盖了资源抢占的具体配置、作业限制...

    大数据课程-Hadoop集群程序设计与开发-5.Yarn资源调度器_lk_edit.pptx

    【Yarn资源调度器】是Hadoop大数据处理框架的核心组件之一,主要负责集群资源的管理和分配,确保高效、公平地运行各种计算任务。本课程详细介绍了Yarn的基本架构、工作机制、调度器及调度算法,以及如何进行实际操作...

    hadoop调优指南 hadoop调优指南

    - **动态调度器**:使用更先进的调度算法,如FairScheduler或CapacityScheduler,以实现资源的公平分配。 - **优先级设置**:根据不同任务的重要性设置优先级,确保关键任务优先执行。 - **内存与CPU核心数配置**:...

    23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化

    在Hadoop集群中,YARN(Yet Another Resource Negotiator)作为资源管理器,负责调度MapReduce任务的内存和CPU资源。YARN支持基于内存和CPU的两种资源调度策略,以确保集群资源的有效利用。在非默认配置下,合理地...

    Apache Spark源码走读之3 -- Task运行期之函数调用关系分析

    启动TaskScheduler的目的在于启动SchedulerBackend,并设置定时任务进行检查。 **步骤3**:以TaskScheduler实例为参数创建DAGScheduler实例并启动运行。 ```scala @volatile private[spark] var dagScheduler = new...

    Hadoop配置文档

    在IT行业中,Hadoop是一个广泛使用的开源框架,主要用于大数据处理和分析。这个“Hadoop配置文档”涵盖了关于如何设置和优化Hadoop环境的关键知识点。在深入探讨之前,我们需要明确Hadoop的基本架构,它主要由Hadoop...

    Hadoop配置文件表(如需要请下载附件)

    5. **yarn.scheduler.minimum-allocation-mb**和`yarn.scheduler.maximum-allocation-mb`:YARN(Yet Another Resource Negotiator)调度器分配给每个容器的最小和最大内存,决定任务的并行度。 博客中可能提供的...

    Hadoop 2.0部署配置文件示例.zip

    1. YARN(Yet Another Resource Negotiator):Hadoop 2.0引入了YARN作为新的资源管理框架,取代了原有的JobTracker,使得集群资源分配和任务调度更加高效和灵活。 2. HDFS HA(High Availability):为了解决...

    深入理解Spark:核心思想及源码分析.pdf

    例如,DAGScheduler如何将作业拆分为任务,以及TaskScheduler如何将任务分配给Executor。 7. **优化技巧**: Spark的性能优化是重要一环,包括配置调优、内存管理优化、数据序列化、减少shuffle操作等。理解这些...

    企业级IT架构分享 云计算架构师成长之路 Hadoop公平调度器指南 共8页.pdf

    - **调度器类**:`mapred.jobtracker.taskScheduler`用于指定使用的调度器类,对于公平调度器而言,应设置为`org.apache.hadoop.mapred.FairScheduler`。 - **默认资源池**:`mapred.fairscheduler.default-pool`...

    hadoop默认配置文件x-default.xml

    `mapreduce.task.io.sort.mb`控制排序阶段的内存使用,而`mapreduce.job.maps`和`mapreduce.job.reduces`定义了作业的Map和Reduce任务数量。 最后,`yarn-default.xml`文件涉及YARN,它是Hadoop的资源管理和调度器...

Global site tag (gtag.js) - Google Analytics