`
tcxiang
  • 浏览: 87984 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

JobInProgress TaskInProgress TaskAttempt

 
阅读更多

Job对task 1对多

task对taskAttempt1对对,一个task可能同时有多个尝试运行(推测执行)。

 

public class JobInProgress {

  JobProfile profile;
  JobStatus status;
  String jobFile = null;
  Path localJobFile = null;

  TaskInProgress maps[] = new TaskInProgress[0];
  TaskInProgress reduces[] = new TaskInProgress[0];
  TaskInProgress cleanup[] = new TaskInProgress[0];
  TaskInProgress setup[] = new TaskInProgress[0];
  int numMapTasks = 0;
  int numReduceTasks = 0;
  final long memoryPerMap;
  final long memoryPerReduce;
  volatile int numSlotsPerMap = 1;
  volatile int numSlotsPerReduce = 1;
  final int maxTaskFailuresPerTracker;
  
  // Counters to track currently running/finished/failed Map/Reduce task-attempts
  int runningMapTasks = 0;
  int runningReduceTasks = 0;
  int finishedMapTasks = 0;
  int finishedReduceTasks = 0;
  int failedMapTasks = 0; 
  int failedReduceTasks = 0;
  private static long DEFAULT_REDUCE_INPUT_LIMIT = -1L;
  long reduce_input_limit = -1L;
  private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
  int completedMapsForReduceSlowstart = 0;
    
  // runningMapTasks include speculative tasks, so we need to capture 
  // speculative tasks separately 
  int speculativeMapTasks = 0;
  int speculativeReduceTasks = 0;
  
  final int mapFailuresPercent;
  final int reduceFailuresPercent;
  int failedMapTIPs = 0;
  int failedReduceTIPs = 0;
  private volatile boolean launchedCleanup = false;
  private volatile boolean launchedSetup = false;
  private volatile boolean jobKilled = false;
  private volatile boolean jobFailed = false;

  JobPriority priority = JobPriority.NORMAL;
  final JobTracker jobtracker;
  
  protected Credentials tokenStorage;

  // NetworkTopology Node to the set of TIPs
  Map<Node, List<TaskInProgress>> nonRunningMapCache;
  
  // Map of NetworkTopology Node to set of running TIPs
  Map<Node, Set<TaskInProgress>> runningMapCache;

  // A list of non-local, non-running maps
  final List<TaskInProgress> nonLocalMaps;

  // Set of failed, non-running maps sorted by #failures
  final SortedSet<TaskInProgress> failedMaps;

  // A set of non-local running maps
  Set<TaskInProgress> nonLocalRunningMaps;

  // A list of non-running reduce TIPs
  Set<TaskInProgress> nonRunningReduces;

  // A set of running reduce TIPs
  Set<TaskInProgress> runningReduces;
  
  // A list of cleanup tasks for the map task attempts, to be launched
  List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();
  
  // A list of cleanup tasks for the reduce task attempts, to be launched
  List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();

 

public class TaskInProgress {
  static final int MAX_TASK_EXECS = 1;
  int maxTaskAttempts = 4;    
  static final double SPECULATIVE_GAP = 0.2;
  static final long SPECULATIVE_LAG = 60 * 1000;
  private static final int NUM_ATTEMPTS_PER_RESTART = 1000;

  // Defines the TIP
  private String jobFile = null;
  // 在JobInProgress计算好传到这里
  private final TaskSplitMetaInfo splitInfo;
  private int numMaps;
  private int partition;
  private JobTracker jobtracker;
  private TaskID id;
  private JobInProgress job;
  private final int numSlotsRequired;

  // Status of the TIP
  private int successEventNumber = -1;
  private int numTaskFailures = 0;
  private int numKilledTasks = 0;
  private double progress = 0;
  private String state = "";
  private long startTime = 0;
  private long execStartTime = 0;
  private long execFinishTime = 0;
  private int completes = 0;
  private boolean failed = false;
  private boolean killed = false;
  private long maxSkipRecords = 0;
  private FailedRanges failedRanges = new FailedRanges();
  private volatile boolean skipping = false;
  private boolean jobCleanup = false; 
  private boolean jobSetup = false;
   
  // The 'next' usable taskid of this tip (taskAttemptId)
  int nextTaskId = 0;
    
  // The taskid that took this TIP to SUCCESS
  private TaskAttemptID successfulTaskId;

  // The first taskid of this tip
  private TaskAttemptID firstTaskId;
  
  // Map from task Id -> TaskTracker Id, contains tasks that are
  // currently runnings
  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
  // All attempt Ids of this TIP
  private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
  private JobConf conf;
  private Map<TaskAttemptID,List<String>> taskDiagnosticData = new TreeMap<TaskAttemptID,List<String>>();
  /**
   * Map from taskId -> TaskStatus
   */
  private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = new TreeMap<TaskAttemptID,TaskStatus>();

  // Map from taskId -> TaskTracker Id, 
  // contains cleanup attempts and where they ran, if any
  private TreeMap<TaskAttemptID, String> cleanupTasks = new TreeMap<TaskAttemptID, String>();

  private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
  private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
  
  //list of tasks to kill, <taskid> -> <shouldFail> 
  private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();
  
  //task to commit, <taskattemptid>  
  private TaskAttemptID taskToCommit;
  
  private Counters counters = new Counters();
  
  private String user;

 

 

分享到:
评论

相关推荐

    hadoop源码分析

    在JobTracker中,JobInProgress和TaskInProgress等类用于管理和监控作业的执行状态和进度。TaskTracker作为工作节点,负责接收和执行JobTracker分配的任务,包括MapTask和ReduceTask。JvmManager则管理TaskTracker上...

    Hadoop运行流程详解

    - JobInProgress:JobTracker接收到JobClient提交的作业后,会创建一个JobInProgress实例,用于跟踪和调度作业,并将其放入作业队列。 - TaskInProgress:JobTracker启动任务时,通过TaskInProgress来launchTask。...

    HadoopAPI使用

    TaskInProgress 是 JobInProgress 创建的一个对象,用于监控和调度 Task 的执行情况。TaskTracker 可以运行在 HDFS 的 DataNode 上,JobTracker 则不需要,可以部署在单独的机器上。 Hadoop API 提供了一组丰富的...

    Hadoop源码分析

    作业提交首先会创建一个JobInProgress对象,记录作业的状态和相关信息。然后,JobTracker会将作业分解为多个Map任务和Reduce任务,依据数据本地性原则分配到合适的TaskTracker节点上。TaskTracker接收到任务后,启动...

    hadoop学习源码学习(二)

    - Map任务的创建通过`JobInProgress`的`obtainNewMapTask()`方法完成,最终调用`JobInProgress`的`findNewMapTask()`从非运行中的Map任务缓存中获取。 - Reduce任务的创建则通过`JobInProgress`的`...

    MapReduce Job集群提交过程源码跟踪及分析

    JobTracker接收到请求后,会创建一个新的JobInProgress对象,用于跟踪作业的整个生命周期。 3. **任务分配**: JobTracker会根据输入数据分片(split)生成多个Map任务,每个任务都会分配给一个空闲的TaskTracker...

    Hadoop MapReduce 入门

    - **任务执行**: 创建 TaskInProgress 来执行具体的 map 和 reduce 任务。 ##### 3.4 任务分配与执行 - **心跳机制**: TaskTracker 通过定期向 JobTracker 发送心跳信息来请求任务。 - **本地化任务**: TaskTracker...

    基于Hama并行计算框架的多层级作业调度算法的研究及实现.pdf

    此外,任务分配策略注册在JobInProgress类中,与作业调度器注册类似。配置文件监听功能允许在配置文件变更时实时更新,以优化调度设置。通过这些机制,该多层级作业调度算法能够提高Hama集群的效率和资源利用率,更...

    Hadoop系统架构 (2).docx

    当作业被提交到作业服务器后,会生成一个JobInProgress对象,作业服务器将这个作业拆分成多个Map任务,并将这些任务分配给不同的任务执行器执行。每个Map任务负责处理一部分输入数据,并生成中间结果。 #### Reduce...

Global site tag (gtag.js) - Google Analytics