`
yu06206
  • 浏览: 111390 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

MapReduce运行流程源码分析(一)

阅读更多

   这几天都会看一些hadoop的源代码,开始的时候总会没有任何头绪,不知道从哪开始,经过这几天的对hadoop运行流程的分析和了解,还有从别人那得到的一些启发,再加上看到其他人发表的博客,对hadoop源代码 有了一点点的认识,这篇博客写下一点对hadoop源代码的了解

1.启动hadoop

我们都知道启动hadoop的命令是bin/start-all.sh,通过查看start-all.sh脚本,可以发现运行该脚本之后,Hadoop会配置一系列的环境变量以及其他Hadoop运行所需要的参数,然后在本机运行JobTracker和NameNode。然后通过SSH登录到所有slave机器上,启动TaskTracker和DataNode。

2.启动namenode和JobTracker(这次只分析启动JobTracker)

org.apache.hadoop.mapred.JobTracker类实现了JobTracker启动的实现,我们可以看一下JobTracker这个类,

首先看一下startTracker这个方法

 

public static JobTracker startTracker(JobConf conf) 
  throws IOException, InterruptedException {
    return startTracker(conf, DEFAULT_CLOCK);
  }

  static JobTracker startTracker(JobConf conf, Clock clock) 
  throws IOException, InterruptedException {
    return startTracker(conf, clock, generateNewIdentifier());
  }

  static JobTracker startTracker(JobConf conf, Clock clock, String identifier) 
  throws IOException, InterruptedException {
    JobTracker result = null;
    while (true) {
      try {
        result = new JobTracker(conf, clock, identifier);
        result.taskScheduler.setTaskTrackerManager(result);
        break;
      } catch (VersionMismatch e) {
        throw e;
      } catch (BindException e) {
        throw e;
      } catch (UnknownHostException e) {
        throw e;
      } catch (AccessControlException ace) {
        // in case of jobtracker not having right access
        // bail out
        throw ace;
      } catch (IOException e) {
        LOG.warn("Error starting tracker: " + 
                 StringUtils.stringifyException(e));
      }
      Thread.sleep(1000);
    }
    if (result != null) {
      JobEndNotifier.startNotifier();
    }
    return result;
  }
}

startTracker函数是一个静态方法,是它调用JobTracker的构造函数生成一个JobTracker类的实例,名为result,传入的参数JobConf,进行一系列的配置。然后,进行了一系列初始化活动,包括启动RPC server,启动内置的jetty服务器,检查是否需要重启JobTracker等。

再来看一下offerService方法

 

/**
   * Run forever
   */
  public void offerService() throws InterruptedException, IOException {
    // Prepare for recovery. This is done irrespective of the status of restart
    // flag.
    while (true) {
      try {
        recoveryManager.updateRestartCount();
        break;
      } catch (IOException ioe) {
        LOG.warn("Failed to initialize recovery manager. ", ioe);
        // wait for some time
        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
        LOG.warn("Retrying...");
      }
    }

    taskScheduler.start();
    
    recoveryManager.recover();
    
    // refresh the node list as the recovery manager might have added 
    // disallowed trackers
    refreshHosts();
    
    startExpireTrackersThread();

    expireLaunchingTaskThread.start();

    if (completedJobStatusStore.isActive()) {
      completedJobsStoreThread = new Thread(completedJobStatusStore,
                                            "completedjobsStore-housekeeper");
      completedJobsStoreThread.start();
    }

    // start the inter-tracker server once the jt is ready
    this.interTrackerServer.start();
    
    synchronized (this) {
      state = State.RUNNING;
    }
    LOG.info("Starting RUNNING");
    
    this.interTrackerServer.join();
    LOG.info("Stopped interTrackerServer");
  }

 

 

我们可以看到offerService方法其实及时启了taskScheduler.start(),但是我们接着看TaskScheduler类,我们看到调用的TaskScheduler.start()方法,实际上没有做任何事情,实际上taskScheduler.start()方法执行的是JobQueueTaskScheduler类的start方法。

 

 

 public void start() throws IOException {
    // do nothing
  }

 

我们继续看JobQueueTaskScheduler类

 

public synchronized void start() throws IOException {
    super.start();
    taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
    eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
    eagerTaskInitializationListener.start();
    taskTrackerManager.addJobInProgressListener(
        eagerTaskInitializationListener);
  }

JobQueueTaskScheduler类的start方法主要注册了两个非常重要的监听器:jobQueueJobInProgressListener和eagerTaskInitializationListener。前者是JobQueueJobInProgressListener类的一个实例,该类以先进先出的方式维持一个JobInProgress的队列,并且监听各个JobInProgress实例在生命周期中的变化;后者是EagerTaskInitializationListener类的一个实例,该类不断监听jobInitQueue,一旦发现有新的job被提交(即有新的JobInProgress实例被加入),则立即调用该实例的initTasks方法,对job进行初始化。

那个监听的类我们就不看了,我们看一下JobInProgress类,其中的主要方法initTasks()的主要代码


 

 public synchronized void initTasks() 
  throws IOException, KillInterruptedException, UnknownHostException {
    if (tasksInited.get() || isComplete()) {
      return;
    }
    synchronized(jobInitKillStatus){
      if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
        return;
      }
      jobInitKillStatus.initStarted = true;
    }

    LOG.info("Initializing " + jobId);

    logSubmissionToJobHistory();
    
    // log the job priority
    setPriority(this.priority);
    
    //
    // generate security keys needed by Tasks
    //
    generateAndStoreTokens();
    
    //
    // read input splits and create a map per a split
    //
    TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
    numMapTasks = taskSplitMetaInfo.length;

    checkTaskLimits();

    // Sanity check the locations so we don't create/initialize unnecessary tasks
    for (TaskSplitMetaInfo split : taskSplitMetaInfo) {
      NetUtils.verifyHostnames(split.getLocations());
    }

    jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
    jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);

    createMapTasks(jobFile.toString(), taskSplitMetaInfo);
    
    if (numMapTasks > 0) { 
      nonRunningMapCache = createCache(taskSplitMetaInfo,
          maxLevel);
    }
        
    // set the launch time
    this.launchTime = JobTracker.getClock().getTime();

    createReduceTasks(jobFile.toString());
    
    // Calculate the minimum number of maps to be complete before 
    // we should start scheduling reduces
    completedMapsForReduceSlowstart = 
      (int)Math.ceil(
          (conf.getFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 
                         DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
           numMapTasks));
    
    initSetupCleanupTasks(jobFile.toString());
    
    synchronized(jobInitKillStatus){
      jobInitKillStatus.initDone = true;
      if(jobInitKillStatus.killed) {
        //setup not launched so directly terminate
        throw new KillInterruptedException("Job " + jobId + " killed in init");
      }
    }
    
    tasksInited.set(true);
    JobInitedEvent jie = new JobInitedEvent(
        profile.getJobID(),  this.launchTime,
        numMapTasks, numReduceTasks,
        JobStatus.getJobRunState(JobStatus.PREP),
        false);
    
    jobHistory.logEvent(jie, jobId);
   
    // Log the number of map and reduce tasks
    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks 
             + " map tasks and " + numReduceTasks + " reduce tasks.");
  }

 

 在这个方法里面有两个重要的方法:

createMapTasks(jobFile.toString(), taskSplitMetaInfo);

createReduceTasks(jobFile.toString())

其实初始化Tasks的过程应该就是这部分最重要的一步

 

//map task的个数就是input split的个数
numMapTasks = splits.length;
//为每个map tasks生成一个TaskInProgress来处理一个input split
maps = newTaskInProgress[numMapTasks];
for(inti=0; i < numMapTasks; ++i) {
inputLength += splits[i].getDataLength();
maps[i] =new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i);
}

 

对于map task,将其放入nonRunningMapCache,是一个Map<Node,

List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input

split所在的Node上。在此,Node代表一个datanode或者机架或者数据中

心。nonRunningMapCache将在JobTracker向TaskTracker分配map task的

时候使用。

 

if(numMapTasks > 0) {
nonRunningMapCache = createCache(splits,maxLevel);
}
//创建reduce task
this.reduces = new TaskInProgress[numReduceTasks];
for (int i= 0; i < numReduceTasks; i++) {
reduces[i]= new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this);
/*reducetask放入nonRunningReduces,其将在JobTracker向TaskTracker
分配reduce task的时候使用。*/
nonRunningReduces.add(reduces[i]);
}
//创建两个cleanup task,一个用来清理map,一个用来清理reduce.
cleanup =new TaskInProgress[2];
cleanup[0]= new TaskInProgress(jobId, jobFile, splits[0],
jobtracker, conf, this, numMapTasks);
cleanup[0].setJobCleanupTask();
cleanup[1]= new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this);
cleanup[1].setJobCleanupTask();
//创建两个初始化 task,一个初始化map,一个初始化reduce.
setup =new TaskInProgress[2];
setup[0] =new TaskInProgress(jobId, jobFile, splits[0],
jobtracker,conf, this, numMapTasks + 1 );
setup[0].setJobSetupTask();
setup[1] =new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this);
setup[1].setJobSetupTask();
tasksInited.set(true);//初始化完毕
}

 

 这一部分就结束了,我画一张简单的流程图



启动datanode和TaskTracker(同样我们这里只讲一下TaskTracker)

org.apache.hadoop.mapred.TaskTracker类实现了MapReduce模型中TaskTracker的功能。TaskTracker也是作为一个单独的JVM来运行的,其main函数就是TaskTracker的入口函数,当运行start-all.sh时,脚本就是通过SSH运行该函数来启动TaskTracker的。

我们来看一下TaskTracker类的main函数

 

public static void main(String argv[]) throws Exception {
    StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG);
    if (argv.length != 0) {
      System.out.println("usage: TaskTracker");
      System.exit(-1);
    }
    try {
      JobConf conf=new JobConf();
      // enable the server to track time spent waiting on locks
      ReflectionUtils.setContentionTracing
        (conf.getBoolean(TT_CONTENTION_TRACKING, false));
      new TaskTracker(conf).run();
    } catch (Throwable e) {
      LOG.error("Can not start task tracker because "+
                StringUtils.stringifyException(e));
      System.exit(-1);
    }
  }

 

 

里面主要的代码就是new TaskTracker(conf).run(),传入配置文件conf,其中run函数主要调用了offerService函数


 

public void run() {
    try {
      startCleanupThreads();
      boolean denied = false;
      while (running && !shuttingDown && !denied) {
        boolean staleState = false;
        try {
          // This while-loop attempts reconnects if we get network errors
          while (running && !staleState && !shuttingDown && !denied) {
            try {
              State osState = offerService();
              if (osState == State.STALE) {
                staleState = true;
              } else if (osState == State.DENIED) {
                denied = true;
              }
            } catch (Exception ex) {
              if (!shuttingDown) {
                LOG.info("Lost connection to JobTracker [" +
                         jobTrackAddr + "].  Retrying...", ex);
                try {
                  Thread.sleep(5000);
                } catch (InterruptedException ie) {
                }
              }
            }
          }
        } finally {
          close();
        }
        if (shuttingDown) { return; }
        LOG.warn("Reinitializing local state");
        initialize();
      }
      if (denied) {
        shutdown();
      }
    } catch (IOException iex) {
      LOG.error("Got fatal exception while reinitializing TaskTracker: " +
                StringUtils.stringifyException(iex));
      return;
    }
    catch (InterruptedException i) {
      LOG.error("Got interrupted while reinitializing TaskTracker: " + 
          i.getMessage());
      return;
    }
  }

 

 

每隔一段时间就向JobTracker发送heartbeat

 

long waitTime = heartbeatInterval - (now - lastHeartbeat);

if (waitTime > 0) { // sleeps for the wait time or // until there are empty slots to schedule tasks synchronized (finishedCount) { if (finishedCount.get() == 0) { finishedCount.wait(waitTime); } finishedCount.set(0); } }

 

发送Heartbeat到JobTracker,得到response

 

 // Send the heartbeat and process the jobtracker's directives
        HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);
//从Response中得到此TaskTracker需要做的事情
 TaskTrackerAction[] actions = heartbeatResponse.getActions();

if (actions != null){ 
          for(TaskTrackerAction action: actions) {
            if (action instanceof LaunchTaskAction) {
              addToTaskQueue((LaunchTaskAction)action);
            } else if (action instanceof CommitTaskAction) {
              CommitTaskAction commitAction = (CommitTaskAction)action;
              if (!commitResponses.contains(commitAction.getTaskID())) {
                LOG.info("Received commit task action for " + 
                          commitAction.getTaskID());
                commitResponses.add(commitAction.getTaskID());
              }
            } else {
              tasksToCleanup.put(action);
            }
          }
        }
        markUnresponsiveTasks();
        killOverflowingTasks();
            

 

 

其中transmitHeartBeat方法的作用就是向JobTracker发送Heartbeat

 

//每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息
booleansendCounters;
if (now> (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
sendCounters = true;
previousUpdate = now;
}
else {
sendCounters = false;
}

 

 

报告给JobTracker,此TaskTracker的当前状态

 

if(status == null) {
synchronized (this) {
status = new TaskTrackerStatus(taskTrackerName, localHostname,
httpPort,
cloneAndResetRunningTaskStatuses(
sendCounters),
failures,
maxCurrentMapTasks,
maxCurrentReduceTasks);
}
}


 

当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:

当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数

当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数

 

booleanaskForNewTask;
longlocalMinSpaceStart;
synchronized (this) {
askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||
status.countReduceTasks() <maxCurrentReduceTasks)
&& acceptNewTasks;
localMinSpaceStart = minSpaceStart;
}

//向JobTracker发送heartbeat,这是一个RPC调用
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted, askForNewTask,
heartbeatResponseId);
……
returnheartbeatResponse;
}

 

 这个过程还是很复杂的这涉及到RPC,以及很复杂的通信控制,我在这里比较简单的概括了一下其中过程,希望大家可以自己深入研究,最后还是贴一张我自己画的流程图


 

今天这分析了两步,我自己也要进一步的深入分析,明天继续分析后面的过程,敬请期待!

 

  • 大小: 66.6 KB
  • 大小: 20.2 KB
分享到:
评论

相关推荐

    MapReduce 2.0源码分析与编程实

    全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce2.0的框架结构和源码分析,MapReduce2.0的配置与测试,MapReduce2.0运行流程,MapReduce2.0高级程序设计以及相关特性等内容。《MapReduce...

    MapReduce2.0源码分析与实战编程

    全书分为10章,系统地介绍了HDFS存储系统,Hadoop的文件I/O系统,MapReduce 2.0的框架结构和源码分析,MapReduce 2.0的配置与测试,MapReduce 2.0运行流程,MapReduce 2.0高级程序设计以及相关特性等内容。...

    map reduce 源码分析流程

    map reduce的全部执行流程,源码分析视图

    MapReduce源码分析

    总结,MapReduce的源码分析涵盖了数据分片、Map函数、Shuffle过程、Reduce函数、输入输出格式、任务调度等多个关键部分。理解这些核心组件的工作原理,有助于我们更高效地利用Hadoop MapReduce处理大数据,同时也...

    MapReduce源码流程.pdf

    以下是MapReduce执行流程、Split切片、以及MapTask过程的详细解析。 1. MapReduce执行流程: MapReduce的工作流程主要分为四个步骤:作业提交、任务调度、Map任务执行和Reduce任务执行。首先,客户端将作业提交给...

    Hadoop MapReduce Cookbook 源码

    书中提供的源码对于理解MapReduce的工作流程至关重要,读者可以通过实际运行和修改这些代码,加深对概念的理解。 在阅读和实践过程中,建议读者结合Hadoop官方文档和其他相关资料,以便更全面地学习。同时,不断...

    hadoop 框架下 mapreduce源码例子 wordcount

    在Hadoop框架中,MapReduce是一种分布式计算模型,用于处理和生成大数据集。WordCount是MapReduce中的一个经典示例,它演示了如何利用该框架进行数据处理。在这个例子中,我们将深入理解Hadoop MapReduce的工作原理...

    mapreduce源码

    通过深入分析这些源代码,开发者可以更好地理解Hadoop MapReduce的工作流程,从而定制化处理逻辑、优化性能,甚至开发新的功能。对于大数据处理和分布式计算领域的研究者和工程师来说,这是一个极有价值的学习材料。

    hadoop源码分析-HDFS&MapReduce

    源码分析有助于理解YARN的工作流程和资源分配策略。 通过深入学习HDFS和MapReduce的源码,不仅可以理解它们的工作原理,还能为优化性能、解决故障或开发新的功能提供基础。同时,这也有助于更好地适应Hadoop生态...

    MapReduce分析Youtube数据内含源码以及说明书可以自己运行复现.zip

    在这个"MapReduce分析Youtube数据内含源码以及说明书可以自己运行复现.zip"压缩包中,包含了对YouTube数据进行分析的完整流程。这可能包括用户行为数据、视频信息、评论等。通过MapReduce,我们可以对这些数据进行...

    hadoop源码分析-mapreduce部分.doc

    《Hadoop源码分析——MapReduce深度解析》 Hadoop,作为云计算领域的核心组件,以其分布式存储和计算能力,为大数据处理提供了强大的支持。MapReduce是Hadoop的主要计算框架,其设计思想源于Google的论文,旨在解决...

    SDU-大数据实验&课设(mapreduce)内含源码和说明书(可以直接运行).zip

    2. **源码分析**: - `AudioRecommend`:这部分代码可能是实现音频推荐算法的核心部分,可能涉及到机器学习模型,如协同过滤或基于内容的推荐,通过对用户听歌历史和音乐特征的分析,为用户推荐相似或相关的音频...

    MapReduce保姆级教程源码

    - Hadoop是开源实现MapReduce的框架,它提供了一个运行MapReduce作业的分布式文件系统(HDFS)和一个执行环境(YARN)。 - Hadoop MapReduce通过拆分任务、调度和容错管理,使得大规模数据处理变得可能。 3. **...

    MapReduce Job本地提交过程源码跟踪及分析

    MapReduce是Hadoop生态系统中的核心组件,主要用于处理和存储大规模数据。...通过阅读《Job本地提交过程源码分析及图解》这样的文档,我们可以深入学习MapReduce的工作原理,提升我们的Hadoop编程技能。

Global site tag (gtag.js) - Google Analytics