`
luliangy
  • 浏览: 96635 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

hadoop学习笔记(三)——MapTask和ReduceTask

阅读更多

MapTaskReduceTask

 

我们知道每一个Task都对应着一个jvm的执行,但是每一个Task都是在Child进程中执行的!我们来看一下Child类的主方法(源代码)

public static void main(String[] args) throws Throwable {

    LOG.debug("Child starting");

 

final JobConf defaultConf = new JobConf();

获取TaskIP,和端口信息

    String host = args[0];

    int port = Integer.parseInt(args[1]);

final InetSocketAddress address = new InetSocketAddress(host, port);

TaskID

final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);

 

    final String logLocation = args[3];

    final int SLEEP_LONGER_COUNT = 5;

int jvmIdInt = Integer.parseInt(args[4]);

每个任务所分配的JVMID

JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);

判断是MapTask还是ReduceTask

    String prefix = firstTaskid.isMap() ? "MapTask" : "ReduceTask";

 

    工作路径

    cwd = System.getenv().get(TaskRunner.HADOOP_WORK_DIR);

    if (cwd == null) {

      throw new IOException("Environment variable " + 

                             TaskRunner.HADOOP_WORK_DIR + " is not set");

    }

 

   获取环境变量中的值--hadoop中文件系统所在的目录

     String jobTokenFile = 

      System.getenv().get  (UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);

   签订证书

    Credentials credentials = 

      TokenCache.loadTokens(jobTokenFile, defaultConf);

    LOG.debug("loading token. # keys =" +credentials.numberOfSecretKeys() + 

        "; from file=" + jobTokenFile);

    

Token<JobTokenIdentifier> jt = TokenCache.getJobToken(credentials);

设置服务器的地址

    jt.setService(new Text(address.getAddress().getHostAddress() + ":"

        + address.getPort()));

     当前用户组信息

    UserGroupInformation current = UserGroupInformation.getCurrentUser();

current.addToken(jt);

 

    远程的用户

    UserGroupInformation taskOwner 

     = UserGroupInformation.createRemoteUser(firstTaskid.getJobID().toString());

    taskOwner.addToken(jt);

    

    设置证书

    defaultConf.setCredentials(credentials);

    

final TaskUmbilicalProtocol umbilical = 

 执行任务

      taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {

       

        public TaskUmbilicalProtocol run() throws Exception {

          return 

     返回RPC代理出的结果

(TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,

              TaskUmbilicalProtocol.versionID,

              address,

              defaultConf);

        }

    });

    

    int numTasksToExecute = -1; //-1 signifies "no limit"

int numTasksExecuted = 0;

日志记录的代码

     ......

    t.setName("Thread for syncLogs");

    t.setDaemon(true);

    t.start();

    

    String pid = "";

    if (!Shell.WINDOWS) {

      pid = System.getenv().get("JVM_PID");

    }

    JvmContext context = new JvmContext(jvmId, pid);

    int idleLoopCount = 0;

    Task task = null;

    

    UserGroupInformation childUGI = null;

 

    try {

      while (true) {

        taskid = null;

        currentJobSegmented = true;

 

        JvmTask myTask = umbilical.getTask(context);

        if (myTask.shouldDie()) {

          break;

        } else {

          if (myTask.getTask() == null) {

            taskid = null;

            currentJobSegmented = true;

 

            if (++idleLoopCount >= SLEEP_LONGER_COUNT) {

              //we sleep for a bigger interval when we don't receive

              //tasks for a while

              Thread.sleep(1500);

            } else {

              Thread.sleep(500);

            }

            continue;

          }

        }

        idleLoopCount = 0;

        task = myTask.getTask();

        taskid = task.getTaskID();

 

        // Create the JobConf and determine if this job gets segmented task logs

        final JobConf job = new JobConf(task.getJobFile());

        currentJobSegmented = logIsSegmented(job);

 

        isCleanup = task.isTaskCleanupTask();

        重设静态数据

        FileSystem.clearStatistics();

        

        设置job  证书                      job.setCredentials(defaultConf.getCredentials());

        

设置当当前的任务执行完毕的时候即JVM执行完毕之后就关闭本地缓存系统

        job.setBoolean("fs.file.impl.disable.cache"false);

 

        // set the jobTokenFile into task

        task.setJobTokenSecret(JobTokenSecretManager.

            createSecretKey(jt.getPassword()));

 

      

         设置本地的map路径,之后当前child只能从临时的目录读取数据

        TaskRunner.setupChildMapredLocalDirs(task, job);

        

        // setup the child's attempt directories

        设置临时的路径

        localizeTask(task, job, logLocation);

 

       

为分布式缓存系统设置工作路径

        TaskRunner.setupWorkDir(job, new File(cwd));

       

        设置日志文件的位置

        TaskLog.syncLogs

          (logLocation, taskidisCleanuplogIsSegmented(job));

        

        numTasksToExecute = job.getNumTasksToExecutePerJvm();

        assert(numTasksToExecute != 0);

 

        task.setConf(job);

 

        // Initiate Java VM metrics

初始化JVMMetrics(监测系统运行的对象)

        initMetrics(prefix, jvmId.toString(), job.getSessionId());

 

        LOG.debug("Creating remote user to execute task: " + job.get("user.name"));

        childUGI = UserGroupInformation.createRemoteUser(job.get("user.name"));

     

 设置新的标志以便任务的执行

        for(Token<?> token : UserGroupInformation.getCurrentUser().getTokens()) {

          childUGI.addToken(token);

        }

        

        // Create a final reference to the task for the doAs block

创建task执行的块

        final Task taskFinal = task;

        childUGI.doAs(new PrivilegedExceptionAction<Object>() {

          @Override

          public Object run() throws Exception {

            try {

 

        设置工作的路径              

       FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());

执行次方法才开始执行任务

              taskFinal.run(job, umbilical);           

            } finally {

              TaskLog.syncLogs

                (logLocation, taskidisCleanuplogIsSegmented(job));

              TaskLogsTruncater trunc = new TaskLogsTruncater(defaultConf);

              trunc.truncateLogs(new JVMInfo(

                  TaskLog.getAttemptDir(taskFinal.getTaskID(),

                    taskFinal.isTaskCleanupTask()), Arrays.asList(taskFinal)));

            }

 

            return null;

          }

        });

        if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) {

表示任务执行完毕

          break;

        }

      }

    } catch (FSError e) {

      LOG.fatal("FSError from child", e);

      umbilical.fsError(taskid, e.getMessage());

    } catch (Exception exception) {

      LOG.warn("Error running child", exception);

      try {

        if (task != null) {

          // do cleanup for the task

          if(childUGI == null) {

            task.taskCleanup(umbilical);

          } else {

            final Task taskFinal = task;

            childUGI.doAs(new PrivilegedExceptionAction<Object>() {

              @Override

              public Object run() throws Exception {

                taskFinal.taskCleanup(umbilical);

                return null;

              }

            });

          }

        }

      } catch (Exception e) {

        LOG.info("Error cleaning up", e);

      }

      // Report back any failures, for diagnostic purposes

      ByteArrayOutputStream baos = new ByteArrayOutputStream();

      exception.printStackTrace(new PrintStream(baos));

      if (taskid != null) {

        umbilical.reportDiagnosticInfo(taskid, baos.toString());

      }

    } catch (Throwable throwable) {

      LOG.fatal("Error running child : "

                + StringUtils.stringifyException(throwable));

      if (taskid != null) {

        Throwable tCause = throwable.getCause();

        String cause = tCause == null 

                       ? throwable.getMessage() 

                       : StringUtils.stringifyException(tCause);

        umbilical.fatalError(taskid, cause);

      }

finally {

finally里面停止RPC传输和Metrics监测

      RPC.stopProxy(umbilical);

      shutdownMetrics();

      停止日志对象

      LogManager.shutdown();

    }

  }

 

通过对Child类主方法的分析我们发现,每个Child进程通过RPC代理和tasktracker进行通信,系统为我们设计好了一些RPC工具类,方便调用。还有就是这个Metrics类他能任务的运行状况。当前用户和远程的用户签订了一种证书,相当于一种协议,这个证书对于tasktracker的调度有很大的作用。在执行任务之前做了一系列的初始化动作包括设置各种路径以及初始化Metrics对象,任务执行完毕之后要汇报结果以及做一些关闭动作。

 

每一个Task都对应着一个自定义的Run方法;

来看看Maprun方法:

 public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)

    throws IOException, ClassNotFoundException, InterruptedException {

    this.umbilical = umbilical;

 

定期向JobTracker报告的类    

TaskReporter reporter = new TaskReporter(getProgress(), umbilical);

开始通信线程

    reporter.startCommunicationThread();

boolean useNewApi = job.getUseNewMapper();

初始化任务

    initialize(job, getJobID(), reporter, useNewApi);

 

    查看是不是清理任务

    if (jobCleanup) {

      runJobCleanupTask(umbilical, reporter);

      return;

}

建立任务

    if (jobSetup) {

      runJobSetupTask(umbilical, reporter);

      return;

    }

    if (taskCleanup) {

      runTaskCleanupTask(umbilical, reporter);

      return;

}

Map任务的run方法中我们首先要建立一个TaskReporter 对象用于TaskTracker报告结果,然后我们要判断该执行何种任务,是建立任务还是清除的任务,然后在执行。

 

    如果是新的配置下的api就执行新的任务否则执行以前的任务

    if (useNewApi) {

      runNewMapper(job, splitMetaInfo, umbilical, reporter);

    } else {

      runOldMapper(job, splitMetaInfo, umbilical, reporter);

    }

    done(umbilical, reporter);

  }

 

其实我们可以想像在ReduceTaskrun方法中也应该有部分类似的方法

ReduceTask中的run方法:

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)

    throws IOException, InterruptedException, ClassNotFoundException {

    this.umbilical = umbilical;

    job.setBoolean("mapred.skip.on", isSkipping());

 

    Reduce任务分为三个部分copy,sort,reduce

    if (isMapOrReduce()) {

      copyPhase = getProgress().addPhase("copy");

      sortPhase  = getProgress().addPhase("sort");

      reducePhase = getProgress().addPhase("reduce");

    }

    开启一个想TaskTracker汇报的线程;

TaskReporter reporter = new TaskReporter(getProgress(), umbilical);

 

    reporter.startCommunicationThread();

    boolean useNewApi = job.getUseNewReducer();

    initialize(job, getJobID(), reporter, useNewApi);

 

    // check if it is a cleanupJobTask

    if (jobCleanup) {

      runJobCleanupTask(umbilical, reporter);

        清理Job,包括步骤状态设置,更新状态到TaskTracker

      return;

    }

    if (jobSetup) {

      runJobSetupTask(umbilical, reporter);

              清理TaskTracker和上述CleanupTask方法类似。 

      return;

    }

    if (taskCleanup) {

      runTaskCleanupTask(umbilical, reporter);

      return;

    }

    

    // Initialize the codec

    codec = initCodec();

 

 

    boolean isLocal = "local".equals(job.get("mapred.job.tracker""local"));

    if (!isLocal) {

      reduceCopier = new ReduceCopier(umbilical, job, reporter);

      if (!reduceCopier.fetchOutputs()) {

        if(reduceCopier.mergeThrowable instanceof FSError) {

          throw (FSError)reduceCopier.mergeThrowable;

        }

        throw new IOException("Task: " + getTaskID() + 

            " - The reduce copier failed"reduceCopier.mergeThrowable);

      }

}

 

copyPhase.complete();                         

Copy结束后进行sort

    setPhase(TaskStatus.Phase.SORT);

    statusUpdate(umbilical);

 

final FileSystem rfs = FileSystem.getLocal(job).getRaw();

Sort之后进行Merge

    RawKeyValueIterator rIter = isLocal

      ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),

          job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),

          !conf.getKeepFailedTaskFiles(), 

job.getInt("io.sort.factor", 100),(sort的属性设置,缓冲区的大小默认100)

          new Path(getTaskID().toString()), job.getOutputKeyComparator(),

          reporter, spilledRecordsCounternull)

      : reduceCopier.createKVIterator(job, rfs, reporter);

        

清除磁盘的临时文件;

    mapOutputFilesOnDisk.clear();

    Sort结束

sortPhase.complete();   

   

开始执行Reduce程序                   

    setPhase(TaskStatus.Phase.REDUCE); 

statusUpdate(umbilical);

 

    Class keyClass = job.getMapOutputKeyClass();

    Class valueClass = job.getMapOutputValueClass();

    RawComparator comparator = job.getOutputValueGroupingComparator();

 

判断是否执行新的api

    if (useNewApi) {

      runNewReducer(job, umbilical, reporter, rIter, comparator, 

                    keyClass, valueClass);

    } else {

      runOldReducer(job, umbilical, reporter, rIter, comparator, 

                    keyClass, valueClass);

    }

    done(umbilical, reporter);

  }

 

通过对ReduceTask的分析我们可以很清晰的发现ReduceTask执行顺序,copysortReduce,其中sort之后还会做Merge处理,同时copy完了之后会对磁盘上的临时文件做清除处理。然后的执行策略和MapTask基本一致。


以上分析只是自己的一点简单的理解,具体的很多实现我还没有去了解比如Metrics的具体实现以及那个证书具体的东东等等。

 

 

<!--EndFragment-->

  • 大小: 7.6 KB
分享到:
评论

相关推荐

    Hadoop源代码分析(MapTask)

    Hadoop源代码分析(MapTask) Hadoop的MapTask类是Hadoop MapReduce框架中的一部分,负责执行Map任务。MapTask类继承自Task类,是MapReduce框架中的一个重要组件。本文将对MapTask类的源代码进行分析,了解其内部...

    Hadoop学习笔记

    Hadoop学习笔记,自己总结的一些Hadoop学习笔记,比较简单。

    hadoop学习笔记.rar

    三、Hadoop学习笔记之三:用MRUnit做单元测试 MRUnit是针对MapReduce任务的单元测试框架,它允许开发者对MapReduce作业进行单元测试,确保每个Mapper和Reducer的功能正确性。通过MRUnit,可以在不实际运行Hadoop集群...

    Hadoop源代码分析(MapTask辅助类,III)

    ### Hadoop源代码分析——MapTask辅助类输出机制详解 #### 概述 本文将深入探讨Hadoop MapReduce框架中的MapTask辅助类中与键值对(Key-Value,简称KV)输出相关的源代码实现细节。这部分内容对于理解Hadoop内部...

    最新Hadoop学习笔记

    综上所述,这份“最新Hadoop学习笔记”涵盖了从环境搭建到项目实践的全过程,旨在帮助读者全面掌握Hadoop的核心技术和应用方式。通过阅读和实践,你将能够熟练地在大规模数据集上进行高效计算和存储。

    Hadoop 学习笔记.md

    Hadoop 学习笔记.md

    hadoop学习笔记(三)

    在本篇"Hadoop学习笔记(三)"中,我们将探讨如何使用Hadoop的MapReduce框架来解决一个常见的问题——从大量数据中找出最大值。这个问题与SQL中的`SELECT MAX(NUMBER) FROM TABLE`查询相似,但在这里我们通过编程...

    3.Hadoop学习笔记.pdf

    MapReduce的核心思想是将计算过程分为Map(映射)和Reduce(归约)两个阶段。Map阶段并行处理输入数据,并输出中间结果;Reduce阶段则将这些中间结果合并成最终输出。 3. YARN(Yet Another Resource Negotiator)...

    Hadoop源代码分析(MapTask辅助类 I)

    MapReduce负责处理大规模数据集的并行运算任务,而MapTask作为MapReduce的核心组成部分之一,其设计与实现对于理解和优化Hadoop系统的性能至关重要。本文将对Hadoop MapTask中的辅助类进行深入分析,特别是针对...

    Hadoop源代码分析(MapTask辅助类,II)

    在Hadoop中,MapTask是MapReduce框架的关键组件,负责执行Mapper阶段的工作。MapTask辅助类,特别是MapOutputBuffer,是Mapper输出数据管理的核心部分。本文将继续深入分析MapOutputBuffer的内部实现,以便理解...

    HADOOP学习笔记

    【HADOOP学习笔记】 Hadoop是Apache基金会开发的一个开源分布式计算框架,是云计算领域的重要组成部分,尤其在大数据处理方面有着广泛的应用。本学习笔记将深入探讨Hadoop的核心组件、架构以及如何搭建云计算平台。...

    Hadoop分布式文件系统——导入和导出数据内含源码以及说明书可以自己运行复现.zip

    在IT行业中,Hadoop分布式文件系统(Hadoop Distributed File System, 简称HDFS)是一种广泛应用于大...这个压缩包提供了一个实践和学习的平台,通过实际操作,你可以更好地理解HDFS的工作机制,提升自己的编程能力。

    Hadoop学习笔记.pdf

    学习Hadoop不仅要关注当前的架构和组件,还需要持续跟踪其发展动态,以便更好地把握和应用这项技术。对于初学者而言,了解Hadoop的基本架构和组件,理解其设计哲学和适用场景,是入门的关键步骤。随着技术的深入学习...

    云计算hadoop学习笔记

    云计算,hadoop,学习笔记, dd

    Hadoop高级编程——构建与实现大数据解决方案.rar

    它分为Map阶段和Reduce阶段,Map阶段将任务分解,Reduce阶段对结果进行聚合。 三、Hadoop高级编程 1. MapReduce优化:包括任务切片优化、数据本地化、减少shuffle数据量等,以提高处理速度和效率。 2. YARN:作为...

    Hadoop课程实验和报告——Hadoop安装实验报告

    Hadoop课程实验和报告——Hadoop安装实验报告 Hadoop是一个开源的大数据处理框架,由Apache基金会开发和维护。它提供了一种可靠、可扩展、可高效的方法来存储和处理大规模数据。在本实验报告中,我们将介绍Hadoop的...

Global site tag (gtag.js) - Google Analytics