`
landyer
  • 浏览: 144352 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Hadoop学习总结:Map-Reduce的过程解析

阅读更多

原文地址:http://www.cnblogs.com/end/archive/2011/04/26/2029496.html

 

 

一、客户端

Map-Reduce的过程首先是由客户端提交一个任务开始的。

提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:

public static RunningJob runJob(JobConf job) throws IOException {

  //首先生成一个JobClient对象

  JobClient jc = new JobClient(job);

  ……

  //调用submitJob来提交一个任务

  running = jc.submitJob(job);

  JobID jobId = running.getID();

  ……

  while (true) {

     //while循环中不断得到此任务的状态,并打印到客户端console中

  }

  return running;

}

其中JobClient的submitJob函数实现如下:

public RunningJob submitJob(JobConf job) throws FileNotFoundException,

                                InvalidJobConfException, IOException {

  //从JobTracker得到当前任务的id

  JobID jobId = jobSubmitClient.getNewJobId();

  //准备将任务运行所需要的要素写入HDFS:

  //任务运行程序所在的jar封装成job.jar

  //任务所要处理的input split信息写入job.split

  //任务运行的配置项汇总写入job.xml

  Path submitJobDir = new Path(getSystemDir(), jobId.toString());

  Path submitJarFile = new Path(submitJobDir, "job.jar");

  Path submitSplitFile = new Path(submitJobDir, "job.split");

  //此处将-libjars命令行指定的jar上传至HDFS

  configureCommandLineOptions(job, submitJobDir, submitJarFile);

  Path submitJobFile = new Path(submitJobDir, "job.xml");

  ……

  //通过input format的格式获得相应的input split,默认类型为FileSplit

  InputSplit[] splits =

    job.getInputFormat().getSplits(job, job.getNumMapTasks());

 

  // 生成一个写入流,将input split得信息写入job.split文件

  FSDataOutputStream out = FileSystem.create(fs,

      submitSplitFile, new FsPermission(JOB_FILE_PERMISSION));

  try {

    //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。

    //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split 在文件中的起始位置),split的location信息(即在那个DataNode上)。

    writeSplitsFile(splits, out);

  } finally {

    out.close();

  }

  job.set("mapred.job.split.file", submitSplitFile.toString());

  //根据split的个数设定map task的个数

  job.setNumMapTasks(splits.length);

  // 写入job的配置信息入job.xml文件      

  out = FileSystem.create(fs, submitJobFile,

      new FsPermission(JOB_FILE_PERMISSION));

  try {

    job.writeXml(out);

  } finally {

    out.close();

  }

  //真正的调用JobTracker来提交任务

  JobStatus status = jobSubmitClient.submitJob(jobId);

  ……

}

 

二、JobTracker

JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:

  • 调用静态函数startTracker(new JobConf())创建一个JobTracker对象
  • 调用JobTracker.offerService()函数提供服务

在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。

在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:

  • JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态
  • EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化

EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。

在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:

synchronized (jobs) {

  synchronized (taskScheduler) {

    jobs.put(job.getProfile().getJobID(), job);

    //对JobTracker的每一个listener都调用jobAdded函数

    for (JobInProgressListener listener : jobInProgressListeners) {

      listener.jobAdded(job);

    }

  }

}

 

EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:

 

public synchronized void initTasks() throws IOException {

  ……

  //从HDFS中读取job.split文件从而生成input splits

  String jobFile = profile.getJobFile();

  Path sysDir = new Path(this.jobtracker.getSystemDir());

  FileSystem fs = sysDir.getFileSystem(conf);

  DataInputStream splitFile =

    fs.open(new Path(conf.get("mapred.job.split.file")));

  JobClient.RawSplit[] splits;

  try {

    splits = JobClient.readSplitFile(splitFile);

  } finally {

    splitFile.close();

  }

  //map task的个数就是input split的个数

  numMapTasks = splits.length;

  //为每个map tasks生成一个TaskInProgress来处理一个input split

  maps = new TaskInProgress[numMapTasks];

  for(int i=0; i < numMapTasks; ++i) {

    inputLength += splits[i].getDataLength();

    maps[i] = new TaskInProgress(jobId, jobFile,

                                 splits[i],

                                 jobtracker, conf, this, i);

  }

  //对于map task,将其放入nonRunningMapCache,是一个Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。

  if (numMapTasks > 0) { 
    nonRunningMapCache = createCache(splits, maxLevel);
  }

 

  //创建reduce task

  this.reduces = new TaskInProgress[numReduceTasks];

  for (int i = 0; i < numReduceTasks; i++) {

    reduces[i] = new TaskInProgress(jobId, jobFile,

                                    numMapTasks, i,

                                    jobtracker, conf, this);

    //reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。

    nonRunningReduces.add(reduces[i]);

  }

 

  //创建两个cleanup task,一个用来清理map,一个用来清理reduce.

  cleanup = new TaskInProgress[2];

  cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],

          jobtracker, conf, this, numMapTasks);

  cleanup[0].setJobCleanupTask();

  cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

                     numReduceTasks, jobtracker, conf, this);

  cleanup[1].setJobCleanupTask();

  //创建两个初始化 task,一个初始化map,一个初始化reduce.

  setup = new TaskInProgress[2];

  setup[0] = new TaskInProgress(jobId, jobFile, splits[0],

          jobtracker, conf, this, numMapTasks + 1 );

  setup[0].setJobSetupTask();

  setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,

                     numReduceTasks + 1, jobtracker, conf, this);

  setup[1].setJobSetupTask();

  tasksInited.set(true);//初始化完毕

  ……

}

 

三、TaskTracker

TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:

 

State offerService() throws Exception {

  long lastHeartbeat = 0;

  //TaskTracker进行是一直存在的

  while (running && !shuttingDown) {

      ……

      long now = System.currentTimeMillis();

      //每隔一段时间就向JobTracker发送heartbeat

      long waitTime = heartbeatInterval - (now - lastHeartbeat);

      if (waitTime > 0) {

        synchronized(finishedCount) {

          if (finishedCount[0] == 0) {

            finishedCount.wait(waitTime);

          }

          finishedCount[0] = 0;

        }

      }

      ……

      //发送Heartbeat到JobTracker,得到response

      HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

      ……

     //从Response中得到此TaskTracker需要做的事情

      TaskTrackerAction[] actions = heartbeatResponse.getActions();

      ……

      if (actions != null){

        for(TaskTrackerAction action: actions) {

          if (action instanceof LaunchTaskAction) {

            //如果是运行一个新的Task,则将Action添加到任务队列中

            addToTaskQueue((LaunchTaskAction)action);

          } else if (action instanceof CommitTaskAction) {

            CommitTaskAction commitAction = (CommitTaskAction)action;

            if (!commitResponses.contains(commitAction.getTaskID())) {

              commitResponses.add(commitAction.getTaskID());

            }

          } else {

            tasksToCleanup.put(action);

          }

        }

      }

  }

  return State.NORMAL;

}

其中transmitHeartBeat主要逻辑如下:

 

private HeartbeatResponse transmitHeartBeat(long now) throws IOException {

  //每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息

  boolean sendCounters;

  if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {

    sendCounters = true;

    previousUpdate = now;

  }

  else {

    sendCounters = false;

  }

  ……

  //报告给JobTracker,此TaskTracker的当前状态

  if (status == null) {

    synchronized (this) {

      status = new TaskTrackerStatus(taskTrackerName, localHostname,

                                     httpPort,

                                     cloneAndResetRunningTaskStatuses(

                                       sendCounters),

                                     failures,

                                     maxCurrentMapTasks,

                                     maxCurrentReduceTasks);

    }

  }

  ……

  //当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行:

  //当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数

  //当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数

  boolean askForNewTask;

  long localMinSpaceStart;

  synchronized (this) {

    askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||

                     status.countReduceTasks() < maxCurrentReduceTasks) &&

                    acceptNewTasks;

    localMinSpaceStart = minSpaceStart;

  }

  ……

  //向JobTracker发送heartbeat,这是一个RPC调用

  HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,

                                                            justStarted, askForNewTask,

                                                            heartbeatResponseId);

  ……

  return heartbeatResponse;

}

 

四、JobTracker

当 JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:

 

public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,

                                                boolean initialContact, boolean acceptNewTasks, short responseId)

  throws IOException {

  ……

  String trackerName = status.getTrackerName();

  ……

  short newResponseId = (short)(responseId + 1);

  ……

  HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);

  List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

  //如果TaskTracker向JobTracker请求一个task运行

  if (acceptNewTasks) {

    TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);

    if (taskTrackerStatus == null) {

      LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);

    } else {

      //setup和cleanup的task优先级最高

      List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);

      if (tasks == null ) {

        //任务调度器分配任务

        tasks = taskScheduler.assignTasks(taskTrackerStatus);

      }

      if (tasks != null) {

        for (Task task : tasks) {

          //将任务放入actions列表,返回给TaskTracker

          expireLaunchingTasks.addNewTask(task.getTaskID());

          actions.add(new LaunchTaskAction(task));

        }

      }

    }

  }

  ……

  int nextInterval = getNextHeartbeatInterval();

  response.setHeartbeatInterval(nextInterval);

  response.setActions(

                      actions.toArray(new TaskTrackerAction[actions.size()]));

  ……

  return response;

}

默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:

 

public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)

    throws IOException {

  ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();

  int numTaskTrackers = clusterStatus.getTaskTrackers();

  Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue();

  int maxCurrentMapTasks = taskTracker.getMaxMapTasks();

  int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();

  int numMaps = taskTracker.countMapTasks();

  int numReduces = taskTracker.countReduceTasks();

  //计算剩余的map和reduce的工作量:remaining

  int remainingReduceLoad = 0;

  int remainingMapLoad = 0;

  synchronized (jobQueue) {

    for (JobInProgress job : jobQueue) {

      if (job.getStatus().getRunState() == JobStatus.RUNNING) {

        int totalMapTasks = job.desiredMaps();

        int totalReduceTasks = job.desiredReduces();

        remainingMapLoad += (totalMapTasks - job.finishedMaps());

        remainingReduceLoad += (totalReduceTasks - job.finishedReduces());

      }

    }

  }

  //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。

  int maxMapLoad = 0;

  int maxReduceLoad = 0;

  if (numTaskTrackers > 0) {

    maxMapLoad = Math.min(maxCurrentMapTasks,

                          (int) Math.ceil((double) remainingMapLoad /

                                          numTaskTrackers));

    maxReduceLoad = Math.min(maxCurrentReduceTasks,

                             (int) Math.ceil((double) remainingReduceLoad

                                             / numTaskTrackers));

  }

  ……

 

  //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task

  if (numMaps < maxMapLoad) {

    int totalNeededMaps = 0;

    synchronized (jobQueue) {

      for (JobInProgress job : jobQueue) {

        if (job.getStatus().getRunState() != JobStatus.RUNNING) {

          continue;

        }

        Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,

            taskTrackerManager.getNumberOfUniqueHosts());

        if (t != null) {

          return Collections.singletonList(t);

        }

        ……

      }

    }

  }

  //分配完map task,再分配reduce task

  if (numReduces < maxReduceLoad) {

    int totalNeededReduces = 0;

    synchronized (jobQueue) {

      for (JobInProgress job : jobQueue) {

        if (job.getStatus().getRunState() != JobStatus.RUNNING ||

            job.numReduceTasks == 0) {

          continue;

        }

        Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,

            taskTrackerManager.getNumberOfUniqueHosts());

        if (t != null) {

          return Collections.singletonList(t);

        }

        ……

      }

    }

  }

  return null;

}

从 上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找 TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。

 

五、TaskTracker

在 向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用 addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):

private void addToTaskQueue(LaunchTaskAction action) {

  if (action.getTask().isMapTask()) {

font-size: 13px; margin-top: 5px; margin-right: auto; margin-bottom: 5px; margin-left: auto; text-indent:

分享到:
评论

相关推荐

    Hadoop学习总结之四:Map-Reduce过程解析

    ### Hadoop MapReduce任务提交与执行流程解析 #### 一、客户端提交任务 在Hadoop MapReduce框架中,客户端的任务提交是整个MapReduce作业启动的关键步骤。这一过程主要由`JobClient`类中的`runJob(JobConf job)`...

    hadoop map-reduce turorial

    **源代码解析**:单词计数是Hadoop Map-Reduce中最经典的示例之一,用于演示如何读取文本文件,统计其中每个单词出现的次数。Map函数将文本文件中的每一行分解为单词,为每个单词创建键值对(&lt;单词,1&gt;),Reduce...

    Hadoop Map-Reduce

    Hadoop Map-Reduce Map-Reduce 是 Hadoop 框架中的一种核心组件,用于处理大规模数据。Map-Reduce 依靠两大步骤来完成数据处理:Map 和 Reduce。 Map 阶段的主要任务是将输入数据拆分成小块,并将其转换成 key-...

    Hive - A Warehousing Solution Over a Map-Reduce.pdf

    为了解决这一问题,Hadoop作为一种开源的Map-Reduce实现框架被广泛应用于存储与处理大规模数据集。然而,Map-Reduce编程模型本身较为底层,开发者需要编写大量的自定义程序来实现特定的功能,这不仅增加了开发的难度...

    Hadoop学习总结和源码分析

    “Hadoop学习总结之四:Map-Reduce的过程解析.doc”进一步深入MapReduce的工作流程。Map任务在多个DataNode上并行执行,Reduce任务则根据需要的数量进行划分。Shuffle和Sort阶段在Map和Reduce之间起到关键作用,它们...

    hadoop-training-map-reduce-example-4

    6. **日志和调试**:学习如何查看和解析Hadoop作业的日志,以及如何调试分布式环境中的问题。 7. **性能优化**:了解如何通过调整参数、使用Combiner和Partitioner等方式提升MapReduce作业的性能。 8. **实战应用**...

    hadoop-map-reduce-demo

    总结,`hadoop-map-reduce-demo`项目是一个理想的起点,通过它,你可以深入学习和掌握Hadoop MapReduce的核心概念和编程技巧。同时,Java作为主要的编程语言,对于理解分布式计算的实现细节至关重要。不断实践和探索...

    基于Eclipse的hadoop-eclipse-plugin-2.0.0插件

    1. **创建MapReduce项目**:安装插件后,Eclipse会新增一个"New" -&gt; "Other" -&gt; "Hadoop Map/Reduce Project"选项,点击即可创建一个MapReduce项目,提供了模板化的Job类和Mapper/Reducer类。 2. **编辑HDFS文件**...

    Hadoop学习总结

    总结来说,Hadoop的学习涵盖了HDFS的基础概念、数据读写流程,以及Map-Reduce模型的理解和应用。掌握这些知识点,不仅能够帮助你理解和操作Hadoop系统,也为进一步探索大数据处理和分析打下坚实基础。在实践中不断...

    Hadoop技术内幕 深入理解MapReduce架构设计与实现原理 高清完整中文版PDF下载

    ### Hadoop技术内幕:深入解析MapReduce架构设计与实现原理 #### 一、Hadoop及其重要性 Hadoop是一个开放源代码的分布式计算框架,它能够处理大量的数据集,并通过集群提供高性能的数据处理能力。随着大数据时代的...

    hadoop-2.10.0-src.tar.gz

    源码中,MapTask和ReduceTask的执行流程值得深入分析。 四、源码学习价值 阅读Hadoop 2.10.0的源码,可以帮助我们: 1. 理解Hadoop的内部工作机制,提升问题排查能力。 2. 学习分布式系统的设计与实现,为自定义...

    hadoop,hive,hbase学习资料

    2. **Hadoop学习总结之一:HDFS简介.doc**、**Hadoop学习总结之四:Map-Reduce的过程解析.doc**、**Hadoop学习总结之五:Hadoop的运行痕迹.doc**、**Hadoop学习总结之二:HDFS读写过程解析.doc**:这些文档详细介绍...

    hadoop2x-eclipse-plugin-original

    "Map"阶段将输入数据拆分成键值对,"Reduce"阶段则聚合这些键值对的结果。 5. **Hadoop-Eclipse插件功能**:该插件提供HDFS的浏览和文件操作功能,可以在Eclipse内创建、编辑和运行MapReduce程序,同时还可以直接...

    Hadoop学习总结.doc

    ### Hadoop 学习总结 #### 一、HDFS简介 **1.1 数据块(Block)** HDFS(Hadoop Distributed File System)是Hadoop的核心组件之一,它主要用于存储大规模的数据集。HDFS默认的基本存储单位是64MB的数据块。与...

    Hadoop技术内幕 深入解析MapReduce架构设计与实现原理[董西成][带书签].pdf 百度网盘下载

    根据提供的文件信息,本文将深入解析《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》这本书中的关键知识点,主要包括Hadoop的核心组件——MapReduce的设计理念、架构组成及其具体的实现原理。 ### Hadoop...

    Map_Reduce_Hadoop:实施map-reduce程序来执行等值连接

    在Hadoop MapReduce中,我们通常需要将数据分片、映射和规约来实现这一过程。以下是一个详细的步骤概述: 1. **数据预处理**: 在MapReduce程序开始之前,确保输入数据已经被正确格式化,并存储在HDFS(Hadoop...

    Hadoop (十五)Hadoop-MR编程 -- 【使用hadoop计算网页之间的PageRank值----编程】

    在本篇中,我们将深入探讨如何使用Hadoop MapReduce编程模型来计算网页之间的PageRank值。...这个过程展示了MapReduce如何通过分布式计算处理复杂的数据分析任务,同时也揭示了Hadoop在大数据处理中的强大能力。

    Map-Reduce-Iris-Flower:这些Map Reduce程序的目标是从著名的鸢尾花数据集中计算出萼片长度,萼片宽度,花瓣长度和花瓣宽度的最大值,最小值和平均值。

    这个项目,"Map-Reduce-Iris-Flower",是利用MapReduce技术对鸢尾花(Iris flower)数据集进行分析的一个实例。鸢尾花数据集是机器学习领域中的经典数据集,包含三种不同种类的鸢尾花,每种花有四个特征:萼片长度、...

    map-reduce详细.pdf

    总结来说,MapReduce通过将大任务分解为小的Map任务,然后并行处理,再通过Reduce阶段聚合结果,实现了大数据处理的高效性和可扩展性。在Hadoop中,这个过程被巧妙地设计和实现,使得开发者能够专注于业务逻辑,而...

Global site tag (gtag.js) - Google Analytics