TaskTracker的工作职责之前已经和大家提过,主要负责维护,申请和监控Task,通过heartbeat和JobTracker进行通信。
TaskTracker的init过程:
1.读取配置文件,解析参数
2.将TaskTraker上原有的用户local files删除并新建新的dir和file
3. Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>(); 清除map
4. this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();记录task的链表
this.runningJobs = new TreeMap<JobID, RunningJob>();记录job的id信息
5.初始化JVMManager:
- mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),
- true, tracker);
- reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
- false, tracker);
6.初始化RPC,获取JobTracker client用于heartbeat通信;
7.new一个 后台线程用于监听map完成的事件
- this.mapEventsFetcher = new MapEventsFetcherThread();
- mapEventsFetcher.setDaemon(true);
- mapEventsFetcher.setName(
- "Map-events fetcher for all reduce tasks " + "on " +
- taskTrackerName);
- mapEventsFetcher.start();
后台线程的run方法如下:
- while (running) {
- try {
- List <FetchStatus> fList = null;
- synchronized (runningJobs) {
- while (((fList = reducesInShuffle()).size()) == 0) {
- try {
- runningJobs.wait();
- } catch (InterruptedException e) {
- LOG.info("Shutting down: " + this.getName());
- return;
- }
- }
- }
- // now fetch all the map task events for all the reduce tasks
- // possibly belonging to different jobs
- boolean fetchAgain = false; //flag signifying whether we want to fetch
- //immediately again.
- for (FetchStatus f : fList) {
- long currentTime = System.currentTimeMillis();
- try {
- //the method below will return true when we have not
- //fetched all available events yet
- if (f.fetchMapCompletionEvents(currentTime)) {
- fetchAgain = true;
- }
- } catch (Exception e) {
- LOG.warn(
- "Ignoring exception that fetch for map completion" +
- " events threw for " + f.jobId + " threw: " +
- StringUtils.stringifyException(e));
- }
- if (!running) {
- break;
- }
- }
- synchronized (waitingOn) {
- try {
- if (!fetchAgain) {
- waitingOn.wait(heartbeatInterval);
- }
- } catch (InterruptedException ie) {
- LOG.info("Shutting down: " + this.getName());
- return;
- }
- }
- } catch (Exception e) {
- LOG.info("Ignoring exception " + e.getMessage());
- }
- }
- }
8.initializeMemoryManagement,初始化每个TrackTask的内存设置
9.new一个Map和Reducer的Launcher后台线程
- mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
- reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
- mapLauncher.start();
- reduceLauncher.start();
用于后面创建子JVM来执行map、reduce task
看一下
- TaskLauncher的run方法:
- //before preparing the job localize
- //all the archives
- TaskAttemptID taskid = t.getTaskID();
- final LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
- //simply get the location of the workDir and pass it to the child. The
- //child will do the actual dir creation
- final File workDir =
- new File(new Path(localdirs[rand.nextInt(localdirs.length)],
- TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),
- taskid.toString(),
- t.isTaskCleanupTask())).toString());
- String user = tip.getUGI().getUserName();
- // Set up the child task's configuration. After this call, no localization
- // of files should happen in the TaskTracker's process space. Any changes to
- // the conf object after this will NOT be reflected to the child.
- // setupChildTaskConfiguration(lDirAlloc);
- if (!prepare()) {
- return;
- }
- // Accumulates class paths for child.
- List<String> classPaths = getClassPaths(conf, workDir,
- taskDistributedCacheManager);
- long logSize = TaskLog.getTaskLogLength(conf);
- // Build exec child JVM args.
- Vector<String> vargs = getVMArgs(taskid, workDir, classPaths, logSize);
- tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
- // set memory limit using ulimit if feasible and necessary ...
- String setup = getVMSetupCmd();
- // Set up the redirection of the task's stdout and stderr streams
- File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
- File stdout = logFiles[0];
- File stderr = logFiles[1];
- tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout,
- stderr);
- Map<String, String> env = new HashMap<String, String>();
- errorInfo = getVMEnvironment(errorInfo, user, workDir, conf, env, taskid,
- logSize);
- // flatten the env as a set of export commands
- List <String> setupCmds = new ArrayList<String>();
- for(Entry<String, String> entry : env.entrySet()) {
- StringBuffer sb = new StringBuffer();
- sb.append("export ");
- sb.append(entry.getKey());
- sb.append("=\"");
- sb.append(entry.getValue());
- sb.append("\"");
- setupCmds.add(sb.toString());
- }
- setupCmds.add(setup);
- launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
- tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
- if (exitCodeSet) {
- if (!killed && exitCode != 0) {
- if (exitCode == 65) {
- tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
- }
- throw new IOException("Task process exit with nonzero status of " +
- exitCode + ".");
- }
- }
- }
run方法为当前task new一个child JVM,为其设置文件路径,上下文环境,JVM启动参数和启动命令等信息,然后调用TaskControll方法启动新的JVM执行对应的Task工作。
各个类关系图如下所示:
最后以TaskController的launchTask截至
10.然后开始 startHealthMonitor(this.fConf);
再来看看TaskLauncher的run方法,就是不停的循环去获取TaskTracker中新的task,然后调用startNewTask方法
- if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
- this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
- this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
- localizeTask(task);
- if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
- this.taskStatus.setRunState(TaskStatus.State.RUNNING);
- }
- setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
- this.runner.start();
- long now = System.currentTimeMillis();
- this.taskStatus.setStartTime(now);
- this.lastProgressReport = now;
TaskTracker的run方法:通过维护心跳和JobTracker通信,以获取、杀掉新的Task,重点看一下heartBeat通信过程:
- synchronized (this) {
- askForNewTask =
- ((status.countOccupiedMapSlots() < maxMapSlots ||
- status.countOccupiedReduceSlots() < maxReduceSlots) &&
- acceptNewTasks);
- localMinSpaceStart = minSpaceStart;
- }
- if (askForNewTask) {
- askForNewTask = enoughFreeSpace(localMinSpaceStart);
- long freeDiskSpace = getFreeSpace();
- long totVmem = getTotalVirtualMemoryOnTT();
- long totPmem = getTotalPhysicalMemoryOnTT();
- long availableVmem = getAvailableVirtualMemoryOnTT();
- long availablePmem = getAvailablePhysicalMemoryOnTT();
- long cumuCpuTime = getCumulativeCpuTimeOnTT();
- long cpuFreq = getCpuFrequencyOnTT();
- int numCpu = getNumProcessorsOnTT();
- float cpuUsage = getCpuUsageOnTT();
- status.getResourceStatus().setAvailableSpace(freeDiskSpace);
- status.getResourceStatus().setTotalVirtualMemory(totVmem);
- status.getResourceStatus().setTotalPhysicalMemory(totPmem);
- status.getResourceStatus().setMapSlotMemorySizeOnTT(
- mapSlotMemorySizeOnTT);
- status.getResourceStatus().setReduceSlotMemorySizeOnTT(
- reduceSlotSizeMemoryOnTT);
- status.getResourceStatus().setAvailableVirtualMemory(availableVmem);
- status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
- status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
- status.getResourceStatus().setCpuFrequency(cpuFreq);
- status.getResourceStatus().setNumProcessors(numCpu);
- status.getResourceStatus().setCpuUsage(cpuUsage);
- }
- //add node health information
- TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
- synchronized (this) {
- if (healthChecker != null) {
- healthChecker.setHealthStatus(healthStatus);
- } else {
- healthStatus.setNodeHealthy(true);
- healthStatus.setLastReported(0L);
- healthStatus.setHealthReport("");
- }
- }
- //
- // Xmit the heartbeat
- //
- HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
- justStarted,
- justInited,
- askForNewTask,
- heartbeatResponseId);
该方法主要将TaskTracker上的各种性能参数信息反馈给JobTraker,调用其heartbeat方法然后解析返回的结果,下篇详细分析heartBeat机制
相关推荐
基于Hadoop的成绩分析系统 本文档介绍了基于Hadoop的成绩分析系统的设计和实现。Hadoop是一个分布式开源计算平台,具有高可靠性、高扩展性、高效性和高容错性等特点。该系统使用Hadoop的分布式文件系统HDFS和...
Hive是建立在Hadoop之上的数据仓库工具,它允许用户使用SQL-like语言进行数据查询和分析,简化了大数据分析的过程。Hive将SQL语句转换为MapReduce任务运行在Hadoop集群上,提供了一种更易用的接口来处理Hadoop中的大...
如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop进行数据分析.zip 如何使用hadoop...
【基于Hadoop豆瓣电影数据分析实验报告】 在大数据时代,对海量信息进行高效处理和分析是企业决策的关键。Hadoop作为一款强大的分布式计算框架,自2006年诞生以来,已经在多个领域展现了其卓越的数据处理能力。本...
,Hadoop 技术已经在互联网领域得到了广泛的应用。...同样也得到了许多公司的青睐,如百度主要将Hadoop 应用于日志分析和网页数据库的数据 挖掘;阿里巴巴则将Hadoop 用于商业数据的排序和搜索引擎的优化等。
针对本次实验,我们需要用到Hadoop集群作为模拟大数据的分析软件,集群环境必须要包括,hdfs,hbase,hive,flume,sqoop等插件,最后结合分析出来的数据进行可视化展示,需要用到Python(爬取数据集,可视化展示)...
本主题将深入探讨Hadoop在数据分析中的应用及其生态系统的关键技术。 首先,我们需要理解“大数据”的概念。大数据指的是无法用传统数据库软件工具捕获、管理和处理的大规模数据集。这些数据集通常具有三个关键特征...
HDFS 是 Hadoop 的核心组件之一,是一个分布式文件系统。HDFS 的主要功能是提供一个高可靠、高可扩展的文件系统,可以存储大量的数据。HDFS 的架构主要包括以下几个部分: * Namenode:负责管理文件系统的命名空间...
【基于Hadoop的电影影评数据分析】是一项大数据课程的大作业,旨在利用Hadoop的分布式处理能力来分析电影影评数据。Hadoop是一个由Apache软件基金会开发的开源框架,专为处理和存储大规模数据而设计。它由四个核心...
Hadoop源代码分析完整版.pdf
Hadoop豆瓣电影数据分析(Hadoop)操作源码
**基于Hadoop平台的数据仓库可行性分析报告** **1. 引言** 在信息化时代,企业对数据处理的需求日益增长,传统的数据仓库系统由于其规模、性能和灵活性的限制,已经无法满足现代企业对大数据处理的需求。Hadoop作为...
使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop进行数据分析天气数据分析.zip使用hadoop...
《Hadoop源代码分析》是一本深入探讨Hadoop核心组件MapReduce的专著。Hadoop是Apache软件基金会的一个开源项目,旨在提供分布式存储和计算框架,以处理和存储大量数据。MapReduce是Hadoop的核心计算模型,它通过将大...
基于Hadoop网站流量日志数据分析系统 1、典型的离线流数据分析系统 2、技术分析 - Hadoop - nginx - flume - hive - mysql - springboot + mybatisplus+vcharts nginx + lua 日志文件埋点的 基于Hadoop网站流量...
深入云计算:Hadoop源代码分析(修订版)
### 深入云计算 Hadoop源代码分析 #### 一、引言 随着大数据时代的到来,数据处理成为了各个领域中的关键技术之一。Hadoop作为一个开源的大数据处理框架,因其优秀的分布式计算能力,在业界得到了广泛的应用。...
Hadoop源码分析是深入理解Hadoop分布式计算平台原理的起点,通过源码分析,可以更好地掌握Hadoop的工作机制、关键组件的实现方式和内部通信流程。Hadoop项目包括了多个子项目,其中最核心的是HDFS和MapReduce,这两...
本文将深入探讨“Hadoop之外卖订单数据分析系统”,并介绍如何利用Hadoop进行大规模数据处理,以及如何将分析结果通过可视化手段进行展示。 首先,我们需要理解Hadoop的核心组件:HDFS(Hadoop Distributed File ...