`

Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析(下)

阅读更多
中间隔了国庆, 好不容易才看明白了MRAppMaster如何启动其他container以及如何在NodeManager上面运行Task的。

上回写到了AM启动到最后其实是运行的MRAppMaster的main方法, 那么我们就从这里开始看他是如何启动其他container的, 首先看一下main方法:
 public static void main(String[] args) {
    try {
      Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
	  
	  //其实这里代码看似很多, 但是基本上不需要看, 这里大部分代码是从本地读取各种配置然后重新创建相应的对象
	  //如containerId Host Port等等
      String containerIdStr =
          System.getenv(Environment.CONTAINER_ID.name());
      String nodeHostString = System.getenv(Environment.NM_HOST.name());
      String nodePortString = System.getenv(Environment.NM_PORT.name());
      String nodeHttpPortString =
          System.getenv(Environment.NM_HTTP_PORT.name());
      String appSubmitTimeStr =
          System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
      
      validateInputParam(containerIdStr,
          Environment.CONTAINER_ID.name());
      validateInputParam(nodeHostString, Environment.NM_HOST.name());
      validateInputParam(nodePortString, Environment.NM_PORT.name());
      validateInputParam(nodeHttpPortString,
          Environment.NM_HTTP_PORT.name());
      validateInputParam(appSubmitTimeStr,
          ApplicationConstants.APP_SUBMIT_TIME_ENV);

      ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
      ApplicationAttemptId applicationAttemptId =
          containerId.getApplicationAttemptId();
      long appSubmitTime = Long.parseLong(appSubmitTimeStr);
      
      
	  //根据当前获取的配置, 创建appMaster
      MRAppMaster appMaster =
          new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
              Integer.parseInt(nodePortString),
              Integer.parseInt(nodeHttpPortString), appSubmitTime);
      ShutdownHookManager.get().addShutdownHook(
        new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
      JobConf conf = new JobConf(new YarnConfiguration());
      conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
      
      MRWebAppUtil.initialize(conf);
      String jobUserName = System
          .getenv(ApplicationConstants.Environment.USER.name());
      conf.set(MRJobConfig.USER_NAME, jobUserName);
      // Do not automatically close FileSystem objects so that in case of
      // SIGTERM I have a chance to write out the job history. I'll be closing
      // the objects myself.
      conf.setBoolean("fs.automatic.close", false);
	  
	  //这里才是最重要的, 就是调用serviceInit和serviceStart
	  //
      initAndStartAppMaster(appMaster, conf, jobUserName);
    } catch (Throwable t) {
      LOG.fatal("Error starting MRAppMaster", t);
      ExitUtil.terminate(1, t);
    }
  }



看一下initAndStartAppMaster:
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
      final JobConf conf, String jobUserName) throws IOException,
      InterruptedException {
	  ...
	  
	  //这里就调用了MRAppMaster的init(serviceinit)和start(servicestart)方法
	  appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
      @Override
      public Object run() throws Exception {
        appMaster.init(conf);
        appMaster.start();
        if(appMaster.errorHappenedShutDown) {
          throw new IOException("Was asked to shut down.");
        }
        return null;
      }
    });
	}



serviceInit主要是初始化一堆对象, 这里就直接看一下serviceStart了:

protected void serviceStart() throws Exception {

...

//创建Job
job = createJob(getConfig(), forcedState, shutDownMessage);

...

//会启动所有的services, 比较重要的就包括containerAllocator 和containerLauncher
//containerAllocator会调用其初始化函数, 然后在跑其中的start方法, 如果我们进去看的话可以看到RMContainerAllocator实际上是抽象类AbstractService的实现 //他的init和start方法调用的是serviceinit和servicestart, 再看一下servicestart里面是启动了eventHandlingThread以及allocatorThread
//其中eventHandlingThread是负责事件处理的, allocatorThread是执行heartbeat与RM进行状态汇报和container操作的
super.serviceStart();


  // finally set the job classloader
    MRApps.setClassLoader(jobClassLoader, getConfig());

    if (initFailed) {
      JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
      jobEventDispatcher.handle(initFailedEvent);
    } else {
      // 所有都启动后, 开始启动Job
      startJobs();
    }
	
	
	}


那么既然所有的service都启动完成后, 就去看startJobs里面做了什么了:
  protected void startJobs() {
    /** create a job-start event to get this ball rolling */
    JobEvent startJobEvent = new JobStartEvent(job.getID(),
        recoveredJobStartTime);
    /** send the job-start event. this triggers the job execution. */
	
	//其实就是创建了JobStartEvent, 去JobImpl触发JobEventType.JOB_START transition
    dispatcher.getEventHandler().handle(startJobEvent);
  }


那么我们就要去看看JobImpl的状态机的定义了:
protected static final
    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 
       stateMachineFactory
     = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
              (JobStateInternal.NEW)

          // Transitions from NEW state
          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
              JobEventType.JOB_DIAGNOSTIC_UPDATE,
              DIAGNOSTIC_UPDATE_TRANSITION)
          
		  ...
			  
			  
			  //在这里, 会执行StartTransition这个方法
          .addTransition(JobStateInternal.INITED, JobStateInternal.SETUP,
              JobEventType.JOB_START,
              new StartTransition())


那么接下来看一下StartTransition在做什么 (目前所有的事情都是在AM这个container里面做的, 还没有涉及到执行RM相关的操作)
  public static class StartTransition
  implements SingleArcTransition<JobImpl, JobEvent> {
    /**
     * This transition executes in the event-dispatcher thread, though it's
     * triggered in MRAppMaster's startJobs() method.
     */
    @Override
    public void transition(JobImpl job, JobEvent event) {
      JobStartEvent jse = (JobStartEvent) event;
      if (jse.getRecoveredJobStartTime() != 0) {
        job.startTime = jse.getRecoveredJobStartTime();
      } else {
        job.startTime = job.clock.getTime();
      }
      JobInitedEvent jie =
        new JobInitedEvent(job.oldJobId,
             job.startTime,
             job.numMapTasks, job.numReduceTasks,
             job.getState().toString(),
             job.isUber());
			 
			 
			 //触发一些JobHistory相关的event  
      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
      JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
          job.appSubmitTime, job.startTime);
      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
	  //running Job +1
      job.metrics.runningJob(job);

	  //触发CommitterEventHandler的JOB_SETUP事件
      job.eventHandler.handle(new CommitterJobSetupEvent(
              job.jobId, job.jobContext));
    }
  }


那么看一下CommitterEventHandler里面JOB_SETUP都做了什么:
    public void run() {
      LOG.info("Processing the event " + event.toString());
      switch (event.getType()) {
      case JOB_SETUP:
	  //JOB_SETUP回去调用handleJobSetup方法, 看一下
        handleJobSetup((CommitterJobSetupEvent) event);
        break;
      case JOB_COMMIT:
        handleJobCommit((CommitterJobCommitEvent) event);
        break;
      case JOB_ABORT:
        handleJobAbort((CommitterJobAbortEvent) event);
        break;
      case TASK_ABORT:
        handleTaskAbort((CommitterTaskAbortEvent) event);
        break;
      default:
        throw new YarnRuntimeException("Unexpected committer event "
            + event.toString());
      }
    }
	
	//handleJobSetup
	protected void handleJobSetup(CommitterJobSetupEvent event) {
      try {
	  
	  //回去OutputCommitter执行setupJob, setupJob是一个抽象类, 会根据不同情况调用不一样的实现类, //我们这里看FileOutputCommitter的setupJob其实就是创建一个Job工作的temp路径 , 创建完后job就算setup了
        committer.setupJob(event.getJobContext());
		
		//接下来就会去触发JobImpl的JOB_SETUP_COMPLETED事件
        context.getEventHandler().handle(
            new JobSetupCompletedEvent(event.getJobID()));
      } catch (Exception e) {
        LOG.warn("Job setup failed", e);
        context.getEventHandler().handle(new JobSetupFailedEvent(
            event.getJobID(), StringUtils.stringifyException(e)));
      }
    }


那么我们又要回到JobImpl的状态机定义了:
//会调用SetupCompletedTransition方法, 并把Job状态从SETUP改为RUNNING
.addTransition(JobStateInternal.SETUP, JobStateInternal.RUNNING,
              JobEventType.JOB_SETUP_COMPLETED,
              new SetupCompletedTransition())


那么我们就去看一下SetupCompletedTransition方法:
  private static class SetupCompletedTransition
      implements SingleArcTransition<JobImpl, JobEvent> {
    @Override
    public void transition(JobImpl job, JobEvent event) {
      job.setupProgress = 1.0f;
	  
	  //schedule Map task, 这里我们就只写maptask这一部分了, 不然太长了, 如果Map部分看懂了reduce部分也不难
      job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
	  
	  //schedule reduce Task
      job.scheduleTasks(job.reduceTasks, true);

      // If we have no tasks, just transition to job completed
      if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
        job.eventHandler.handle(new JobEvent(job.jobId,
            JobEventType.JOB_COMPLETED));
      }
    }
  }


我们要去看一下scheduleTasks这个方法到底做了什么:
  protected void scheduleTasks(Set<TaskId> taskIDs,
      boolean recoverTaskOutput) {
	  
	  //task其实在我们提交Job的时候就分好了, 包含很多TaskInfo, 这里就为每个task创建一个T_SCHEDULE的event
    for (TaskId taskID : taskIDs) {
      TaskInfo taskInfo = completedTasksFromPreviousRun.remove(taskID);
	  
	  //如果是true代表之前执行过这个task, 我们这里只考虑完全新submit的 所以直接到else
      if (taskInfo != null) {
        eventHandler.handle(new TaskRecoverEvent(taskID, taskInfo,
            committer, recoverTaskOutput));
      } else {
	  //为每个task触发TaskImpl的T_SCHEDULE事件
        eventHandler.handle(new TaskEvent(taskID, TaskEventType.T_SCHEDULE));
      }
    }
  }


一样, 我们需要去看一下TaskImpl里面是怎么定义状态机, 怎么处理T_SCHEDULE事件的:
  private static final StateMachineFactory
               <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent> 
            stateMachineFactory 
           = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
               (TaskStateInternal.NEW)

    // define the state machine of Task

    // Transitions from NEW state
	//调用InitialScheduleTransition方法, 并将task状态转为SCHEDULED
    .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED, 
        TaskEventType.T_SCHEDULE, new InitialScheduleTransition())


那么看一下InitialScheduleTransition这个方法:
 
  private static class InitialScheduleTransition
    implements SingleArcTransition<TaskImpl, TaskEvent> {

    @Override
    public void transition(TaskImpl task, TaskEvent event) {
	
	//这里, 开始schedule尝试执行这个task
      task.addAndScheduleAttempt(Avataar.VIRGIN);
      task.scheduledTime = task.clock.getTime();
      task.sendTaskStartedEvent();
    }
  }
  
    private void addAndScheduleAttempt(Avataar avataar) {
	
	//创建一个TaskAttempt
    TaskAttempt attempt = addAttempt(avataar);
    inProgressAttempts.add(attempt.getID());
    //schedule the nextAttemptNumber
	//如果有失败的taskattempt, 那么就reschedule去执行, 我们这里同样只考虑全新的task attempt
    if (failedAttempts.size() > 0) {
      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
          TaskAttemptEventType.TA_RESCHEDULE));
    } else {
	
	//触发taskattemptImpl的TA_SCHEDULE
      eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
          TaskAttemptEventType.TA_SCHEDULE));
    }
  }


那么我们就要去TaskAttemptImpl里面去看一下相关的状态机了:
 private static final StateMachineFactory
        <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
        stateMachineFactory
    = new StateMachineFactory
             <TaskAttemptImpl, TaskAttemptStateInternal, TaskAttemptEventType, TaskAttemptEvent>
           (TaskAttemptStateInternal.NEW)

     // Transitions from the NEW state.
	 //执行的是RequestContainerTransition 并且把状态改为UNASSIGNED
     .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.UNASSIGNED,
         TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition(false))


看一下RequestContainerTransition的方法:
  static class RequestContainerTransition implements
      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
    private final boolean rescheduled;
    public RequestContainerTransition(boolean rescheduled) {
      this.rescheduled = rescheduled;
    }
    @SuppressWarnings("unchecked")
    @Override
    public void transition(TaskAttemptImpl taskAttempt, 
        TaskAttemptEvent event) {
      // 通知   要去拿container啦
      taskAttempt.eventHandler.handle
          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), +1));
      //request for container
	  //不考虑reschedule的 直接去else
      if (rescheduled) {
        taskAttempt.eventHandler.handle(
            ContainerRequestEvent.createContainerRequestEventForFailedContainer(
                taskAttempt.attemptId, 
                taskAttempt.resourceCapability));
      } else {
	  //为这个task创建一个ContainerRequestEvent事件, 这样会去调用RMcontainerAllocator的CONTAINER_REQ事件
	  //RMcontainerAllocator是在AppMaster创建时候就有的, 那么接下去就要去看一下RMcontainerAllocator是如何处置CONTAINER_REQ事件的
        taskAttempt.eventHandler.handle(new ContainerRequestEvent(
            taskAttempt.attemptId, taskAttempt.resourceCapability,
            taskAttempt.dataLocalHosts.toArray(
                new String[taskAttempt.dataLocalHosts.size()]),
            taskAttempt.dataLocalRacks.toArray(
                new String[taskAttempt.dataLocalRacks.size()])));
      }
    }
  }


去RMcontainerAllocator看一下:
protected synchronized void handleEvent(ContainerAllocatorEvent event) {
    recalculateReduceSchedule = true;
    if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
	
	...
	
	//如果是Map task则执行这一段
	if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
		//中间一大堆, 主要就是确认Map所要求的资源在这个集群内能不能提供, 不能就kill掉, 可以就做下面这个动作
		//addMap里面算是把maptask schedule了
		scheduledRequests.addMap(reqEvent);
	}
	
	}
	
	...
	
	}


那么就得去看一下addMap里面都做了什么了:
    void addMap(ContainerRequestEvent event) {
      ContainerRequest request = null;
      
	  //我们还没开始, 轮不到failed, 那么就去看else
      if (event.getEarlierAttemptFailed()) {
        earlierFailedMaps.add(event.getAttemptID());
        request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
        LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
      } else {
	  //根据event里面保存的host 和rack去找对应的机器, 这里就体现了Hadoop会优先去找task所在的机器启动container
        for (String host : event.getHosts()) {
          LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
          if (list == null) {
            list = new LinkedList<TaskAttemptId>();
            mapsHostMapping.put(host, list);
          }
          list.add(event.getAttemptID());
          if (LOG.isDebugEnabled()) {
            LOG.debug("Added attempt req to host " + host);
          }
       }
       for (String rack: event.getRacks()) {
         LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
         if (list == null) {
           list = new LinkedList<TaskAttemptId>();
           mapsRackMapping.put(rack, list);
         }
         list.add(event.getAttemptID());
         if (LOG.isDebugEnabled()) {
            LOG.debug("Added attempt req to rack " + rack);
         }
       }
       request = new ContainerRequest(event, PRIORITY_MAP);
      }
      maps.put(event.getAttemptID(), request);
	  
	  //执行addContainerReq去想办法request container
      addContainerReq(request);
    }
    


看一下addContainerReq这个方法了:
  protected void addContainerReq(ContainerRequest req) {
    // Create resource requests
	
	//其实就是执行一个addResourceRequest, 去想办法向RM申请资源
    for (String host : req.hosts) {
      // Data-local
      if (!isNodeBlacklisted(host)) {
        addResourceRequest(req.priority, host, req.capability);
      }      
    }

    // Nothing Rack-local for now
    for (String rack : req.racks) {
      addResourceRequest(req.priority, rack, req.capability);
    }

    // Off-switch
    addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
  }


看一下addResourceRequest里面:
 private void addResourceRequest(Priority priority, String resourceName,
      Resource capability) {
    
	//里面也是一大堆, 总的来说就是创建一个remoteRequest, 然后放到ask这个set里面, 等着heartBeat的时候通过AM去要资源, 然后启动
	
	..
	
    Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
    if (reqMap == null) {
      reqMap = new HashMap<Resource, ResourceRequest>();
      remoteRequests.put(resourceName, reqMap);
    }
	
	 ResourceRequest remoteRequest = reqMap.get(capability);
    if (remoteRequest == null) {
      remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
      remoteRequest.setPriority(priority);
      remoteRequest.setResourceName(resourceName);
      remoteRequest.setCapability(capability);
      remoteRequest.setNumContainers(0);
      reqMap.put(capability, remoteRequest);
    }
	
	//添加到ask里面 等AM和RM通讯时 要container
	addResourceRequestToAsk(remoteRequest);
	
	}


好了, 刚开始的时候我们就说到heartbeat是在RMContainerAllocater这个类的heartbeat()方法做的, 那么我们看一下这个方法是怎么要container的:
protected synchronized void heartbeat() throws Exception {
    scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
	
	//和RM通讯要container的入口就在这个getResources里面, 接下来会看一下
    List<Container> allocatedContainers = getResources();
    if (allocatedContainers != null && allocatedContainers.size() > 0) {
	
	//如果拿到container的话就开始assign到NM上面去执行
      scheduledRequests.assign(allocatedContainers);
    }

    int completedMaps = getJob().getCompletedMaps();
    int completedTasks = completedMaps + getJob().getCompletedReduces();
    if ((lastCompletedTasks != completedTasks) ||
          (scheduledRequests.maps.size() > 0)) {
      lastCompletedTasks = completedTasks;
      recalculateReduceSchedule = true;
    }

    if (recalculateReduceSchedule) {
      preemptReducesIfNeeded();
      scheduleReduces(
          getJob().getTotalMaps(), completedMaps,
          scheduledRequests.maps.size(), scheduledRequests.reduces.size(), 
          assignedRequests.maps.size(), assignedRequests.reduces.size(),
          mapResourceRequest, reduceResourceRequest,
          pendingReduces.size(), 
          maxReduceRampupLimit, reduceSlowStart);
      recalculateReduceSchedule = false;
    }

    scheduleStats.updateAndLogIfChanged("After Scheduling: ");
  }


我们先看一下getResources这个方法:
 private List<Container> getResources() throws Exception {
    // will be null the first time
    Resource headRoom =
        getAvailableResources() == null ? Resources.none() :
            Resources.clone(getAvailableResources());
    AllocateResponse response;
    /*
     * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
     * milliseconds before aborting. During this interval, AM will still try
     * to contact the RM.
     */
    try {
	//这里就是去RM那边获取container
      response = makeRemoteRequest();
      // Reset retry count if no exception occurred.
      retrystartTime = System.currentTimeMillis();
    }
	
	...
	
	//将container从RM的response里面存到本地list中
	List<Container> newContainers = response.getAllocatedContainers();
	
	...
	
	//返回给heartbeat去assign
	return newContainers;
	
	
	}
	
	
	
	protected AllocateResponse makeRemoteRequest() throws YarnException,
      IOException {
    ResourceBlacklistRequest blacklistRequest =
        ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
            new ArrayList<String>(blacklistRemovals));
    AllocateRequest allocateRequest =
        AllocateRequest.newInstance(lastResponseID,
          super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
          new ArrayList<ContainerId>(release), blacklistRequest);
		  
		  //这里就是像RM要container, scheduler其实是一个RM的proxy类, 最终会去RM上面的FifoScheduler上面去allocate container并返回
		  //scheduler = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
		  //所以AM就在这里和RM的scheduler通讯 获取到分配给他的container
    AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
	
	
	...
	
	//返回
	return allocateResponse;
	
	}


接下来就要去看一下assign这个动作是怎么做的了:
 private void assign(List<Container> allocatedContainers) {
 Iterator<Container> it = allocatedContainers.iterator();
 ...
 
 while (it.hasNext()) {
 //一堆确认, 能不能assign container  忽略了
 }
 
 ...
 
 //开始assign container
 assignContainers(allocatedContainers);
 
 ...
 
 }


来看一下assignContainers这个方法:
    private void assignContainers(List<Container> allocatedContainers) {
      Iterator<Container> it = allocatedContainers.iterator();
      while (it.hasNext()) {
        Container allocated = it.next();
		
		//这里只会看是否这个task是PRIORITY_FAST_FAIL_MAP 或者 PRIORITY_REDUCE, 显然我们的是PRIORITY_MAP, 那么就执行assignMapsWithLocality
        ContainerRequest assigned = assignWithoutLocality(allocated);
        if (assigned != null) {
          containerAssigned(allocated, assigned);
          it.remove();
        }
      }

	  //执行assignMapsWithLocality
      assignMapsWithLocality(allocatedContainers);
    }
	
	
	 private void assignMapsWithLocality(List<Container> allocatedContainers) {
      // try to assign to all nodes first to match node local
	  
	  //下面代码很长, 其实就是先先去根据host是不是local 是的话就直接分了, 不是的话看一下rack是不是local, 如果还不是的话才另外分配container
	  //由此可以看出hadoop yarn是怎么分配container给task的
      Iterator<Container> it = allocatedContainers.iterator();
      while(it.hasNext() && maps.size() > 0){
        Container allocated = it.next();        
        Priority priority = allocated.getPriority();
        assert PRIORITY_MAP.equals(priority);
        // "if (maps.containsKey(tId))" below should be almost always true.
        // hence this while loop would almost always have O(1) complexity
        String host = allocated.getNodeId().getHost();
        LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
        while (list != null && list.size() > 0) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Host matched to the request list " + host);
          }
          TaskAttemptId tId = list.removeFirst();
          if (maps.containsKey(tId)) {
            ContainerRequest assigned = maps.remove(tId);
			
			//具体的container assign动作在这个类里面
            containerAssigned(allocated, assigned);
            it.remove();
            JobCounterUpdateEvent jce =
              new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
            jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
            eventHandler.handle(jce);
            hostLocalAssigned++;
            if (LOG.isDebugEnabled()) {
              LOG.debug("Assigned based on host match " + host);
            }
            break;
          }
        }
      }
      
      // try to match all rack local
      it = allocatedContainers.iterator();
      while(it.hasNext() && maps.size() > 0){
        Container allocated = it.next();
        Priority priority = allocated.getPriority();
        assert PRIORITY_MAP.equals(priority);
        // "if (maps.containsKey(tId))" below should be almost always true.
        // hence this while loop would almost always have O(1) complexity
        String host = allocated.getNodeId().getHost();
        String rack = RackResolver.resolve(host).getNetworkLocation();
        LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
        while (list != null && list.size() > 0) {
          TaskAttemptId tId = list.removeFirst();
          if (maps.containsKey(tId)) {
            ContainerRequest assigned = maps.remove(tId);
            containerAssigned(allocated, assigned);
            it.remove();
            JobCounterUpdateEvent jce =
              new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
            jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
            eventHandler.handle(jce);
            rackLocalAssigned++;
            if (LOG.isDebugEnabled()) {
              LOG.debug("Assigned based on rack match " + rack);
            }
            break;
          }
        }
      }
      
      // assign remaining
      it = allocatedContainers.iterator();
      while(it.hasNext() && maps.size() > 0){
        Container allocated = it.next();
        Priority priority = allocated.getPriority();
        assert PRIORITY_MAP.equals(priority);
        TaskAttemptId tId = maps.keySet().iterator().next();
        ContainerRequest assigned = maps.remove(tId);
        containerAssigned(allocated, assigned);
        it.remove();
        JobCounterUpdateEvent jce =
          new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
        jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
        eventHandler.handle(jce);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Assigned based on * match");
        }
      }
    }
  }


其实就是先先去根据host是不是local 是的话就直接分了, 不是的话看一下rack是不是local, 如果还不是的话才另外分配container 由此可以看出hadoop yarn是怎么分配container给task的, 那么我们就要看一下具体assign container的方法了containerAssigned:

    private void containerAssigned(Container allocated, 
                                    ContainerRequest assigned) {
      // Update resource requests
      decContainerReq(assigned);

      // send the container-assigned event to task attempt
	  //去执行TaskAttempt的TA_ASSIGNED事件
      eventHandler.handle(new TaskAttemptContainerAssignedEvent(
          assigned.attemptID, allocated, applicationACLs));

      assignedRequests.add(allocated, assigned.attemptID);

      if (LOG.isDebugEnabled()) {
        LOG.info("Assigned container (" + allocated + ") "
            + " to task " + assigned.attemptID + " on node "
            + allocated.getNodeId().toString());
      }
    }


那么又要回到TaskAttemptImpl里面状态机的定义了:
     .addTransition(TaskAttemptStateInternal.UNASSIGNED,
         TaskAttemptStateInternal.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
         new ContainerAssignedTransition())

会去执行ContainerAssignedTransition方法, 并将TaskAttempt转为ASSIGNED状态, 看一下ContainerAssignedTransition方法怎么做的:

  private static class ContainerAssignedTransition implements
      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
    @SuppressWarnings({ "unchecked" })
    @Override
    public void transition(final TaskAttemptImpl taskAttempt, 
        TaskAttemptEvent event) {
      final TaskAttemptContainerAssignedEvent cEvent = 
        (TaskAttemptContainerAssignedEvent) event;
		
		//创建container
      Container container = cEvent.getContainer();
      taskAttempt.container = container;
      // this is a _real_ Task (classic Hadoop mapred flavor):
      taskAttempt.remoteTask = taskAttempt.createRemoteTask();
      taskAttempt.jvmID =
          new WrappedJvmID(taskAttempt.remoteTask.getTaskID().getJobID(),
              taskAttempt.remoteTask.isMapTask(),
              taskAttempt.container.getId().getContainerId());
      taskAttempt.taskAttemptListener.registerPendingTask(
          taskAttempt.remoteTask, taskAttempt.jvmID);

      taskAttempt.computeRackAndLocality();
      
      //launch the container
      //create the container object to be launched for a given Task attempt
	  
	  //这个很熟悉吧, launchContext其实是启动的脚本, 具体是启动的那个类, 我后面会说一下
      ContainerLaunchContext launchContext = createContainerLaunchContext(
          cEvent.getApplicationACLs(), taskAttempt.conf, taskAttempt.jobToken,
          taskAttempt.remoteTask, taskAttempt.oldJobId, taskAttempt.jvmID,
          taskAttempt.taskAttemptListener, taskAttempt.credentials);
		  
		  //CONTAINER_REMOTE_LAUNCH会去ContainerLauncherImpl调用他的handle类 然后执行launch动作
      taskAttempt.eventHandler
        .handle(new ContainerRemoteLaunchEvent(taskAttempt.attemptId,
          launchContext, container, taskAttempt.remoteTask));

      // send event to speculator that our container needs are satisfied
      taskAttempt.eventHandler.handle
          (new SpeculatorEvent(taskAttempt.getID().getTaskId(), -1));
    }
  }
  
  
   class EventProcessor implements Runnable {
    private ContainerLauncherEvent event;

    EventProcessor(ContainerLauncherEvent event) {
      this.event = event;
    }

    @Override
    public void run() {
      LOG.info("Processing the event " + event.toString());

      // Load ContainerManager tokens before creating a connection.
      // TODO: Do it only once per NodeManager.
      ContainerId containerID = event.getContainerID();

      Container c = getContainer(event);
      switch(event.getType()) {

	  //就是在这里remote Launch的
      case CONTAINER_REMOTE_LAUNCH:
        ContainerRemoteLaunchEvent launchEvent
            = (ContainerRemoteLaunchEvent) event;
			
			//执行launch 去远程启动, 这里可以看出 RM负责分配contianer, 实际控制container的是AM
        c.launch(launchEvent);
        break;

      case CONTAINER_REMOTE_CLEANUP:
        c.kill();
        break;
      }
      removeContainerIfDone(containerID);
    }
  }


那么就要具体看一下launch这个动作是怎么启动container的:
public synchronized void launch(ContainerRemoteLaunchEvent event) {

...

//这是一个远程控制的对象
ContainerManagementProtocolProxyData proxy = null;

proxy = getCMProxy(containerMgrAddress, containerID);

//创建container的start request
StartContainerRequest startRequest =
            StartContainerRequest.newInstance(containerLaunchContext,
              event.getContainerToken());
        List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
        list.add(startRequest);
        StartContainersRequest requestList = StartContainersRequest.newInstance(list);
		
		//远程启动container, 是通过containermanagerImpl去启动的
//这个containermanagerImpl是对应的NM上面的containerManager
        StartContainersResponse response =
            proxy.getContainerManagementProtocol().startContainers(requestList);

...

}


那么就要去containermanagerImpl去看一下了, 这个startContainers里面到底做的是什么:
  public StartContainersResponse
      startContainers(StartContainersRequest requests) throws YarnException,
          IOException {
    if (blockNewContainerRequests.get()) {
      throw new NMNotYetReadyException(
        "Rejecting new containers as NodeManager has not"
            + " yet connected with ResourceManager");
    }
	
	//创建一堆token
    UserGroupInformation remoteUgi = getRemoteUgi();
    NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
    authorizeUser(remoteUgi,nmTokenIdentifier);
    List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
    Map<ContainerId, SerializedException> failedContainers =
        new HashMap<ContainerId, SerializedException>();
    for (StartContainerRequest request : requests.getStartContainerRequests()) {
      ContainerId containerId = null;
      try {
	  
	  //还是token
        ContainerTokenIdentifier containerTokenIdentifier =
            BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
        verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
          containerTokenIdentifier);
        containerId = containerTokenIdentifier.getContainerID();
		
		//在这个NM上面启动Container
        startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
          request);
        succeededContainers.add(containerId);
      } catch (YarnException e) {
        failedContainers.put(containerId, SerializedException.newInstance(e));
      } catch (InvalidToken ie) {
        failedContainers.put(containerId, SerializedException.newInstance(ie));
        throw ie;
      } catch (IOException e) {
        throw RPCUtil.getRemoteException(e);
      }
    }

    return StartContainersResponse.newInstance(getAuxServiceMetaData(),
      succeededContainers, failedContainers);
  }


在当前NM上面启动container是调用的startContainerInternal方法,其实这个方法之前也用到过, 就是去启动container的, 就不详细的进入了, 具体来说会去启动application 级别的事件, application init完 变成running状态后, 会去触发AppInitDoneTransition方法, 然后触发ontainerEventType.INIT_CONTAINER事件:
  static class AppInitDoneTransition implements
      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
    @Override
    public void transition(ApplicationImpl app, ApplicationEvent event) {
      // Start all the containers waiting for ApplicationInit
      for (Container container : app.containers.values()) {
        app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
              container.getContainerId()));
      }
    }
  }


这里就是开始初始化Container了, 看一下INIT_CONTAINER的状态机会做什么:
  private static StateMachineFactory
           <ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>
        stateMachineFactory =
      new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
    // From NEW State
    .addTransition(ContainerState.NEW,
        EnumSet.of(ContainerState.LOCALIZING,
            ContainerState.LOCALIZED,
            ContainerState.LOCALIZATION_FAILED,
            ContainerState.DONE),
        ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())


看到了, 会去触发RequestResourcesTransition方法, 视情况会改变container状态, 再来继续看一下RequestResourcesTransition:
  static class RequestResourcesTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
    @Override
    public ContainerState transition(ContainerImpl container,
        ContainerEvent event) { 
		
		...
		
		//拿启动脚本
		final ContainerLaunchContext ctxt = container.launchContext;
		
		...
		
		//最初的时候这个设置不会是空
		 Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
      if (!cntrRsrc.isEmpty()) {
		
		//这里开始就会去一步一步的去拿local resource
		
		...
		
		//继续触发各种事件, 直至到ContainerEventType.RESOURCE_LOCALIZED 为止
		//会触发LocalizedTransition 也就是resource都拿到了, 可以启动了
		//接下来就直接去看LocalizedTransition里面做什么了
		container.dispatcher.getEventHandler().handle(
              new ContainerLocalizationRequestEvent(container, req));
		
		} else {
		
		container.sendLaunchEvent();
        container.metrics.endInitingContainer();
        return ContainerState.LOCALIZED;
		
		}
		
		}
		
		}



来看一下LocalizedTransition这个方法吧:
static class LocalizedTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
    @Override
    public ContainerState transition(ContainerImpl container,
        ContainerEvent event) {
		
		//排除各种本地资源获取不到的错误后 直接启动
      ContainerResourceLocalizedEvent rsrcEvent = (ContainerResourceLocalizedEvent) event;
      List<String> syms =
          container.pendingResources.remove(rsrcEvent.getResource());
      if (null == syms) {
        LOG.warn("Localized unknown resource " + rsrcEvent.getResource() +
                 " for container " + container.containerId);
        assert false;
        // fail container?
        return ContainerState.LOCALIZING;
      }
      container.localizedResources.put(rsrcEvent.getLocation(), syms);
      if (!container.pendingResources.isEmpty()) {
        return ContainerState.LOCALIZING;
      }

      container.dispatcher.getEventHandler().handle(
          new ContainerLocalizationEvent(LocalizationEventType.
              CONTAINER_RESOURCES_LOCALIZED, container));

			  //启动container在这个地方
      container.sendLaunchEvent();
      container.metrics.endInitingContainer();
      return ContainerState.LOCALIZED;
    }
  }


那么就要去看sendLaunchEvent了:
  private void sendLaunchEvent() {
    ContainersLauncherEventType launcherEvent =
        ContainersLauncherEventType.LAUNCH_CONTAINER;
    if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
      // try to recover a container that was previously launched
      launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
    }
	//会去触发ContainersLauncherEvent, 也就是ContainersLauncherEventType.LAUNCH_CONTAINER事件
	//实际上是去ContainersLauncher的handle里面调用 case LAUNCH_CONTAINER
    dispatcher.getEventHandler().handle(
        new ContainersLauncherEvent(this, launcherEvent));
  }


去到ContainersLauncher里面看handle里面的LAUNCH_CONTAINER定义:
           case LAUNCH_CONTAINER:
        Application app =
          context.getApplications().get(
              containerId.getApplicationAttemptId().getApplicationId());

			  //创建一个ContainerLaunch对象, 里面有一个call方法, 是会去调用最初放进去的脚本文件, 然后运行他的main方法
        ContainerLaunch launch =
            new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
              event.getContainer(), dirsHandler, containerManager);
			  
			  //containerLauncher是一个ExecutorService, submit后系统会自动调用传入对象的call方法
			  //到这里为止 container就启动了, 这个mapTask就启动了  开始运行了
        containerLauncher.submit(launch);
        running.put(containerId, launch);
        break;


既然开始运行了, 那么运行的那个class是哪个呢, 其实是一个YarnChild.class, 这个类是在createContainerLaunchContext方法的时候被传入到脚本里面的, 具体的方法是在
List<String> commands = MapReduceChildJVM.getVMCommand(
        taskAttemptListener.getAddress(), remoteTask, jvmID);


在getVMCommand方法里面, 把YarnChild.class传入到命令里, 在启动container的时候就执行这个命令, 从而执行YarnChild的main方法。

在YarnChild里面的main方法里面有这么一句:
taskFinal.run(job, umbilical)

可以看出这里开始才是真正run了task。

目前为止我们知道了AM是怎么向RM要container的, 然后AM再远程启动container, 再执行脚本开始run task。

后面有空的话会去看一下map后的shuffle过程是怎么做的, reduce这边就不看了, 整个过程应该和map task差不多, AM去RM申请contianer然后启动, 再执行
分享到:
评论

相关推荐

    hadoop 源码解析_yarn源码解析

    Hadoop 源码解析_Yarn 源码解析 Hadoop 是一个基于 Java 的大数据处理框架,Yarn 是 Hadoop 的资源管理器,负责资源分配、任务调度和集群管理。下面是 Yarn 源码解析的知识点: 1. MR 程序提交 MR(MapReduce)...

    hadoop源码解析-Job提交.pdf

    通过理解这些源码细节,我们可以更好地掌握Hadoop作业的生命周期,从Job提交到MapTask和ReduceTask的执行,这对于优化大数据处理性能和调试Hadoop应用程序至关重要。这些深入的源码分析对于开发人员和系统管理员来说...

    MapReduce Job本地提交过程源码跟踪及分析

    Job本地提交过程是MapReduce执行任务的一个重要环节,特别是在开发和调试阶段,理解这一过程对于优化性能和解决潜在问题至关重要。本文将深入源码层面,分析MapReduce Job在本地提交的详细步骤。 首先,我们来了解...

    Hadoop多Job并行处理的实例详解

    在Hadoop环境中,多Job并行处理是一种优化大数据处理效率的关键技术。通过对多个Job的并发执行,可以在集群中更有效地利用资源,缩短整体作业的执行时间。本实例将详细讲解如何实现Hadoop多Job并行处理,并提供相关...

    php hadoop源码

    在源码中,可能会有一个`Job.php`文件,包含了`configure()`和`submit()`等方法。 4. **监控和调试**:在处理大数据时,监控作业状态和日志至关重要。PHP库可能提供了获取作业状态、查看日志、异常处理等功能。在...

    MapReduce Job集群提交过程源码跟踪及分析

    在Hadoop MapReduce框架中,Job的提交过程是整个分布式计算流程中的关键步骤。这个过程涉及到客户端、JobTracker(在Hadoop 2.x版本中被ResourceManager替代)和TaskTracker(在Hadoop 2.x版本中被NodeManager替代)...

    eclipse的hadoop2.7插件以及hadoop-common编译文件

    在IT行业中,Eclipse是一款广泛使用的Java集成开发环境(IDE),而Hadoop是Apache软件基金会的一个开源项目,主要处理大数据的分布式存储和计算。本文将详细介绍如何使用Eclipse的Hadoop2.7插件以及Hadoop-common...

    Hadoop命令手册

    由于提供的内容中并没有具体的知识点描述,而只是对于“Hadoop命令手册”的标题和描述,以及一些特殊的符号,显然这些符号并不能转换为有价值的IT知识点。但是既然您需要详细的知识点,那么接下来,我会根据标题...

    hadoop-2.6.0-hadoop.dll-winutils.exe

     at org.apache.hadoop.mapreduce.Job.submit(Job.java:1293)  at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1314)  at com.lxz.mr.WCApp.main(WCApp.java:27) 原因是hadoop.dll文件和...

    hadoop eclips 的插件 和实例程序

    Hadoop是大数据处理领域的重要工具,它提供了一个分布式文件系统(HDFS)和MapReduce计算框架,使得在大规模数据集上进行并行处理成为可能。Eclipse是一款流行的Java集成开发环境(IDE),对于Hadoop开发者来说,...

    java web程序调用hadoop2.6

    Java Web程序调用Hadoop 2.6是一个关键的技术整合,它允许Web应用程序与Hadoop分布式文件系统(HDFS)和MapReduce框架交互,以处理大规模数据。在本示例中,我们将深入探讨如何实现这一集成,以及涉及的关键概念和...

    hadoop命令手册

    用法:hadoop job [GENERIC_OPTIONS] [-submit &lt;job-file&gt;] | [-status &lt;job-id&gt;] | [-counter &lt;job-id&gt; &lt;group-name&gt; ] | [-kill &lt;job-id&gt;] | [-events &lt;job-id&gt; &lt;from-event-#&gt; ] | [-history [all] ] | [-list ...

    hadoop 面试题大全

    在Job提交流程中,waitForCompletion()方法会调用submit(),然后连接到JobTracker(YARN中的ResourceManager),初始化并提交Job。submitJobInternal()创建Stage,进行数据分片和任务分配。在MapReduce过程中,...

    hadoop大数据常用命令

    - **用法**: `hadoop job [GENERIC_OPTIONS] [-submit &lt;job-file&gt;] | [-status &lt;job-id&gt;] | [-counter &lt;job-id&gt; &lt;group-name&gt; ] | [-kill &lt;job-id&gt;] | [-events &lt;job-id&gt; &lt;from-event-#&gt; ] | [-history [all] ] | ...

    hadoop大数据实战培训教材 spark 培训教材

    在大数据处理领域,Hadoop和Spark是两个至关重要的框架,它们在存储和处理海量数据方面发挥着核心作用。本套培训教材旨在为学习者提供深入理解及实践这两个框架的全面指导。 Hadoop,由Apache软件基金会开发,是...

    第02节:hadoop精讲之map reduce原理及代码.rar

    最后,通过Job的submit()方法提交作业到Hadoop集群。 在Hadoop生态系统中,MapReduce可以与其他组件如HDFS(Hadoop Distributed File System)和YARN(Yet Another Resource Negotiator)紧密配合,实现高效的数据...

    MapReduce - WordCount案例 - 含各种部署方式源码

    MapReduce是Google提出的一种分布式计算模型,被广泛应用于大数据处理领域,尤其是在Hadoop框架下。本文将详细解析MapReduce在实现WordCount案例中的原理、步骤以及如何通过Java进行编程,并涵盖本地提交和远程调用...

    Hadoop&Spark安装、环境配置、使用教程.pdf

    ### Hadoop & Spark 安装、环境配置及使用...通过上述步骤,我们可以完成Hadoop与Spark的基本安装配置,并掌握它们的基本使用方法以及进行简单的数据处理示例。这些基础技能对于进一步深入学习大数据处理技术非常关键。

    hadoop命令指南

    在深入探讨Hadoop命令的具体用法之前,我们先来了解一下Hadoop的基本概念以及如何调用这些命令。 **1.1 泛用选项** Hadoop提供了一套泛用选项,这些选项被多个命令支持。以下是常见的泛用选项: - `-conf ...

Global site tag (gtag.js) - Google Analytics