`

Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析 (中)

阅读更多
继续上一篇文章, 那时候AM Allocation已经生成, 就等着NM 的心跳来找到有资源的NM, 再去启动, 那么假设一个NM 心跳, 然后走的就是RMNodeImpl的状态机的RMNodeEventType.STATUS_UPDATE事件, 看一下事件定义:
  private static final StateMachineFactory<RMNodeImpl,
                                           NodeState,
                                           RMNodeEventType,
                                           RMNodeEvent> stateMachineFactory 
                 = new StateMachineFactory<RMNodeImpl,
                                           NodeState,
                                           RMNodeEventType,
                                           RMNodeEvent>(NodeState.NEW)
  
     //Transitions from NEW state
     .addTransition(NodeState.NEW, NodeState.RUNNING, 
         RMNodeEventType.STARTED, new AddNodeTransition())
     .addTransition(NodeState.NEW, NodeState.NEW,
         RMNodeEventType.RESOURCE_UPDATE, 
         new UpdateNodeResourceWhenUnusableTransition())

     //Transitions from RUNNING state
     .addTransition(NodeState.RUNNING,
         EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
         RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
		 
		 //调用了StatusUpdateWhenHealthyTransition的方法


那么我们要继续看到StatusUpdateWhenHealthyTransition去, 看注释:
public static class StatusUpdateWhenHealthyTransition implements
      MultipleArcTransition<RMNodeImpl, RMNodeEvent, NodeState> {
    @Override
    public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {

      RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;

	  //心跳基本信息确认
      // Switch the last heartbeatresponse.
      rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();

      NodeHealthStatus remoteNodeHealthStatus = 
          statusEvent.getNodeHealthStatus();
      rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
      rmNode.setLastHealthReportTime(
          remoteNodeHealthStatus.getLastHealthReportTime());
		  
		  //这个If一般走不进去
      if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
        LOG.info("Node " + rmNode.nodeId + " reported UNHEALTHY with details: "
            + remoteNodeHealthStatus.getHealthReport());
        rmNode.nodeUpdateQueue.clear();
        // Inform the scheduler
        rmNode.context.getDispatcher().getEventHandler().handle(
            new NodeRemovedSchedulerEvent(rmNode));
        rmNode.context.getDispatcher().getEventHandler().handle(
            new NodesListManagerEvent(
                NodesListManagerEventType.NODE_UNUSABLE, rmNode));
        // Update metrics
        rmNode.updateMetricsForDeactivatedNode(rmNode.getState(),
            NodeState.UNHEALTHY);
        return NodeState.UNHEALTHY;
      }

      rmNode.handleContainerStatus(statusEvent.getContainers());

	  //心跳实际作用内容, 会通过dispatcher去通知scheduler进行NodeUpdateSchedulerEvent, 也就是SchedulerEventType.NODE_UPDATE
	  //那么就要去看FifoScheduler的NODE_UPDATE事件了
      if(rmNode.nextHeartBeat) {
        rmNode.nextHeartBeat = false;
        rmNode.context.getDispatcher().getEventHandler().handle(
            new NodeUpdateSchedulerEvent(rmNode));
      }

      // Update DTRenewer in secure mode to keep these apps alive. Today this is
      // needed for log-aggregation to finish long after the apps are gone.
      if (UserGroupInformation.isSecurityEnabled()) {
        rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
          statusEvent.getKeepAliveAppIds());
      }

      return NodeState.RUNNING;
    }
  }


接下来就要去FifoScheduler的Node Update事件了, 看一下这个事件的处理:
    case NODE_UPDATE:
    {
      NodeUpdateSchedulerEvent nodeUpdatedEvent = 
      (NodeUpdateSchedulerEvent)event;
      nodeUpdate(nodeUpdatedEvent.getRMNode());
    }


调用nodeUpdate方法:
  private synchronized void nodeUpdate(RMNode rmNode) {
    FiCaSchedulerNode node = getNode(rmNode.getNodeID());
    
	//从NM中拿到各种信息
    List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
    for(UpdatedContainerInfo containerInfo : containerInfoList) {
      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
      completedContainers.addAll(containerInfo.getCompletedContainers());
    }
	
	//已存信息处理
    // Processing the newly launched containers
    for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
    }

	//已存信息处理
    // Process completed containers
    for (ContainerStatus completedContainer : completedContainers) {
      ContainerId containerId = completedContainer.getContainerId();
      LOG.debug("Container FINISHED: " + containerId);
      completedContainer(getRMContainer(containerId), 
          completedContainer, RMContainerEventType.FINISHED);
    }


    if (rmContext.isWorkPreservingRecoveryEnabled()
        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
      return;
    }

	//判断是否有资源来处理一个Allocation, 我们AM返回的就是一个Allocation, 在scheduler中等着
	//如果这里刚好能够执行这个Allocation, 那么就会进入
    if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
            node.getAvailableResource(),minimumAllocation)) {
      LOG.debug("Node heartbeat " + rmNode.getNodeID() + 
          " available resource = " + node.getAvailableResource());

		  //分配container
      assignContainers(node);

      LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
          + node.getAvailableResource());
    }

    updateAvailableResourcesMetrics();
  }



可以看到如果碰到资源足够的NM, 那么就会执行assignContainers这个方法, 很明显了, 就是要给这个Node起一个AM的container, 继续看这个方法:
private void assignContainers(FiCaSchedulerNode node) {
    LOG.debug("assignContainers:" +
        " node=" + node.getRMNode().getNodeAddress() + 
        " #applications=" + applications.size());

    // Try to assign containers to applications in fifo order
    for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications
        .entrySet()) {
		
		//先进先出原则 拿出当前的application
      FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
      if (application == null) {
        continue;
      }

      LOG.debug("pre-assignContainers");
      application.showRequests();
      synchronized (application) {
        // Check if this resource is on the blacklist
        if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
          continue;
        }
        
		//根据priority高低顺序来安排assign
        for (Priority priority : application.getPriorities()) {
          int maxContainers = 
            getMaxAllocatableContainers(application, priority, node, 
                NodeType.OFF_SWITCH); 
          // Ensure the application needs containers of this priority
		  
		  //有要assign的
          if (maxContainers > 0) {
		  
		  //assign到NM上, 要看一下assignContainersOnNode
            int assignedContainers = 
              assignContainersOnNode(node, application, priority);
            // Do not assign out of order w.r.t priorities
            if (assignedContainers == 0) {
              break;
            }
          }
        }
      }
      
      LOG.debug("post-assignContainers");
      application.showRequests();

      // Done
      if (Resources.lessThan(resourceCalculator, clusterResource,
              node.getAvailableResource(), minimumAllocation)) {
        break;
      }
    }

    // Update the applications' headroom to correctly take into
    // account the containers assigned in this update.
    for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
      FiCaSchedulerApp attempt =
          (FiCaSchedulerApp) application.getCurrentAppAttempt();
      if (attempt == null) {
        continue;
      }
      updateAppHeadRoom(attempt);
    }
  }


那么继续看一下assignContainersOnNode方法:

 private int assignContainersOnNode(FiCaSchedulerNode node, 
      FiCaSchedulerApp application, Priority priority 
  ) {
    // Data-local
	
	//不一样的container 不一样的方法,但都会调用assignContainer的方法
	//但是最终都会到FicaScheduler的allocate方法, 里面会触发RMContainerEventType.START事件
	//就不细看下去了
    int nodeLocalContainers = 
      assignNodeLocalContainers(node, application, priority); 

    // Rack-local
    int rackLocalContainers = 
      assignRackLocalContainers(node, application, priority);

    // Off-switch
    int offSwitchContainers =
      assignOffSwitchContainers(node, application, priority);


    LOG.debug("assignContainersOnNode:" +
        " node=" + node.getRMNode().getNodeAddress() + 
        " application=" + application.getApplicationId().getId() +
        " priority=" + priority.getPriority() + 
        " #assigned=" + 
        (nodeLocalContainers + rackLocalContainers + offSwitchContainers));


    return (nodeLocalContainers + rackLocalContainers + offSwitchContainers);
  }


当触发RMContainerEventType.START事件后, 会去RMContainerImpl的状态机去执行对应的方法, 状态机:
 stateMachineFactory = new StateMachineFactory<RMContainerImpl, 
       RMContainerState, RMContainerEventType, RMContainerEvent>(
      RMContainerState.NEW)

    // Transitions from NEW state
    .addTransition(RMContainerState.NEW, RMContainerState.ALLOCATED,
        RMContainerEventType.START, new ContainerStartedTransition())
    .addTransition(RMContainerState.NEW, RMContainerState.KILLED,
        RMContainerEventType.KILL)


回去执行ContainerStartedTransition方法:
  private static final class ContainerStartedTransition extends
      BaseTransition {

    @Override
    public void transition(RMContainerImpl container, RMContainerEvent event) {
	
	//在NM尝试启动container, RMAppAttemptContainerAllocatedEvent = RMAppAttemptEventType.CONTAINER_ALLOCATED
      container.eventHandler.handle(new RMAppAttemptContainerAllocatedEvent(
          container.appAttemptId));
    }
  }


回到RMAppAttemptImpl看一下  RMAppAttemptEventType.CONTAINER_ALLOCATED 触发的是什么:
.addTransition(RMAppAttemptState.SCHEDULED,
          EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING,
            RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.CONTAINER_ALLOCATED,
          new AMContainerAllocatedTransition())


执行的是AMContainerAllocatedTransition方法:
  private static final class AMContainerAllocatedTransition
      implements
      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
    @Override
    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
      // Acquire the AM container from the scheduler.
	  
	  //从scheduler那边拿到这个AMallocation
      Allocation amContainerAllocation =
          appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
            EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,
            null);
      // There must be at least one container allocated, because a
      // CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
      // and is put in SchedulerApplication#newlyAllocatedContainers.

      // Note that YarnScheduler#allocate is not guaranteed to be able to
      // fetch it since container may not be fetchable for some reason like
      // DNS unavailable causing container token not generated. As such, we
      // return to the previous state and keep retry until am container is
      // fetched.
      if (amContainerAllocation.getContainers().size() == 0) {
        appAttempt.retryFetchingAMContainer(appAttempt);
        return RMAppAttemptState.SCHEDULED;
      }

	  //设置各种AM 的属性
      // Set the masterContainer
      appAttempt.setMasterContainer(amContainerAllocation.getContainers()
          .get(0));
		  
		  //拿到RMContainer对象
      RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler
          .getRMContainer(appAttempt.getMasterContainer().getId());
      rmMasterContainer.setAMContainer(true);
      // The node set in NMTokenSecrentManager is used for marking whether the
      // NMToken has been issued for this node to the AM.
      // When AM container was allocated to RM itself, the node which allocates
      // this AM container was marked as the NMToken already sent. Thus,
      // clear this node set so that the following allocate requests from AM are
      // able to retrieve the corresponding NMToken.
      appAttempt.rmContext.getNMTokenSecretManager()
        .clearNodeSetForAttempt(appAttempt.applicationAttemptId);
      appAttempt.getSubmissionContext().setResource(
        appAttempt.getMasterContainer().getResource());
		
		//执行attempt的storeAttempt, 接下来看
      appAttempt.storeAttempt();
      return RMAppAttemptState.ALLOCATED_SAVING;
    }
  }


那么就到appAttempt.storeAttempt()去了:
  private void storeAttempt() {
    // store attempt data in a non-blocking manner to prevent dispatcher
    // thread starvation and wait for state to be saved
    LOG.info("Storing attempt: AppId: " + 
              getAppAttemptId().getApplicationId() 
              + " AttemptId: " + 
              getAppAttemptId()
              + " MasterContainer: " + masterContainer);
			  
			  //将attempt保存草resource manager的statestore中
    rmContext.getStateStore().storeNewApplicationAttempt(this);
  }


看一下storeNewApplicationAttempt:
 public synchronized void storeNewApplicationAttempt(RMAppAttempt appAttempt) {
    Credentials credentials = getCredentialsFromAppAttempt(appAttempt);

    AggregateAppResourceUsage resUsage =
        appAttempt.getRMAppAttemptMetrics().getAggregateAppResourceUsage();
    ApplicationAttemptState attemptState =
        new ApplicationAttemptState(appAttempt.getAppAttemptId(),
          appAttempt.getMasterContainer(), credentials,
          appAttempt.getStartTime(), resUsage.getMemorySeconds(),
          resUsage.getVcoreSeconds());

		  //statestore里面会去执行RMStateStoreAppAttemptEvent事件也就是触发RMStateStoreEventType.STORE_APP_ATTEMPT事件
		  //实际上是会去处理RMStateStore里面执行RMStateStoreEventType.STORE_APP_ATTEMPT事件
    dispatcher.getEventHandler().handle(
      new RMStateStoreAppAttemptEvent(attemptState));
  }


那么就要去statestore看一下这个事件触发了什么动作:
private static final StateMachineFactory<RMStateStore,
                                           RMStateStoreState,
                                           RMStateStoreEventType, 
                                           RMStateStoreEvent>
      stateMachineFactory = new StateMachineFactory<RMStateStore,
                                                    RMStateStoreState,
                                                    RMStateStoreEventType,
                                                    RMStateStoreEvent>(
      RMStateStoreState.DEFAULT)
      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
          RMStateStoreEventType.STORE_APP, new StoreAppTransition())
      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
          RMStateStoreEventType.UPDATE_APP, new UpdateAppTransition())
      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
          RMStateStoreEventType.REMOVE_APP, new RemoveAppTransition())
		  
		  //在这里, 要去看StoreAppAttemptTransition了
      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
          RMStateStoreEventType.STORE_APP_ATTEMPT, new StoreAppAttemptTransition())
      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
          RMStateStoreEventType.UPDATE_APP_ATTEMPT, new UpdateAppAttemptTransition());


那么接下来就去看StoreAppAttemptTransition方法:
  private static class StoreAppAttemptTransition implements
      SingleArcTransition<RMStateStore, RMStateStoreEvent> {
    @Override
    public void transition(RMStateStore store, RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreAppAttemptEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return;
      }
	  
	  //attempt state
      ApplicationAttemptState attemptState =
          ((RMStateStoreAppAttemptEvent) event).getAppAttemptState();
      try {
	  
	  //拿attemptdata
        ApplicationAttemptStateData attemptStateData = 
            ApplicationAttemptStateData.newInstance(attemptState);
        if (LOG.isDebugEnabled()) {
          LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
        }
		//保存
        store.storeApplicationAttemptStateInternal(attemptState.getAttemptId(),
            attemptStateData);
			
			//通知事件处理器去处理RMAppAttemptEventType.ATTEMPT_NEW_SAVED事件
        store.notifyApplicationAttempt(new RMAppAttemptEvent
               (attemptState.getAttemptId(),
               RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
      } catch (Exception e) {
        LOG.error("Error storing appAttempt: " + attemptState.getAttemptId(), e);
        store.notifyStoreOperationFailed(e);
      }
    };
  }


RMAppAttemptEventType.ATTEMPT_NEW_SAVED事件事件的定义:
 .addTransition(RMAppAttemptState.ALLOCATED_SAVING, 
          RMAppAttemptState.ALLOCATED,
          RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())


将状态改为RMAppAttemptState.ALLOCATED, 执行AttemptStoredTransition():

  private static final class AttemptStoredTransition extends BaseTransition {
    @Override
    public void transition(RMAppAttemptImpl appAttempt,
                                                    RMAppAttemptEvent event) {

//既然attempt已经保存了, 那么接下来就是启动了, 这个attempt是AM启动的attempt
      appAttempt.launchAttempt();
    }
  }


再去看一下launchAttempt:
  public synchronized void  handle(AMLauncherEvent appEvent) {
    AMLauncherEventType event = appEvent.getType();
    RMAppAttempt application = appEvent.getAppAttempt();
    switch (event) {
	//AMLauncherEventType.LAUNCH事件被ApplicationMasterLauncher捕获
    case LAUNCH:
      launch(application);
      break;
    case CLEANUP:
      cleanup(application);
    default:
      break;
    }
  }


然后再launch里面执行了:
  private void launch(RMAppAttempt application) {
  
  //创建了一个Runnable 的AMLauncher, 然后放入到masterEvents中
    Runnable launcher = createRunnableLauncher(application, 
        AMLauncherEventType.LAUNCH);
    masterEvents.add(launcher);
  }
  
    
  protected Runnable createRunnableLauncher(RMAppAttempt application, 
      AMLauncherEventType event) {
    Runnable launcher =
        new AMLauncher(context, application, event, getConfig());
    return launcher;
  }


在ApplicationMasterLauncher启动的时候会同时启动一个thread:
this.launcherHandlingThread = new LauncherThread();

这个LauncherThread中会从masterEvents将放进去的Runnable一个一个取出来, 然后执行run方法:
private class LauncherThread extends Thread {
    
    public LauncherThread() {
      super("ApplicationMaster Launcher");
    }

    @Override
    public void run() {
      while (!this.isInterrupted()) {
        Runnable toLaunch;
        try {
          toLaunch = masterEvents.take();
          launcherPool.execute(toLaunch);
        } catch (InterruptedException e) {
          LOG.warn(this.getClass().getName() + " interrupted. Returning.");
          return;
        }
      }
    }
  }    


那么就要去看一下AMLauncher的run方法了:
public void run() {
    switch (eventType) {
	
	//AMLauncher的run方法, 根据传进来的eventtype执行Launch或者cleanup
	//我们这里就是去执行Launch动作
    case LAUNCH:
      try {
        LOG.info("Launching master" + application.getAppAttemptId());
        launch();
        handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
            RMAppAttemptEventType.LAUNCHED));
      } catch(Exception ie) {
        String message = "Error launching " + application.getAppAttemptId()
            + ". Got exception: " + StringUtils.stringifyException(ie);
        LOG.info(message);
        handler.handle(new RMAppAttemptLaunchFailedEvent(application
            .getAppAttemptId(), message));
      }
      break;
    case CLEANUP:
      try {
        LOG.info("Cleaning master " + application.getAppAttemptId());
        cleanup();
      } catch(IOException ie) {
        LOG.info("Error cleaning master ", ie);
      } catch (YarnException e) {
        StringBuilder sb = new StringBuilder("Container ");
        sb.append(masterContainer.getId().toString());
        sb.append(" is not handled by this NodeManager");
        if (!e.getMessage().contains(sb.toString())) {
          // Ignoring if container is already killed by Node Manager.
          LOG.info("Error cleaning master ", e);          
        }
      }
      break;
    default:
      LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");
      break;
    }
  }


在laynch里面只要执行了launch(), 然后通知事件处理器执行RMAppAttemptEventType.LAUNCHED事件

其实从字面意思来理解 这个launch方法必然是去启动AM的container的, 来看一下怎么启动的:
  private void launch() throws IOException, YarnException {
    connect();
    ContainerId masterContainerID = masterContainer.getId();
	
	//这里就是我们刚开始的时候把启动脚本拼接起来存储进去的对象, 马上就要用到了
    ApplicationSubmissionContext applicationContext =
      application.getSubmissionContext();
    LOG.info("Setting up container " + masterContainer
        + " for AM " + application.getAppAttemptId());  
		
		//根据创建LaunchContext
    ContainerLaunchContext launchContext =
        createAMContainerLaunchContext(applicationContext, masterContainerID);

		//将LaunchContext封装到startContainerRequest里面
    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(launchContext,
          masterContainer.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(scRequest);
	
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);

		//启动AM Container
    StartContainersResponse response =
        containerMgrProxy.startContainers(allRequests);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(masterContainerID)) {
      Throwable t =
          response.getFailedRequests().get(masterContainerID).deSerialize();
      parseAndThrowException(t);
    } else {
      LOG.info("Done launching container " + masterContainer + " for AM "
          + application.getAppAttemptId());
    }
  }


通过containerMgrProxy.startContainers(allRequests); ContainerManagerImpl来创建传入的request, 看一下:
  public StartContainersResponse
      startContainers(StartContainersRequest requests) throws YarnException,
          IOException {
    
	...
		
		//根据request的list, 启动所有的container
    for (StartContainerRequest request : requests.getStartContainerRequests()) {
      ContainerId containerId = null;
      try {
        ContainerTokenIdentifier containerTokenIdentifier =
            BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
        verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
          containerTokenIdentifier);
        containerId = containerTokenIdentifier.getContainerID();
		
		//启动container
        startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
          request);
        succeededContainers.add(containerId);
      } catch (YarnException e) {
        failedContainers.put(containerId, SerializedException.newInstance(e));
      } catch (InvalidToken ie) {
        failedContainers.put(containerId, SerializedException.newInstance(ie));
        throw ie;
      } catch (IOException e) {
        throw RPCUtil.getRemoteException(e);
      }
    }

    return StartContainersResponse.newInstance(getAuxServiceMetaData(),
      succeededContainers, failedContainers);
  }


会通过startContainerInternal来启动container:
 private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
      ContainerTokenIdentifier containerTokenIdentifier,
      StartContainerRequest request) throws YarnException, IOException {
	  
	  ...
	  //拿到之前设置的launchContext, 包含着启动脚本
	  ContainerLaunchContext launchContext = request.getContainerLaunchContext();
	  
	  ...
	  
	  //创建container对象
	  Container container =
        new ContainerImpl(getConfig(), this.dispatcher,
            context.getNMStateStore(), launchContext,
          credentials, metrics, containerTokenIdentifier);
	  	  
	  ...
	  
	  //初始化container, 在ApplicationImpl中触发启动container, 最后是会去ContainerImpl里面的状态机执行ContainerEventType.INIT_CONTAINER事件
	  dispatcher.getEventHandler().handle(
          new ApplicationContainerInitEvent(container));
	  
	  
	  ...
	  
	    
	  }


看一下ContainerEventType.INIT_CONTAINER事件的状态机:
  private static StateMachineFactory
           <ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>
        stateMachineFactory =
      new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
    // From NEW State
    .addTransition(ContainerState.NEW,
        EnumSet.of(ContainerState.LOCALIZING,
            ContainerState.LOCALIZED,
            ContainerState.LOCALIZATION_FAILED,
            ContainerState.DONE),
        ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())


会去执行RequestResourcesTransition, 顾名思义 去要资源了, 看一下这个方法:
static class RequestResourcesTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
	  
	  ...
	  
	  //获取到启动脚本
	   final ContainerLaunchContext ctxt = container.launchContext;
      container.metrics.initingContainer();

	  ...
	  
	  //启动
	  container.sendLaunchEvent();
        container.metrics.endInitingContainer();
	  
	  }

看到, 最后就是去启动脚本了:
  private void sendLaunchEvent() {
    ContainersLauncherEventType launcherEvent =
        ContainersLauncherEventType.LAUNCH_CONTAINER;
    if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
      // try to recover a container that was previously launched
      launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
    }
	
	//触发containerLaunch事件,  ContainersLauncherEventType.LAUNCH_CONTAINER 
	//这个事件将在ContainersLauncher中的handle执行
    dispatcher.getEventHandler().handle(
        new ContainersLauncherEvent(this, launcherEvent));
  }


那么就接下来看一下ContainersLauncher的handle方法:
public void handle(ContainersLauncherEvent event) {
    // TODO: ContainersLauncher launches containers one by one!!
    Container container = event.getContainer();
    ContainerId containerId = container.getContainerId();
    switch (event.getType()) {
      case LAUNCH_CONTAINER:
        Application app =
          context.getApplications().get(
              containerId.getApplicationAttemptId().getApplicationId());

        ContainerLaunch launch =
            new ContainerLaunch(context, getConfig(), dispatcher, exec, app,
              event.getContainer(), dirsHandler, containerManager);
			  
			  //直接submit到containerLauncher里面
			  //containerLauncher是一个ExecutorService, submit进去后, 会去执行launch的call方法, 那么我们就要接下来看一下call这个方法了
        containerLauncher.submit(launch);
        running.put(containerId, launch);
        break;
		
     ...
	 
    }
  }


ContainerLaunch 的call方法, 这个方法很长, 那么就只抓其中几个比较重点的方法来看一下了:

public Integer call() {

	//这个包含启动脚本的launchContext
	final ContainerLaunchContext launchContext = container.getLaunchContext();

	...
	//拿出启动的命令
	final List<String> command = launchContext.getCommands();
	
	...
	//将command+日志路径的变量 放到一个List里面再设置回launchcontext
	List<String> newCmds = new ArrayList<String>(command.size());
	
	for (String str : command) {
        // TODO: Should we instead work via symlinks without this grammar?
        newCmds.add(expandEnvironment(str, containerLogDir));
      }
      launchContext.setCommands(newCmds)
	
	...
	
	//通知container已经启动了, 修改状态
	dispatcher.getEventHandler().handle(new ContainerEvent(
            containerID,
            ContainerEventType.CONTAINER_LAUNCHED));
      context.getNMStateStore().storeContainerLaunched(containerID);

      // Check if the container is signalled to be killed.
      if (!shouldLaunchContainer.compareAndSet(false, true)) {
        LOG.info("Container " + containerIdStr + " not launched as "
            + "cleanup already called");
        ret = ExitCode.TERMINATED.getExitCode();
      }
      else {
        exec.activateContainer(containerID, pidFilePath);
		//启动container
        ret = exec.launchContainer(container, nmPrivateContainerScriptPath,
                nmPrivateTokensPath, user, appIdStr, containerWorkDir,
                localDirs, logDirs);
      }
	
	...
	
}

exec.launchContainer会根据不一样的环境去执行不一样的类, 比如说LinuxContainerExecutor,

这个类会执行:
shExec = new ShellCommandExecutor(commandArray, null, // NM's cwd
            container.getLaunchContext().getEnvironment());

shExec.execute();


在execute里面就去执行我们刚刚开始就设置的command, 也就是MRAppMaster的main方法。

至此 AM container启动完毕, 后面就看MRAppMaster里面的main是怎么做的了
分享到:
评论

相关推荐

    hadoop 源码解析_yarn源码解析

    Hadoop 源码解析_Yarn 源码解析 Hadoop 是一个基于 Java 的大数据处理框架,Yarn 是 Hadoop 的资源管理器,负责资源分配、任务调度和集群管理。下面是 Yarn 源码解析的知识点: 1. MR 程序提交 MR(MapReduce)...

    hadoop源码解析-Job提交.pdf

    通过理解这些源码细节,我们可以更好地掌握Hadoop作业的生命周期,从Job提交到MapTask和ReduceTask的执行,这对于优化大数据处理性能和调试Hadoop应用程序至关重要。这些深入的源码分析对于开发人员和系统管理员来说...

    MapReduce Job本地提交过程源码跟踪及分析

    Job本地提交过程是MapReduce执行任务的一个重要环节,特别是在开发和调试阶段,理解这一过程对于优化性能和解决潜在问题至关重要。本文将深入源码层面,分析MapReduce Job在本地提交的详细步骤。 首先,我们来了解...

    Hadoop多Job并行处理的实例详解

    在Hadoop环境中,多Job并行处理是一种优化大数据处理效率的关键技术。通过对多个Job的并发执行,可以在集群中更有效地利用资源,缩短整体作业的执行时间。本实例将详细讲解如何实现Hadoop多Job并行处理,并提供相关...

    php hadoop源码

    在源码中,可能会有一个`Job.php`文件,包含了`configure()`和`submit()`等方法。 4. **监控和调试**:在处理大数据时,监控作业状态和日志至关重要。PHP库可能提供了获取作业状态、查看日志、异常处理等功能。在...

    MapReduce Job集群提交过程源码跟踪及分析

    在Hadoop MapReduce框架中,Job的提交过程是整个分布式计算流程中的关键步骤。这个过程涉及到客户端、JobTracker(在Hadoop 2.x版本中被ResourceManager替代)和TaskTracker(在Hadoop 2.x版本中被NodeManager替代)...

    eclipse的hadoop2.7插件以及hadoop-common编译文件

    在IT行业中,Eclipse是一款广泛使用的Java集成开发环境(IDE),而Hadoop是Apache软件基金会的一个开源项目,主要处理大数据的分布式存储和计算。本文将详细介绍如何使用Eclipse的Hadoop2.7插件以及Hadoop-common...

    Hadoop命令手册

    由于提供的内容中并没有具体的知识点描述,而只是对于“Hadoop命令手册”的标题和描述,以及一些特殊的符号,显然这些符号并不能转换为有价值的IT知识点。但是既然您需要详细的知识点,那么接下来,我会根据标题...

    hadoop-2.6.0-hadoop.dll-winutils.exe

     at org.apache.hadoop.mapreduce.Job.submit(Job.java:1293)  at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1314)  at com.lxz.mr.WCApp.main(WCApp.java:27) 原因是hadoop.dll文件和...

    hadoop eclips 的插件 和实例程序

    Hadoop是大数据处理领域的重要工具,它提供了一个分布式文件系统(HDFS)和MapReduce计算框架,使得在大规模数据集上进行并行处理成为可能。Eclipse是一款流行的Java集成开发环境(IDE),对于Hadoop开发者来说,...

    java web程序调用hadoop2.6

    4. **提交MapReduce作业**:在Java Web程序中,通过`FileSystem` API连接到HDFS,然后使用`Job`对象的`submit()`方法提交作业。这个过程会将MapReduce的JAR文件上传到Hadoop集群,并启动作业执行。 5. **监控作业...

    hadoop命令手册

    用法:hadoop job [GENERIC_OPTIONS] [-submit &lt;job-file&gt;] | [-status &lt;job-id&gt;] | [-counter &lt;job-id&gt; &lt;group-name&gt; ] | [-kill &lt;job-id&gt;] | [-events &lt;job-id&gt; &lt;from-event-#&gt; ] | [-history [all] ] | [-list ...

    hadoop 面试题大全

    在Job提交流程中,waitForCompletion()方法会调用submit(),然后连接到JobTracker(YARN中的ResourceManager),初始化并提交Job。submitJobInternal()创建Stage,进行数据分片和任务分配。在MapReduce过程中,...

    hadoop大数据常用命令

    - **用法**: `hadoop job [GENERIC_OPTIONS] [-submit &lt;job-file&gt;] | [-status &lt;job-id&gt;] | [-counter &lt;job-id&gt; &lt;group-name&gt; ] | [-kill &lt;job-id&gt;] | [-events &lt;job-id&gt; &lt;from-event-#&gt; ] | [-history [all] ] | ...

    hadoop大数据实战培训教材 spark 培训教材

    在大数据处理领域,Hadoop和Spark是两个至关重要的框架,它们在存储和处理海量数据方面发挥着核心作用。本套培训教材旨在为学习者提供深入理解及实践这两个框架的全面指导。 Hadoop,由Apache软件基金会开发,是...

    第02节:hadoop精讲之map reduce原理及代码.rar

    最后,通过Job的submit()方法提交作业到Hadoop集群。 在Hadoop生态系统中,MapReduce可以与其他组件如HDFS(Hadoop Distributed File System)和YARN(Yet Another Resource Negotiator)紧密配合,实现高效的数据...

    MapReduce - WordCount案例 - 含各种部署方式源码

    本文将详细解析MapReduce在实现WordCount案例中的原理、步骤以及如何通过Java进行编程,并涵盖本地提交和远程调用的不同部署方式。 1. **MapReduce基本原理** MapReduce分为两个主要阶段:Map阶段和Reduce阶段。...

    Hadoop&Spark安装、环境配置、使用教程.pdf

    ### Hadoop & Spark 安装、环境配置及使用...通过上述步骤,我们可以完成Hadoop与Spark的基本安装配置,并掌握它们的基本使用方法以及进行简单的数据处理示例。这些基础技能对于进一步深入学习大数据处理技术非常关键。

    hadoop命令指南

    在深入探讨Hadoop命令的具体用法之前,我们先来了解一下Hadoop的基本概念以及如何调用这些命令。 **1.1 泛用选项** Hadoop提供了一套泛用选项,这些选项被多个命令支持。以下是常见的泛用选项: - `-conf ...

Global site tag (gtag.js) - Google Analytics