`

Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析 (上)

阅读更多
参考了一篇文章, 才看懂了Yarnrunner的整个流程:
http://blog.csdn.net/caodaoxi/article/details/12970993

网上很多文章写的不是很清楚, 有些到AM的启动的时候就错了, 结合最近俩星期看的Yarnrunner部分的源码 我把我的理解写下来, 打算分三部分
上: SubmitJob到schduler为ApplicationMaster申请Container
中:AM到NodeManager启动container 至在NM端执行APPMaster的脚本
下:启动container去执行Job的过程

可能中间也会有一些理解的错误, 导致文章并不对, 希望有人能指出来, 毕竟写个博客本身就是为了记录自己对大数据各种工具的学习过程


先借用上面文章的一部分, 作为一些需要先了解的内容, 不然完全不知道代码在讲什么:
(1)RMApp:每个application对应一个RMApp对象,保存该application的各种信息。

(2)RMAppAttempt:每个RMApp可能会对应多个RMAppAttempt对象,这取决于前面的RMAppAttempt是否执行成功,如果不成功,会启动另外一个,直到运行成功。RMAppAttempt对象称为“application执行尝试”,这RMApp与RMAppAttempt关系类似于MapReduce中的task与taskAttempt的关系。

(3)RMNode:保存各个节点的信息。

(4)RMContainer:保存各个container的信息。

2.    事件调度器

(1)AsyncDispatcher

中央事件调度器,各个状态机的事件调度器会在中央事件调度器中注册,注册方式信息包括:<事件,事件调度器>。该调度器维护了一个事件队列,它会不断扫描整个队列,取出一个事件,检查事件类型,并交给相应的事件调度器处理。

(2)各个子事件调度器
事件类型                       状态机             事件处理器
RMAppEvent               RMApp ApplicationEventDispatcher
RMAppAttemptEvent    RMAppAttempt ApplicationAttemptEventDispatcher
RMNodeEvent               RMNode NodeEventDispatcher
SchedulerEvent                — SchedulerEventDispatcher
AMLauncherEvent        — ApplicationMasterLauncher



好了, 首先我们得从SubmitJob来讲起, 入口是job.waitForCompletion方法, 深入的话会看到最后是调用JobSubmitter类的submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials()); 来提交的。 submitClient的申明是ClientProtocol接口, 一共有两个实现类:
LocalRunner
YarnRunner

我之前有一篇文章写得是从LocalRunner这边进入, 然后最后是怎么执行Job的。
Yarn的话就是就是通过YarnRunner执行的
我们看一下YarnRunner的submitJob方法:

主要就是创建了ApplicationSubmissionContext对象, 以及通过ResourceMgrDelegate去提交ApplicationSubmissionContext

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    
    addHistoryToken(ts);
    
    // Construct necessary information to start the MR AM
	//这个appContext很重要, 里面拼接了各种环境变量, 以及启动App Master的脚本  这个对象会一直贯穿于各个类之间, 直到AM启动
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    try {
	
	//通过ResourceMgrDelegate来sumbit这个appContext,  ResourceMgrDelegate类是用来和Resource Manager在通讯的
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

		  //这个appMaster并不是我们说的ApplicationMaster对象, 这样的命名刚开始也把我迷惑了。。。
      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }



上面讲到了ApplicationSubmissionContext里面存储了启动Application Master的类的脚本, 那么就稍微看一下:
主要里面都是拼接各种变量和命令的, 所以就略去了大部分, 就留下了最重要的那句:
 public ApplicationSubmissionContext createApplicationSubmissionContext(
      Configuration jobConf,
      String jobSubmitDir, Credentials ts) throws IOException {
   
   ...
   
   //这里才是设定Appmaster类的地方, MRJobConfig.APPLICATION_MASTER_CLASS = org.apache.hadoop.mapreduce.v2.app.MRAppMaster
   //所以最后通过命令在nodemanager那边执行的其实是MRAppMaster类的main方法
   vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
    vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        Path.SEPARATOR + ApplicationConstants.STDOUT);
    vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
        Path.SEPARATOR + ApplicationConstants.STDERR);
   
   Vector<String> vargsFinal = new Vector<String>(8);
    // Final command
    StringBuilder mergedCommand = new StringBuilder();
    for (CharSequence str : vargs) {
      mergedCommand.append(str).append(" ");
    }
    vargsFinal.add(mergedCommand.toString());
	
	...
	
	// Setup ContainerLaunchContext for AM container
	//根据前面的拼接的命令生成AM的container 在后面会通过这个对象来启动container 从而启动MRAppMaster
    ContainerLaunchContext amContainer =
        ContainerLaunchContext.newInstance(localResources, environment,
          vargsFinal, null, securityTokens, acls);
		  
	
   ...
   //设置AMContainer
   appContext.setAMContainerSpec(amContainer); 
   
   ...

    return appContext;
  }


ApplicationSubmissionContext只需要记住amContainer的启动脚本在里面, 后面会用到。 那么继续看一下ResourceMgrDelegate的submitApplication:
  public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    return client.submitApplication(appContext);
  }

他是通过client来submit的, 这个client实在ResourceMgrDelegate的构造函数里面创建的, 其实就是一个YarnClientImpl对象:
  public ResourceMgrDelegate(YarnConfiguration conf) {
    super(ResourceMgrDelegate.class.getName());
    this.conf = conf;
    this.client = YarnClient.createYarnClient();
    init(conf);
    start();
  }


到目前为止, 所有的内容都还是在提交Job的那台Client机器上, 还没有到ResourceManger那边。

我们看一下YarnClientImpl的submitApplication:
 public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    ApplicationId applicationId = appContext.getApplicationId();
    if (applicationId == null) {
      throw new ApplicationIdNotProvidedException(
          "ApplicationId is not provided in ApplicationSubmissionContext");
    }
	
	//将appContext设置到一个request里面
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);

    // Automatically add the timeline DT into the CLC
    // Only when the security and the timeline service are both enabled
    if (isSecurityEnabled() && timelineServiceEnabled) {
      addTimelineDelegationToken(appContext.getAMContainerSpec());
    }

    //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
	
	
	//通过rmClient提交request, 这个rmClient其实就是ClientRMService类, 是用来和Resource Manager做RPC的call, 通过这个类, 可以直接和RM对话
    rmClient.submitApplication(request);

    int pollCount = 0;
    long startTime = System.currentTimeMillis();

	
	//一直循环, 直到状态变为NEW为止, 如果长时间状态没变, 那么就timeout
    while (true) {
      try {
        YarnApplicationState state =
            getApplicationReport(applicationId).getYarnApplicationState();
        if (!state.equals(YarnApplicationState.NEW) &&
            !state.equals(YarnApplicationState.NEW_SAVING)) {
          LOG.info("Submitted application " + applicationId);
          break;
        }

        long elapsedMillis = System.currentTimeMillis() - startTime;
        if (enforceAsyncAPITimeout() &&
            elapsedMillis >= asyncApiPollTimeoutMillis) {
          throw new YarnException("Timed out while waiting for application " +
              applicationId + " to be submitted successfully");
        }

        // Notify the client through the log every 10 poll, in case the client
        // is blocked here too long.
        if (++pollCount % 10 == 0) {
          LOG.info("Application submission is not finished, " +
              "submitted application " + applicationId +
              " is still in " + state);
        }
        try {
          Thread.sleep(submitPollIntervalMillis);
        } catch (InterruptedException ie) {
          LOG.error("Interrupted while waiting for application "
              + applicationId
              + " to be successfully submitted.");
        }
      } catch (ApplicationNotFoundException ex) {
        // FailOver or RM restart happens before RMStateStore saves
        // ApplicationState
        LOG.info("Re-submit application " + applicationId + "with the " +
            "same ApplicationSubmissionContext");
        rmClient.submitApplication(request);
      }
    }

    return applicationId;
  }


看一下rmClient也就是ClientRMService的submitApplication:
 public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnException {
    ApplicationSubmissionContext submissionContext = request
        .getApplicationSubmissionContext();
    ApplicationId applicationId = submissionContext.getApplicationId();

    // ApplicationSubmissionContext needs to be validated for safety - only
    // those fields that are independent of the RM's configuration will be
    // checked here, those that are dependent on RM configuration are validated
    // in RMAppManager.

	
	//开始各种验证 一不开心就不让干活
    String user = null;
    try {
      // Safety
      user = UserGroupInformation.getCurrentUser().getShortUserName();
    } catch (IOException ie) {
      LOG.warn("Unable to get the current user.", ie);
      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
          ie.getMessage(), "ClientRMService",
          "Exception in submitting application", applicationId);
      throw RPCUtil.getRemoteException(ie);
    }

	//各种验证
    // Check whether app has already been put into rmContext,
    // If it is, simply return the response
    if (rmContext.getRMApps().get(applicationId) != null) {
      LOG.info("This is an earlier submitted application: " + applicationId);
      return SubmitApplicationResponse.newInstance();
    }

	//继续验证
    if (submissionContext.getQueue() == null) {
      submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
    }
    if (submissionContext.getApplicationName() == null) {
      submissionContext.setApplicationName(
          YarnConfiguration.DEFAULT_APPLICATION_NAME);
    }
    if (submissionContext.getApplicationType() == null) {
      submissionContext
        .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
    } else {
      if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
        submissionContext.setApplicationType(submissionContext
          .getApplicationType().substring(0,
            YarnConfiguration.APPLICATION_TYPE_LENGTH));
      }
    }

    try {
      // call RMAppManager to submit application directly
	  //干活  通过rmAppManager提交
      rmAppManager.submitApplication(submissionContext,
          System.currentTimeMillis(), user);

      LOG.info("Application with id " + applicationId.getId() + 
          " submitted by user " + user);
      RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
          "ClientRMService", applicationId);
    } catch (YarnException e) {
      LOG.info("Exception in submitting application with id " +
          applicationId.getId(), e);
      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
          e.getMessage(), "ClientRMService",
          "Exception in submitting application", applicationId);
      throw e;
    }

    SubmitApplicationResponse response = recordFactory
        .newRecordInstance(SubmitApplicationResponse.class);
    return response;
  }


来看一下rmAppManager.submitApplication(submissionContext,
          System.currentTimeMillis(), user);的具体内容:
protected void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime,
      String user) throws YarnException {
    ApplicationId applicationId = submissionContext.getApplicationId();

	//创建一个RMAppImpl对象 其实就是启动RMApp状态机 以及执行RMAppEvent
    RMAppImpl application =
        createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
    ApplicationId appId = submissionContext.getApplicationId();

	//如果有安全认证enable的话会走这里, 比如kerberos啥的 我就不这么麻烦了 以看懂为主, 直接到else
    if (UserGroupInformation.isSecurityEnabled()) {
      try {
        this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
            parseCredentials(submissionContext),
            submissionContext.getCancelTokensWhenComplete(),
            application.getUser());
      } catch (Exception e) {
        LOG.warn("Unable to parse credentials.", e);
        // Sending APP_REJECTED is fine, since we assume that the
        // RMApp is in NEW state and thus we haven't yet informed the
        // scheduler about the existence of the application
        assert application.getState() == RMAppState.NEW;
        this.rmContext.getDispatcher().getEventHandler()
          .handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
        throw RPCUtil.getRemoteException(e);
      }
    } else {
      // Dispatcher is not yet started at this time, so these START events
      // enqueued should be guaranteed to be first processed when dispatcher
      // gets started.
	  
	  //启动RMApp的状态机, 这里rmContext其实是resourceManager的Client代理, 这一步就是让去RM端的dispatcher去处理RMAppEventType.START事件
      this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.START));
    }
  }


在文章的开头有写“事件调度器”, 在resourcemanager那边会有AsyncDispatcher来调度所有事件, 这里的话会通过ApplicationEventDispatcher去做RmAppImpl的transition方法, 看一下RmAppImpl类的初始化的时候的各种event和transition:
private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    .addTransition(RMAppState.NEW, RMAppState.NEW,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())

...

 //实在太长了, 到这里我们可以看到上面已经捕捉了RMAppEventType.START事件, 会把RMApp的状态从NEW变成NEW_SAVING, 调用RMAppNewlySavingTransition方法



我们看一下RMAppNewlySavingTransition里面做了什么:
  private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }


只做了storeNewApplication(app)这个动作:
  public synchronized void storeNewApplication(RMApp app) {
    ApplicationSubmissionContext context = app
                                            .getApplicationSubmissionContext();
    assert context instanceof ApplicationSubmissionContextPBImpl;
    ApplicationState appState =
        new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
          app.getUser());

//触发了RMStateStore的STORE_APP事件
    dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
  }


RMStateStore的状态机transition定义:
  private static final StateMachineFactory<RMStateStore,
                                           RMStateStoreState,
                                           RMStateStoreEventType, 
                                           RMStateStoreEvent>
      stateMachineFactory = new StateMachineFactory<RMStateStore,
                                                    RMStateStoreState,
                                                    RMStateStoreEventType,
                                                    RMStateStoreEvent>(
      RMStateStoreState.DEFAULT)
      .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
          RMStateStoreEventType.STORE_APP, new StoreAppTransition())

...

//看到这里执行StoreAppTransition的方法


继续看下去StoreAppTransition:
  private static class StoreAppTransition
      implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
    @Override
    public void transition(RMStateStore store, RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreAppEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return;
      }
      ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
      ApplicationId appId = appState.getAppId();
      ApplicationStateData appStateData = ApplicationStateData
          .newInstance(appState);
      LOG.info("Storing info for app: " + appId);
      try {
	  
	  //1. store application state
        store.storeApplicationStateInternal(appId, appStateData);
		
	  // 2.更改状态至RMAppEventType.APP_NEW_SAVED
        store.notifyApplication(new RMAppEvent(appId,
               RMAppEventType.APP_NEW_SAVED));
      } catch (Exception e) {
        LOG.error("Error storing app: " + appId, e);
        store.notifyStoreOperationFailed(e);
      }
    };
  }

  private void notifyApplication(RMAppEvent event) {

//调用dispatcher触发RMAppEventType.APP_NEW_SAVED
    rmDispatcher.getEventHandler().handle(event);
  }



在RMAppImpl中我们可以看到RMAppEventType.APP_NEW_SAVED会触发AddApplicationToSchedulerTransition方法

    .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())


看一下AddApplicationToSchedulerTransition:
  private static final class AddApplicationToSchedulerTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
	
	//这里会去调用scheduler的APP_ADDED scheduler类有好几个, 比如说FifoScheduler或者FairScheduler
	//我们就看一下FifoScheduler的APP_ADDED事件吧
      app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
        app.submissionContext.getQueue(), app.user,
        app.submissionContext.getReservationID()));
    }
  }


我们到FifoScheduler的APP_ADDED事件看看:
    case APP_ADDED:
    {
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
      addApplication(appAddedEvent.getApplicationId(),
        appAddedEvent.getQueue(), appAddedEvent.getUser(),
        appAddedEvent.getIsAppRecovering());
    }


会去调用scheduler的addApplication方法, 看一下:
  public synchronized void addApplication(ApplicationId applicationId,
      String queue, String user, boolean isAppRecovering) {
	  
	  //创建一个SchedulerApplication
    SchedulerApplication<FiCaSchedulerApp> application =
        new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
		
	//Application放到Scheduler里面
    applications.put(applicationId, application);
    metrics.submitApp(user);
    LOG.info("Accepted application " + applicationId + " from user: " + user
        + ", currently num of applications: " + applications.size());
    if (isAppRecovering) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
      }
    } else {
	
	//通过resourceManager的dispatcher去触发RMAppEventType.APP_ACCEPTED 
      rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
    }
  }


那么我们就要再回到RMAPPImpl去看看RMAppEventType.APP_ACCEPTED做了什么:
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
        RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())


会去执行StartAppAttemptTransition方法, 这个方法其实就是尝试去启动一次Application, 如果失败 还会尝试, 直到尝试的次数到达最大尝试次数为止

看一下StartAppAttemptTransition方法:
  private static final class StartAppAttemptTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
	
	//创建启动attempt
      app.createAndStartNewAttempt(false);
    };
  }
  
  //在createAndStartNewAttempt中创建了RMAppAttempt, 然后去触发RMAppAttemptEventType.START事件
    private void
      createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
    createNewAttempt();
    handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
      transferStateFromPreviousAttempt));
  }
  
  
    private void createNewAttempt() {
    ApplicationAttemptId appAttemptId =
        ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
    RMAppAttempt attempt =
        new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
          submissionContext, conf,
          // The newly created attempt maybe last attempt if (number of
          // previously failed attempts(which should not include Preempted,
          // hardware error and NM resync) + 1) equal to the max-attempt
          // limit.
		  
		  //每次尝试 +1 知道最大次数到了为止
          maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq);
    attempts.put(appAttemptId, attempt);
    currentAttempt = attempt;
  }


这里就会去RMAPPAttemptImpl 里面去触发RMAppAttemptEventType.START事件, 看一下这个状态机里面的transition是怎么样的:
  private static final StateMachineFactory<RMAppAttemptImpl,
                                           RMAppAttemptState,
                                           RMAppAttemptEventType,
                                           RMAppAttemptEvent>
       stateMachineFactory  = new StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
                                            RMAppAttemptEventType,
                                     RMAppAttemptEvent>(RMAppAttemptState.NEW)

       // Transitions from NEW State
      .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
          RMAppAttemptEventType.START, new AttemptStartedTransition())
		  
		  //回去调用本类的AttemptStartedTransition方法


看一下本类的AttemptStartedTransition方法:
  private static final class AttemptStartedTransition extends BaseTransition {
	@Override
    public void transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {

	    boolean transferStateFromPreviousAttempt = false;
      if (event instanceof RMAppStartAttemptEvent) {
        transferStateFromPreviousAttempt =
            ((RMAppStartAttemptEvent) event)
              .getTransferStateFromPreviousAttempt();
      }
      appAttempt.startTime = System.currentTimeMillis();

      // Register with the ApplicationMasterService
      appAttempt.masterService
          .registerAppAttempt(appAttempt.applicationAttemptId);

      if (UserGroupInformation.isSecurityEnabled()) {
        appAttempt.clientTokenMasterKey =
            appAttempt.rmContext.getClientToAMTokenSecretManager()
              .createMasterKey(appAttempt.applicationAttemptId);
      }

      // Add the applicationAttempt to the scheduler and inform the scheduler
      // whether to transfer the state from previous attempt.
	  
	  //将applicationAttempt触发scheduler的APP_ATTEMPT_ADDED事件, 就是FifoScheduler的APP_ATTEMPT_ADDED事件
	  
      appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
        appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
    }
  }


我们回到FifoScheduler里面看一下APP_ATTEMPT_ADDED做了什么:
    case APP_ATTEMPT_ADDED:
    {
      AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
          (AppAttemptAddedSchedulerEvent) event;
		  
		  //执行addApplicationAttempt方法
      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
        appAttemptAddedEvent.getIsAttemptRecovering());
    }
	
	
	  public synchronized void
      addApplicationAttempt(ApplicationAttemptId appAttemptId,
          boolean transferStateFromPreviousAttempt,
          boolean isAttemptRecovering) {
		  
		  //创建SchedulerApplication
    SchedulerApplication<FiCaSchedulerApp> application =
        applications.get(appAttemptId.getApplicationId());
    String user = application.getUser();
    // TODO: Fix store
	
	//创建FiCaSchedulerApp
    FiCaSchedulerApp schedulerApp =
        new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
          activeUsersManager, this.rmContext);

    if (transferStateFromPreviousAttempt) {
      schedulerApp.transferStateFromPreviousAttempt(application
        .getCurrentAppAttempt());
    }
	
	//设置当前attempt
    application.setCurrentAppAttempt(schedulerApp);

	//Submit
    metrics.submitAppAttempt(user);
    LOG.info("Added Application Attempt " + appAttemptId
        + " to scheduler from user " + application.getUser());
    if (isAttemptRecovering) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(appAttemptId
            + " is recovering. Skipping notifying ATTEMPT_ADDED");
      }
    } else {
	
	//回去触发RMAppAttemptEventType.ATTEMPT_ADDED事件
      rmContext.getDispatcher().getEventHandler().handle(
        new RMAppAttemptEvent(appAttemptId,
            RMAppAttemptEventType.ATTEMPT_ADDED));
    }
  }


那么接下来就又要回到MRAPPAttemptImpl去看RMAppAttemptEventType.ATTEMPT_ADDED事件了:
.addTransition(RMAppAttemptState.SUBMITTED, 
          EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
                     RMAppAttemptState.SCHEDULED),
          RMAppAttemptEventType.ATTEMPT_ADDED,
          new ScheduleTransition())


会去执行ScheduleTransition方法, 完成这一步后 这次尝试就会变成scheduled状态, 等着scheduler去assignContainer到nodemanager去了:
public static final class ScheduleTransition
      implements
      MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
    @Override
    public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
        RMAppAttemptEvent event) {
		
		//这个就是刚刚开始的时候我们创建的包含container启动脚本的地方
      ApplicationSubmissionContext subCtx = appAttempt.submissionContext;
      if (!subCtx.getUnmanagedAM()) {
        // Need reset #containers before create new attempt, because this request
        // will be passed to scheduler, and scheduler will deduct the number after
        // AM container allocated
        
        // Currently, following fields are all hard code,
        // TODO: change these fields when we want to support
        // priority/resource-name/relax-locality specification for AM containers
        // allocation.
		
		//一个App Master 一个container 设置一些container的属性
        appAttempt.amReq.setNumContainers(1);
        appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
        appAttempt.amReq.setResourceName(ResourceRequest.ANY);
        appAttempt.amReq.setRelaxLocality(true);
        
        // AM resource has been checked when submission
		
		//去scheduler里面执行allocate 然后会返回一个Allocation对象,  会等NodeManager去heartBeat的时候, ResourceManager发现这个NM还有资源, 然后就assign这个Allocation到这个NM上面, 再去Launch AM
        Allocation amContainerAllocation =
            appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
                Collections.singletonList(appAttempt.amReq),
                EMPTY_CONTAINER_RELEASE_LIST, null, null);
        if (amContainerAllocation != null
            && amContainerAllocation.getContainers() != null) {
          assert (amContainerAllocation.getContainers().size() == 0);
        }
        return RMAppAttemptState.SCHEDULED;
      } else {
        // save state and then go to LAUNCHED state
        appAttempt.storeAttempt();
        return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
      }
    }
  }


到目前为止AM的Allocation以及生成了, 接下去就是通过Nodemanager去分配container, 然后在NM上面启动Container (执行刚刚开始设置的脚本)

这部分会在第二篇文章里面再写, 这篇就到这里吧
0
0
分享到:
评论

相关推荐

    hadoop 源码解析_yarn源码解析

    Hadoop 源码解析_Yarn 源码解析 Hadoop 是一个基于 Java 的大数据处理框架,Yarn 是 Hadoop 的资源管理器,负责资源分配、任务调度和集群管理。下面是 Yarn 源码解析的知识点: 1. MR 程序提交 MR(MapReduce)...

    hadoop源码解析-Job提交.pdf

    通过理解这些源码细节,我们可以更好地掌握Hadoop作业的生命周期,从Job提交到MapTask和ReduceTask的执行,这对于优化大数据处理性能和调试Hadoop应用程序至关重要。这些深入的源码分析对于开发人员和系统管理员来说...

    MapReduce Job本地提交过程源码跟踪及分析

    Job本地提交过程是MapReduce执行任务的一个重要环节,特别是在开发和调试阶段,理解这一过程对于优化性能和解决潜在问题至关重要。本文将深入源码层面,分析MapReduce Job在本地提交的详细步骤。 首先,我们来了解...

    Hadoop多Job并行处理的实例详解

    在Hadoop环境中,多Job并行处理是一种优化大数据处理效率的关键技术。通过对多个Job的并发执行,可以在集群中更有效地利用资源,缩短整体作业的执行时间。本实例将详细讲解如何实现Hadoop多Job并行处理,并提供相关...

    php hadoop源码

    在源码中,可能会有一个`Job.php`文件,包含了`configure()`和`submit()`等方法。 4. **监控和调试**:在处理大数据时,监控作业状态和日志至关重要。PHP库可能提供了获取作业状态、查看日志、异常处理等功能。在...

    MapReduce Job集群提交过程源码跟踪及分析

    在Hadoop MapReduce框架中,Job的提交过程是整个分布式计算流程中的关键步骤。这个过程涉及到客户端、JobTracker(在Hadoop 2.x版本中被ResourceManager替代)和TaskTracker(在Hadoop 2.x版本中被NodeManager替代)...

    Hadoop命令手册

    由于提供的内容中并没有具体的知识点描述,而只是对于“Hadoop命令手册”的标题和描述,以及一些特殊的符号,显然这些符号并不能转换为有价值的IT知识点。但是既然您需要详细的知识点,那么接下来,我会根据标题...

    eclipse的hadoop2.7插件以及hadoop-common编译文件

    在IT行业中,Eclipse是一款广泛使用的Java集成开发环境(IDE),而Hadoop是Apache软件基金会的一个开源项目,主要处理大数据的分布式存储和计算。本文将详细介绍如何使用Eclipse的Hadoop2.7插件以及Hadoop-common...

    hadoop-2.6.0-hadoop.dll-winutils.exe

    windows上eclipse运行hadoop程序报NullPointerException错 log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j ...

    hadoop eclips 的插件 和实例程序

    Hadoop是大数据处理领域的重要工具,它提供了一个分布式文件系统(HDFS)和MapReduce计算框架,使得在大规模数据集上进行并行处理成为可能。Eclipse是一款流行的Java集成开发环境(IDE),对于Hadoop开发者来说,...

    java web程序调用hadoop2.6

    Java Web程序调用Hadoop 2.6是一个关键的技术整合,它允许Web应用程序与Hadoop分布式文件系统(HDFS)和MapReduce框架交互,以处理大规模数据。在本示例中,我们将深入探讨如何实现这一集成,以及涉及的关键概念和...

    hadoop命令手册

    用法:hadoop job [GENERIC_OPTIONS] [-submit &lt;job-file&gt;] | [-status &lt;job-id&gt;] | [-counter &lt;job-id&gt; &lt;group-name&gt; ] | [-kill &lt;job-id&gt;] | [-events &lt;job-id&gt; &lt;from-event-#&gt; ] | [-history [all] ] | [-list ...

    hadoop 面试题大全

    在Job提交流程中,waitForCompletion()方法会调用submit(),然后连接到JobTracker(YARN中的ResourceManager),初始化并提交Job。submitJobInternal()创建Stage,进行数据分片和任务分配。在MapReduce过程中,...

    hadoop大数据常用命令

    - **用法**: `hadoop job [GENERIC_OPTIONS] [-submit &lt;job-file&gt;] | [-status &lt;job-id&gt;] | [-counter &lt;job-id&gt; &lt;group-name&gt; ] | [-kill &lt;job-id&gt;] | [-events &lt;job-id&gt; &lt;from-event-#&gt; ] | [-history [all] ] | ...

    hadoop大数据实战培训教材 spark 培训教材

    在大数据处理领域,Hadoop和Spark是两个至关重要的框架,它们在存储和处理海量数据方面发挥着核心作用。本套培训教材旨在为学习者提供深入理解及实践这两个框架的全面指导。 Hadoop,由Apache软件基金会开发,是...

    第02节:hadoop精讲之map reduce原理及代码.rar

    最后,通过Job的submit()方法提交作业到Hadoop集群。 在Hadoop生态系统中,MapReduce可以与其他组件如HDFS(Hadoop Distributed File System)和YARN(Yet Another Resource Negotiator)紧密配合,实现高效的数据...

    MapReduce - WordCount案例 - 含各种部署方式源码

    本文将详细解析MapReduce在实现WordCount案例中的原理、步骤以及如何通过Java进行编程,并涵盖本地提交和远程调用的不同部署方式。 1. **MapReduce基本原理** MapReduce分为两个主要阶段:Map阶段和Reduce阶段。...

    Hadoop&Spark安装、环境配置、使用教程.pdf

    ### Hadoop & Spark 安装、环境配置及使用...通过上述步骤,我们可以完成Hadoop与Spark的基本安装配置,并掌握它们的基本使用方法以及进行简单的数据处理示例。这些基础技能对于进一步深入学习大数据处理技术非常关键。

    hadoop命令指南

    在深入探讨Hadoop命令的具体用法之前,我们先来了解一下Hadoop的基本概念以及如何调用这些命令。 **1.1 泛用选项** Hadoop提供了一套泛用选项,这些选项被多个命令支持。以下是常见的泛用选项: - `-conf ...

Global site tag (gtag.js) - Google Analytics