`
standalone
  • 浏览: 611494 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Hadoop 2.0 代码分析---MapReduce

阅读更多
本文参考hadoop的版本: hadoop-2.0.1-alpha-src
参考: http://www.cnblogs.com/biyeymyhjob/archive/2012/08/16/2640733.html

和参考的文章一样,还是考虑具体一个MR的job怎么执行的,这个例子如下:

   // Create a new Job
     Job job = new Job(new Configuration());
     job.setJarByClass(MyJob.class);
     
     // Specify various job-specific parameters     
     job.setJobName("myjob");
     
     job.setInputPath(new Path("in"));
     job.setOutputPath(new Path("out"));
     
     job.setMapperClass(MyJob.MyMapper.class);
     job.setReducerClass(MyJob.MyReducer.class);

     // Submit the job, then poll for progress until the job is complete
     job.waitForCompletion(true);

我们首先看Job类的定义,该类在包org.apache.hadoop.mapreduce里。它其实包括了MR任务的定义,提交和执行状态查询等内容。

public class Job extends JobContextImpl implements JobContext


该类继承了JobContextImpl, JobContextImpl又是实现JobContext接口的,JobContext又继承了MRJobConfig接口。我们先从MRJobConfig接口看起。MRJobConfig里面主要包含了MR任务相关的配置参数的名称,例如:

 // Put all of the attribute names in here so that Job and JobContext are
  // consistent.
  public static final String INPUT_FORMAT_CLASS_ATTR = "mapreduce.job.inputformat.class";

  public static final String MAP_CLASS_ATTR = "mapreduce.job.map.class";

  public static final String COMBINE_CLASS_ATTR = "mapreduce.job.combine.class";

  public static final String REDUCE_CLASS_ATTR = "mapreduce.job.reduce.class";

  public static final String OUTPUT_FORMAT_CLASS_ATTR = "mapreduce.job.outputformat.class";

  public static final String PARTITIONER_CLASS_ATTR = "mapreduce.job.partitioner.class";

  public static final String SETUP_CLEANUP_NEEDED = "mapreduce.job.committer.setup.cleanup.needed";

  public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed";

  public static final String JAR = "mapreduce.job.jar";

  public static final String ID = "mapreduce.job.id";

  public static final String JOB_NAME = "mapreduce.job.name";



JobContext这个接口扩展了MRJobConfig,继续声明了一些getXXX方法,例如:
getConfiguration()
getCredentialls()
getJobId()
getNumReduceTasks()
getOutputKeyClass()
getOutputValueClass()
...
所以JobContext接口的实现类应该要填充这些可以用来获取Job一些配置信息的方法。

然后看JobConextImpl具体怎么做到的,JobContextImpl的构造方法:


public JobContextImpl(Configuration conf, JobID jobId) {
    if (conf instanceof JobConf) {
      this.conf = (JobConf)conf;
    } else {
      this.conf = new JobConf(conf);
    }
    this.jobId = jobId;
    this.credentials = this.conf.getCredentials();
    try {
      this.ugi = UserGroupInformation.getCurrentUser();
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }



可以看到JobContextImpl有个内部变量,类型是JobConf,而JobConf是org.apache.hadoop.conf.Configuration的子类,其构造函数需要一个Configuration类的对象。所以上面说到的配置信息的最终来源都是Configuration类的实例对象。JobContextImpl实现的JobContext中定义的那些getXXX()方法大多都是通过
conf.getXXX()来实现的。

现在看JobContextImpl的子类也就是我们的Job类。先看几个重要的变量。

  private JobState state = JobState.DEFINE;
  private JobStatus status;
  private long statustime;
  private Cluster cluster;


其中JobState是在Job类里面定义的一个枚举变量类型,值有两个:DEFINE和RUNNING,代表了Job提交市到了哪个阶段了。JobStatus是单独定义的一个描述Job当前执行状态的一个类,我们来具体看一下一个Job在执行过程中,我们可以拿到哪些相关信息。


  private JobID jobid;
  private float mapProgress;
  private float reduceProgress;
  private float cleanupProgress;
  private float setupProgress;
  private State runState;
  private long startTime;
  private String user;
  private String queue;
  private JobPriority priority;
  private String schedulingInfo="NA";
  private String failureInfo = "NA";

  private Map<JobACL, AccessControlList> jobACLs =
      new HashMap<JobACL, AccessControlList>();

  private String jobName;
  private String jobFile;
  private long finishTime;
  private boolean isRetired;
  private String historyFile = "";
  private String trackingUrl ="";
  private int numUsedSlots;
  private int numReservedSlots;
  private int usedMem;
  private int reservedMem;
  private int neededMem;
  private boolean isUber;



注意这里也定义了一个State枚举变量来表示Job执行的情况,取值有RUNNING, SUCCEEDED, FAILED, PREP和KILLED. 我们在执行MR程序的时候可以从标准输出看到当前Map Tasks执行的百分比和Reduce Tasks执行的百分比,都是获取的JobStatus类对乡里的信息,但具体JobStatus中的诸如mapProgress, reduceProgress等怎么计算的呢?后面会讲。

Job里面的另一个重要的变量是cluster,这个其实是用来代表该MR程序运行所在的集群。我们仔细看一下这个类。

先介绍Cluster类的两个变量

ClientProtocol client;
ClientProtocolProvider provider;

ClientProtocol是个接口,如果你熟悉hadoop的RPC,会理解client其实是用来和JobTracker或者Yarn进行通信用来提交Job或者查询当前系统状况的RPC代理,看这个代理怎么生成的:
//Cluster.java
 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());
        ClientProtocol clientProtocol = null; 
        try {
          if (jobTrackAddr == null) {
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
      }
    }

    if (null == clientProtocolProvider || null == client) {
      throw new IOException(
          "Cannot initialize Cluster. Please check your configuration for "
              + MRConfig.FRAMEWORK_NAME
              + " and the correspond server addresses.");
    }
  }


可以看出client是由provider创造出来的,问题是ClientProtocolProvider是个抽象类,怎么创造出来的呢?看Cluter类的几行代码:

private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
      ServiceLoader.load(ClientProtocolProvider.class);


如果明白ServiceLoader就会明白了(不明白只能先去看JAVA API),我们找到这个service的配置文件:
hadoop-mapreduce-project\hadoop-mapreduce-client\hadoop-mapreduce-client-jobclient\src\main\resources\META-INF\services\org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider,

文件里面提供了一个ClientProtocolProvider的实现:
org.apache.hadoop.mapred.YarnClientProtocolProvider
这里要说一句的是好像ClientProtocolProvider有两个实现子类,另一个是LocalClientProtoclProvider,好像在debug的时候用的,暂且不表。
继续看YarnClientProtocolProvider怎么创建这个client代理的:

  public ClientProtocol create(Configuration conf) throws IOException {
    if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
      return new YARNRunner(conf);
    }
    return null;
  }


就是说如果选择的framework名字是yarn的话,就创建一个YARNRunner对象。这个又是什么玩意呢?

代码里面对这个类的介绍就一句话:

This class enables the current JobClient (0.22 hadoop) to run on YARN.

就是让JobClient跑在YARN的的一个东西。YARNRunner实现了ClientProtocol接口,我们暂时先对它了解这么多。

扯得太远了,我们回头解释最开始的例子的执行。首先是Job的创建:

new Job(new Configuration())
=> new Job(JobConf conf)
=> JobContextImpl(Configuration conf, JobID jobId)

然后job.setJarByClass()是设置要执行的MR程序的类文件, job.setJobName()是设置job的名字,然后设置输入输出的路径,以及MR程序中定义map任务的类和reduce任务的类。最关键的是最后一句:job.waitForCompletion(true); 提交job并且等待完成,具体怎么提交的呢?

实现代码:

 public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }


先看当前JobState,如果是DEFINE,那么ok,可以提交,调用submit()方法。

public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
}

  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }



submit()方法里关键的两处代码,一处是调用connect()方法,另一处是获取一个JobSubmitter类的实例,调用该对象的submitJobInternal方法来提交任务。

connect() 方法干了什么呢,要注意的是刚才Job实例化的时候,job.cluster还是null, connect()主要是具体化这个cluster. 我们看cluster的创建过程。

//Cluster.java
  public Cluster(Configuration conf) throws IOException {
    this(null, conf);
  }

  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);
  }


而initialize()方法我们在上面遇到过了。

然后看JobSubmitter是什么,JobSubmitter从名字就能看出来它是负责提交Job的,整个提交过程牵涉到:

引用

1. Checking the input and output specifications of the job. 检查输入输出是否指定。
2. Computing the InputSplits for the job. 将输入切分。
3. Setup the requisite accounting information for the DistributedCache of the job, if necessary. 为DistributedCache设置必要的账号信息
3. Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system. 将Job的Jar文件和配置文件拷贝到HDFS里面。
4. Submitting the job to the JobTracker and optionally monitoring it's status. 提交Job到JobTracker并且监控任务状态


看代码实现:
 JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    checkSpecs(job);
    
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, 
                                                     job.getConfiguration());
    //configure the command line options correctly on the submitting dfs
    Configuration conf = job.getConfiguration();
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      copyAndConfigureFiles(job, submitJobDir);
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      // Write job file to submit dir
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }


jobStagingArea应该是存放Job的目录,看JobSubmissionFiles.getStagingDire做什么:


Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, 
                                                     job.getConfiguration());

=>

Path stagingArea = cluster.getStagingAreaDir();

=>
cluster.client.getStagingAreaDir()



前面知道Cluster.client是一个YANRunner对象,所以前进到YARNRunder.getStagingAreaDir()

  public String getStagingAreaDir() throws IOException, InterruptedException {
    return resMgrDelegate.getStagingAreaDir();
  }


resMgrDelegate是ResourceMgrDelegate类的对象,负责跟ResourceManager联系。

  public String getStagingAreaDir() throws IOException, InterruptedException {
//    Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
    String user = 
      UserGroupInformation.getCurrentUser().getShortUserName();
    Path path = MRApps.getStagingAreaDir(conf, user);
    LOG.debug("getStagingAreaDir: dir=" + path);
    return path.toString();
  }


现在又出现了一个类MRApps, 文档说这是一个MR Applications的帮助类,继承自Apps, Apps是代表Yarn应用程序相关东西的类。
现在看MRApps.getStagingAreaDir()这个静态方法怎么获取存放Job的目录的:

  private static final String STAGING_CONSTANT = ".staging";
  public static Path getStagingAreaDir(Configuration conf, String user) {
    return new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR,
        MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
        + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
  }


所以最后的目录应该是由yarn.app.mapreduce.am.staging-dir/[user]/.staging代表的HDFS目录,缺省是/tmp/hadoop-yarn/staging/[user]/.staging。

目录信息如果不存在会被创建,然后是获取一个JobID:
submitClient.getNewJobID()
=> YARNRunner.getNewJobID()
=> ResourceMgrDelegate.getNewJobID()

  public JobID getNewJobID() throws IOException, InterruptedException {
    GetNewApplicationRequest request = recordFactory.newRecordInstance(GetNewApplicationRequest.class);
    applicationId = applicationsManager.getNewApplication(request).getApplicationId();
    return TypeConverter.fromYarn(applicationId);
  }


看ResourceMgrDelegate怎么获取一个新的JobID的,首先是生成了一个创建新Application的request,然后一个applicationManager以此request为参数创建一个新的app, 并返回ApplicationId. applicationManager声明的类型是ClientRMProtocol,这是一个借口,是client跟ResourceManager通信的协议,看ResourceManagerDelegate的构造方法:

[code="java"]
  /**
   * Delegate responsible for communicating with the Resource Manager's {@link ClientRMProtocol}.
   * @param conf the configuration object.
   */
  public ResourceMgrDelegate(YarnConfiguration conf) {
    this.conf = conf;
    YarnRPC rpc = YarnRPC.create(this.conf);
    this.rmAddress = getRmAddress(conf);
    LOG.debug("Connecting to ResourceManager at " + rmAddress);
    applicationsManager =
        (ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class,
            rmAddress, this.conf);
    LOG.debug("Connected to ResourceManager at " + rmAddress);
  }



从中可以看出applicationsManager就是一个RPC用的代理对象,既然是代理,那么这是client端的,那么server端的实现呢?我们直接跳到server端去看.

  public GetNewApplicationResponse getNewApplication(
      GetNewApplicationRequest request) throws YarnRemoteException {
    GetNewApplicationResponse response = recordFactory
        .newRecordInstance(GetNewApplicationResponse.class);
    response.setApplicationId(getNewApplicationId());
    // Pick up min/max resource from scheduler...
    response.setMinimumResourceCapability(scheduler
        .getMinimumResourceCapability());
    response.setMaximumResourceCapability(scheduler
        .getMaximumResourceCapability());       
    
    return response;
  }

  ApplicationId getNewApplicationId() {
    ApplicationId applicationId = org.apache.hadoop.yarn.util.BuilderUtils
        .newApplicationId(recordFactory, ResourceManager.clusterTimeStamp,
            applicationCounter.incrementAndGet());
    LOG.info("Allocated new applicationId: " + applicationId.getId());
    return applicationId;
  }



ClientRMService处理了所有从客户端过来的RPC请求。看了这两个方法应该明白了这个id就是由一个server端AtomicInteger类型的applicationCounter增1得来的,然后通过rpc返还给提交job的客户端。

现在确定了提交Job文件的目录,Job的ID,下面就可以拷贝Job文件以及配置文件到那个目录去了,然后开始重要的一步,就是切分输入文件,这一步是由writeSplits()实现的。

最后真正提交是由sumbitClient.submitJob(),submitClient是YARNRunner对象,所以看YARNRunner怎么做的:

 @Override
  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    
    /* check if we have a hsproxy, if not, no need */
    MRClientProtocol hsProxy = clientCache.getInitializedHSProxy();
    if (hsProxy != null) {
      // JobClient will set this flag if getDelegationToken is called, if so, get
      // the delegation tokens for the HistoryServer also.
      if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, 
          DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) {
        Token hsDT = getDelegationTokenFromHS(hsProxy, new Text( 
                conf.get(JobClient.HS_DELEGATION_TOKEN_RENEWER)));
        ts.addToken(hsDT.getService(), hsDT);
      }
    }

    // Upload only in security mode: TODO
    Path applicationTokensFile =
        new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE);
    try {
      ts.writeTokenStorageFile(applicationTokensFile, conf);
    } catch (IOException e) {
      throw new YarnException(e);
    }

    // Construct necessary information to start the MR AM
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);

    ApplicationReport appMaster = resMgrDelegate
        .getApplicationReport(applicationId);
    String diagnostics =
        (appMaster == null ?
            "application report is null" : appMaster.getDiagnostics());
    if (appMaster == null || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
        || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
      throw new IOException("Failed to run job : " +
        diagnostics);
    }
    return clientCache.getClient(jobId).getJobStatus(jobId);
  }

  public ApplicationId submitApplication(
      ApplicationSubmissionContext appContext) 
  throws IOException {
    appContext.setApplicationId(applicationId);
    SubmitApplicationRequest request = 
        recordFactory.newRecordInstance(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);
    applicationsManager.submitApplication(request);
    LOG.info("Submitted application " + applicationId + " to ResourceManager" +
    		" at " + rmAddress);
    return applicationId;
  }



提交Job又经过了两层调用,经过resMgrDelegate到了applicationsManager身上,这是个RPC代理啊,刚才说过它了,而且它对应的服务器端的实现是ClientRMService,我们去这个类看实现,


 @Override
  public SubmitApplicationResponse submitApplication(
      SubmitApplicationRequest request) throws YarnRemoteException {
    ApplicationSubmissionContext submissionContext = request
        .getApplicationSubmissionContext();
    ApplicationId applicationId = submissionContext.getApplicationId();
    String user = submissionContext.getUser();
    try {
      user = UserGroupInformation.getCurrentUser().getShortUserName();
      if (rmContext.getRMApps().get(applicationId) != null) {
        throw new IOException("Application with id " + applicationId
            + " is already present! Cannot add a duplicate!");
      }

      // Safety 
      submissionContext.setUser(user);

      // This needs to be synchronous as the client can query 
      // immediately following the submission to get the application status.
      // So call handle directly and do not send an event.
      rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext, System
          .currentTimeMillis()));

      LOG.info("Application with id " + applicationId.getId() + 
          " submitted by user " + user);
      RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
          "ClientRMService", applicationId);
    } catch (IOException ie) {
      LOG.info("Exception in submitting application", ie);
      RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST, 
          ie.getMessage(), "ClientRMService",
          "Exception in submitting application", applicationId);
      throw RPCUtil.getRemoteException(ie);
    }

好吧,现在又见了一个新玩意儿,rmAppManager,我先告诉你,这是一个用来帮助ResourceManager管理任务列表的对象。我们跟下去看它的handle方法:

  @Override
  public void handle(RMAppManagerEvent event) {
    ApplicationId applicationId = event.getApplicationId();
    LOG.debug("RMAppManager processing event for " 
        + applicationId + " of type " + event.getType());
    switch(event.getType()) {
      case APP_COMPLETED: 
      {
        finishApplication(applicationId);
        ApplicationSummary.logAppSummary(
            rmContext.getRMApps().get(applicationId));
        checkAppNumCompletedLimit(); 
      } 
      break;
      case APP_SUBMIT:
      {
        ApplicationSubmissionContext submissionContext = 
            ((RMAppManagerSubmitEvent)event).getSubmissionContext();
        long submitTime = ((RMAppManagerSubmitEvent)event).getSubmitTime();
        submitApplication(submissionContext, submitTime);
      }
      break;
      default:
        LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
      }
  }



我们现在只看APP_SUBMIT那个分支,它有会调用自身的一个submitApplication方法:
//RMAppManager.java
  @SuppressWarnings("unchecked")
  protected synchronized void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime) {
    ApplicationId applicationId = submissionContext.getApplicationId();
    RMApp application = null;
    try {
      String clientTokenStr = null;
      String user = UserGroupInformation.getCurrentUser().getShortUserName();
      if (UserGroupInformation.isSecurityEnabled()) {
        Token<ClientTokenIdentifier> clientToken = new 
            Token<ClientTokenIdentifier>(
            new ClientTokenIdentifier(applicationId),
            this.clientToAMSecretManager);
        clientTokenStr = clientToken.encodeToUrlString();
        LOG.debug("Sending client token as " + clientTokenStr);
      }
      
      // Sanity checks
      if (submissionContext.getQueue() == null) {
        submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
      }
      if (submissionContext.getApplicationName() == null) {
        submissionContext.setApplicationName(
            YarnConfiguration.DEFAULT_APPLICATION_NAME);
      }

      // Store application for recovery
      ApplicationStore appStore = rmContext.getApplicationsStore()
          .createApplicationStore(submissionContext.getApplicationId(),
          submissionContext);

      // Create RMApp
      application = new RMAppImpl(applicationId, rmContext,
          this.conf, submissionContext.getApplicationName(), user,
          submissionContext.getQueue(), submissionContext, clientTokenStr,
          appStore, this.scheduler,
          this.masterService, submitTime);

      // Sanity check - duplicate?
      if (rmContext.getRMApps().putIfAbsent(applicationId, application) != 
          null) {
        String message = "Application with id " + applicationId
            + " is already present! Cannot add a duplicate!";
        LOG.info(message);
        throw RPCUtil.getRemoteException(message);
      } 

      // Inform the ACLs Manager
      this.applicationACLsManager.addApplication(applicationId,
          submissionContext.getAMContainerSpec().getApplicationACLs());

      // Setup tokens for renewal
      if (UserGroupInformation.isSecurityEnabled()) {
        this.rmContext.getDelegationTokenRenewer().addApplication(
            applicationId,parseCredentials(submissionContext),
            submissionContext.getCancelTokensWhenComplete()
            );
      }      
      
      // All done, start the RMApp
      this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppEvent(applicationId, RMAppEventType.START));
    } catch (IOException ie) {
        LOG.info("RMAppManager submit application exception", ie);
        if (application != null) {
          // Sending APP_REJECTED is fine, since we assume that the 
          // RMApp is in NEW state and thus we havne't yet informed the 
          // Scheduler about the existence of the application
          this.rmContext.getDispatcher().getEventHandler().handle(
              new RMAppRejectedEvent(applicationId, ie.getMessage()));
        }
    }
  }


这段code主要生成了一个RMApp类型的application对象,最后把它封装成一个event里面,然后调用this.rmContext.getDispatcher().getEventHandler().handle(
          new RMAppEvent(applicationId, RMAppEventType.START)) 去处理这个对象,最后调用的是GenericEventHandler.handle()方法,也就是将这个event添加到了eventQueue里面。

[to be continued...]
分享到:
评论
1 楼 standalone 2012-11-05  
iteye的代码支持太烂了,我也排不好版了。。。

相关推荐

    hadoop-2.0.4官方源代码

    通过阅读和分析这个源代码,开发者可以深入理解Hadoop的内部工作流程,例如数据分片、副本策略、任务调度、容错机制等。这对于想要为Hadoop贡献代码或者基于Hadoop构建分布式应用程序的人来说是极其宝贵的资源。同时...

    MapReduce2.0源码分析与实战编程 文字注释版

    《MapReduce2.0源码分析与实战编程》是一本深度解析Hadoop MapReduce框架的书籍,其中包含详细的源码注释,旨在帮助读者深入理解MapReduce的工作原理,并能进行实际编程应用。这本书的重点在于剖析MapReduce的核心...

    hadoop-2.0.0-cdh4.2.1的src

    在Hadoop 2.0中,MapReduce的资源管理和任务调度被分离出来,形成了新的资源管理系统YARN。YARN将JobTracker的功能拆分为Resource Manager(RM)和Application Master(AM),提高了系统资源利用率和灵活性。 5. *...

    hadoop2x-eclipse-plugin-original

    开发者可以利用这些源代码来编译自己的“hadoop-eclipse-x.y.z.jar”文件,这个JAR文件是Eclipse集成Hadoop开发环境所必需的,它使得开发者可以直接在Eclipse环境中创建、构建、调试和运行Hadoop MapReduce项目。...

    hadoop-3.1.1-src.tar.gz

    自Hadoop 2.0引入,作为资源管理系统,负责任务调度和集群资源的管理,分离了资源管理和计算逻辑。 5. **Hadoop 3.1.1的新特性**: - **多NameNode支持**:增强了NameNode的高可用性,引入了活性和备用的NameNode...

    hadoop-2.5.0-cdh5.3.1-src.tar.gz

    通过分析`hadoop-2.5.0-cdh5.3.1-src.tar.gz`的源代码,可以深入了解Hadoop和CDH的实现细节,例如: - YARN如何进行资源调度和任务分配。 - HDFS的副本策略和故障恢复机制。 - MapReduce的作业生命周期管理和容错...

    Hadoop云计算2.0笔记第一课Hadoop介绍

    Hadoop 云计算 2.0 ...Hadoop 云计算 2.0 笔记第一课 Hadoop 介绍为我们提供了 Hadoop 的生态系统特点、Hadoop 生态系统概况、HDFS 和 MapReduce 的架构和原理等方面的知识点,对于学习 Hadoop 和云计算非常有帮助。

    couchbase-hadoop-plugin-src

    Hadoop则是大规模数据处理的开源框架,核心组件包括HDFS(分布式文件系统)和MapReduce(并行计算模型)。Couchbase-Hadoop-Plugin作为两者之间的桥梁,实现了数据的高效迁移和分析。 二、源码结构与功能模块 ...

    Hadoop MapReduce Cookbook

    - **YARN架构**:作为Hadoop 2.0的核心组件之一,YARN为MapReduce提供了资源管理和调度的功能。 - **Hadoop生态系统集成**:讲解如何与其他Hadoop组件(如Hive、Pig等)配合使用,构建更强大的数据处理流水线。 ###...

    hadoop源代码归档 2

    很抱歉,根据您提供的信息,压缩包中的文件名称列表似乎与Hadoop源代码归档的主题不相符,它们看起来像是视频文件而不是源代码或相关的技术文档。不过,关于"Hadoop源代码"这一主题,我可以提供一些核心知识点的详细...

    Hadoop - Mapreduce

    ### Hadoop MapReduce 教程知识点详述 #### 一、目的 本教程全面地介绍了 Hadoop MapReduce 框架的所有用户界面方面,并作为学习该框架的指导文档。通过本教程,用户能够掌握如何利用 Hadoop MapReduce 来处理大...

    进军Hadoop源代码

    标题“进军Hadoop源代码”和描述“进军Hadoop源代码,进军Hadoop源代码,进军Hadoop源代码,进军Hadoop源代码”看似重复,实际上强调了对Hadoop源代码的重要性。Hadoop是一个开源的分布式存储与计算系统,由Apache...

    Python+Spark 2.0+Hadoop机器学习与大数据

    作者林大贵以其丰富的经验,详细介绍了如何利用Python、Spark 2.0以及Hadoop这一组合来构建高效的数据分析解决方案。 Python作为一门强大的脚本语言,因其易读性、丰富的库支持和广泛的应用场景,已经成为数据科学...

    mapreduce 实现朴素贝叶斯算法-源码

    在这个场景中,我们讨论的是如何在Hadoop 2.0框架下利用MapReduce实现朴素贝叶斯(Naive Bayes)算法。朴素贝叶斯算法是一种基于概率的分类方法,广泛应用于文本分类、垃圾邮件过滤等领域。它假设特征之间相互独立,...

    hadoop-2.4.0.tar.gz

    这里的“hadoop-2.4.0.tar.gz”是一个压缩包文件,包含了Hadoop 2.4.0版本的所有源代码、编译后的二进制文件、配置文件以及相关的文档。这个版本发布于2014年,是Hadoop发展中的一个重要里程碑,引入了许多关键的...

    hadoop-2.6.0-x64.tar 。。

    在Hadoop 2.6.0的压缩包中,`hadoop-2.6.0`包含了所有的源代码、编译后的二进制文件、配置文件以及文档。用户可以解压这个文件,按照官方文档提供的步骤进行安装和配置,创建Hadoop集群,并开始处理大数据任务。在...

    hadoop最新源码

    这个压缩包文件“hadoop-2.0.0-alpha-src”很可能包含了Hadoop 2.0的第一个alpha版本的源代码,这是一个重要的里程碑,因为Hadoop 2.0引入了许多关键改进和新特性。 1. **HDFS(Hadoop Distributed File System)**...

    hadoop练习-mytest.rar

    5. **大数据实战项目**:由于描述中提到有多个小项目,我们可以期待看到如何利用Hadoop解决实际问题,比如日志分析、推荐系统或者图算法的实现等。这些项目将展示Hadoop在处理大规模数据时的强大能力。 6. **错误...

    spring-data-hadoop-1.0.1.RELEASE.zip

    它支持通过Spring的依赖注入来管理Hadoop的客户端实例,允许开发者通过声明式的方式定义MapReduce作业,极大地提高了开发效率和代码可维护性。此外,它还提供了对Hadoop生态系统其他组件如YARN、Hive、Pig等的集成,...

Global site tag (gtag.js) - Google Analytics