- 浏览: 615184 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
月光杯:
问题解决了吗?
Exceptions in HDFS -
iostreamin:
神,好厉害,这是我找到的唯一可以ac的Java代码,厉害。
[leetcode] word ladder II -
standalone:
One answer I agree with:引用Whene ...
How many string objects are created? -
DiaoCow:
不错!,一开始对这些确实容易犯迷糊
erlang中的冒号 分号 和 句号 -
standalone:
Exception in thread "main& ...
one java interview question
本文参考hadoop的版本: hadoop-2.0.1-alpha-src
参考: http://www.cnblogs.com/biyeymyhjob/archive/2012/08/16/2640733.html
和参考的文章一样,还是考虑具体一个MR的job怎么执行的,这个例子如下:
从中可以看出applicationsManager就是一个RPC用的代理对象,既然是代理,那么这是client端的,那么server端的实现呢?我们直接跳到server端去看.
ClientRMService处理了所有从客户端过来的RPC请求。看了这两个方法应该明白了这个id就是由一个server端AtomicInteger类型的applicationCounter增1得来的,然后通过rpc返还给提交job的客户端。
现在确定了提交Job文件的目录,Job的ID,下面就可以拷贝Job文件以及配置文件到那个目录去了,然后开始重要的一步,就是切分输入文件,这一步是由writeSplits()实现的。
最后真正提交是由sumbitClient.submitJob(),submitClient是YARNRunner对象,所以看YARNRunner怎么做的:
提交Job又经过了两层调用,经过resMgrDelegate到了applicationsManager身上,这是个RPC代理啊,刚才说过它了,而且它对应的服务器端的实现是ClientRMService,我们去这个类看实现,
好吧,现在又见了一个新玩意儿,rmAppManager,我先告诉你,这是一个用来帮助ResourceManager管理任务列表的对象。我们跟下去看它的handle方法:
我们现在只看APP_SUBMIT那个分支,它有会调用自身的一个submitApplication方法:
这段code主要生成了一个RMApp类型的application对象,最后把它封装成一个event里面,然后调用this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.START)) 去处理这个对象,最后调用的是GenericEventHandler.handle()方法,也就是将这个event添加到了eventQueue里面。
[to be continued...]
参考: http://www.cnblogs.com/biyeymyhjob/archive/2012/08/16/2640733.html
和参考的文章一样,还是考虑具体一个MR的job怎么执行的,这个例子如下:
// Create a new Job Job job = new Job(new Configuration()); job.setJarByClass(MyJob.class); // Specify various job-specific parameters job.setJobName("myjob"); job.setInputPath(new Path("in")); job.setOutputPath(new Path("out")); job.setMapperClass(MyJob.MyMapper.class); job.setReducerClass(MyJob.MyReducer.class); // Submit the job, then poll for progress until the job is complete job.waitForCompletion(true); 我们首先看Job类的定义,该类在包org.apache.hadoop.mapreduce里。它其实包括了MR任务的定义,提交和执行状态查询等内容。 public class Job extends JobContextImpl implements JobContext 该类继承了JobContextImpl, JobContextImpl又是实现JobContext接口的,JobContext又继承了MRJobConfig接口。我们先从MRJobConfig接口看起。MRJobConfig里面主要包含了MR任务相关的配置参数的名称,例如:// Put all of the attribute names in here so that Job and JobContext are // consistent. public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.job.inputformat.class"; public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class"; public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class"; public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class"; public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.job.outputformat.class"; public static final String PARTITIONER_CLASS_ATTR = "mapreduce.job.partitioner.class"; public static final String SETUP_CLEANUP_NEEDED = "mapreduce.job.committer.setup.cleanup.needed"; public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed"; public static final String JAR = "mapreduce.job.jar"; public static final String ID = "mapreduce.job.id"; public static final String JOB_NAME = "mapreduce.job.name";
JobContext这个接口扩展了MRJobConfig,继续声明了一些getXXX方法,例如:
getConfiguration()
getCredentialls()
getJobId()
getNumReduceTasks()
getOutputKeyClass()
getOutputValueClass()
...
所以JobContext接口的实现类应该要填充这些可以用来获取Job一些配置信息的方法。
然后看JobConextImpl具体怎么做到的,JobContextImpl的构造方法:
public JobContextImpl(Configuration conf, JobID jobId) { if (conf instanceof JobConf) { this.conf = (JobConf)conf; } else { this.conf = new JobConf(conf); } this.jobId = jobId; this.credentials = this.conf.getCredentials(); try { this.ugi = UserGroupInformation.getCurrentUser(); } catch (IOException e) { throw new RuntimeException(e); } }
可以看到JobContextImpl有个内部变量,类型是JobConf,而JobConf是org.apache.hadoop.conf.Configuration的子类,其构造函数需要一个Configuration类的对象。所以上面说到的配置信息的最终来源都是Configuration类的实例对象。JobContextImpl实现的JobContext中定义的那些getXXX()方法大多都是通过
conf.getXXX()来实现的。
现在看JobContextImpl的子类也就是我们的Job类。先看几个重要的变量。
private JobState state = JobState.DEFINE; private JobStatus status; private long statustime; private Cluster cluster;
其中JobState是在Job类里面定义的一个枚举变量类型,值有两个:DEFINE和RUNNING,代表了Job提交市到了哪个阶段了。JobStatus是单独定义的一个描述Job当前执行状态的一个类,我们来具体看一下一个Job在执行过程中,我们可以拿到哪些相关信息。
private JobID jobid; private float mapProgress; private float reduceProgress; private float cleanupProgress; private float setupProgress; private State runState; private long startTime; private String user; private String queue; private JobPriority priority; private String schedulingInfo="NA"; private String failureInfo = "NA"; private Map<JobACL, AccessControlList> jobACLs = new HashMap<JobACL, AccessControlList>(); private String jobName; private String jobFile; private long finishTime; private boolean isRetired; private String historyFile = ""; private String trackingUrl =""; private int numUsedSlots; private int numReservedSlots; private int usedMem; private int reservedMem; private int neededMem; private boolean isUber;
注意这里也定义了一个State枚举变量来表示Job执行的情况,取值有RUNNING, SUCCEEDED, FAILED, PREP和KILLED. 我们在执行MR程序的时候可以从标准输出看到当前Map Tasks执行的百分比和Reduce Tasks执行的百分比,都是获取的JobStatus类对乡里的信息,但具体JobStatus中的诸如mapProgress, reduceProgress等怎么计算的呢?后面会讲。
Job里面的另一个重要的变量是cluster,这个其实是用来代表该MR程序运行所在的集群。我们仔细看一下这个类。
先介绍Cluster类的两个变量
ClientProtocol client;
ClientProtocolProvider provider;
ClientProtocol是个接口,如果你熟悉hadoop的RPC,会理解client其实是用来和JobTracker或者Yarn进行通信用来提交Job或者查询当前系统状况的RPC代理,看这个代理怎么生成的://Cluster.java private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { synchronized (frameworkLoader) { for (ClientProtocolProvider provider : frameworkLoader) { LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName()); ClientProtocol clientProtocol = null; try { if (jobTrackAddr == null) { clientProtocol = provider.create(conf); } else { clientProtocol = provider.create(jobTrackAddr, conf); } if (clientProtocol != null) { clientProtocolProvider = provider; client = clientProtocol; LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider"); break; } else { LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol"); } } catch (Exception e) { LOG.info("Failed to use " + provider.getClass().getName() + " due to error: " + e.getMessage()); } } } if (null == clientProtocolProvider || null == client) { throw new IOException( "Cannot initialize Cluster. Please check your configuration for " + MRConfig.FRAMEWORK_NAME + " and the correspond server addresses."); } }
可以看出client是由provider创造出来的,问题是ClientProtocolProvider是个抽象类,怎么创造出来的呢?看Cluter类的几行代码:
private static ServiceLoader<ClientProtocolProvider> frameworkLoader = ServiceLoader.load(ClientProtocolProvider.class);
如果明白ServiceLoader就会明白了(不明白只能先去看JAVA API),我们找到这个service的配置文件:
hadoop-mapreduce-project\hadoop-mapreduce-client\hadoop-mapreduce-client-jobclient\src\main\resources\META-INF\services\org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider,
文件里面提供了一个ClientProtocolProvider的实现:
org.apache.hadoop.mapred.YarnClientProtocolProvider
这里要说一句的是好像ClientProtocolProvider有两个实现子类,另一个是LocalClientProtoclProvider,好像在debug的时候用的,暂且不表。
继续看YarnClientProtocolProvider怎么创建这个client代理的:
public ClientProtocol create(Configuration conf) throws IOException { if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { return new YARNRunner(conf); } return null; }
就是说如果选择的framework名字是yarn的话,就创建一个YARNRunner对象。这个又是什么玩意呢?
代码里面对这个类的介绍就一句话:
This class enables the current JobClient (0.22 hadoop) to run on YARN.
就是让JobClient跑在YARN的的一个东西。YARNRunner实现了ClientProtocol接口,我们暂时先对它了解这么多。
扯得太远了,我们回头解释最开始的例子的执行。首先是Job的创建:
new Job(new Configuration())
=> new Job(JobConf conf)
=> JobContextImpl(Configuration conf, JobID jobId)
然后job.setJarByClass()是设置要执行的MR程序的类文件, job.setJobName()是设置job的名字,然后设置输入输出的路径,以及MR程序中定义map任务的类和reduce任务的类。最关键的是最后一句:job.waitForCompletion(true); 提交job并且等待完成,具体怎么提交的呢?
实现代码:
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
先看当前JobState,如果是DEFINE,那么ok,可以提交,调用submit()方法。
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { return submitter.submitJobInternal(Job.this, cluster); } }); state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); } private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster == null) { cluster = ugi.doAs(new PrivilegedExceptionAction<Cluster>() { public Cluster run() throws IOException, InterruptedException, ClassNotFoundException { return new Cluster(getConfiguration()); } }); } }
submit()方法里关键的两处代码,一处是调用connect()方法,另一处是获取一个JobSubmitter类的实例,调用该对象的submitJobInternal方法来提交任务。
connect() 方法干了什么呢,要注意的是刚才Job实例化的时候,job.cluster还是null, connect()主要是具体化这个cluster. 我们看cluster的创建过程。
//Cluster.java public Cluster(Configuration conf) throws IOException { this(null, conf); } public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.conf = conf; this.ugi = UserGroupInformation.getCurrentUser(); initialize(jobTrackAddr, conf); }
而initialize()方法我们在上面遇到过了。
然后看JobSubmitter是什么,JobSubmitter从名字就能看出来它是负责提交Job的,整个提交过程牵涉到:
引用
1. Checking the input and output specifications of the job. 检查输入输出是否指定。
2. Computing the InputSplits for the job. 将输入切分。
3. Setup the requisite accounting information for the DistributedCache of the job, if necessary. 为DistributedCache设置必要的账号信息
3. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system. 将Job的Jar文件和配置文件拷贝到HDFS里面。
4. Submitting the job to the JobTracker and optionally monitoring it's status. 提交Job到JobTracker并且监控任务状态
看代码实现:JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs checkSpecs(job); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, job.getConfiguration()); //configure the command line options correctly on the submitting dfs Configuration conf = job.getConfiguration(); InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); // Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
jobStagingArea应该是存放Job的目录,看JobSubmissionFiles.getStagingDire做什么:
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, job.getConfiguration()); => Path stagingArea = cluster.getStagingAreaDir(); => cluster.client.getStagingAreaDir()
前面知道Cluster.client是一个YANRunner对象,所以前进到YARNRunder.getStagingAreaDir()
public String getStagingAreaDir() throws IOException, InterruptedException { return resMgrDelegate.getStagingAreaDir(); }
resMgrDelegate是ResourceMgrDelegate类的对象,负责跟ResourceManager联系。
public String getStagingAreaDir() throws IOException, InterruptedException { // Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR); String user = UserGroupInformation.getCurrentUser().getShortUserName(); Path path = MRApps.getStagingAreaDir(conf, user); LOG.debug("getStagingAreaDir: dir=" + path); return path.toString(); }
现在又出现了一个类MRApps, 文档说这是一个MR Applications的帮助类,继承自Apps, Apps是代表Yarn应用程序相关东西的类。
现在看MRApps.getStagingAreaDir()这个静态方法怎么获取存放Job的目录的:
private static final String STAGING_CONSTANT = ".staging"; public static Path getStagingAreaDir(Configuration conf, String user) { return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR, MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT); }
所以最后的目录应该是由yarn.app.mapreduce.am.staging-dir/[user]/.staging代表的HDFS目录,缺省是/tmp/hadoop-yarn/staging/[user]/.staging。
目录信息如果不存在会被创建,然后是获取一个JobID:
submitClient.getNewJobID()
=> YARNRunner.getNewJobID()
=> ResourceMgrDelegate.getNewJobID()
public JobID getNewJobID() throws IOException, InterruptedException { GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class); applicationId = applicationsManager.getNewApplication(request).getApplicationId(); return TypeConverter.fromYarn(applicationId); }
看ResourceMgrDelegate怎么获取一个新的JobID的,首先是生成了一个创建新Application的request,然后一个applicationManager以此request为参数创建一个新的app, 并返回ApplicationId. applicationManager声明的类型是ClientRMProtocol,这是一个借口,是client跟ResourceManager通信的协议,看ResourceManagerDelegate的构造方法:
[code="java"]
/**
* Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
* @param conf the configuration object.
*/
public ResourceMgrDelegate(YarnConfiguration conf) {
this.conf = conf;
YarnRPC rpc = YarnRPC.create(this.conf);
this.rmAddress = getRmAddress(conf);
LOG.debug("Connecting to ResourceManager at " + rmAddress);
applicationsManager =
(ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
rmAddress, this.conf);
LOG.debug("Connected to ResourceManager at " + rmAddress);
}
从中可以看出applicationsManager就是一个RPC用的代理对象,既然是代理,那么这是client端的,那么server端的实现呢?我们直接跳到server端去看.
public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnRemoteException { GetNewApplicationResponse response = recordFactory .newRecordInstance(GetNewApplicationResponse.class); response.setApplicationId(getNewApplicationId()); // Pick up min/max resource from scheduler... response.setMinimumResourceCapability(scheduler .getMinimumResourceCapability()); response.setMaximumResourceCapability(scheduler .getMaximumResourceCapability()); return response; } ApplicationId getNewApplicationId() { ApplicationId applicationId = org.apache.hadoop.yarn.util.BuilderUtils .newApplicationId(recordFactory, ResourceManager.clusterTimeStamp, applicationCounter.incrementAndGet()); LOG.info("Allocated new applicationId: " + applicationId.getId()); return applicationId; }
ClientRMService处理了所有从客户端过来的RPC请求。看了这两个方法应该明白了这个id就是由一个server端AtomicInteger类型的applicationCounter增1得来的,然后通过rpc返还给提交job的客户端。
现在确定了提交Job文件的目录,Job的ID,下面就可以拷贝Job文件以及配置文件到那个目录去了,然后开始重要的一步,就是切分输入文件,这一步是由writeSplits()实现的。
最后真正提交是由sumbitClient.submitJob(),submitClient是YARNRunner对象,所以看YARNRunner怎么做的:
@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { /* check if we have a hsproxy, if not, no need */ MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); if (hsProxy != null) { // JobClient will set this flag if getDelegationToken is called, if so, get // the delegation tokens for the HistoryServer also. if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) { Token hsDT = getDelegationTokenFromHS(hsProxy, new Text( conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER))); ts.addToken(hsDT.getService(), hsDT); } } // Upload only in security mode: TODO Path applicationTokensFile = new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); try { ts.writeTokenStorageFile(applicationTokensFile, conf); } catch (IOException e) { throw new YarnException(e); } // Construct necessary information to start the MR AM ApplicationSubmissionContext appContext = createApplicationSubmissionContext(conf, jobSubmitDir, ts); // Submit to ResourceManager ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); ApplicationReport appMaster = resMgrDelegate .getApplicationReport(applicationId); String diagnostics = (appMaster == null ? "application report is null" : appMaster.getDiagnostics()); if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) { throw new IOException("Failed to run job : " + diagnostics); } return clientCache.getClient(jobId).getJobStatus(jobId); } public ApplicationId submitApplication( ApplicationSubmissionContext appContext) throws IOException { appContext.setApplicationId(applicationId); SubmitApplicationRequest request = recordFactory.newRecordInstance(SubmitApplicationRequest.class); request.setApplicationSubmissionContext(appContext); applicationsManager.submitApplication(request); LOG.info("Submitted application " + applicationId + " to ResourceManager" + " at " + rmAddress); return applicationId; }
提交Job又经过了两层调用,经过resMgrDelegate到了applicationsManager身上,这是个RPC代理啊,刚才说过它了,而且它对应的服务器端的实现是ClientRMService,我们去这个类看实现,
@Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnRemoteException { ApplicationSubmissionContext submissionContext = request .getApplicationSubmissionContext(); ApplicationId applicationId = submissionContext.getApplicationId(); String user = submissionContext.getUser(); try { user = UserGroupInformation.getCurrentUser().getShortUserName(); if (rmContext.getRMApps().get(applicationId) != null) { throw new IOException("Application with id " + applicationId + " is already present! Cannot add a duplicate!"); } // Safety submissionContext.setUser(user); // This needs to be synchronous as the client can query // immediately following the submission to get the application status. // So call handle directly and do not send an event. rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System .currentTimeMillis())); LOG.info("Application with id " + applicationId.getId() + " submitted by user " + user); RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST, "ClientRMService", applicationId); } catch (IOException ie) { LOG.info("Exception in submitting application", ie); RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, ie.getMessage(), "ClientRMService", "Exception in submitting application", applicationId); throw RPCUtil.getRemoteException(ie); }
好吧,现在又见了一个新玩意儿,rmAppManager,我先告诉你,这是一个用来帮助ResourceManager管理任务列表的对象。我们跟下去看它的handle方法:
@Override public void handle(RMAppManagerEvent event) { ApplicationId applicationId = event.getApplicationId(); LOG.debug("RMAppManager processing event for " + applicationId + " of type " + event.getType()); switch(event.getType()) { case APP_COMPLETED: { finishApplication(applicationId); ApplicationSummary.logAppSummary( rmContext.getRMApps().get(applicationId)); checkAppNumCompletedLimit(); } break; case APP_SUBMIT: { ApplicationSubmissionContext submissionContext = ((RMAppManagerSubmitEvent)event).getSubmissionContext(); long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime(); submitApplication(submissionContext, submitTime); } break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } }
我们现在只看APP_SUBMIT那个分支,它有会调用自身的一个submitApplication方法:
//RMAppManager.java @SuppressWarnings("unchecked") protected synchronized void submitApplication( ApplicationSubmissionContext submissionContext, long submitTime) { ApplicationId applicationId = submissionContext.getApplicationId(); RMApp application = null; try { String clientTokenStr = null; String user = UserGroupInformation.getCurrentUser().getShortUserName(); if (UserGroupInformation.isSecurityEnabled()) { Token<ClientTokenIdentifier> clientToken = new Token<ClientTokenIdentifier>( new ClientTokenIdentifier(applicationId), this.clientToAMSecretManager); clientTokenStr = clientToken.encodeToUrlString(); LOG.debug("Sending client token as " + clientTokenStr); } // Sanity checks if (submissionContext.getQueue() == null) { submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); } if (submissionContext.getApplicationName() == null) { submissionContext.setApplicationName( YarnConfiguration.DEFAULT_APPLICATION_NAME); } // Store application for recovery ApplicationStore appStore = rmContext.getApplicationsStore() .createApplicationStore(submissionContext.getApplicationId(), submissionContext); // Create RMApp application = new RMAppImpl(applicationId, rmContext, this.conf, submissionContext.getApplicationName(), user, submissionContext.getQueue(), submissionContext, clientTokenStr, appStore, this.scheduler, this.masterService, submitTime); // Sanity check - duplicate? if (rmContext.getRMApps().putIfAbsent(applicationId, application) != null) { String message = "Application with id " + applicationId + " is already present! Cannot add a duplicate!"; LOG.info(message); throw RPCUtil.getRemoteException(message); } // Inform the ACLs Manager this.applicationACLsManager.addApplication(applicationId, submissionContext.getAMContainerSpec().getApplicationACLs()); // Setup tokens for renewal if (UserGroupInformation.isSecurityEnabled()) { this.rmContext.getDelegationTokenRenewer().addApplication( applicationId,parseCredentials(submissionContext), submissionContext.getCancelTokensWhenComplete() ); } // All done, start the RMApp this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.START)); } catch (IOException ie) { LOG.info("RMAppManager submit application exception", ie); if (application != null) { // Sending APP_REJECTED is fine, since we assume that the // RMApp is in NEW state and thus we havne't yet informed the // Scheduler about the existence of the application this.rmContext.getDispatcher().getEventHandler().handle( new RMAppRejectedEvent(applicationId, ie.getMessage())); } } }
这段code主要生成了一个RMApp类型的application对象,最后把它封装成一个event里面,然后调用this.rmContext.getDispatcher().getEventHandler().handle(
new RMAppEvent(applicationId, RMAppEventType.START)) 去处理这个对象,最后调用的是GenericEventHandler.handle()方法,也就是将这个event添加到了eventQueue里面。
[to be continued...]
发表评论
-
hadoop-2.2.0 build failure due to missing dependancy
2014-01-06 13:18 757The bug and fix is at https://i ... -
HDFS中租约管理源代码分析
2013-07-05 18:05 0HDFS中Client写文件的时候要获得一个租约,用来保证Cl ... -
Question on HBase source code
2013-05-22 15:05 1122I'm reading source code of hbas ... -
Using the libjars option with Hadoop
2013-05-20 15:03 971As I have said in my last post, ... -
学习hadoop之基于protocol buffers的 RPC
2012-11-15 23:23 10123现在版本的hadoop各种serv ... -
学习hadoop之基于protocol buffers的 RPC
2012-11-15 22:59 2现在版本的hadoop各种server、client RPC端 ... -
Hadoop RPC 一问
2012-11-14 14:43 121看代码时候发现好像有个地方做得多余,不知道改一下会不会有好处, ... -
Hadoop Version Graph
2012-11-14 11:47 932可以到这里看全文: http://cloudblog.8km ... -
how to study hadoop?
2012-04-27 15:34 1536From StackOverflow http://stack ... -
首相发怒记之hadoop篇
2012-03-23 12:14 794我在youtube上看到的,某位能翻*墙的看一下吧,挺好笑的。 ... -
一个HDFS Error
2011-06-11 21:53 1538ERROR: hdfs.DFSClient: Excep ... -
hadoop cluster at ebay
2011-06-11 21:39 1162Friday, December 17, 2010Hadoop ... -
[转]hadoop at ebay
2011-06-11 21:09 1200http://www.ebaytechblog.com/201 ... -
【读书笔记】Data warehousing and analytics infrastructure at facebook
2011-03-18 22:03 1954这好像是sigmod2010上的paper。 读了之后做了以 ... -
impact of total region numbers?
2011-01-19 16:31 935这几天tune了hbase的几个参数,有些有意思的结果。具体看 ... -
Will all HFiles managed by a regionserver kept open
2011-01-19 10:29 1493code 没看仔细,所以在hbase 的mail list上面 ... -
problems in building hadoop
2010-12-22 10:28 1016When I try to modify some code ... -
HDFS scalability: the limits to growth
2010-11-30 12:52 2050Abstract: The Hadoop Distr ... -
hadoop-0.20.2+737 and hbase-0.20.6 not compatible?
2010-11-11 13:28 1329master log里面发现 0 region server ... -
Implementing WebGIS on Hadoop: A Case Study of Improving Small File IO Performan
2010-07-26 22:46 1681Implementing WebGIS on Hadoo ...
相关推荐
"基于Comsol的采空区阴燃现象研究:速度、氧气浓度、瓦斯浓度与温度分布的二维模型分析",comsol采空区阴燃。 速度,氧气浓度,瓦斯浓度及温度分布。 二维模型。 ,comsol; 采空区; 阴燃; 速度; 氧气浓度; 瓦斯浓度; 温度分布; 二维模型;,"COMSOL模拟采空区阴燃:速度、浓度与温度分布的二维模型研究"
安全驱动的边云数据协同策略研究.pdf
MATLAB代码实现电-气-热综合能源系统耦合优化调度模型:精细电网、气网与热网协同优化,保姆级注释参考文档详可查阅。,MATLAB代码:电-气-热综合能源系统耦合优化调度 关键词:综合能源系统 优化调度 电气热耦合 参考文档:自编文档,非常细致详细,可联系我查阅 仿真平台:MATLAB YALMIP+cplex gurobi 主要内容:代码主要做的是一个考虑电网、热网以及气网耦合调度的综合能源系统优化调度模型,考虑了电网与气网,电网与热网的耦合,算例系统中,电网部分为10机39节点的综合能源系统,气网部分为比利时20节点的配气网络,潮流部分电网是用了直流潮流,气网部分也进行了线性化的操作处理,代码质量非常高,保姆级的注释以及人性化的模块子程序,所有数据均有可靠来源 ,关键词:MATLAB代码; 电-气-热综合能源系统; 耦合优化调度; 电网; 热网; 气网; 潮流; 直流潮流; 线性化处理; 保姆级注释; 人性化模块子程序; 可靠数据来源。,MATLAB代码:电-气-热综合能源系统耦合优化调度模型(保姆级注释,数据来源可靠)
内容概要:本文详细探讨了人工智能(AI)对就业市场的深远影响及其发展趋势。首先介绍了到2027年,44%的工人核心技能将受技术变革尤其是AI影响的事实,并提及自动化可能取代部分工作的现象。其次指出虽然某些职位面临风险,但也带来了全新的职业机遇与现有角色改进的可能性,关键在于人类要学会借助AI释放自身潜力并培养软实力,以适应快速发展的科技需求。再者,强调终身学习理念下企业和教育培训须革新教学手段与评估机制,以便紧跟AI进化速率,为个体和社会持续注入新动力。最后提到了教育机构应当加快调整步伐以匹配技术变革的速度,并利用AI实现个性化的教育,进而提升学习者的适应能力和解决问题的能力。 适用人群:政策制定者、企业管理层、在职人员及教育工作者,还有广大学生群体均能从中获得启示。 使用场景及目标:面向关注未来职场动向及教育发展方向的专业人士,提供前瞻性思考角度,助力各界积极规划职业生涯路径或调整教育资源分配策略。 其他说明:本文综合多位行业领袖的观点展开讨论,旨在唤起社会各界共同思考AI带来的变革及对策,而非单方面渲染危机感。
2025最新空调与制冷作业考试题及答案.doc
2025最新初级电工证考试题及答案.docx
飞剪PLC控制系统——采用西门子S7-200SMART和触摸屏实现智能化操控及图纸详述,飞锯追剪程序,PLC和触摸屏采用西门子200smart,包含图纸,触摸屏程序和PLC程序。 ,核心关键词:飞锯追剪程序; 西门子200smart; PLC程序; 触摸屏程序; 图纸; 控制系统。,"西门子200smart飞锯追剪系统程序包:含图纸、PLC与触摸屏程序"
使用PyQt6制作的Python应用程序。
三相桥式整流电路双闭环控制策略:电压外环与电流内环协同优化研究,三相桥式整流电路双闭环控制 电流内环 电压外环(也有开环控制) 采用电压电流双闭环控制,在电压、电流控制电路中,电压单环控制易于设计和分析,但是响应速度慢,无限流功能。 而电流环能增强电路稳定性、响应速度快。 三相桥式全控整流电路由整流变压器、阴极相连接的晶闸管(VT1, VT3, VT5)、阳极相连接的晶闸管(VT4, VT6, VT2)、负载、触发器和同步环节组成(如图1),6个晶闸管依次相隔60°触发,将电源交流电整流为直流电。 matlab仿真模型(开闭环都有)控制效果良好,可写报告。 ,三相桥式整流电路;双闭环控制;电流内环;电压外环;开环控制;MATLAB仿真模型。,基于双闭环控制的电压电流三相整流技术分析与Matlab仿真实现
MATLAB四旋翼仿真PID控制:从入门到精通的手把手教学,含QAV方法、模型代码、Simulink布局思路及详细图文说明,MATLAB四旋翼仿真 PID控制,有完全对应的说明文档,专门为初级学习者提供。 不用问在不在,直接拿即可。 亮点: 拥有和模型完全对应的讲解文档,相当于手把手教学。 内容包括: 1.QAV详细方法 2.模型及代码 3.模型2(提供simulink排版布局思路) 4.相关图片 5.使用备注 ,核心关键词:MATLAB四旋翼仿真; PID控制; 完全对应说明文档; 初级学习者; QAV详细方法; 模型及代码; simulink排版布局思路; 相关图片; 使用备注。,"MATLAB四旋翼仿真教程:PID控制详解与手把手教学"
定子磁链控制下的直接转矩控制系统MATLAB仿真研究及结果分析报告,基于定子磁链控制的直接转矩控制系统 MATLAB SIMULINK仿真模型(2018b)及说明报告,仿真结果良好。 报告第一部分讨论异步电动机的理论基础和数学模型,第二部分介绍直接转矩控制的具体原理,第三部分对调速系统中所用到的脉宽调制技术CFPWM、SVPWM进行了介绍,第四部分介绍了MATLAB仿真模型的搭建过程,第五部分对仿真结果进行了展示及讨论。 ,关键词:定子磁链控制;直接转矩控制系统;MATLAB SIMULINK仿真模型;异步电动机理论基础;数学模型;直接转矩控制原理;脉宽调制技术CFPWM;SVPWM;仿真结果。,基于MATLAB的异步电机直接转矩控制仿真研究报告
2025中小学教师编制考试教育理论基础知识必刷题库及答案.pptx
Python游戏编程源码-糖果消消消.zip
三相PWM整流器双闭环控制:电压外环电流内环的SVPWM调制策略及其代码编写详解——动态稳态特性优越的技术参考。,三相PWM整流器双闭环控制,电压外环,电流内环,PLL。 采用SVPWM调制,代码编写。 动态和稳态特性较好,可提供参考资料 ,三相PWM整流器;双闭环控制;电压外环;电流内环;PLL调制;SVPWM调制;动态特性;稳态特性;参考资料,三相PWM整流器双闭环SVPWM调制策略:稳态与动态特性优化参考指南
永磁同步电机滑膜观测器参数识别与仿真研究:转动惯量、阻尼系数及负载转矩的Matlab Simulink仿真分析文章及文档说明,永磁同步电机 滑膜观测器参数识别Matlab simulink仿真 包括转动惯量 阻尼系数 负载转矩 波形很好 跟踪很稳 包含仿真文件说明文档以及文章 ,关键词:永磁同步电机;滑膜观测器;参数识别;Matlab simulink仿真;转动惯量;阻尼系数;负载转矩;波形质量;跟踪稳定性;仿真文件;说明文档;文章。,基于Matlab Simulink仿真的永磁同步电机滑膜观测器参数识别及性能分析
基于永磁涡流的电梯缓冲结构设计.pdf
Python自动化办公源码-28 Python爬虫爬取网站的指定文章
MATLAB下的安全强化学习:利用Constraint Enforcement块训练代理实现目标接近任务,MATLAB代码:安全 强化学习 关键词:safe RL 仿真平台:MATLAB 主要内容:此代码展示了如何使用 Constraint Enforcement 块来训练强化学习 (RL) 代理。 此块计算最接近受约束和动作边界的代理输出的动作的修改控制动作。 训练强化学习代理需要 Reinforcement Learning Toolbox 。 在此示例中,代理的目标是使绿球尽可能靠近红球不断变化的目标位置。 具体步骤为创建用于收集数据的环境和代理,学习约束函数,使用约束强制训练代理,在没有约束执行的情况下训练代理。 ,核心关键词:safe RL; MATLAB代码; Constraint Enforcement 块; 强化学习代理; 绿球; 红球目标位置; 数据收集环境; 约束函数; 约束强制训练; 无约束执行训练。,MATLAB中安全强化学习训练的约束强化代理实现
基于EtherCAT总线网络的锂电池激光制片机控制系统,融合欧姆龙NX系列与威伦通触摸屏的智能制造方案。,锂电池激光模切机 欧姆龙NX1P2-1140DT,威伦通触摸屏,搭载从机扩展机架控制,I输入输出IO模块模拟量模块读取控制卷径计算 汇川IS620N总线伺服驱动器7轴控制,总线纠偏器控制 全自动锂电池激光制片机,整机采用EtherCAT总线网络节点控制, 伺服凸轮同步运动,主轴虚轴控制应用,卷径计算,速度计算,放卷张力控制。 触摸屏设计伺服驱动器报警代码,MC总线报警代码,欧姆龙伺服报警代码 张力摆臂控制,PID控制,等等 触摸屏产量统计,触摸屏故障统计,触摸屏与PLC对接信息交互,触摸屏多账户使用,多产品配方程序,优秀的触摸屏模板。 NX在收放卷控制的设计 欧姆龙NX系列实际项目程序+威纶触摸屏程序+新能源锂电设备 涵盖威纶通人机,故障记录功能,st+梯形图+FB块,注释齐全。 ,"新能源锂电池激光模切机:欧姆龙NX与威纶通触摸屏的智能控制与信息交互系统"
2025装载机理论考试试题库(含答案).pptx