- 浏览: 74530 次
-
参考了一篇文章, 才看懂了Yarnrunner的整个流程:
http://blog.csdn.net/caodaoxi/article/details/12970993
网上很多文章写的不是很清楚, 有些到AM的启动的时候就错了, 结合最近俩星期看的Yarnrunner部分的源码 我把我的理解写下来, 打算分三部分
上: SubmitJob到schduler为ApplicationMaster申请Container
中:AM到NodeManager启动container 至在NM端执行APPMaster的脚本
下:启动container去执行Job的过程
可能中间也会有一些理解的错误, 导致文章并不对, 希望有人能指出来, 毕竟写个博客本身就是为了记录自己对大数据各种工具的学习过程
先借用上面文章的一部分, 作为一些需要先了解的内容, 不然完全不知道代码在讲什么:
(1)RMApp:每个application对应一个RMApp对象,保存该application的各种信息。
(2)RMAppAttempt:每个RMApp可能会对应多个RMAppAttempt对象,这取决于前面的RMAppAttempt是否执行成功,如果不成功,会启动另外一个,直到运行成功。RMAppAttempt对象称为“application执行尝试”,这RMApp与RMAppAttempt关系类似于MapReduce中的task与taskAttempt的关系。
(3)RMNode:保存各个节点的信息。
(4)RMContainer:保存各个container的信息。
2. 事件调度器
(1)AsyncDispatcher
中央事件调度器,各个状态机的事件调度器会在中央事件调度器中注册,注册方式信息包括:<事件,事件调度器>。该调度器维护了一个事件队列,它会不断扫描整个队列,取出一个事件,检查事件类型,并交给相应的事件调度器处理。
(2)各个子事件调度器
事件类型 状态机 事件处理器
RMAppEvent RMApp ApplicationEventDispatcher
RMAppAttemptEvent RMAppAttempt ApplicationAttemptEventDispatcher
RMNodeEvent RMNode NodeEventDispatcher
SchedulerEvent — SchedulerEventDispatcher
AMLauncherEvent — ApplicationMasterLauncher
好了, 首先我们得从SubmitJob来讲起, 入口是job.waitForCompletion方法, 深入的话会看到最后是调用JobSubmitter类的submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials()); 来提交的。 submitClient的申明是ClientProtocol接口, 一共有两个实现类:
LocalRunner
YarnRunner
我之前有一篇文章写得是从LocalRunner这边进入, 然后最后是怎么执行Job的。
Yarn的话就是就是通过YarnRunner执行的
我们看一下YarnRunner的submitJob方法:
主要就是创建了ApplicationSubmissionContext对象, 以及通过ResourceMgrDelegate去提交ApplicationSubmissionContext
上面讲到了ApplicationSubmissionContext里面存储了启动Application Master的类的脚本, 那么就稍微看一下:
主要里面都是拼接各种变量和命令的, 所以就略去了大部分, 就留下了最重要的那句:
ApplicationSubmissionContext只需要记住amContainer的启动脚本在里面, 后面会用到。 那么继续看一下ResourceMgrDelegate的submitApplication:
他是通过client来submit的, 这个client实在ResourceMgrDelegate的构造函数里面创建的, 其实就是一个YarnClientImpl对象:
到目前为止, 所有的内容都还是在提交Job的那台Client机器上, 还没有到ResourceManger那边。
我们看一下YarnClientImpl的submitApplication:
看一下rmClient也就是ClientRMService的submitApplication:
来看一下rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);的具体内容:
在文章的开头有写“事件调度器”, 在resourcemanager那边会有AsyncDispatcher来调度所有事件, 这里的话会通过ApplicationEventDispatcher去做RmAppImpl的transition方法, 看一下RmAppImpl类的初始化的时候的各种event和transition:
我们看一下RMAppNewlySavingTransition里面做了什么:
只做了storeNewApplication(app)这个动作:
RMStateStore的状态机transition定义:
继续看下去StoreAppTransition:
在RMAppImpl中我们可以看到RMAppEventType.APP_NEW_SAVED会触发AddApplicationToSchedulerTransition方法
看一下AddApplicationToSchedulerTransition:
我们到FifoScheduler的APP_ADDED事件看看:
会去调用scheduler的addApplication方法, 看一下:
那么我们就要再回到RMAPPImpl去看看RMAppEventType.APP_ACCEPTED做了什么:
会去执行StartAppAttemptTransition方法, 这个方法其实就是尝试去启动一次Application, 如果失败 还会尝试, 直到尝试的次数到达最大尝试次数为止
看一下StartAppAttemptTransition方法:
这里就会去RMAPPAttemptImpl 里面去触发RMAppAttemptEventType.START事件, 看一下这个状态机里面的transition是怎么样的:
看一下本类的AttemptStartedTransition方法:
我们回到FifoScheduler里面看一下APP_ATTEMPT_ADDED做了什么:
那么接下来就又要回到MRAPPAttemptImpl去看RMAppAttemptEventType.ATTEMPT_ADDED事件了:
会去执行ScheduleTransition方法, 完成这一步后 这次尝试就会变成scheduled状态, 等着scheduler去assignContainer到nodemanager去了:
到目前为止AM的Allocation以及生成了, 接下去就是通过Nodemanager去分配container, 然后在NM上面启动Container (执行刚刚开始设置的脚本)
这部分会在第二篇文章里面再写, 这篇就到这里吧
http://blog.csdn.net/caodaoxi/article/details/12970993
网上很多文章写的不是很清楚, 有些到AM的启动的时候就错了, 结合最近俩星期看的Yarnrunner部分的源码 我把我的理解写下来, 打算分三部分
上: SubmitJob到schduler为ApplicationMaster申请Container
中:AM到NodeManager启动container 至在NM端执行APPMaster的脚本
下:启动container去执行Job的过程
可能中间也会有一些理解的错误, 导致文章并不对, 希望有人能指出来, 毕竟写个博客本身就是为了记录自己对大数据各种工具的学习过程
先借用上面文章的一部分, 作为一些需要先了解的内容, 不然完全不知道代码在讲什么:
(1)RMApp:每个application对应一个RMApp对象,保存该application的各种信息。
(2)RMAppAttempt:每个RMApp可能会对应多个RMAppAttempt对象,这取决于前面的RMAppAttempt是否执行成功,如果不成功,会启动另外一个,直到运行成功。RMAppAttempt对象称为“application执行尝试”,这RMApp与RMAppAttempt关系类似于MapReduce中的task与taskAttempt的关系。
(3)RMNode:保存各个节点的信息。
(4)RMContainer:保存各个container的信息。
2. 事件调度器
(1)AsyncDispatcher
中央事件调度器,各个状态机的事件调度器会在中央事件调度器中注册,注册方式信息包括:<事件,事件调度器>。该调度器维护了一个事件队列,它会不断扫描整个队列,取出一个事件,检查事件类型,并交给相应的事件调度器处理。
(2)各个子事件调度器
事件类型 状态机 事件处理器
RMAppEvent RMApp ApplicationEventDispatcher
RMAppAttemptEvent RMAppAttempt ApplicationAttemptEventDispatcher
RMNodeEvent RMNode NodeEventDispatcher
SchedulerEvent — SchedulerEventDispatcher
AMLauncherEvent — ApplicationMasterLauncher
好了, 首先我们得从SubmitJob来讲起, 入口是job.waitForCompletion方法, 深入的话会看到最后是调用JobSubmitter类的submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials()); 来提交的。 submitClient的申明是ClientProtocol接口, 一共有两个实现类:
LocalRunner
YarnRunner
我之前有一篇文章写得是从LocalRunner这边进入, 然后最后是怎么执行Job的。
Yarn的话就是就是通过YarnRunner执行的
我们看一下YarnRunner的submitJob方法:
主要就是创建了ApplicationSubmissionContext对象, 以及通过ResourceMgrDelegate去提交ApplicationSubmissionContext
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { addHistoryToken(ts); // Construct necessary information to start the MR AM //这个appContext很重要, 里面拼接了各种环境变量, 以及启动App Master的脚本 这个对象会一直贯穿于各个类之间, 直到AM启动 ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager try { //通过ResourceMgrDelegate来sumbit这个appContext, ResourceMgrDelegate类是用来和Resource Manager在通讯的 ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); //这个appMaster并不是我们说的ApplicationMaster对象, 这样的命名刚开始也把我迷惑了。。。 ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } catch (YarnException e) { throw new IOException(e); } }
上面讲到了ApplicationSubmissionContext里面存储了启动Application Master的类的脚本, 那么就稍微看一下:
主要里面都是拼接各种变量和命令的, 所以就略去了大部分, 就留下了最重要的那句:
public ApplicationSubmissionContext createApplicationSubmissionContext( Configuration jobConf, String jobSubmitDir, Credentials ts) throws IOException { ... //这里才是设定Appmaster类的地方, MRJobConfig.APPLICATION_MASTER_CLASS = org.apache.hadoop.mapreduce.v2.app.MRAppMaster //所以最后通过命令在nodemanager那边执行的其实是MRAppMaster类的main方法 vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS); vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDERR); Vector<String> vargsFinal = new Vector<String>(8); // Final command StringBuilder mergedCommand = new StringBuilder(); for (CharSequence str : vargs) { mergedCommand.append(str).append(" "); } vargsFinal.add(mergedCommand.toString()); ... // Setup ContainerLaunchContext for AM container //根据前面的拼接的命令生成AM的container 在后面会通过这个对象来启动container 从而启动MRAppMaster ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(localResources, environment, vargsFinal, null, securityTokens, acls); ... //设置AMContainer appContext.setAMContainerSpec(amContainer); ... return appContext; }
ApplicationSubmissionContext只需要记住amContainer的启动脚本在里面, 后面会用到。 那么继续看一下ResourceMgrDelegate的submitApplication:
public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { return client.submitApplication(appContext); }
他是通过client来submit的, 这个client实在ResourceMgrDelegate的构造函数里面创建的, 其实就是一个YarnClientImpl对象:
public ResourceMgrDelegate(YarnConfiguration conf) { super(ResourceMgrDelegate.class.getName()); this.conf = conf; this.client = YarnClient.createYarnClient(); init(conf); start(); }
到目前为止, 所有的内容都还是在提交Job的那台Client机器上, 还没有到ResourceManger那边。
我们看一下YarnClientImpl的submitApplication:
public ApplicationId submitApplication(ApplicationSubmissionContext appContext) throws YarnException, IOException { ApplicationId applicationId = appContext.getApplicationId(); if (applicationId == null) { throw new ApplicationIdNotProvidedException( "ApplicationId is not provided in ApplicationSubmissionContext"); } //将appContext设置到一个request里面 SubmitApplicationRequest request = Records.newRecord(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); // Automatically add the timeline DT into the CLC // Only when the security and the timeline service are both enabled if (isSecurityEnabled() && timelineServiceEnabled) { addTimelineDelegationToken(appContext.getAMContainerSpec()); } //TODO: YARN-1763:Handle RM failovers during the submitApplication call. //通过rmClient提交request, 这个rmClient其实就是ClientRMService类, 是用来和Resource Manager做RPC的call, 通过这个类, 可以直接和RM对话 rmClient.submitApplication(request); int pollCount = 0; long startTime = System.currentTimeMillis(); //一直循环, 直到状态变为NEW为止, 如果长时间状态没变, 那么就timeout while (true) { try { YarnApplicationState state = getApplicationReport(applicationId).getYarnApplicationState(); if (!state.equals(YarnApplicationState.NEW) && !state.equals(YarnApplicationState.NEW_SAVING)) { LOG.info("Submitted application " + applicationId); break; } long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { throw new YarnException("Timed out while waiting for application " + applicationId + " to be submitted successfully"); } // Notify the client through the log every 10 poll, in case the client // is blocked here too long. if (++pollCount % 10 == 0) { LOG.info("Application submission is not finished, " + "submitted application " + applicationId + " is still in " + state); } try { Thread.sleep(submitPollIntervalMillis); } catch (InterruptedException ie) { LOG.error("Interrupted while waiting for application " + applicationId + " to be successfully submitted."); } } catch (ApplicationNotFoundException ex) { // FailOver or RM restart happens before RMStateStore saves // ApplicationState LOG.info("Re-submit application " + applicationId + "with the " + "same ApplicationSubmissionContext"); rmClient.submitApplication(request); } } return applicationId; }
看一下rmClient也就是ClientRMService的submitApplication:
public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); // ApplicationSubmissionContext needs to be validated for safety - only // those fields that are independent of the RM's configuration will be // checked here, those that are dependent on RM configuration are validated // in RMAppManager. //开始各种验证 一不开心就不让干活 String user = null; try { // Safety user = UserGroupInformation.getCurrentUser().getShortUserName(); } catch (IOException ie) { LOG.warn("Unable to get the current user.", ie); RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, ie.getMessage(), "ClientRMService", "Exception in submitting application", applicationId); throw RPCUtil.getRemoteException(ie); } //各种验证 // Check whether app has already been put into rmContext, // If it is, simply return the response if (rmContext.getRMApps().get(applicationId) != null) { LOG.info("This is an earlier submitted application: " + applicationId); return SubmitApplicationResponse.newInstance(); } //继续验证 if (submissionContext.getQueue() == null) { submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); } if (submissionContext.getApplicationName() == null) { submissionContext.setApplicationName( YarnConfiguration.DEFAULT_APPLICATION_NAME); } if (submissionContext.getApplicationType() == null) { submissionContext .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE); } else { if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) { submissionContext.setApplicationType(submissionContext .getApplicationType().substring(0, YarnConfiguration.APPLICATION_TYPE_LENGTH)); } } try { // call RMAppManager to submit application directly //干活 通过rmAppManager提交 rmAppManager.submitApplication(submissionContext, System.currentTimeMillis(), user); LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId); } catch (YarnException e) { LOG.info("Exception in submitting application with id " + applicationId.getId(), e); RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, e.getMessage(), "ClientRMService", "Exception in submitting application", applicationId); throw e; } SubmitApplicationResponse response = recordFactory .newRecordInstance(SubmitApplicationResponse.class); return response; }
来看一下rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);的具体内容:
protected void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime, String user) throws YarnException { ApplicationId applicationId = submissionContext.getApplicationId(); //创建一个RMAppImpl对象 其实就是启动RMApp状态机 以及执行RMAppEvent RMAppImpl application = createAndPopulateNewRMApp(submissionContext, submitTime, user, false); ApplicationId appId = submissionContext.getApplicationId(); //如果有安全认证enable的话会走这里, 比如kerberos啥的 我就不这么麻烦了 以看懂为主, 直接到else if (UserGroupInformation.isSecurityEnabled()) { try { this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId, parseCredentials(submissionContext), submissionContext.getCancelTokensWhenComplete(), application.getUser()); } catch (Exception e) { LOG.warn("Unable to parse credentials.", e); // Sending APP_REJECTED is fine, since we assume that the // RMApp is in NEW state and thus we haven't yet informed the // scheduler about the existence of the application assert application.getState() == RMAppState.NEW; this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppRejectedEvent(applicationId, e.getMessage())); throw RPCUtil.getRemoteException(e); } } else { // Dispatcher is not yet started at this time, so these START events // enqueued should be guaranteed to be first processed when dispatcher // gets started. //启动RMApp的状态机, 这里rmContext其实是resourceManager的Client代理, 这一步就是让去RM端的dispatcher去处理RMAppEventType.START事件 this.rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.START)); } }
在文章的开头有写“事件调度器”, 在resourcemanager那边会有AsyncDispatcher来调度所有事件, 这里的话会通过ApplicationEventDispatcher去做RmAppImpl的transition方法, 看一下RmAppImpl类的初始化的时候的各种event和transition:
private static final StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent> stateMachineFactory = new StateMachineFactory<RMAppImpl, RMAppState, RMAppEventType, RMAppEvent>(RMAppState.NEW) // Transitions from NEW state .addTransition(RMAppState.NEW, RMAppState.NEW, RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppNewlySavingTransition()) ... //实在太长了, 到这里我们可以看到上面已经捕捉了RMAppEventType.START事件, 会把RMApp的状态从NEW变成NEW_SAVING, 调用RMAppNewlySavingTransition方法
我们看一下RMAppNewlySavingTransition里面做了什么:
private static final class RMAppNewlySavingTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { // If recovery is enabled then store the application information in a // non-blocking call so make sure that RM has stored the information // needed to restart the AM after RM restart without further client // communication LOG.info("Storing application with id " + app.applicationId); app.rmContext.getStateStore().storeNewApplication(app); } }
只做了storeNewApplication(app)这个动作:
public synchronized void storeNewApplication(RMApp app) { ApplicationSubmissionContext context = app .getApplicationSubmissionContext(); assert context instanceof ApplicationSubmissionContextPBImpl; ApplicationState appState = new ApplicationState(app.getSubmitTime(), app.getStartTime(), context, app.getUser()); //触发了RMStateStore的STORE_APP事件 dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState)); }
RMStateStore的状态机transition定义:
private static final StateMachineFactory<RMStateStore, RMStateStoreState, RMStateStoreEventType, RMStateStoreEvent> stateMachineFactory = new StateMachineFactory<RMStateStore, RMStateStoreState, RMStateStoreEventType, RMStateStoreEvent>( RMStateStoreState.DEFAULT) .addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT, RMStateStoreEventType.STORE_APP, new StoreAppTransition()) ... //看到这里执行StoreAppTransition的方法
继续看下去StoreAppTransition:
private static class StoreAppTransition implements SingleArcTransition<RMStateStore, RMStateStoreEvent> { @Override public void transition(RMStateStore store, RMStateStoreEvent event) { if (!(event instanceof RMStateStoreAppEvent)) { // should never happen LOG.error("Illegal event type: " + event.getClass()); return; } ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState(); ApplicationId appId = appState.getAppId(); ApplicationStateData appStateData = ApplicationStateData .newInstance(appState); LOG.info("Storing info for app: " + appId); try { //1. store application state store.storeApplicationStateInternal(appId, appStateData); // 2.更改状态至RMAppEventType.APP_NEW_SAVED store.notifyApplication(new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED)); } catch (Exception e) { LOG.error("Error storing app: " + appId, e); store.notifyStoreOperationFailed(e); } }; } private void notifyApplication(RMAppEvent event) { //调用dispatcher触发RMAppEventType.APP_NEW_SAVED rmDispatcher.getEventHandler().handle(event); }
在RMAppImpl中我们可以看到RMAppEventType.APP_NEW_SAVED会触发AddApplicationToSchedulerTransition方法
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED, RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
看一下AddApplicationToSchedulerTransition:
private static final class AddApplicationToSchedulerTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { //这里会去调用scheduler的APP_ADDED scheduler类有好几个, 比如说FifoScheduler或者FairScheduler //我们就看一下FifoScheduler的APP_ADDED事件吧 app.handler.handle(new AppAddedSchedulerEvent(app.applicationId, app.submissionContext.getQueue(), app.user, app.submissionContext.getReservationID())); } }
我们到FifoScheduler的APP_ADDED事件看看:
case APP_ADDED: { AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; addApplication(appAddedEvent.getApplicationId(), appAddedEvent.getQueue(), appAddedEvent.getUser(), appAddedEvent.getIsAppRecovering()); }
会去调用scheduler的addApplication方法, 看一下:
public synchronized void addApplication(ApplicationId applicationId, String queue, String user, boolean isAppRecovering) { //创建一个SchedulerApplication SchedulerApplication<FiCaSchedulerApp> application = new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user); //Application放到Scheduler里面 applications.put(applicationId, application); metrics.submitApp(user); LOG.info("Accepted application " + applicationId + " from user: " + user + ", currently num of applications: " + applications.size()); if (isAppRecovering) { if (LOG.isDebugEnabled()) { LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED"); } } else { //通过resourceManager的dispatcher去触发RMAppEventType.APP_ACCEPTED rmContext.getDispatcher().getEventHandler() .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED)); } }
那么我们就要再回到RMAPPImpl去看看RMAppEventType.APP_ACCEPTED做了什么:
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED, RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
会去执行StartAppAttemptTransition方法, 这个方法其实就是尝试去启动一次Application, 如果失败 还会尝试, 直到尝试的次数到达最大尝试次数为止
看一下StartAppAttemptTransition方法:
private static final class StartAppAttemptTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { //创建启动attempt app.createAndStartNewAttempt(false); }; } //在createAndStartNewAttempt中创建了RMAppAttempt, 然后去触发RMAppAttemptEventType.START事件 private void createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) { createNewAttempt(); handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(), transferStateFromPreviousAttempt)); } private void createNewAttempt() { ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1); RMAppAttempt attempt = new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService, submissionContext, conf, // The newly created attempt maybe last attempt if (number of // previously failed attempts(which should not include Preempted, // hardware error and NM resync) + 1) equal to the max-attempt // limit. //每次尝试 +1 知道最大次数到了为止 maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq); attempts.put(appAttemptId, attempt); currentAttempt = attempt; }
这里就会去RMAPPAttemptImpl 里面去触发RMAppAttemptEventType.START事件, 看一下这个状态机里面的transition是怎么样的:
private static final StateMachineFactory<RMAppAttemptImpl, RMAppAttemptState, RMAppAttemptEventType, RMAppAttemptEvent> stateMachineFactory = new StateMachineFactory<RMAppAttemptImpl, RMAppAttemptState, RMAppAttemptEventType, RMAppAttemptEvent>(RMAppAttemptState.NEW) // Transitions from NEW State .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED, RMAppAttemptEventType.START, new AttemptStartedTransition()) //回去调用本类的AttemptStartedTransition方法
看一下本类的AttemptStartedTransition方法:
private static final class AttemptStartedTransition extends BaseTransition { @Override public void transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { boolean transferStateFromPreviousAttempt = false; if (event instanceof RMAppStartAttemptEvent) { transferStateFromPreviousAttempt = ((RMAppStartAttemptEvent) event) .getTransferStateFromPreviousAttempt(); } appAttempt.startTime = System.currentTimeMillis(); // Register with the ApplicationMasterService appAttempt.masterService .registerAppAttempt(appAttempt.applicationAttemptId); if (UserGroupInformation.isSecurityEnabled()) { appAttempt.clientTokenMasterKey = appAttempt.rmContext.getClientToAMTokenSecretManager() .createMasterKey(appAttempt.applicationAttemptId); } // Add the applicationAttempt to the scheduler and inform the scheduler // whether to transfer the state from previous attempt. //将applicationAttempt触发scheduler的APP_ATTEMPT_ADDED事件, 就是FifoScheduler的APP_ATTEMPT_ADDED事件 appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent( appAttempt.applicationAttemptId, transferStateFromPreviousAttempt)); } }
我们回到FifoScheduler里面看一下APP_ATTEMPT_ADDED做了什么:
case APP_ATTEMPT_ADDED: { AppAttemptAddedSchedulerEvent appAttemptAddedEvent = (AppAttemptAddedSchedulerEvent) event; //执行addApplicationAttempt方法 addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(), appAttemptAddedEvent.getTransferStateFromPreviousAttempt(), appAttemptAddedEvent.getIsAttemptRecovering()); } public synchronized void addApplicationAttempt(ApplicationAttemptId appAttemptId, boolean transferStateFromPreviousAttempt, boolean isAttemptRecovering) { //创建SchedulerApplication SchedulerApplication<FiCaSchedulerApp> application = applications.get(appAttemptId.getApplicationId()); String user = application.getUser(); // TODO: Fix store //创建FiCaSchedulerApp FiCaSchedulerApp schedulerApp = new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager, this.rmContext); if (transferStateFromPreviousAttempt) { schedulerApp.transferStateFromPreviousAttempt(application .getCurrentAppAttempt()); } //设置当前attempt application.setCurrentAppAttempt(schedulerApp); //Submit metrics.submitAppAttempt(user); LOG.info("Added Application Attempt " + appAttemptId + " to scheduler from user " + application.getUser()); if (isAttemptRecovering) { if (LOG.isDebugEnabled()) { LOG.debug(appAttemptId + " is recovering. Skipping notifying ATTEMPT_ADDED"); } } else { //回去触发RMAppAttemptEventType.ATTEMPT_ADDED事件 rmContext.getDispatcher().getEventHandler().handle( new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.ATTEMPT_ADDED)); } }
那么接下来就又要回到MRAPPAttemptImpl去看RMAppAttemptEventType.ATTEMPT_ADDED事件了:
.addTransition(RMAppAttemptState.SUBMITTED, EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING, RMAppAttemptState.SCHEDULED), RMAppAttemptEventType.ATTEMPT_ADDED, new ScheduleTransition())
会去执行ScheduleTransition方法, 完成这一步后 这次尝试就会变成scheduled状态, 等着scheduler去assignContainer到nodemanager去了:
public static final class ScheduleTransition implements MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { //这个就是刚刚开始的时候我们创建的包含container启动脚本的地方 ApplicationSubmissionContext subCtx = appAttempt.submissionContext; if (!subCtx.getUnmanagedAM()) { // Need reset #containers before create new attempt, because this request // will be passed to scheduler, and scheduler will deduct the number after // AM container allocated // Currently, following fields are all hard code, // TODO: change these fields when we want to support // priority/resource-name/relax-locality specification for AM containers // allocation. //一个App Master 一个container 设置一些container的属性 appAttempt.amReq.setNumContainers(1); appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY); appAttempt.amReq.setResourceName(ResourceRequest.ANY); appAttempt.amReq.setRelaxLocality(true); // AM resource has been checked when submission //去scheduler里面执行allocate 然后会返回一个Allocation对象, 会等NodeManager去heartBeat的时候, ResourceManager发现这个NM还有资源, 然后就assign这个Allocation到这个NM上面, 再去Launch AM Allocation amContainerAllocation = appAttempt.scheduler.allocate(appAttempt.applicationAttemptId, Collections.singletonList(appAttempt.amReq), EMPTY_CONTAINER_RELEASE_LIST, null, null); if (amContainerAllocation != null && amContainerAllocation.getContainers() != null) { assert (amContainerAllocation.getContainers().size() == 0); } return RMAppAttemptState.SCHEDULED; } else { // save state and then go to LAUNCHED state appAttempt.storeAttempt(); return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING; } } }
到目前为止AM的Allocation以及生成了, 接下去就是通过Nodemanager去分配container, 然后在NM上面启动Container (执行刚刚开始设置的脚本)
这部分会在第二篇文章里面再写, 这篇就到这里吧
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1119最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
YARNRunner的运行原理总结
2016-10-25 17:52 1153之前看了那么些源码, 大致对整个Yarn的运行过程有了一个了解 ... -
Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析(下)
2016-10-11 13:53 2523中间隔了国庆, 好不容易才看明白了MRAppMaster如何启 ... -
Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析 (中)
2016-09-27 13:25 1653继续上一篇文章, 那时候AM Allocation已经生成, ... -
Hadoop MapReduce Job执行过程源码跟踪
2016-09-07 15:07 3012前面一片文章写了MR怎么写, 然后添加的主要功能怎么用, 像p ... -
Hadoop的Map端sort, partition, combiner以及Group
2016-09-05 15:15 1522Mapreduce在执行的时候首先会解析成KV键值对传送到Ma ... -
Hadoop 的WordCount
2016-08-30 19:41 649之前花了点时间玩spark, 现在开始学一下hadoop 前 ...
相关推荐
风光储直流微电网Simulink仿真模型:光伏发电、风力发电与混合储能系统的协同运作及并网逆变器VSR的研究,风光储直流微电网Simulink仿真模型:MPPT控制、混合储能系统、VSR并网逆变器的设计与实现,风光储、风光储并网直流微电网simulink仿真模型。 系统由光伏发电系统、风力发电系统、混合储能系统(可单独储能系统)、逆变器VSR?大电网构成。 光伏系统采用扰动观察法实现mppt控制,经过boost电路并入母线; 风机采用最佳叶尖速比实现mppt控制,风力发电系统中pmsg采用零d轴控制实现功率输出,通过三相电压型pwm变器整流并入母线; 混合储能由蓄电池和超级电容构成,通过双向DCDC变器并入母线,并采用低通滤波器实现功率分配,超级电容响应高频功率分量,蓄电池响应低频功率分量,有限抑制系统中功率波动,且符合储能的各自特性。 并网逆变器VSR采用PQ控制实现功率入网。 ,风光储; 直流微电网; simulink仿真模型; 光伏发电系统; 最佳叶尖速比控制; MPPT控制; Boost电路; 三相电压型PWM变换器;
以下是针对初学者的 **51单片机入门教程**,内容涵盖基础概念、开发环境搭建、编程实践及常见应用示例,帮助你快速上手。
【Python毕设】根据你提供的课程代码,自动排出可行课表,适用于西工大选课_pgj
【毕业设计】[零食商贩]-基于vue全家桶+koa2+sequelize+mysql搭建的移动商城应用
电动汽车充电背景下的微电网谐波抑制策略与风力发电系统仿真研究,电动汽车充电微电网的谐波抑制策略与风力发电系统仿真研究,基于电动汽车充电的微电网谐波抑制策略研究,包括电动汽车充电负 载模型,风电模型,光伏发现系统,储能系统,以及谐波处理模块 风力发电系统仿真 ,电动汽车充电负载模型; 风电模型; 光伏发现系统; 储能系统; 谐波处理模块; 风力发电系统仿真,电动汽车充电微电网的谐波抑制策略研究:整合负载模型、风电模型与光伏储能系统
Vscode部署本地Deepseek的continue插件windows版本
内容概要:本文详细介绍了滤波器的两个关键参数——截止频率(F0)和品质因素(Q),并探讨了不同类型的滤波器(包括低通、高通、带通和带阻滤波器)的设计方法及其特性。文章首先明确了F0和Q的基本概念及其在滤波器性能中的作用,接着通过数学推导和图形展示的方式,解释了不同Q值对滤波器频率响应的影响。文中特别指出,通过调整Q值可以控制滤波器的峰谷效果和滚降速度,进而优化系统的滤波性能。此外,还讨论了不同类型滤波器的具体应用场景,如低通滤波器适用于消除高频噪声,高通滤波器用于去除直流分量和低频干扰,而带通滤波器和带阻滤波器分别用于选取特定频段信号和排除不需要的频段。最后,通过对具体案例的解析,帮助读者更好地理解和应用相关理论。 适合人群:电子工程及相关领域的技术人员、研究人员以及高校学生,特别是那些需要深入了解滤波器设计原理的人群。 使用场景及目标:适用于从事模拟电路设计的专业人士,尤其是希望掌握滤波器设计细节和技术的应用场合。目标是让读者能够灵活运用Q值和F0来优化滤波器设计,提升系统的信噪比和选择性,确保信号的纯净性和完整性。
内容概要:本文主要讲述了利用QUARTUSⅡ进行电子设计自动化的具体步骤和实例操作,详细介绍了如何利用EDA技术在QUARTUSⅡ环境中设计并模拟下降沿D触发器的工作过程,重点探讨了系统规格设计、功能描述、设计处理、器件编译和测试四个步骤及相关的设计验证流程,如功能仿真、逻辑综合及时序仿真等内容,并通过具体的操作指南展示了电路设计的实际操作方法。此外还强调了QUARTUSⅡ作为一款集成了多种功能的综合平台的优势及其对于提高工作效率的重要性。 适用人群:电子工程、自动化等相关专业的学生或者工程师,尤其适用于初次接触EDA技术和QuartusⅡ的用户。 使用场景及目标:旨在帮助用户理解和掌握使用QUARTUSⅡ这一先进的EDA工具软件进行从概念设计到最后成品制作整个电路设计过程的方法和技巧。目标是在实际工作中能够熟练运用QUARTUSⅡ完成各类复杂电子系统的高效设计。 其他说明:文中通过具体的案例让读者更直观理解EDA设计理念和技术特点的同时也为进一步探索EDA领域的前沿课题打下了良好基础。此外它还提到了未来可能的发展方向,比如EDA工具的功能增强趋势等。
Simulink建模下的光储系统与IEEE33节点配电网的协同并网运行:光照强度变化下的储能系统优化策略与输出性能分析,Simulink模型下的光伏微网系统:光储协同,实现380v电压等级下的恒定功率并网与平抑波动,Simulink含光伏的IEEE33节点配电网模型 微网,光储系统并网运行 光照强度发生改变时,储能可以有效配合光伏进行恒定功率并网,平抑波动,实现削峰填谷。 总的输出有功为270kw(图23) 无功为0 检验可以并网到电压等级为380v的电网上 逆变侧输出电压电流稳定(图4) ,Simulink; 含光伏; 配电网模型; 微网; 光储系统; 储能配合; 恒定功率并网; 电压等级; 逆变侧输出。,Simulink光伏微网模型:光储协同并网运行,实现功率稳定输出
基于Andres ELeon新法的双馈风机次同步振荡抑制策略:附加阻尼控制(SDC)的实践与应用,双馈风机次同步振荡的抑制策略研究:基于转子侧附加阻尼控制(SDC)的应用与效能分析,双馈风机次同步振荡抑制策略(一) 含 基于转子侧附加阻尼控制(SDC)的双馈风机次同步振荡抑制,不懂就问, 附加阻尼控制 (SDC)被添加到 RSC 内部控制器的q轴输出中。 这种方法是由Andres ELeon在2016年提出的。 该方法由增益、超前滞后补偿器和带通滤波器组成。 采用实测的有功功率作为输入信号。 有关更多信息,你可以阅读 Andres ELeon 的lunwen。 附lunwen ,关键词:双馈风机、次同步振荡、抑制策略;转子侧附加阻尼控制(SDC);RSC内部控制器;Andres ELeon;增益;超前滞后补偿器;带通滤波器;实测有功功率。,双馈风机次同步振荡抑制技术:基于SDC与RSCq轴控制的策略研究
springboot疫情防控期间某村外出务工人员信息管理系统--
高效光伏并网发电系统MATLAB Simulink仿真设计与MPPT技术应用及PI调节闭环控制,光伏并网发电系统MATLAB Simulink仿真设计:涵盖电池、BOOST电路、逆变电路及MPPT技术效率提升,光伏并网发电系统MATLAB Simulink仿真设计。 该仿真包括电池,BOOST升压电路,单相全桥逆变电路,电压电流双闭环控制部分;应用MPPT技术,提高光伏发电的利用效率。 采用PI调节方式进行闭环控制,SPWM调制,采用定步长扰动观测法,对最大功率点进行跟踪,可以很好的提高发电效率和实现并网要求。 ,光伏并网发电系统; MATLAB Simulink仿真设计; 电池; BOOST升压电路; 单相全桥逆变电路; 电压电流双闭环控制; MPPT技术; PI调节方式; SPWM调制; 定步长扰动观测法。,光伏并网发电系统Simulink仿真设计:高效MPPT与PI调节控制策略
PFC 6.0高效循环加载系统:支持半正弦、半余弦及多级变荷载功能,PFC 6.0循环加载代码:支持半正弦、半余弦及多级变荷载的强大功能,PFC6.0循环加载代码,支持半正弦,半余弦函数加载,中间变荷载等。 多级加载 ,PFC6.0; 循环加载代码; 半正弦/半余弦函数加载; 中间变荷载; 多级加载,PFC6.0多级半正弦半余弦循环加载系统
某站1K的校园跑腿小程序 多校园版二手市场校园圈子失物招领 食堂/快递代拿代买跑腿 多校版本,多模块,适合跑腿,外卖,表白,二手,快递等校园服务 需要自己准备好后台的服务器,已认证的小程序,备案的域名!
【Python毕设】根据你提供的课程代码,自动排出可行课表,适用于西工大选课
COMSOL锂枝晶模型:五合一的相场、浓度场与电场模拟研究,涵盖单枝晶定向生长、多枝晶生长及无序生长等多元现象的探索,COMSOL锂枝晶模型深度解析:五合一技术揭示单枝晶至雪花枝晶的生长机制与物理场影响,comsol锂枝晶模型 五合一 单枝晶定向生长、多枝晶定向生长、多枝晶随机生长、无序生长随机形核以及雪花枝晶,包含相场、浓度场和电场三种物理场(雪花枝晶除外),其中单枝晶定向生长另外包含对应的参考文献。 ,comsol锂枝晶模型; 五合一模型; 单枝晶定向生长; 多枝晶定向生长; 多枝晶随机生长; 无序生长随机形核; 雪花枝晶; 相场、浓度场、电场物理场; 参考文献,COMSOL锂枝晶模型:多场景定向生长与相场电场分析
嵌入式大学生 点阵代码
那个有delphi12 tedgebrowser 使用的dll
基于DQN算法的微网储能优化调度与能量管理:深度强化学习的应用与实践,基于DQN算法的微网储能优化调度与能量管理:深度强化学习的应用与实践,基于DQN算法的微网储能运行优化与能量管理 关键词:微网 优化调度 储能优化 深度强化学习 DQN 编程语言:python 参考文献:《Explainable AI Deep Reinforcement Learning Agents for Residential Demand Side Cost Savings in Smart Grids》 内容简介: 受深层强化学习(RL)最新进展的激励,我们开发了一个RL代理来管理家庭中存储设备的操作,旨在最大限度地节省需求侧的成本。 所提出的技术是数据驱动的,并且RL代理从头开始学习如何在可变费率结构下有效地使用能量存储设备,即收缩“黑匣子”的概念,其中代理所学的技术被忽略。 我们解释了RL-agent的学习过程,以及基于存储设备容量的策略。 ,微网; 优化调度; 储能优化; 深度强化学习; DQN; 家庭存储设备; 需求侧成本节省; 智能电网; RL代理; 能量存储设备。,基于DQN算法的微网储
内容概要:该文档为FM17580的原理图设计文件,重点介绍了这款非接触式IC卡读写芯片的电路设计细节。文档详细列出了各个元器件及其连接方式、引脚分配及具体值设定。特别值得注意的是,为了确保性能和可靠性,在PCB布局时强调了GND线需要尽量以最短路径连回FM175xx芯片的TVSS引脚附近,并且靠近电源输入端(TVDD)。同时明确了FM17580只兼容SPI通讯协议,其他如IIC或UART选项则不在支持范围内。此外还提供了关于降低能耗的选择——移除不必要的ADC检测电路,这对于一些特定应用场景非常有用。 适合人群:具备硬件开发经验和RFID/NFC领域基础知识的技术人员或研究人员。 使用场景及目标:适用于需要详细了解FM17580内部结构和技术特性的项目团队;旨在帮助工程师们快速上手搭建实验平台并测试FM17580的功能特性。主要目的是为实际应用开发提供技术支持和参考。 其他说明:文档最后附带了一些附加信息,包括设计师名字、公司名称以及审查流程的相关内容,但具体内容并未公开。此外还提到该文档是针对FM17580评估板(即FM17580Demo)的设计图纸。文中出现多次类似表格可能是不同版本之间的对比或者记录修改历史的部分内容。