`
samuschen
  • 浏览: 407687 次
  • 性别: Icon_minigender_2
  • 来自: 北京
社区版块
存档分类
最新评论

hadoop作业运行部分源码

阅读更多

一、客户端

Map-Reduce的过程首先是由客户端提交一个任务开始的。

提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:

public static RunningJob runJob(JobConf job) throws IOException {

  //首先生成一个JobClient对象

  JobClient jc = new JobClient(job);

  ……

  //调用submitJob来提交一个任务

  running = jc.submitJob(job);

  JobID jobId = running.getID();

  ……

  while (true) {

     //while循环中不断得到此任务的状态,并打印到客户端console中

  }

  return running;

}

其中JobClient的submitJob函数实现如下:

public RunningJob submitJob(JobConf job) throws FileNotFoundException,

                                InvalidJobConfException, IOException {

  //从JobTracker得到当前任务的id

  JobID jobId = jobSubmitClient.getNewJobId();

  //准备将任务运行所需要的要素写入HDFS:

  //任务运行程序所在的jar封装成job.jar

  //任务所要处理的input split信息写入job.split

  //任务运行的配置项汇总写入job.xml

  Path submitJobDir = new Path(getSystemDir(), jobId.toString());

  Path submitJarFile = new Path(submitJobDir, "job.jar");

  Path submitSplitFile = new Path(submitJobDir, "job.split");

  //此处将-libjars命令行指定的jar上传至HDFS

  configureCommandLineOptions(job, submitJobDir, submitJarFile);

  Path submitJobFile = new Path(submitJobDir, "job.xml");

  ……

  //通过input format的格式获得相应的input split,默认类型为FileSplit

  InputSplit[] splits =

    job.getInputFormat().getSplits(job, job.getNumMapTasks());

 

  // 生成一个写入流,将input split得信息写入job.split文件

  FSDataOutputStream out = FileSystem.create(fs,

      submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));

  try {

    //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。

    //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split 在文件中的起始位置),split的location信息(即在那个DataNode上)。

    writeSplitsFile(splits, out);

  } finally {

    out.close();

  }

  job.set("mapred.job.split.file", submitSplitFile.toString());

  //根据split的个数设定map task的个数

  job.setNumMapTasks(splits.length);

  // 写入job的配置信息入job.xml文件      

  out = FileSystem.create(fs, submitJobFile,

      new FsPermission(JOB_FILE_PERMISSION));

  try {

    job.writeXml(out);

  } finally {

    out.close();

  }

  //真正的调用JobTracker来提交任务

  JobStatus status = jobSubmitClient.submitJob(jobId);

  ……

}

 

二、JobTracker

JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:

  • 调用静态函数startTracker(new JobConf())创建一个JobTracker对象
  • 调用JobTracker.offerService()函数提供服务

在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。

在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:

  • JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态
  • EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化

EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。

在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:

synchronized (jobs) {

  synchronized (taskScheduler) {

    jobs.put(job.getProfile().getJobID(), job);

    //对JobTracker的每一个listener都调用jobAdded函数

    for (JobInProgressListener listener : jobInProgressListeners) {

      listener.jobAdded(job);

    }

  }

}

 

EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:

 

public synchronized void initTasks() throws IOException {

  ……

  //从HDFS中读取job.split文件从而生成input splits

  String jobFile = profile.getJobFile();

  Path sysDir = new Path(this.jobtracker.getSystemDir());

  FileSystem fs = sysDir.getFileSystem(conf);

  DataInputStream splitFile =

    fs.open(new Path(conf.get("mapred.job.split.file")));

  JobClient.RawSplit[] splits;

  try {

    splits = JobClient.readSplitFile(splitFile);

  } finally {

    splitFile.close();

  }

  //map task的个数就是input split的个数

  numMapTasks = splits.length;

  //为每个map tasks生成一个TaskInProgress来处理一个input split

  maps = new TaskInProgress[numMapTasks];

  for(int i=0; i < numMapTasks; ++i) {

    inputLength += splits[i].getDataLength();

    maps[i] = new TaskInProgress(jobId, jobFile,

                                 splits[i],

                                 jobtracker, conf, this, i);

  }

  //对于map task,将其放入nonRunningMapCache,是一个Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。

  if (numMapTasks > 0) {
    nonRunningMapCache = createCache(splits, maxLevel);
  }

 

  //创建reduce task

  this.reduces = new TaskInProgress[numReduceTasks];

  for (int i = 0; i < numReduceTasks; i++) {

    reduces[i] = new TaskInProgress(jobId, jobFile,

                                    numMapTasks, i,

                                    jobtracker, conf, this);

    //reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。

    nonRunningReduces.add(reduces[i]);

  }

 

  //创建两个cleanup task,一个用来清理map,一个用来清理reduce.

  cleanup = new TaskInProgress[2];

  cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],

          jobtracker, conf, this, numMapTasks);

  cleanup[0].setJobCleanupTask();

  cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

                     numReduceTasks, jobtracker, conf, this);

  cleanup[1].setJobCleanupTask();

  //创建两个初始化 task,一个初始化map,一个初始化reduce.

  setup = new TaskInProgress[2];

  setup[0] = new TaskInProgress(jobId, jobFile, splits[0],

          jobtracker, conf, this, numMapTasks + 1 );

  setup[0].setJobSetupTask();

  setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

                     numReduceTasks + 1, jobtracker, conf, this);

  setup[1].setJobSetupTask();

  tasksInited.set(true);//初始化完毕

  ……

}

 

三、TaskTracker

TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:

 

State offerService() throws Exception {

  long lastHeartbeat = 0;

  //TaskTracker进行是一直存在的

  while (running && !shuttingDown) {

      ……

      long now = System.currentTimeMillis();

      //每隔一段时间就向JobTracker发送heartbeat

      long waitTime = heartbeatInterval - (now - lastHeartbeat);

      if (waitTime > 0) {

        synchronized(finishedCount) {

          if (finishedCount[0] == 0) {

            finishedCount.wait(waitTime);

          }

          finishedCount[0] = 0;

        }

      }

      ……

      //发送Heartbeat到JobTracker,得到response

      HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

      ……

     //从Response中得到此TaskTracker需要做的事情

      TaskTrackerAction[] actions = heartbeatResponse.getActions();

      ……

      if (actions != null){

        for(TaskTrackerAction action: actions) {

          if (action instanceof LaunchTaskAction) {

            //如果是运行一个新的Task,则将Action添加到任务队列中

            addToTaskQueue((LaunchTaskAction)action);

          } else if (action instanceof CommitTaskAction) {

            CommitTaskAction commitAction = (CommitTaskAction)action;

            if (!commitResponses.contains(commitAction.getTaskID())) {

              commitResponses.add(commitAction.getTaskID());

            }

          } else {

            tasksToCleanup.put(action);

          }

        }

      }

  }

  return State.NORMAL;

}

其中transmitHeartBeat主要逻辑如下:

 

private HeartbeatResponse transmitHeartBeat(long now) throws IOException {

  //每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息

  boolean sendCounters;

  if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {

    sendCounters = true;

    previousUpdate = now;

  }

  else {

    sendCounters = false;

  }

  ……

  //报告给JobTracker,此TaskTracker的当前状态

  if (status == null) {

    synchronized (this) {

      status = new TaskTrackerStatus(taskTrackerName, localHostname,

                                     httpPort,

                                     cloneAndResetRunningTaskStatuses(

                                       sendCounters),

                                     failures,

                                     maxCurrentMapTasks,

                                     maxCurrentReduceTasks);

    }

  }

  ……

  //当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:

  //当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数

  //当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数

  boolean askForNewTask;

  long localMinSpaceStart;

  synchronized (this) {

    askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||

                     status.countReduceTasks() < maxCurrentReduceTasks) &&

                    acceptNewTasks;

    localMinSpaceStart = minSpaceStart;

  }

  ……

  //向JobTracker发送heartbeat,这是一个RPC调用

  HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,

                                                            justStarted, askForNewTask,

                                                            heartbeatResponseId);

  ……

  return heartbeatResponse;

}

 

四、JobTracker

当 JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:

 

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

                                                boolean initialContact, boolean acceptNewTasks, short responseId)

  throws IOException {

  ……

  String trackerName = status.getTrackerName();

  ……

  short newResponseId = (short)(responseId + 1);

  ……

  HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);

  List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

  //如果TaskTracker向JobTracker请求一个task运行

  if (acceptNewTasks) {

    TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);

    if (taskTrackerStatus == null) {

      LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);

    } else {

      //setup和cleanup的task优先级最高

      List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

      if (tasks == null ) {

        //任务调度器分配任务

        tasks = taskScheduler.assignTasks(taskTrackerStatus);

      }

      if (tasks != null) {

        for (Task task : tasks) {

          //将任务放入actions列表,返回给TaskTracker

          expireLaunchingTasks.addNewTask(task.getTaskID());

          actions.add(new LaunchTaskAction(task));

        }

      }

    }

  }

  ……

  int nextInterval = getNextHeartbeatInterval();

  response.setHeartbeatInterval(nextInterval);

  response.setActions(

                      actions.toArray(new TaskTrackerAction[actions.size()]));

  ……

  return response;

}

默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:

 

public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)

    throws IOException {

  ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

  int numTaskTrackers = clusterStatus.getTaskTrackers();

  Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();

  int maxCurrentMapTasks = taskTracker.getMaxMapTasks();

  int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();

  int numMaps = taskTracker.countMapTasks();

  int numReduces = taskTracker.countReduceTasks();

  //计算剩余的map和reduce的工作量:remaining

  int remainingReduceLoad = 0;

  int remainingMapLoad = 0;

  synchronized (jobQueue) {

    for (JobInProgress job : jobQueue) {

      if (job.getStatus().getRunState() == JobStatus.RUNNING) {

        int totalMapTasks = job.desiredMaps();

        int totalReduceTasks = job.desiredReduces();

        remainingMapLoad += (totalMapTasks - job.finishedMaps());

        remainingReduceLoad += (totalReduceTasks - job.finishedReduces());

      }

    }

  }

  //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。

  int maxMapLoad = 0;

  int maxReduceLoad = 0;

  if (numTaskTrackers > 0) {

    maxMapLoad = Math.min(maxCurrentMapTasks,

                          (int) Math.ceil((double) remainingMapLoad /

                                          numTaskTrackers));

    maxReduceLoad = Math.min(maxCurrentReduceTasks,

                             (int) Math.ceil((double) remainingReduceLoad

                                             / numTaskTrackers));

  }

  ……

 

  //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task

  if (numMaps < maxMapLoad) {

    int totalNeededMaps = 0;

    synchronized (jobQueue) {

      for (JobInProgress job : jobQueue) {

        if (job.getStatus().getRunState() != JobStatus.RUNNING) {

          continue;

        }

        Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,

            taskTrackerManager.getNumberOfUniqueHosts());

        if (t != null) {

          return Collections.singletonList(t);

        }

        ……

      }

    }

  }

  //分配完map task,再分配reduce task

  if (numReduces < maxReduceLoad) {

    int totalNeededReduces = 0;

    synchronized (jobQueue) {

      for (JobInProgress job : jobQueue) {

        if (job.getStatus().getRunState() != JobStatus.RUNNING ||

            job.numReduceTasks == 0) {

          continue;

        }

        Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,

            taskTrackerManager.getNumberOfUniqueHosts());

        if (t != null) {

          return Collections.singletonList(t);

        }

        ……

      }

    }

  }

  return null;

}

从 上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找 TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。

 

五、TaskTracker

在 向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用 addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):

private void addToTaskQueue(LaunchTaskAction action) {

  if (action.getTask().isMapTask()) {

    mapLauncher.addToTaskQueue(action);

  } else {

    reduceLauncher.addToTaskQueue(action);

  }

}

TaskLauncher 是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用 startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):

 

private void localizeJob(TaskInProgress tip) throws IOException {

  //首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar

  Path localJarFile = null;

  Task t = tip.getTask();

  JobID jobId = t.getJobID();

  Path jobFile = new Path(t.getJobFile());

  ……

  Path localJobFile = lDirAlloc.getLocalPathForWrite(

                                  getLocalJobDir(jobId.toString())

                                  + Path.SEPARATOR + "job.xml",

                                  jobFileSize, fConf);

  RunningJob rjob = addTaskToJob(jobId, tip);

  synchronized (rjob) {

    if (!rjob.localized) {

      FileSystem localFs = FileSystem.getLocal(fConf);

      Path jobDir = localJobFile.getParent();

      ……

      //将job.split拷贝到本地

      systemFS.copyToLocalFile(jobFile, localJobFile);

      JobConf localJobConf = new JobConf(localJobFile);

      Path workDir = lDirAlloc.getLocalPathForWrite(

                       (getLocalJobDir(jobId.toString())

                       + Path.SEPARATOR + "work"), fConf);

      if (!localFs.mkdirs(workDir)) {

        throw new IOException("Mkdirs failed to create "

                    + workDir.toString());

      }

      System.setProperty("job.local.dir", workDir.toString());

      localJobConf.set("job.local.dir", workDir.toString());

      // copy Jar file to the local FS and unjar it.

      String jarFile = localJobConf.getJar();

      long jarFileSize = -1;

      if (jarFile != null) {

        Path jarFilePath = new Path(jarFile);

        localJarFile = new Path(lDirAlloc.getLocalPathForWrite(

                                   getLocalJobDir(jobId.toString())

                                   + Path.SEPARATOR + "jars",

                                   5 * jarFileSize, fConf), "job.jar");

        if (!localFs.mkdirs(localJarFile.getParent())) {

          throw new IOException("Mkdirs failed to create jars directory ");

        }

        //将job.jar拷贝到本地

        systemFS.copyToLocalFile(jarFilePath, localJarFile);

        localJobConf.setJar(localJarFile.toString());

       //将job得configuration写成job.xml

        OutputStream out = localFs.create(localJobFile);

        try {

          localJobConf.writeXml(out);

        } finally {

          out.close();

        }

        // 解压缩job.jar

        RunJar.unJar(new File(localJarFile.toString()),

                     new File(localJarFile.getParent().toString()));

      }

      rjob.localized = true;

      rjob.jobConf = localJobConf;

    }

  }

  //真正的启动此Task

  launchTaskForJob(tip, new JobConf(rjob.jobConf));

}

当所有的task运行所需要的资源都拷贝到本地后,则调用launchTaskForJob,其又调用TaskInProgress的launchTask函数:

public synchronized void launchTask() throws IOException {

    ……

    //创建task运行目录

    localizeTask(task);

    if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {

      this.taskStatus.setRunState(TaskStatus.State.RUNNING);

    }

    //创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner

    this.runner = task.createRunner(TaskTracker.this, this);

    this.runner.start();

    this.taskStatus.setStartTime(System.currentTimeMillis());

}

TaskRunner是一个线程,其run函数如下:

 

public final void run() {

    ……

    TaskAttemptID taskid = t.getTaskID();

    LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");

    File jobCacheDir = null;

    if (conf.getJar() != null) {

      jobCacheDir = new File(

                        new Path(conf.getJar()).getParent().toString());

    }

    File workDir = new File(lDirAlloc.getLocalPathToRead(

                              TaskTracker.getLocalTaskDir(

                                t.getJobID().toString(),

                                t.getTaskID().toString(),

                                t.isTaskCleanupTask())

                              + Path.SEPARATOR + MRConstants.WORKDIR,

                              conf). toString());

    FileSystem fileSystem;

    Path localPath;

    ……

    //拼写classpath

    String baseDir;

    String sep = System.getProperty("path.separator");

    StringBuffer classPath = new StringBuffer();

    // start with same classpath as parent process

    classPath.append(System.getProperty("java.class.path"));

    classPath.append(sep);

    if (!workDir.mkdirs()) {

      if (!workDir.isDirectory()) {

        LOG.fatal("Mkdirs failed to create " + workDir.toString());

      }

    }

    String jar = conf.getJar();

    if (jar != null) {      

      // if jar exists, it into workDir

      File[] libs = new File(jobCacheDir, "lib").listFiles();

      if (libs != null) {

        for (int i = 0; i < libs.length; i++) {

          classPath.append(sep);            // add libs from jar to classpath

          classPath.append(libs[i]);

        }

      }

      classPath.append(sep);

      classPath.append(new File(jobCacheDir, "classes"));

      classPath.append(sep);

      classPath.append(jobCacheDir);

    }

    ……

    classPath.append(sep);

    classPath.append(workDir);

    //拼写命令行java及其参数

    Vector<String> vargs = new Vector<String>(8);

    File jvm =

      new File(new File(System.getProperty("java.home"), "bin"), "java");

    vargs.add(jvm.toString());

    String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");

    javaOpts = javaOpts.replace("@taskid@", taskid.toString());

    String [] javaOptsSplit = javaOpts.split(" ");

    String libraryPath = System.getProperty("java.library.path");

    if (libraryPath == null) {

      libraryPath = workDir.getAbsolutePath();

    } else {

      libraryPath += sep + workDir;

    }

    boolean hasUserLDPath = false;

    for(int i=0; i<javaOptsSplit.length ;i++) {

      if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {

        javaOptsSplit[i] += sep + libraryPath;

        hasUserLDPath = true;

        break;

      }

    }

    if(!hasUserLDPath) {

      vargs.add("-Djava.library.path=" + libraryPath);

    }

    for (int i = 0; i < javaOptsSplit.length; i++) {

      vargs.add(javaOptsSplit[i]);

    }

    //添加Child进程的临时文件夹

    String tmp = conf.get("mapred.child.tmp", "./tmp");

    Path tmpDir = new Path(tmp);

    if (!tmpDir.isAbsolute()) {

      tmpDir = new Path(workDir.toString(), tmp);

    }

    FileSystem localFs = FileSystem.getLocal(conf);

    if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {

      throw new IOException("Mkdirs failed to create " + tmpDir.toString());

    }

    vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());

    // Add classpath.

    vargs.add("-classpath");

    vargs.add(classPath.toString());

    //log文件夹

    long logSize = TaskLog.getTaskLogLength(conf);

    vargs.add("-Dhadoop.log.dir=" +

        new File(System.getProperty("hadoop.log.dir")

        ).getAbsolutePath());

    vargs.add("-Dhadoop.root.logger=INFO,TLA");

    vargs.add("-Dhadoop.tasklog.taskid=" + taskid);

    vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);

    // 运行map task和reduce task的子进程的main class是Child

    vargs.add(Child.class.getName());  // main of Child

    ……

    //运行子进程

    jvmManager.launchJvm(this,

        jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,

            workDir, env, pidFile, conf));

}

 

六、Child

真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:

 

while (true) {

  //从TaskTracker通过网络通信得到JvmTask对象

  JvmTask myTask = umbilical.getTask(jvmId);

  ……

  idleLoopCount = 0;

  task = myTask.getTask();

  taskid = task.getTaskID();

  isCleanup = task.isTaskCleanupTask();

  JobConf job = new JobConf(task.getJobFile());

  TaskRunner.setupWorkDir(job);

  numTasksToExecute = job.getNumTasksToExecutePerJvm();

  task.setConf(job);

  defaultConf.addResource(new Path(task.getJobFile()));

  ……

  //运行task

  task.run(job, umbilical);             // run the task

  if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {

    break;

  }

}

6.1、MapTask

如果task是MapTask,则其run函数如下:

 

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

  throws IOException {

  //用于同TaskTracker进行通信,汇报运行状况

  final Reporter reporter = getReporter(umbilical);

  startCommunicationThread(umbilical);

  initialize(job, reporter);

  ……

  //map task的输出

  int numReduceTasks = conf.getNumReduceTasks();

  MapOutputCollector collector = null;

  if (numReduceTasks > 0) {

    collector = new MapOutputBuffer(umbilical, job, reporter);

  } else {

    collector = new DirectMapOutputCollector(umbilical, job, reporter);

  }

  //读取input split,按照其中的信息,生成RecordReader来读取数据

instantiatedSplit = (InputSplit)

      ReflectionUtils.newInstance(job.getClassByName(splitClass), job);

  DataInputBuffer splitBuffer = new DataInputBuffer();

  splitBuffer.reset(split.getBytes(), 0, split.getLength());

  instantiatedSplit.readFields(splitBuffer);

  if (instantiatedSplit instanceof FileSplit) {

    FileSplit fileSplit = (FileSplit) instantiatedSplit;

    job.set("map.input.file", fileSplit.getPath().toString());

    job.setLong("map.input.start", fileSplit.getStart());

    job.setLong("map.input.length", fileSplit.getLength());

  }

  RecordReader rawIn =                  // open input

    job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter);

  RecordReader in = isSkipping() ?

      new SkippingRecordReader(rawIn, getCounters(), umbilical) :

      new TrackedRecordReader(rawIn, getCounters());

  job.setBoolean("mapred.skip.on", isSkipping());

  //对于map task,生成一个MapRunnable,默认是MapRunner

  MapRunnable runner =

    ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

  try {

    //MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理。

    runner.run(in, collector, reporter);     

    collector.flush();

  } finally {

    in.close();                               // close input

    collector.close();

  }

  done(umbilical);

}

MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:

public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,

                Reporter reporter)

  throws IOException {

  try {

    K1 key = input.createKey();

    V1 value = input.createValue();

    while (input.next(key, value)) {

      mapper.map(key, value, output, reporter);

      if(incrProcCount) {

        reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,

            SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);

      }

    }

  } finally {

    mapper.close();

  }

}

结果集全部收集到MapOutputBuffer中,其collect函数如下:

 

public synchronized void collect(K key, V value)

    throws IOException {

  reporter.progress();

  ……

  //从此处看,此buffer是一个ring的数据结构

  final int kvnext = (kvindex + 1) % kvoffsets.length;

  spillLock.lock();

  try {

    boolean kvfull;

    do {

      //在ring中,如果下一个空闲位置接上起始位置的话,则表示满了

      kvfull = kvnext == kvstart;

      //在ring中计算是否需要将buffer写入硬盘的阈值

      final boolean kvsoftlimit = ((kvnext > kvend)

          ? kvnext - kvend > softRecordLimit

          : kvend - kvnext <= kvoffsets.length - softRecordLimit);

      //如果到达阈值,则开始将buffer写入硬盘,写成spill文件。

      //startSpill主要是notify一个背后线程SpillThread的run()函数,开始调用sortAndSpill()开始排序,合并,写入硬盘

      if (kvstart == kvend && kvsoftlimit) {

        startSpill();

      }

      //如果buffer满了,则只能等待写入完毕

      if (kvfull) {

          while (kvstart != kvend) {

            reporter.progress();

            spillDone.await();

          }

      }

    } while (kvfull);

  } finally {

    spillLock.unlock();

  }

  try {

    //如果buffer不满,则将key, value写入buffer

    int keystart = bufindex;

    keySerializer.serialize(key);

    final int valstart = bufindex;

    valSerializer.serialize(value);

    int valend = bb.markRecord();

    //调用设定的partitioner,根据key, value取得partition id

    final int partition = partitioner.getPartition(key, value, partitions);

    mapOutputRecordCounter.increment(1);

    mapOutputByteCounter.increment(valend >= keystart

        ? valend - keystart

        : (bufvoid - keystart) + valend);

    //将parition id以及key, value在buffer中的偏移量写入索引数组

    int ind = kvindex * ACCTSIZE;

    kvoffsets[kvindex] = ind;

    kvindices[ind + PARTITION] = partition;

    kvindices[ind + KEYSTART] = keystart;

    kvindices[ind + VALSTART] = valstart;

    kvindex = kvnext;

  } catch (MapBufferTooSmallException e) {

    LOG.info("Record too large for in-memory buffer: " + e.getMessage());

    spillSingleRecord(key, value);

    mapOutputRecordCounter.increment(1);

    return;

  }

}

内存buffer的格式如下:

(见几位hadoop大侠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 以及http://caibinbupt.iteye.com/ )

clip_image001

kvoffsets是为了写入内存前排序使用的。

从上面可知,内存buffer写入硬盘spill文件的函数为sortAndSpill:

 

 

private void sortAndSpill() throws IOException {

  ……

  FSDataOutputStream out = null;

  FSDataOutputStream indexOut = null;

  IFileOutputStream indexChecksumOut = null;

  //创建硬盘上的spill文件

  Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),

                                  numSpills, size);

  out = rfs.create(filename);

  ……

  final int endPosition = (kvend > kvstart)

    ? kvend

    : kvoffsets.length + kvend;

  //按照partition的顺序对buffer中的数据进行排序

  sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);

  int spindex = kvstart;

  InMemValBytes value = new InMemValBytes();

  //依次一个一个parition的写入文件

  for (int i = 0; i < partitions; ++i) {

    IFile.Writer<K, V> writer = null;

    long segmentStart = out.getPos();

    writer = new Writer<K, V>(job, out, keyClass, valClass, codec);

    //如果combiner为空,则直接写入文件

    if (null == combinerClass) {

        ……

        writer.append(key, value);

        ++spindex;

     }

     else {

        ……

        //如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件

        combineAndSpill(kvIter, combineInputCounter);

     }

  }

  ……

}

当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:

 

private void mergeParts() throws IOException {

    ……

    //对于每一个partition

    for (int parts = 0; parts < partitions; parts++){

      //create the segments to be merged

      List<Segment<K, V>> segmentList =

        new ArrayList<Segment<K, V>>(numSpills);

      TaskAttemptID mapId = getTaskID();

       //依次从各个spill文件中收集属于当前partition的段

      for(int i = 0; i < numSpills; i++) {

        final IndexRecord indexRecord =

          getIndexInformation(mapId, i, parts);

        long segmentOffset = indexRecord.startOffset;

        long segmentLength = indexRecord.partLength;

        Segment<K, V> s =

          new Segment<K, V>(job, rfs, filename[i], segmentOffset,

                            segmentLength, codec, true);

        segmentList.add(i, s);

      }

      //将属于同一个partition的段merge到一起

      RawKeyValueIterator kvIter =

        Merger.merge(job, rfs,

                     keyClass, valClass,

                     segmentList, job.getInt("io.sort.factor", 100),

                     new Path(getTaskID().toString()),

                     job.getOutputKeyComparator(), reporter);

      //写入合并后的段到文件

      long segmentStart = finalOut.getPos();

      Writer<K, V> writer =

          new Writer<K, V>(job, finalOut, keyClass, valClass, codec);

      if (null == combinerClass || numSpills < minSpillsForCombine) {

        Merger.writeFile(kvIter, writer, reporter, job);

      } else {

        combineCollector.setWriter(writer);

        combineAndSpill(kvIter, combineInputCounter);

      }

      ……

    }

}

6.2、ReduceTask

ReduceTask的run函数如下:

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

  throws IOException {

  job.setBoolean("mapred.skip.on", isSkipping());

  //对于reduce,则包含三个步骤:拷贝,排序,Reduce

  if (isMapOrReduce()) {

    copyPhase = getProgress().addPhase("copy");

    sortPhase  = getProgress().addPhase("sort");

    reducePhase = getProgress().addPhase("reduce");

  }

  startCommunicationThread(umbilical);

  final Reporter reporter = getReporter(umbilical);

  initialize(job, reporter);

  //copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程MapOutputCopier,其中copyOutput进行拷贝。

  boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));

  if (!isLocal) {

    reduceCopier = new ReduceCopier(umbilical, job);

    if (!reduceCopier.fetchOutputs()) {

        ……

    }

  }

  copyPhase.complete();

  //sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于访问key-value

  setPhase(TaskStatus.Phase.SORT);

  statusUpdate(umbilical);

  final FileSystem rfs = FileSystem.getLocal(job).getRaw();

  RawKeyValueIterator rIter = isLocal

    ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),

        job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),

        !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),

        new Path(getTaskID().toString()), job.getOutputKeyComparator(),

        reporter)

    : reduceCopier.createKVIterator(job, rfs, reporter);

  mapOutputFilesOnDisk.clear();

  sortPhase.complete();

  //reduce阶段

  setPhase(TaskStatus.Phase.REDUCE);

  ……

  Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);

  Class keyClass = job.getMapOutputKeyClass();

  Class valClass = job.getMapOutputValueClass();

  ReduceValuesIterator values = isSkipping() ?

     new SkippingReduceValuesIterator(rIter,

          job.getOutputValueGroupingComparator(), keyClass, valClass,

          job, reporter, umbilical) :

      new ReduceValuesIterator(rIter,

      job.getOutputValueGroupingComparator(), keyClass, valClass,

      job, reporter);

  //逐个读出key-value list,然后调用Reducer的reduce函数

  while (values.more()) {

    reduceInputKeyCounter.increment(1);

    reducer.reduce(values.getKey(), values, collector, reporter);

    values.nextKey();

    values.informReduceProgress();

  }

  reducer.close();

  out.close(reporter);

  done(umbilical);

}

 

七、总结

Map-Reduce的过程总结如下图:

map reduce

分享到:
评论

相关推荐

    如何使用eclipse调试Hadoop作业

    本篇文章将详细阐述如何利用Eclipse有效地调试Hadoop作业,以及与之相关的源码分析和工具使用技巧。 首先,调试Hadoop作业的准备工作至关重要。你需要确保已经安装并配置好Eclipse,同时在本地或远程服务器上安装了...

    hadoop-2.8.1源码

    - YARN(Yet Another Resource Negotiator)是Hadoop 2引入的新特性,分离了资源管理和作业调度功能,提高了系统的灵活性和可扩展性。 - 资源管理器(ResourceManager):全局资源调度中心,负责分配和监控集群...

    Eclipse中编译运行Hadoop-0.20.1源码

    在Hadoop的日志输出中,你可以看到WordCount作业的运行状态和结果。 总结一下,编译和运行Hadoop-0.20.1源码需要准备合适的开发环境,正确导入源代码到Eclipse项目,启动Hadoop集群,最后编译和运行源代码。通过...

    hadoop 2.2.2 已编译源码

    Hadoop源码还包含许多其他组件,如Hadoop Common,提供了Hadoop集群运行所需的通用工具和服务;Hadoop YARN,负责集群资源管理和任务调度;Hadoop MapReduce Client,用于提交和监控MapReduce作业。此外,还有HDFS的...

    hadoop2.6.0源码和eclipse插件

    4. 使用Eclipse的“Run As”功能运行MapReduce作业,插件会自动将其提交到Hadoop集群。 通过这个插件,开发者可以更高效地进行Hadoop应用的开发和测试,提高开发效率。 总的来说,这个压缩包为Hadoop的开发和学习...

    Hadoop源码分析(client部分)

    ### Hadoop源码分析(client部分) #### Hadoop概述 Hadoop是一个开源的分布式存储与计算框架,由Apache基金会维护。它为用户提供了处理和存储海量数据的能力,并且能够运行在低成本的商品硬件集群上。Hadoop的...

    基于Python和大数据hadoop电影分析系统源码+文档说明.zip

    基于Python和大数据hadoop电影分析系统源码+文档说明.zip,本项目是一套98分毕业设计系统,主要针对计算机相关专业的正在做毕设的学生和需要项目实战练习的学习者,也可作为课程设计、期末大作业,包含:项目源码、...

    hadoop2.7.2源码包

    5. **Hadoop命令行工具**:源码中包含了各种命令行工具,如`fs`用于文件系统操作,`distcp`用于分布式复制,`jar`用于运行MapReduce作业等,这些工具位于`hadoop-tools`目录下。 6. **Hadoop生态组件**:Hadoop生态...

    Hadoop入门程序java源码

    这个“Hadoop入门程序java源码”是为初学者准备的,目的是帮助他们快速掌握如何在Hadoop环境中编写和运行Java程序。Hadoop的主要组件包括HDFS(Hadoop分布式文件系统)和MapReduce,这两个部分将在下面详细介绍。 ...

    hadoop重要源码包和jar包

    在源码包中,开发者可以查看到Hadoop的核心类和接口,例如`org.apache.hadoop.fs.FileSystem`接口定义了与文件系统的交互,`org.apache.hadoop.mapreduce.Job`类提供了提交MapReduce作业的方法。同时,源码中还包含...

    Hadoop图处理内含源码以及说明书可以自己运行复现.zip

    【Hadoop图处理内含源码以及说明书可以自己运行复现】 Hadoop是Apache软件基金会的一个开源项目,它提供了一个分布式文件系统(HDFS)和一个用于处理大规模数据的计算框架MapReduce。这个压缩包“Hadoop图处理内含...

    Hadoop源码分析 第一章 Hadoop脚本

    1. **配置文件**:如`core-site.xml`, `hdfs-site.xml`, `mapred-site.xml`等,这些配置文件定义了Hadoop集群的参数,包括数据块大小、NameNode地址、JobTracker设置等,是Hadoop运行的基础。 2. **HDFS操作**:...

    Hadoop MapReduce Cookbook 源码

    书中提供的源码对于理解MapReduce的工作流程至关重要,读者可以通过实际运行和修改这些代码,加深对概念的理解。 在阅读和实践过程中,建议读者结合Hadoop官方文档和其他相关资料,以便更全面地学习。同时,不断...

    Hadoop源码分析完整版

    本文将深入探讨Hadoop的源码,帮助你理解其核心机制,提升你在大数据处理领域的专业技能。 首先,我们要了解Hadoop的两个核心组件:HDFS(Hadoop Distributed File System)和MapReduce。HDFS是一种分布式文件系统...

    hadoop权威指南的源码

    1. **Hadoop简介**:Hadoop是一个开源的分布式计算框架,主要由Hadoop Distributed File System (HDFS)和MapReduce两部分组成。HDFS提供高容错、高吞吐量的数据存储,而MapReduce则负责大规模数据处理。 2. **HDFS*...

    Hadoop源码编译好的源码(eclipse可直接导入)

    HDFS是Hadoop的核心组件之一,是一个分布式文件系统,设计目标是能够在普通的硬件上运行,提供高吞吐量的数据访问。其主要特点包括: 1. 分布式:数据被分割成多个块,分布在多台机器上,提高了容错性和可用性。 2....

    Hadoop源码分析视频下载

    观看"Hadoop源码分析视频教程",你可以跟随讲师的步伐逐步探索源码,理解每一部分的功能和交互,比阅读文档或书籍更易于理解和吸收。 5. 学习内容可能包括: - Hadoop的安装和配置:了解如何在本地或集群环境中...

    hadoop-1.2.1源码(完整版)

    这个压缩包“hadoop-1.2.1源码(完整版)”提供了Hadoop 1.2.1版本的完整源代码,这对于开发者来说是极其宝贵的资源,能够深入理解Hadoop的工作原理,进行二次开发或自定义优化。 首先,我们来看“src”目录,这是...

    Hadoop权威指南第3版源码(中文版)

    通过学习和分析《Hadoop权威指南》第三版的源码,你可以深入了解Hadoop的内部机制,例如HDFS的数据块分布、心跳机制,以及MapReduce的作业调度、Shuffle阶段等关键概念。此外,源码中的示例应用也能帮助你更好地理解...

Global site tag (gtag.js) - Google Analytics