JobInProgress在正式为TaskTracker节点分配一个任务之前,它都会先检查自己是否应该给该计算节点分配自己的计算任务。当这个计算节点满足要求时,JobInProgress才会为它分配任务;否则,直接返回。而这个判断的标准主要依据以下两点:
1.TaskTracker节点所在的Host上的所有计算节点执行该作业的任务实例失败的次数;
2.JobInProgress黑名单中的TaskTracker数量;
这里先来解释一下“黑名单”,它是指当一个Host上的TaskTracker节点执行该作业的任务发生失败的次数超过一定阈值时,这个Host和它上面的所有TaskTracker节点就会进入这个作业对应的JobInProgress中的黑名单。不难看出,每一个作业的JobInProgress都有一份自己独有的黑名单。那么,上面的这两条判断标准就很容易理解了,第一点实际上就是判断这个TaskTracker节点在不在黑名单中,第二点是如果上了JobInProgress黑名单的TaskTracker数量过多的话就应该忽略该TaskTracker节点是否在黑名单中都应该可以给它分配任务,因为如果不这样做的话就有可能造成其它的计算节点负载过重以及增大该作业的响应延时,同时上了黑名单的TaskTracker节点执行该作业的任务失败的概率比较的大,但不一定会失败。判断的具体细节如下:
private static final double CLUSTER_BLACKLIST_PERCENT = 0.25; private boolean shouldRunOnTaskTracker(String taskTracker) { int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker); if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) { if (LOG.isDebugEnabled()) { String flakyTracker = convertTrackerNameToHostName(taskTracker); } return false; } return true; }
关于Host及其上的TaskTracker节点上作业黑名单的阈值是由用户在提交作业时设置的,用户对该参数的设置既可以通过JobConf的setMaxTaskFailuresPerTracker()方法也可以作业配置文件中的mapred.max.tracker.failures来设置。这里可能有人会问,JobInProgress究竟是如何知道一个TaskTracker节点执行它的一个任务发生了失败呢?
当一个TaskTracker节点向JobTracker节点发送心跳包的时候,它会向JobTracker节点报告自己正在执行的各个任务状态,JobTracker节点在收到任务状态报告之后就会将它们通知给对应的JobInProgress,之后JobInProgress就知道自己的那些任务实例被该TaskTracker节点执行失败了。另外,JobInProgress还有一个途径可以知道那些计算节点执行它的任务实例失败了。前面讲过,JobTracker节点后台运行着一个检测线程,用来检测那些TaskTracker节点失效了,当它发现一个TaskTracker节点失效时,则该TaskTracker节点当前未完成的任务实例都被看做是执行失败,所以也就会通知这些任务实例所属的JobInProgress关于其任务实例执行失败的情况。JobInProgress对Host及计算节点上黑名单处理细节如下:
void addTrackerTaskFailure(String trackerName) { if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) { //解析该TaskTracker节点所在的主机名(IP地址) String trackerHostName = convertTrackerNameToHostName(trackerName); //该主机上的TaskTracker节点执行该作业任务实例已失败的次数 Integer trackerFailures = trackerToFailuresMap.get(trackerHostName); if (trackerFailures == null) { trackerFailures = 0; } trackerToFailuresMap.put(trackerHostName, ++trackerFailures); //该Taskracker节点能否上黑名单 if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) { ++flakyTaskTrackers; LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'"); } } }
从上面的源代码可以看出,一个JobInProgress黑名单中的TaskTracker数量是有限制的,原因前面也说过,不可能让集群中所有的TaskTracker节点上它的黑名单,否则最后谁来执行它呢! 对于计算节点的检测,除了后台检测线程可以发现外,还有一个地方也可以检测到,就是当一个TaskTracker节点向JobTracker发送心跳包时,如果该节点是刚重启或者是重新初始化,同时JobTracker上又有该节点的记录,那么JobTracker就认为该节点以前的一个版本已经失效了。
除了每一个JobInProgress有自己的黑名单之外,JobTracker节点也有一份黑名单,也就是集群黑名单,它们的相同之处在于这份黑名单都是以Host为单位的,即如果一个Host上了黑名单,那么它上面的所有TaskTracker节点也就随之上了黑名单了。如果一个TaskTracker节点上了集群的黑名单,则它不会被分配任何作业的任务,而如果它上了某一个JobInProgress的黑名单,顶多只是该JobInProgress不会给它分配任务而已(前提是该黑名单不大)。因此,对于那些上了集群黑名的TaskTracker节点,它们的计算能力也就不能算作整个集群的计算能力了,这也使得它不得不限制集群黑名单的大小,否则最后就没有计算节点来完成用户提交的作业了。同样的问题,集群中的黑名单是哪儿来的呢?
首先,集群中的黑名单只来源于JobInProgress中的黑名单,而且该JobInProgress必须最终被成功执行,这个原因其实很有意思。如果一个作业最终没有被成功完成的话,那么就不是集群中的计算节点的问题,而很有可能是这个作业本身的问题,也就不能代表该JobInProgress黑名单中的TaskTracker节点丧失了一部分或全部的计算能力。这个集群的黑名单是由FaultyTrackersInfo对象类管理的,它处理一个Host上黑名单的操作很简单:
private int MAX_BLACKLISTS_PER_TRACKER = 4; private double AVERAGE_BLACKLIST_THRESHOLD = 0.50; private static double MAX_BLACKLIST_PERCENT = 0.50; /*判断一个主机是否应该上集群的黑名单*/ private boolean shouldBlacklist(String hostName, int numFaults) { if (numFaults >= MAX_BLACKLISTS_PER_TRACKER) { // calculate avgBlackLists long clusterSize = getClusterStatus().getTaskTrackers(); long sum = 0; for (FaultInfo f : potentiallyFaultyTrackers.values()) { sum += f.getFaultCount(); } double avg = (double) sum / clusterSize; long totalCluster = clusterSize + numBlacklistedTrackers; if ((numFaults - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) && numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) { return true; } } return false; } /*更新集群的计算能力及黑名单中的TaskTracker数量*/ private void removeHostCapacity(String hostName) { synchronized (taskTrackers) { // remove the capacity of trackers on this host for (TaskTrackerStatus status : getStatusesOnHost(hostName)) { totalMapTaskCapacity -= status.getMaxMapTasks(); totalReduceTaskCapacity -= status.getMaxReduceTasks(); } numBlacklistedTrackers += uniqueHostsMap.remove(hostName); } } void incrementFaults(String hostName) { synchronized (potentiallyFaultyTrackers) { FaultInfo fi = potentiallyFaultyTrackers.get(hostName); if (fi == null) { fi = new FaultInfo(); potentiallyFaultyTrackers.put(hostName, fi); } int numFaults = fi.getFaultCount(); ++numFaults; fi.setFaultCount(numFaults); fi.setLastUpdated(System.currentTimeMillis()); if (!fi.isBlacklisted()) { if (shouldBlacklist(hostName, numFaults)) { removeHostCapacity(hostName); fi.setBlacklist(true); } } } }
在上面的代码中,有三个参数决定了一个Host是否能够上黑名单,其中有两个参数同时还可以通过JobTracker的配置文件来设置:
MAX_BLACKLISTS_PER_TRACKER = conf.getInt("mapred.max.tracker.blacklists", 4); AVERAGE_BLACKLIST_THRESHOLD = conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f);
当一个Host及其上的TaskTracker节点上了集群的黑名单之后,就很难从该黑名单中消除,除非重启该Host上的计算节点,请注意这里说的是重启而不是重新初始化TaskTracker节点。当JobTracker节点发现一个刚重启并且是第一次发送心跳包时,它会首先从黑名单上清除该几点所在的Host及其上的所有TaskTracker节点,并将这些TaskTracker节点的计算能力加入到集群中,这个过程如下:
private void addHostCapacity(String hostName) { synchronized (taskTrackers) { int numTrackersOnHost = 0; // add the capacity of trackers on the host for (TaskTrackerStatus status : getStatusesOnHost(hostName)) { totalMapTaskCapacity += status.getMaxMapTasks(); totalReduceTaskCapacity += status.getMaxReduceTasks(); numTrackersOnHost++; } uniqueHostsMap.put(hostName, numTrackersOnHost); numBlacklistedTrackers -= numTrackersOnHost; } void markTrackerHealthy1(String hostName) { synchronized (potentiallyFaultyTrackers) { FaultInfo fi = potentiallyFaultyTrackers.remove(hostName); if (fi != null && fi.isBlacklisted()) { addHostCapacity(hostName); } } }
相关推荐
Hadoop动态增加节点与删除节点,本人自己整理。。
Hadoop 是一种分布式计算技术,能够处理大量数据。搭建 Hadoop 环境是学习和使用 Hadoop 的基础。本文将详细介绍如何搭建多节点 Hadoop 环境,包括安装 Ubuntu 操作系统、安装 Hadoop 软件、配置 SSH 无密码登录、...
hadoop 启动时 TaskTracker无法启动 ERROR org.apache.hadoop.mapred.TaskTracker: Can not start task tracker because java.io.IOException: Failed to set permissions of path: \tmp\hadoop-admin \mapred\...
以上便是对Hadoop分布式计算环境配置的详细解析,包括了核心配置文件的设置、masters与slaves文件的编写、Hadoop的安装与启动流程以及如何验证Hadoop是否成功启动等内容。这对于初学者理解和实践Hadoop集群的搭建...
Hadoop动态新增节点
本文档介绍了一种基于多元线性回归模型的Hadoop集群节点性能计算方法,该方法可以对Hadoop集群节点的性能进行准确的评估和优化。 什么是Hadoop集群节点性能计算? Hadoop集群节点性能计算是指对Hadoop集群中每个...
Hadoop 2.8.2 三节点集群安装及 native 编译 本文详细介绍了在 RHEL 6.7 操作系统上安装 Hadoop 2.8.2 三节点集群的步骤,包括集群环境准备、Hadoop 软件包的准备、JDK 的安装、集群成员 SSH 互信、NTP 的安装、...
Hadoop分布式计算可以将任务分配给多个节点,从而提高计算速度和效率。 10. Hadoop应用场景 Hadoop可以应用于大数据处理、数据挖掘、机器学习等领域。 Hadoop分布计算安装需要了解Hadoop架构、HDFS和MapReduce...
“hadoop杀僵尸节点”是一项关键的Hadoop运维实践,通过自动化脚本实现对集群内异常节点的有效管理,避免资源浪费,保障集群稳定高效运行。掌握这一技术,对于从事大数据处理的IT专业人士而言,是提升系统运维能力的...
Hadoop 单节点配置详解 Hadoop 是一个开源的大数据处理框架,由Apache基金会维护。它可以在单节点模式下运行,以便于开发、测试和学习。单节点模式下,Hadoop 可以在一台机器上模拟分布式环境,方便用户快速上手和...
### Hadoop单节点部署指导知识点详解 #### 一、实验目的 - **理解Hadoop原理机制**:深入了解Hadoop的工作原理及其背后的技术架构。 - **熟悉Hadoop集群体系结构**:掌握Hadoop集群中各组成部分的功能及其交互方式...
ubuntu 搭建 Hadoop 单节点 Hadoop 是一个由 Apache 基金会所开发的分布式系统根底架构,用户可以在不了解分布式底层细节的情况下,开发分布式程序。Hadoop 实现了一个分布式文件系统(HDFS),简称 HDFS。HDFS 有...
Hadoop是一款开源的大数据处理框架,主要用于处理海量数据的存储与计算任务。对于初学者来说,掌握Hadoop单节点伪分布式的安装是非常重要的第一步。本文档将详细介绍如何在RHEL6.0操作系统上进行Hadoop单节点伪...
在分布式计算领域,Hadoop是一个不可或缺的名字,它提供了一整套处理大数据的框架。在Hadoop中,远程过程调用(RPC)是核心组件之一,它使得节点间的通信变得高效且可靠。本文将深入探讨Hadoop的RPC机制,解析其工作...
Hadoop作为大数据技术栈中不可或缺的部分,它能够有效地对海量数据进行批处理计算。Hadoop的分布式存储系统HDFS能够存储大量数据,并提供数据的高容错性;而MapReduce编程模型则可以简化并行算法的开发,使得开发者...
在分布式计算领域,Hadoop是一个开源的框架,它允许在廉价硬件上处理大规模数据。对于初学者或小型测试环境,双节点Hadoop安装是一个理想的起点,因为其简单易懂,能快速理解Hadoop的工作原理。本篇文档将详细介绍...
linux入门级资源,对需要配置环境变量的初学者有一定作用,不喜勿喷。谢谢
《深入理解Hadoop分布式计算配置》 Hadoop作为开源的大数据处理框架,其核心在于分布式计算,通过将大规模数据分布在多台服务器上进行并行处理,实现高效的数据处理能力。本文将详细介绍Hadoop的分布式计算配置,...
采用虚拟机的方式,先配置好Hadoop的主节点,然后通过克隆的方式创建Slave节点,实现3节点的HDFS集群 任务二: 实验一:使用任务一搭建的集群,编程实现合并文件MergeFile的功能: 将数据集trec06p\_sample中的...
为了应对这一挑战,研究者们开始探索使用基于Hadoop分布式计算平台的仿真方法。本文将详细阐述基于Hadoop的磁流体动力学模型仿真研究的相关知识点。 Hadoop是Apache基金会开发的分布式存储与计算框架,它允许分布式...