`
younglibin
  • 浏览: 1214078 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

执行MapReduce-原码分析

阅读更多

job提交:

 



 

 

  public void submit() throws IOException, InterruptedException, 
                              ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    
    // Connect to the JobTracker and submit the job
    connect();
    info =  jobClient.submitJobInternal(conf);;
    super.setJobID(info.getID());
    state = JobState.RUNNING;
   }

//创建一个client链接
  private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

// 初始化个两个client
  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);
  }

//返回 client,这里判断了 是执行的是本地模式,还是RPC模式
private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

          if (jobTrackAddr == null) {
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
         
      }
    }

 

 

job提交 jobClient.submitJobInternal(conf)

 

1.获得job运行时临时文件的地址,在hdfs上构造,之后会将一些运行时的信息写在这个文件中,

默认值是:/tmp/hadoop/mapred/staging  一般在使用的是配置中的:mapreduce.jobtracker.staging.root.dir

原码如下:

 

LocalJobRunner implements ClientProtocol 

 

RunningJob submitJobInternal(final JobConf job
                               ) 

        Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
            jobCopy);



  /**
   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
   */
  public String getStagingAreaDir() throws IOException {
    Path stagingRootDir = 
      new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
        "/tmp/hadoop/mapred/staging"));
    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
    String user;
    randid = rand.nextInt(Integer.MAX_VALUE);
    if (ugi != null) {
      user = ugi.getShortUserName() + randid;
    } else {
      user = "dummy" + randid;
    }
    return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
  }

 

 

2.获得一个新的jobID,

本地文件+随机数+jobid:

 

 

  public synchronized JobID getNewJobId() {
    return new JobID("local" + randid, ++jobid);
  }

 

 

 

 3.构造  submitJobDir  使用的    1中返回的目录拼接jobid,并将这个值设置给当前job运行目录地址:

 

 

Path submitJobDir = new Path(jobStagingArea, jobId.toString());
        jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
        JobStatus status = null;

 

 

4.添加认证和密钥信息

     a.从配置文件中读取token信息,如果没有之添加密钥信息即可

        在这里 将token和secret信息初始化到jobconf中了

 

populateTokenCache(jobCopy, jobCopy.getCredentials());

 5.拷贝client文件到hdfs

 

将运行作业所需要的资源(包括作业JAR文件、配置文件和计算所得的输入文件)复制到一个以作业ID命名的目录下jobtracker的文件系统。

 包含: -libjars, -files, -archives 三种类型的文件

这里有一个副本数量 默认是10 ,可以配置,

 

 

copyAndConfigureFiles(jobCopy, submitJobDir);

* configure the jobconf of the user with the command line options of 
   * -libjars, -files, -archives

private void copyAndConfigureFiles(JobConf job, Path jobSubmitDir) 
  throws IOException, InterruptedException {
    short replication = (short)job.getInt("mapred.submit.replication", 10);
    copyAndConfigureFiles(job, jobSubmitDir, replication);

    // Set the working directory
    if (job.getWorkingDirectory() == null) {
      job.setWorkingDirectory(fs.getWorkingDirectory());          
    }
  }

 

 

 

6.通过namenode获得token

 

TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
                                              new Path [] {submitJobDir},
                                              jobCopy);

 

 

7.初始化job执行时需要的文件路径信息 ,并将这些信息存放在 conf中

 

Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          int reduces = jobCopy.getNumReduceTasks();
          InetAddress ip = InetAddress.getLocalHost();
          if (ip != null) {
            job.setJobSubmitHostAddress(ip.getHostAddress());
            job.setJobSubmitHostName(ip.getHostName());
          }
          JobContext context = new JobContext(jobCopy, jobId);

 

8.检查输出文件信息,在这里我们会看到,如果输出目录不做设置或者输出目录已经存在的话就会报错了,系统就会退出

 

// Check the output specification
          if (reduces == 0 ? jobCopy.getUseNewMapper() : 
            jobCopy.getUseNewReducer()) {
            org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
              ReflectionUtils.newInstance(context.getOutputFormatClass(),
                  jobCopy);
            output.checkOutputSpecs(context);
          } else {
            jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
          }



  public void checkOutputSpecs(JobContext job
                               ) throws FileAlreadyExistsException, IOException{
    // Ensure that the output directory is set and not already there
    Path outDir = getOutputPath(job);
    if (outDir == null) {
      throw new InvalidJobConfException("Output directory not set.");
    }
    
    // get delegation token for outDir's file system
    TokenCache.obtainTokensForNamenodes(job.getCredentials(), 
                                        new Path[] {outDir}, 
                                        job.getConfiguration());

    if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
      throw new FileAlreadyExistsException("Output directory " + outDir + 
                                           " already exists");
    }
  }

 

9.开始对输入文件做分片处理:

   这里需要说明一下 其中writeNewSplits  主哦功能 调用了List<InputSplit> splits = input.getSplits(job); 这里就是我们在看的哦啊wordcount中 FileInputFormat中getSplits(conf)被调用的地方,可以看到map的数量就是有分片的数量决定的,具体分片操作参考:

http://younglibin.iteye.com/blog/1929255

http://younglibin.iteye.com/blog/1929278

 

 // Create the splits for the job
          FileSystem fs = submitJobDir.getFileSystem(jobCopy);
          LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
          int maps = writeSplits(context, submitJobDir);
          jobCopy.setNumMapTasks(maps);

  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

 

 

10.将将要执行的任务队列提交 到管理队列中 

 

// write "queue admins of the queue to which job is being submitted"
          // to job file.
          String queue = jobCopy.getQueueName();
          AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
          jobCopy.set(QueueManager.toFullPropertyName(queue,
              QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

 

 

11.将这些文件的信息提交给job,在job执行的根据这写配置来获取文件内容

 

// Write job file to JobTracker's fs        
          FSDataOutputStream out = 
            FileSystem.create(fs, submitJobFile,
                new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

 

12. 将这写配置信息 输出到 文件中,我们可以在job运行的临时目录下看到有个job.xml文件 这个文件中存放了关于这个job的所有配置信息,也可以通过50030端口,查看到这个文件;

 

jobCopy.writeXml(out);

 

job的初始化完成了,接下来就是job的执行了

 

13.终于开始提交job任务了

 

 

status = jobSubmitClient.submitJob(
              jobId, submitJobDir.toString(), jobCopy.getCredentials());

  /**
   * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
   */
public JobStatus submitJob(JobID jobid, String jobSubmitDir, 
                             Credentials credentials) 
  throws IOException {
    Job job = new Job(jobid, jobSubmitDir);
    job.job.setCredentials(credentials);
    return job.status;
  }

//

 

以上实现使用的是 一个local方式的,Job是 LocalJobRunner 的一个自己的类,
这个类 继承了一个Thread ,是多线程:

 

 

 private class Job extends Thread implements TaskUmbilicalProtocol {



public Job(JobID jobid, String jobSubmitDir) throws IOException {
     
      
      profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), 
                               "http://localhost:8080/", job.getJobName());
      status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING, 
          profile.getUser(), profile.getJobName(), profile.getJobFile(), 
          profile.getURL().toString());
      jobs.put(id, this);
      this.start();
    }
 @Override
 public void run() {
      ...............
        List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
            jobId, mapOutputFiles);
        ExecutorService mapService = createMapExecutor(taskRunnables.size());

        // Start populating the executor with work units.
        // They may begin running immediately (in other threads).
        for (Runnable r : taskRunnables) {
          mapService.submit(r);
        }
.........................
              reduce.setJobFile(localJobFile.toString());
              localConf.setUser(reduce.getUser());
              reduce.localizeConfiguration(localConf);
              reduce.setConf(localConf);
              reduce_tasks += 1;
              myMetrics.launchReduce(reduce.getTaskID());
              reduce.run(localConf, this);
              myMetrics.completeReduce(reduce.getTaskID());
              reduce_tasks -= 1;
          
          }
        
    }
 

 

job中调用 Map 和reduce 

 

map:

 我们看到在job线程中执行了 mapService.submit(r); 中的r 是 MapTaskRunnable 对象,所以这里真正提交了 map人物执行

protected class MapTaskRunnable implements Runnable {
      public void run() {
            map_tasks.getAndIncrement();
            myMetrics.launchMap(mapId);
            map.run(localConf, Job.this);
            myMetrics.completeMap(mapId);
          
    }

 

 

 

 

我们看到上边方法调用了 MapTask类 的 run

 

@Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    ..............................................
    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }


@SuppressWarnings("unchecked")
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    // make a mapper
    // make the input format
    // rebuild the input split
    // get an output object
    input.initialize(split, mapperContext);
    mapper.run(mapperContext);
    mapPhase.complete();
    setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);
    input.close();
    output.close(mapperContext);
  }

input.initialize(split, mapperContext);

调用的是: LineRecordReader
我们知道 FileInputForamt  的 子类 默认使用了 TextInputFormat  在  TextInputFormat 中我们构造了  return new LineRecordReader(recordDelimiterBytes);

所有我们在读取数据的时候我们使用的是: LineRecordReader

 

 

以上代码 有一段是  mapper.run(mapperContext); 在这里我们终于知道 谁调用了 这个run方法了吧,到这里,一个本地运行的maoreduce就可以串起来了

public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
  }

 

 reduce:

reduce.run(localConf, this);

 

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
  
    if (useNewApi) {
      runNewReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    } else {
      runOldReducer(job, umbilical, reporter, rIter, comparator, 
                    keyClass, valueClass);
    }
    done(umbilical, reporter);
  }

private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewReducer(JobConf job,
                     final TaskUmbilicalProtocol umbilical,
                     final TaskReporter reporter,
                     RawKeyValueIterator rIter,
                     RawComparator<INKEY> comparator,
                     Class<INKEY> keyClass,
                     Class<INVALUE> valueClass
                     ) throws IOException,InterruptedException, 
                              ClassNotFoundException {
    reducer.run(reducerContext);
    trackedRW.close(reducerContext);
  }

 

 

在上比那 我们也看到了 reduce  调用 reducer.run  的地方, 终于把一个流程串起来了 。

 

 

 

  • 大小: 41 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics