`
kelvinliu117
  • 浏览: 20132 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop之TaskTraker分析(转)

 
阅读更多

TaskTracker的工作职责之前已经和大家提过,主要负责维护,申请和监控Task,通过heartbeat和JobTracker进行通信。

     TaskTracker的init过程:

     1.读取配置文件,解析参数

     2.将TaskTraker上原有的用户local files删除并新建新的dir和file

     3. Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>(); 清除map

     4.    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();记录task的链表
            this.runningJobs = new TreeMap<JobID, RunningJob>();记录job的id信息

     5.初始化JVMManager:

  1. mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),   
  2.       true, tracker);  
  3.   reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),  
  4.       false, tracker);  

     6.初始化RPC,获取JobTracker client用于heartbeat通信;

 

     7.new一个 后台线程用于监听map完成的事件

  1. this.mapEventsFetcher = new MapEventsFetcherThread();  
  2. mapEventsFetcher.setDaemon(true);  
  3. mapEventsFetcher.setName(  
  4.                          "Map-events fetcher for all reduce tasks " + "on " +   
  5.                          taskTrackerName);  
  6. mapEventsFetcher.start();  

    后台线程的run方法如下:

 

 

  1. while (running) {  
  2.        try {  
  3.          List <FetchStatus> fList = null;  
  4.          synchronized (runningJobs) {  
  5.            while (((fList = reducesInShuffle()).size()) == 0) {  
  6.              try {  
  7.                runningJobs.wait();  
  8.              } catch (InterruptedException e) {  
  9.                LOG.info("Shutting down: " + this.getName());  
  10.                return;  
  11.              }  
  12.            }  
  13.          }  
  14.          // now fetch all the map task events for all the reduce tasks  
  15.          // possibly belonging to different jobs  
  16.          boolean fetchAgain = false//flag signifying whether we want to fetch  
  17.                                      //immediately again.  
  18.          for (FetchStatus f : fList) {  
  19.            long currentTime = System.currentTimeMillis();  
  20.            try {  
  21.              //the method below will return true when we have not   
  22.              //fetched all available events yet  
  23.              if (f.fetchMapCompletionEvents(currentTime)) {  
  24.                fetchAgain = true;  
  25.              }  
  26.            } catch (Exception e) {  
  27.              LOG.warn(  
  28.                       "Ignoring exception that fetch for map completion" +  
  29.                       " events threw for " + f.jobId + " threw: " +  
  30.                       StringUtils.stringifyException(e));   
  31.            }  
  32.            if (!running) {  
  33.              break;  
  34.            }  
  35.          }  
  36.          synchronized (waitingOn) {  
  37.            try {  
  38.              if (!fetchAgain) {  
  39.                waitingOn.wait(heartbeatInterval);  
  40.              }  
  41.            } catch (InterruptedException ie) {  
  42.              LOG.info("Shutting down: " + this.getName());  
  43.              return;  
  44.            }  
  45.          }  
  46.        } catch (Exception e) {  
  47.          LOG.info("Ignoring exception "  + e.getMessage());  
  48.        }  
  49.      }  
  50.    }   

8.initializeMemoryManagement,初始化每个TrackTask的内存设置

 

9.new一个Map和Reducer的Launcher后台线程

 

  1. mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);  
  2.  reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);  
  3.  mapLauncher.start();  
  4.  reduceLauncher.start();  

用于后面创建子JVM来执行map、reduce task

 

看一下

  1. TaskLauncher的run方法:  
  2.  //before preparing the job localize   
  3.       //all the archives  
  4.       TaskAttemptID taskid = t.getTaskID();  
  5.       final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");  
  6.       //simply get the location of the workDir and pass it to the child. The  
  7.       //child will do the actual dir creation  
  8.       final File workDir =  
  9.       new File(new Path(localdirs[rand.nextInt(localdirs.length)],   
  10.           TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),   
  11.           taskid.toString(),  
  12.           t.isTaskCleanupTask())).toString());  
  13.         
  14.       String user = tip.getUGI().getUserName();  
  15.         
  16.       // Set up the child task's configuration. After this call, no localization  
  17.       // of files should happen in the TaskTracker's process space. Any changes to  
  18.       // the conf object after this will NOT be reflected to the child.  
  19.       // setupChildTaskConfiguration(lDirAlloc);  
  20.   
  21.       if (!prepare()) {  
  22.         return;  
  23.       }  
  24.         
  25.       // Accumulates class paths for child.  
  26.       List<String> classPaths = getClassPaths(conf, workDir,  
  27.                                               taskDistributedCacheManager);  
  28.   
  29.       long logSize = TaskLog.getTaskLogLength(conf);  
  30.         
  31.       //  Build exec child JVM args.  
  32.       Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);  
  33.         
  34.       tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);  
  35.   
  36.       // set memory limit using ulimit if feasible and necessary ...  
  37.       String setup = getVMSetupCmd();  
  38.       // Set up the redirection of the task's stdout and stderr streams  
  39.       File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());  
  40.       File stdout = logFiles[0];  
  41.       File stderr = logFiles[1];  
  42.       tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,  
  43.                  stderr);  
  44.         
  45.       Map<String, String> env = new HashMap<String, String>();  
  46.       errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,  
  47.                                    logSize);  
  48.         
  49.       // flatten the env as a set of export commands  
  50.       List <String> setupCmds = new ArrayList<String>();  
  51.       for(Entry<String, String> entry : env.entrySet()) {  
  52.         StringBuffer sb = new StringBuffer();  
  53.         sb.append("export ");  
  54.         sb.append(entry.getKey());  
  55.         sb.append("=\"");  
  56.         sb.append(entry.getValue());  
  57.         sb.append("\"");  
  58.         setupCmds.add(sb.toString());  
  59.       }  
  60.       setupCmds.add(setup);  
  61.         
  62.       launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);  
  63.       tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());  
  64.       if (exitCodeSet) {  
  65.         if (!killed && exitCode != 0) {  
  66.           if (exitCode == 65) {  
  67.             tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());  
  68.           }  
  69.           throw new IOException("Task process exit with nonzero status of " +  
  70.               exitCode + ".");  
  71.         }  
  72.       }  
  73.     }  

run方法为当前task new一个child JVM,为其设置文件路径,上下文环境,JVM启动参数和启动命令等信息,然后调用TaskControll方法启动新的JVM执行对应的Task工作。

 

各个类关系图如下所示:

最后以TaskController的launchTask截至

10.然后开始  startHealthMonitor(this.fConf);

 

 

再来看看TaskLauncher的run方法,就是不停的循环去获取TaskTracker中新的task,然后调用startNewTask方法

 

  1. if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||  
  2.          this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||  
  3.          this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {  
  4.        localizeTask(task);  
  5.        if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {  
  6.          this.taskStatus.setRunState(TaskStatus.State.RUNNING);  
  7.        }  
  8.        setTaskRunner(task.createRunner(TaskTracker.thisthis, rjob));  
  9.        this.runner.start();  
  10.        long now = System.currentTimeMillis();  
  11.        this.taskStatus.setStartTime(now);  
  12.        this.lastProgressReport = now;  

TaskTracker的run方法:通过维护心跳和JobTracker通信,以获取、杀掉新的Task,重点看一下heartBeat通信过程:

 

 

  1. synchronized (this) {  
  2.      askForNewTask =   
  3.        ((status.countOccupiedMapSlots() < maxMapSlots ||   
  4.          status.countOccupiedReduceSlots() < maxReduceSlots) &&   
  5.         acceptNewTasks);   
  6.      localMinSpaceStart = minSpaceStart;  
  7.    }  
  8.    if (askForNewTask) {  
  9.      askForNewTask = enoughFreeSpace(localMinSpaceStart);  
  10.      long freeDiskSpace = getFreeSpace();  
  11.      long totVmem = getTotalVirtualMemoryOnTT();  
  12.      long totPmem = getTotalPhysicalMemoryOnTT();  
  13.      long availableVmem = getAvailableVirtualMemoryOnTT();  
  14.      long availablePmem = getAvailablePhysicalMemoryOnTT();  
  15.      long cumuCpuTime = getCumulativeCpuTimeOnTT();  
  16.      long cpuFreq = getCpuFrequencyOnTT();  
  17.      int numCpu = getNumProcessorsOnTT();  
  18.      float cpuUsage = getCpuUsageOnTT();  
  19.   
  20.      status.getResourceStatus().setAvailableSpace(freeDiskSpace);  
  21.      status.getResourceStatus().setTotalVirtualMemory(totVmem);  
  22.      status.getResourceStatus().setTotalPhysicalMemory(totPmem);  
  23.      status.getResourceStatus().setMapSlotMemorySizeOnTT(  
  24.          mapSlotMemorySizeOnTT);  
  25.      status.getResourceStatus().setReduceSlotMemorySizeOnTT(  
  26.          reduceSlotSizeMemoryOnTT);  
  27.      status.getResourceStatus().setAvailableVirtualMemory(availableVmem);   
  28.      status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);  
  29.      status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);  
  30.      status.getResourceStatus().setCpuFrequency(cpuFreq);  
  31.      status.getResourceStatus().setNumProcessors(numCpu);  
  32.      status.getResourceStatus().setCpuUsage(cpuUsage);  
  33.    }  
  34.    //add node health information  
  35.      
  36.    TaskTrackerHealthStatus healthStatus = status.getHealthStatus();  
  37.    synchronized (this) {  
  38.      if (healthChecker != null) {  
  39.        healthChecker.setHealthStatus(healthStatus);  
  40.      } else {  
  41.        healthStatus.setNodeHealthy(true);  
  42.        healthStatus.setLastReported(0L);  
  43.        healthStatus.setHealthReport("");  
  44.      }  
  45.    }  
  46.    //  
  47.    // Xmit the heartbeat  
  48.    //  
  49.    HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,   
  50.                                                              justStarted,  
  51.                                                              justInited,  
  52.                                                              askForNewTask,   
  53.                                                              heartbeatResponseId);  


该方法主要将TaskTracker上的各种性能参数信息反馈给JobTraker,调用其heartbeat方法然后解析返回的结果,下篇详细分析heartBeat机制

分享到:
评论

相关推荐

    基于Hadoop的成绩分析系统.docx

    基于Hadoop的成绩分析系统 本文档介绍了基于Hadoop的成绩分析系统的设计和实现。Hadoop是一个分布式开源计算平台,具有高可靠性、高扩展性、高效性和高容错性等特点。该系统使用Hadoop的分布式文件系统HDFS和...

    基于Hadoop数据分析系统设计(需求分析).docx

    Hive是建立在Hadoop之上的数据仓库工具,它允许用户使用SQL-like语言进行数据查询和分析,简化了大数据分析的过程。Hive将SQL语句转换为MapReduce任务运行在Hadoop集群上,提供了一种更易用的接口来处理Hadoop中的大...

    如何使用hadoop进行数据分析.zip

    如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop...

    基于Hadoop豆瓣电影数据分析实验报告

    【基于Hadoop豆瓣电影数据分析实验报告】 在大数据时代,对海量信息进行高效处理和分析是企业决策的关键。Hadoop作为一款强大的分布式计算框架,自2006年诞生以来,已经在多个领域展现了其卓越的数据处理能力。本...

    Hadoop应用案例分析:雅虎、eBay、百度、Facebook.pdf

    ,Hadoop 技术已经在互联网领域得到了广泛的应用。...同样也得到了许多公司的青睐,如百度主要将Hadoop 应用于日志分析和网页数据库的数据 挖掘;阿里巴巴则将Hadoop 用于商业数据的排序和搜索引擎的优化等。

    Hadoop豆瓣电影分析可视化源码

    针对本次实验,我们需要用到Hadoop集群作为模拟大数据的分析软件,集群环境必须要包括,hdfs,hbase,hive,flume,sqoop等插件,最后结合分析出来的数据进行可视化展示,需要用到Python(爬取数据集,可视化展示)...

    Hadoop数据分析_大数据_hadoop_数据分析_

    本主题将深入探讨Hadoop在数据分析中的应用及其生态系统的关键技术。 首先,我们需要理解“大数据”的概念。大数据指的是无法用传统数据库软件工具捕获、管理和处理的大规模数据集。这些数据集通常具有三个关键特征...

    Hadoop源代码分析(完整版).pdf

    HDFS 是 Hadoop 的核心组件之一,是一个分布式文件系统。HDFS 的主要功能是提供一个高可靠、高可扩展的文件系统,可以存储大量的数据。HDFS 的架构主要包括以下几个部分: * Namenode:负责管理文件系统的命名空间...

    基于Hadoop的电影影评数据分析

    【基于Hadoop的电影影评数据分析】是一项大数据课程的大作业,旨在利用Hadoop的分布式处理能力来分析电影影评数据。Hadoop是一个由Apache软件基金会开发的开源框架,专为处理和存储大规模数据而设计。它由四个核心...

    Hadoop源代码分析完整版.pdf

    Hadoop源代码分析完整版.pdf

    Hadoop豆瓣电影数据分析(Hadoop)操作源码

    Hadoop豆瓣电影数据分析(Hadoop)操作源码

    构建企业级数仓-Hadoop可行性分析报告.docx

    **基于Hadoop平台的数据仓库可行性分析报告** **1. 引言** 在信息化时代,企业对数据处理的需求日益增长,传统的数据仓库系统由于其规模、性能和灵活性的限制,已经无法满足现代企业对大数据处理的需求。Hadoop作为...

    使用hadoop进行天气数据分析.zip

    使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop...

    Hadoop源代码分析

    《Hadoop源代码分析》是一本深入探讨Hadoop核心组件MapReduce的专著。Hadoop是Apache软件基金会的一个开源项目,旨在提供分布式存储和计算框架,以处理和存储大量数据。MapReduce是Hadoop的核心计算模型,它通过将大...

    基于Hadoop网站流量日志数据分析系统.zip

    基于Hadoop网站流量日志数据分析系统 1、典型的离线流数据分析系统 2、技术分析 - Hadoop - nginx - flume - hive - mysql - springboot + mybatisplus+vcharts nginx + lua 日志文件埋点的 基于Hadoop网站流量...

    深入云计算:Hadoop源代码分析(修订版)

    深入云计算:Hadoop源代码分析(修订版)

    深入云计算 Hadoop源代码分析

    ### 深入云计算 Hadoop源代码分析 #### 一、引言 随着大数据时代的到来,数据处理成为了各个领域中的关键技术之一。Hadoop作为一个开源的大数据处理框架,因其优秀的分布式计算能力,在业界得到了广泛的应用。...

    Hadoop源码分析(完整版)

    Hadoop源码分析是深入理解Hadoop分布式计算平台原理的起点,通过源码分析,可以更好地掌握Hadoop的工作机制、关键组件的实现方式和内部通信流程。Hadoop项目包括了多个子项目,其中最核心的是HDFS和MapReduce,这两...

    Hadoop之外卖订单数据分析系统

    本文将深入探讨“Hadoop之外卖订单数据分析系统”,并介绍如何利用Hadoop进行大规模数据处理,以及如何将分析结果通过可视化手段进行展示。 首先,我们需要理解Hadoop的核心组件:HDFS(Hadoop Distributed File ...

Global site tag (gtag.js) - Google Analytics