Job对task 1对多
task对taskAttempt1对对,一个task可能同时有多个尝试运行(推测执行)。
public class JobInProgress { JobProfile profile; JobStatus status; String jobFile = null; Path localJobFile = null; TaskInProgress maps[] = new TaskInProgress[0]; TaskInProgress reduces[] = new TaskInProgress[0]; TaskInProgress cleanup[] = new TaskInProgress[0]; TaskInProgress setup[] = new TaskInProgress[0]; int numMapTasks = 0; int numReduceTasks = 0; final long memoryPerMap; final long memoryPerReduce; volatile int numSlotsPerMap = 1; volatile int numSlotsPerReduce = 1; final int maxTaskFailuresPerTracker; // Counters to track currently running/finished/failed Map/Reduce task-attempts int runningMapTasks = 0; int runningReduceTasks = 0; int finishedMapTasks = 0; int finishedReduceTasks = 0; int failedMapTasks = 0; int failedReduceTasks = 0; private static long DEFAULT_REDUCE_INPUT_LIMIT = -1L; long reduce_input_limit = -1L; private static float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f; int completedMapsForReduceSlowstart = 0; // runningMapTasks include speculative tasks, so we need to capture // speculative tasks separately int speculativeMapTasks = 0; int speculativeReduceTasks = 0; final int mapFailuresPercent; final int reduceFailuresPercent; int failedMapTIPs = 0; int failedReduceTIPs = 0; private volatile boolean launchedCleanup = false; private volatile boolean launchedSetup = false; private volatile boolean jobKilled = false; private volatile boolean jobFailed = false; JobPriority priority = JobPriority.NORMAL; final JobTracker jobtracker; protected Credentials tokenStorage; // NetworkTopology Node to the set of TIPs Map<Node, List<TaskInProgress>> nonRunningMapCache; // Map of NetworkTopology Node to set of running TIPs Map<Node, Set<TaskInProgress>> runningMapCache; // A list of non-local, non-running maps final List<TaskInProgress> nonLocalMaps; // Set of failed, non-running maps sorted by #failures final SortedSet<TaskInProgress> failedMaps; // A set of non-local running maps Set<TaskInProgress> nonLocalRunningMaps; // A list of non-running reduce TIPs Set<TaskInProgress> nonRunningReduces; // A set of running reduce TIPs Set<TaskInProgress> runningReduces; // A list of cleanup tasks for the map task attempts, to be launched List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>(); // A list of cleanup tasks for the reduce task attempts, to be launched List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
public class TaskInProgress { static final int MAX_TASK_EXECS = 1; int maxTaskAttempts = 4; static final double SPECULATIVE_GAP = 0.2; static final long SPECULATIVE_LAG = 60 * 1000; private static final int NUM_ATTEMPTS_PER_RESTART = 1000; // Defines the TIP private String jobFile = null; // 在JobInProgress计算好传到这里 private final TaskSplitMetaInfo splitInfo; private int numMaps; private int partition; private JobTracker jobtracker; private TaskID id; private JobInProgress job; private final int numSlotsRequired; // Status of the TIP private int successEventNumber = -1; private int numTaskFailures = 0; private int numKilledTasks = 0; private double progress = 0; private String state = ""; private long startTime = 0; private long execStartTime = 0; private long execFinishTime = 0; private int completes = 0; private boolean failed = false; private boolean killed = false; private long maxSkipRecords = 0; private FailedRanges failedRanges = new FailedRanges(); private volatile boolean skipping = false; private boolean jobCleanup = false; private boolean jobSetup = false; // The 'next' usable taskid of this tip (taskAttemptId) int nextTaskId = 0; // The taskid that took this TIP to SUCCESS private TaskAttemptID successfulTaskId; // The first taskid of this tip private TaskAttemptID firstTaskId; // Map from task Id -> TaskTracker Id, contains tasks that are // currently runnings private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>(); // All attempt Ids of this TIP private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>(); private JobConf conf; private Map<TaskAttemptID,List<String>> taskDiagnosticData = new TreeMap<TaskAttemptID,List<String>>(); /** * Map from taskId -> TaskStatus */ private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = new TreeMap<TaskAttemptID,TaskStatus>(); // Map from taskId -> TaskTracker Id, // contains cleanup attempts and where they ran, if any private TreeMap<TaskAttemptID, String> cleanupTasks = new TreeMap<TaskAttemptID, String>(); private TreeSet<String> machinesWhereFailed = new TreeSet<String>(); private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>(); //list of tasks to kill, <taskid> -> <shouldFail> private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>(); //task to commit, <taskattemptid> private TaskAttemptID taskToCommit; private Counters counters = new Counters(); private String user;
相关推荐
在JobTracker中,JobInProgress和TaskInProgress等类用于管理和监控作业的执行状态和进度。TaskTracker作为工作节点,负责接收和执行JobTracker分配的任务,包括MapTask和ReduceTask。JvmManager则管理TaskTracker上...
- JobInProgress:JobTracker接收到JobClient提交的作业后,会创建一个JobInProgress实例,用于跟踪和调度作业,并将其放入作业队列。 - TaskInProgress:JobTracker启动任务时,通过TaskInProgress来launchTask。...
TaskInProgress 是 JobInProgress 创建的一个对象,用于监控和调度 Task 的执行情况。TaskTracker 可以运行在 HDFS 的 DataNode 上,JobTracker 则不需要,可以部署在单独的机器上。 Hadoop API 提供了一组丰富的...
作业提交首先会创建一个JobInProgress对象,记录作业的状态和相关信息。然后,JobTracker会将作业分解为多个Map任务和Reduce任务,依据数据本地性原则分配到合适的TaskTracker节点上。TaskTracker接收到任务后,启动...
- Map任务的创建通过`JobInProgress`的`obtainNewMapTask()`方法完成,最终调用`JobInProgress`的`findNewMapTask()`从非运行中的Map任务缓存中获取。 - Reduce任务的创建则通过`JobInProgress`的`...
JobTracker接收到请求后,会创建一个新的JobInProgress对象,用于跟踪作业的整个生命周期。 3. **任务分配**: JobTracker会根据输入数据分片(split)生成多个Map任务,每个任务都会分配给一个空闲的TaskTracker...
- **任务执行**: 创建 TaskInProgress 来执行具体的 map 和 reduce 任务。 ##### 3.4 任务分配与执行 - **心跳机制**: TaskTracker 通过定期向 JobTracker 发送心跳信息来请求任务。 - **本地化任务**: TaskTracker...
此外,任务分配策略注册在JobInProgress类中,与作业调度器注册类似。配置文件监听功能允许在配置文件变更时实时更新,以优化调度设置。通过这些机制,该多层级作业调度算法能够提高Hama集群的效率和资源利用率,更...
当作业被提交到作业服务器后,会生成一个JobInProgress对象,作业服务器将这个作业拆分成多个Map任务,并将这些任务分配给不同的任务执行器执行。每个Map任务负责处理一部分输入数据,并生成中间结果。 #### Reduce...