1 JobTracker启动过程
1.1 各种线程功能
函数offerService()会启动JobTracker内部几个比较重要的后台服务进程,分别是expireTrackersThread、retireJobsThread、expireLaunchingTaskThread和completedJobsStoreThread。相关代码如下:
public class JobTracker { ... ... ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks(); Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks, "expireLaunchingTasks"); ... ... public void offerService() throws InterruptedException, IOException { ... ... // expireTrackersThread后台服务进程。 this.expireTrackersThread = new Thread(this.expireTrackers, "expireTrackers"); this.expireTrackersThread.start();
// retireJobsThread后台服务进程。 this.retireJobsThread = new Thread(this.retireJobs, "retireJobs"); this.retireJobsThread.start();
// expireLaunchingTaskThread后台服务进程。 expireLaunchingTaskThread.start();
// completedJobsStoreThread后台服务进程。 if (completedJobStatusStore.isActive()) { completedJobsStoreThread = new Thread(completedJobStatusStore, "completedjobsStore-housekeeper"); completedJobsStoreThread.start(); } ... ... } } |
下面分别介绍这几个服务线程。
1) expireTrackersThread线程
该线程主要用于发现和清理死掉的TaskTracker。每个TaskTracker会周期性地通过心跳向JobTracker汇报信息,而JobTracker会记录每个TaskTracker最近的汇报心跳时间。如果某个TaskTracker在10分钟内未汇报心跳,则JobTracker认为它已死掉,并将经的相关信息从数据结构trackToJobsToCleanup、trackerToTasksToCleanup、trackerToMarkedTasksMap中清除,同时将正在运行的任务状态标注为KILLED_UNCLEAN。
2) retireJobsThread线程
该线程主要用于清理长时间驻留在内存中的已经运行完成的作业信息。JobTracker会将已经运行完成的作业信息存放到内存中,以便外部查询,但随着完成的作业越来越多,势必会占用JobTracker的大量内存,为此,JobTracker通过该线程清理驻留在内存中较长时间的已经运行完成的作业信息。
当一个作业满足如下条件1、2或者条件1、3时,将被从数据结构jobs转移到过期作业队列中。
条件1 作业已经运行完成,即运行状态为SUCCESSED、FAILED或KILLED。
条件2 作业完成时间距现在已经超过24小时(可通过参数mapred.jobtracker.retirejob.interval配置)。
条件3 作业拥有者已经完成作业总数超过100(可通过参数mapred.jobtracker.completeuserjobs.maximum配置)个。
过期作业被统一保存到过期队列中。当过期作业超过1000个(可通过参数mapred.job.tracker.retiredjobs.cache.size配置)时,将会从内存中彻底删除。
3) expireLaunchingTaskThread线程
该线程用于发现已经被分配给某个TaskTracker但一直未汇报信息的任务。当JobTracker将某个任务分配给TaskTracker后,如果该任务在10分钟内未汇报进度,则JobTracker认为该任务分配失败,并将其状态标注为FAILED。
4) completedJobsStoreThread线程
该线程将已经运行完成的作业运行信息保存到HDFS上,并提供了一套存取这些信息的API。该线程能够解决以下两个问题。
n 用户无法获取很久之前的作业运行信息:前面提到线程retireJobsThread会清除长时间驻留在内存中的完成作业,这会导致用户无法查询很久之前某个作业的运行信息。
n JobTracker重启后作业运行信息丢失:当JobTracker因故障重启后,所有原本保存到内存中的作业信息将会全部丢失。
该线程通过保存作业运行日志的方式,使得用户可以查询任意时间提交的作业和还原作业的运行信息。
默认情况下,该线程不会启用,可以通过下表所示的几个参数配置并启用该线程。
配置参数 |
参数含义 |
mapred.job.tracker.persist.jobstatus.active |
是否启用该线程 |
mapred.job.tracker.persist.jobstatus.hours |
作业运行信息保存时间 |
mapred.job.tracker.persist.jobstatus.dir |
作业运行信息保存路径 |
1.2 作业恢复
在MapReduce中,JobTracker存在单点故障问题。如果它因异常退出后重启,那么所有正在运行的作业运行时信息将丢失。如果不采用适当的作业恢复机制对作业信息进行恢复,则所有作业需重新提交,且已经计算完成的任务需重新计算。这势必造成资源浪费。
为了解决JobTracker面临的单点故障问题,Hadoop设计了作业恢复机制,过程如下:作业从提交到运行结束的整个过程中,JobTracker会为一些关键事件记录日志(由JobHistory类完成)。对于作业而言,关键事件包括作业提交、作业创建、作业开始运行、作业运行完成、作业运行失败、作业被杀死等;对于任务而言,关键事件包括任务创建、任务开始运行、任务运行结束、任务运行失败、任务被杀死等。当JobTracker因故障重启后(重启过程中,所有TaskTracker仍然活着),如果管理员启用了作业恢复功能(将参数mapred.jobtracker.restart.recover置为true),则JobTracker会检查是否存在需要恢复运行状态的作业,如果有,则通过日志恢复这些作业的运行状态(由RecoveryManager类完成),并重新调度那些未运行完成的任务(包括产生部分结果的任务)。
2 心跳接收与应答
心跳是沟通TaskTracker和JobTracker的桥梁,它实际上是一个RPC函数。TaskTracker周期性地调用该函数汇报节点和任务状态信息,从而形成心跳。在Hadoop中,心跳主要有三个作用:
n 判断TaskTracker是否活着。
n 及时让JobTracker获取各个节点上的资源使用情况和任务运行状态。
n 为TaskTracker分配任务。
TaskTracker周期性地调用RPC函数heartbeat向JobTracker汇报信息和领取任务。该函数定义如下:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted,boolean initialContact,boolean acceptNewTasks, short responseId) |
该函数的各个参数含义如下。
status |
该参数封装了TaskTracker上的各种状态信息。包括: String trackerName;//TaskTracker名称 String host;//TaskTracker主机名 int httpPort;//TaskTracker对外的HTTP端口号 int failures;//该TaskTracker上已经失败的任务总数 List<TaskStatus> taskReports;//正在运行的各个任务运行状态 volatile long lastSeen;//上次汇报心跳的时间 private int maxMapTasks;/*Map slot总数,即允许同时运行的Map Task总数,由参数mapred.tasktracker.map.tasks.maximum设定*/ private int maxReduceTasks;//Reduce slot总数 private TaskTrackerHealthStatus healthStatus;//TaskTracker健康状态 private ResourceStatus resStatus;//TaskTracker资源(内存,CPU等)信息 |
restarted |
表示TaskTracker是否刚刚重新启动。 |
initialContact |
表示TaskTracker是否初次连接JobTracker |
acceptNewTasks |
表示TaskTracker是否可以接收新任务,这通常取决于slot是否有剩余和节点健康状态等。 |
responseId |
表示心跳响应编号,用于防止重复发送心跳。每接收一次心跳后,该值加1。 |
该函数的返回值为一个HeartbeatResponse对象,该对象主要封装了JobTracker向TaskTracker下达的命令,具体如下:
class HeartbeatResponse implements Writable, Configurable { ... ... short responseId; // 心跳响应编号 int heartbeatInterval; // 下次心跳的发送间隔 TaskTrackerAction[] actions; // 来自JobTracker的命令,可能包括杀死作业等 Set<JobID> recoveredJobs = new HashSet<JobID>(); // 恢复完成的作业列表。 ... ... } |
该函数的内部实现逻辑主要分为两个步骤:更新状态和下达命令。JobTracker首先将TaskTracker汇报的最新任务运行状态保存到相应数据结构中,然后根据这些状态信息和外界需求为其下达相应的命令。
2.1 更新状态
函数heartbeat首先会更新TaskTracker/Job/Task的状态信息。相关代码如下:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... /* Make sure heartbeat is from a tasktracker allowed by the jobtracker. 当一个TaskTracker在host list(由参数mapred.hosts指定)中,但不在exclude list(由参数mapred.hosts.exclude指定)中时,可接入到JobTracker */ if (!acceptTaskTracker(status)) { throw new DisallowedTaskTrackerException(status); }
// 如果该TaskTracker被重启了,则将之标注为健康的TaskTracker,并从黑名单或者灰名单中清除,否则,启动TaskTracker容错机制以检查它是否处于健康状态。 if (restarted) { faultyTrackers.markTrackerHealthy(status.getHost()); } else { faultyTrackers.checkTrackerFaultTimeout(status.getHost(), now); } ...... // Process this heartbeat short newResponseId = (short)(responseId + 1); // 记录心跳发送时间,以发现在一定时间内未发送心跳的TaskTracker,交将其标注为死亡的TaskTracker,此后不可再向其分配新任务。 status.setLastSeen(now); if (!processHeartbeat(status, initialContact, now)) { // 处理心跳 ... ... } ... ... } |
接下来,跟踪进入函数processHeartbeat内部。该函数首先进行一系列异常情况检查,然后更新TaskTracker/Job/Task的状态信息。相关代码如下:
private synchronized boolean processHeartbeat( TaskTrackerStatus trackerStatus, boolean initialContact, long timeStamp) throws UnknownHostException { ... ... updateTaskStatuses(trackerStatus); // 更新Task状态信息 updateNodeHealthStatus(trackerStatus, timeStamp); // 更新节点健康状态 ... ... } |
2.2 下达命令
更新完状态信息后,JobTracker要为TaskTracker构造一个HeartbeatResponse对象作为心跳应答。该对象主要有两部分内容:下达给TaskTracker的命令和下次汇报心跳的时间间隔。下面分别对它们进行介绍:
1. 下达命令
JobTracker将下达给TaskTracker的命令封装成TaskTrackerAction类,主要包括ReinitTrackerAction(重新初始化)、LauchTaskAction(运行新任务)、KillTaskAction(杀死任务)、KillJobAction(杀死作业)和CommitTaskAction(提交任务)五种。下面依次对这几个命令进行介绍。
1) ReinitTrackerAction
JobTracker收到TaskTracker发送过来的心跳信息后,首先要进行一致性检查,如果发现异常情况,则会要求TaskTracker重新对自己进行初始化,以恢复到一致的状态。当出现以下两种不一致情况时,JobTracker会向TaskTracker下达ReinitTrackerAction命令。
n 丢失上次心跳应答信息:JobTracker会保存向每个TaskTracker发送的最近心跳应答信息,如果JobTracker未刚刚重启且一个TaskTracker并非初次连接JobTracker(initialContact!=true),而最近的心跳应答信息丢失了,则这是一种不一致状态。
n 丢失TaskTracker状态信息:JobTracker接收到任何一个心跳信息后,会将TaskTracker状态(封装在类TaskTrackerStatus中)信息保存起来。如果一个TaskTracker非初次连接JobTracker但状态信息却不存在,则也是一种不一致状态。
相关代码如下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... return new HeartbeatResponse(responseId, new TaskTrackerAction[] {new ReinitTrackerAction()}); ... ... } } |
2) LauchTaskAction
该类封装了TaskTracker新分配的任务。TaskTracker接收到该命令后会启动一个子进程运行该任务。Hadoop将一个作业分解后的任务分成两大类:计算型任务和辅助型任务。其中,计算型任务是处理实际数据的任务,包括Map Task和Reduce Task两种(对应TaskType类中的MAP和REDUCE两种类型),由专门的任务调度器对它们进行调度;而辅助型任务则不会处理实际的数据,通常用于同步计算型任务或者清理磁盘上无用的目录,包括job-setup task、job-cleanup task和task-cleanup task三种(对应TaskType类中的JOB_SETUP,JOB_CLEANUP和TASK_CLEANUP三种类型),其中,job-setup task和job-cleanup task分别用作计算型任务开始运行同步标识和结束运行同步标识,而task-cleanup task则用于清理失败的计算型任务已经写到磁盘上的部分结果,这种任务由JobTracker负责调度,且运行优先级高于计算型任务。
如果一个正常(不在黑名单中)的TaskTracker尚有空闲slot(acceptNewTasks为true),则JobTracker会为该TaskTracker分配新任务,任务选择顺序是:先辅助型任务,再计算型任务。而对于辅助型任务,选择顺序依次为job-cleanup task、task-cleanup task和job-setup task,具体代码如下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); // 如果没有辅助型任务,则选择计算型任务 if (tasks == null ) { // 由任务调度器选择一个或多个计算型任务 tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName)); } if (tasks != null) { for (Task task : tasks) { expireLaunchingTasks.addNewTask(task.getTaskID()); if(LOG.isDebugEnabled()) { LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID()); } // 将分配的任务封装成LauchTAskAction actions.add(new LaunchTaskAction(task)); } ... ... } ... ... } |
3) KillTaskAction
该类封装了TaskTracker需杀死的任务。TaskTracker收到该命令后会杀掉对应任务、清理工作目录和释放slot。导致JobTracker向TaskTracker发送该命令的原因有很多,主要包括以下几个场景:
n 用户使用命令“bin/hadoop job -kill-task”或者“bin/hadoop job -fail-task”杀死一个任务或者使一个任务失败。
n 启用推测执行机制后,同一份数据可能同时由两个Task Attempt处理。当其中一个Task Attempt执行成功后,另外一个处理相同数据的Task Attempt将被杀掉。
n 某个作业运行失败,它的所有任务将被杀掉。
n TaskTracker在一定时间内未汇报心跳,则JobTracker认为其死掉,它上面的所有Task均被标注为死亡。
相关代码如下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // Check for tasks to be killed List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName); if (killTasksList != null) { actions.addAll(killTasksList); } ... ... } } |
4) KillJobAciton
该类封装了TaskTracker待清理的作业。TaskTracker接收到该命令后,会清理作业的临时目录。导致JobTracker向TaskTracker发送该命令的原因有很多,主要包括以下几个场景:
n 用户使用命令“”或者“”杀死一个作业或者是使一个作业失败。
n 作业运行完成,通知TaskTracker清理该作业的工作目录。
n 作业运行失败,即同一个作业失败的Task数目超过一定比例。
相关代码如下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // Check for jobs to be killed/cleanedup List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName); if (killJobsList != null) { actions.addAll(killJobsList); } ... ... } } |
5) CommitTaskAction
该类封装了TaskTracker需提交的任务。为了防止同一个TaskInProgress的两个同时运行的Task Attempt(比如打开推测执行功能,一个任务可能存在备份任务)同时打开一个文件或者往一个文件中写数据而产生冲突,Hadoop让每个Task Attempt写到单独一个文件(以TaskAttemptID命名,比如attempt_201412031706_0008_r_000000_0)中。通常而言,Hadoop让每个Task Attempt成功运行完成后,再将运算结果转移到最终目录${mapred.output.dir}中。Hadoop将一个成功运行完成的Task Attempt结果文件从临时目录“提升”至最终目录的过程,称为“任务提交”。当TaskInProgress中一个任务被提交后,其他任务将被杀死,同时意味着该TaskInProgress运行完成。相关代码如下:
public class JobTracker { ... ... public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // Check for tasks whose outputs can be saved List<TaskTrackerAction> commitTasksList = getTasksToSave(status); if (commitTasksList != null) { actions.addAll(commitTasksList); } ... ... } } |
2. 调整心跳间隔
TaskTracker心跳时间间隔大小应该适度,如果太小,则JobTracker需要处理高并发的心跳连接请求,必然产生不小的并发压力;如果太大,空闲的资源不能及时汇报给JobTracker(进而为之分配新的Task),造成资源空闲,进而降低系统吞吐率。
TaskTracker汇报心跳的时间间隔并不是一成不变的,它会随着集群规模的动态调整(比如节点死掉或者用户动态添加新节点)而变化,以便能够合理利用JobTracker的并发处理能力。在Hadoop MapReduce中,只有JobTracker知道某一时刻集群的规模,因此由JobTracker为每个TaskTracker计算下一次汇报心跳的时间间隔,并通过心跳机制告诉TaskTracker。
JobTracker允许用户通过参数配置心跳的时间间隔加速比,即每增加mapred.heartbeats.in.second(默认是100,最小是1)个节点,心跳时间间隔增加mapreduce.jobtracker.heartbeats.scaling.factor(默认是1,最小是0.01)秒。同时,为了防止用户参数设置不合理而对JobTracker产生较大负载,JobTracker要示心跳时间间隔至少为3秒。具体计算方法如下:
public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol, RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol, JobTrackerMXBean { ... ... /** * Calculates next heartbeat interval using cluster size. * Heartbeat interval is incremented by 1 second for every 100 nodes by default. * @return next heartbeat interval. */ public int getNextHeartbeatInterval() { // get the no of task trackers int clusterSize = getClusterStatus().getTaskTrackers(); int heartbeatInterval = Math.max( (int)(1000 * HEARTBEATS_SCALING_FACTOR * Math.ceil((double)clusterSize /NUM_HEARTBEATS_IN_SECOND)), HEARTBEAT_INTERVAL_MIN) ; return heartbeatInterval; } public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean restarted, boolean initialContact, boolean acceptNewTasks, short responseId) { ... ... // calculate next heartbeat interval and put in heartbeat response int nextInterval = getNextHeartbeatInterval(); response.setHeartbeatInterval(nextInterval); ... ... } ... ... } |
相关推荐
在大数据处理领域,Hadoop MapReduce 是一个至关重要的组件,它为海量数据的分布式计算提供了框架。本资源包“大数据-hadoop-mapreduce代码”显然包含了与MapReduce编程相关的实例或示例代码,对于理解并应用Hadoop ...
《大数据之Hadoop精讲》高清视频教程涵盖了Hadoop生态系统中的关键技术和概念,旨在帮助学习者深入了解和掌握大数据处理的核心工具。Hadoop是Apache软件基金会开发的一个开源框架,专门用于处理和存储海量数据,尤其...
Hadoop Shell是用户与Hadoop交互的主要方式之一。它提供了一组丰富的命令,用于执行各种操作,包括但不限于文件管理、数据传输、集群状态查询等。 - **文件管理**: - `hadoop fs -ls /`:列出根目录下的所有文件...
在大数据领域,Hadoop是一个广泛使用的开源框架,用于存储和处理海量数据。本文主要讨论了在Hadoop环境中常用的启动和停止命令,这对于管理和维护Hadoop集群至关重要。Hadoop的组件主要包括HDFS(Hadoop分布式文件...
大数据框架是现代信息技术领域的重要组成部分,它涉及到一系列用于处理海量数据的工具和平台。本文将对其中的关键组件进行深入解析。 首先,我们关注Hadoop Distributed File System (HDFS)。HDFS是大数据处理的...
根据提供的标题、描述、标签及部分内容链接,我们可以推断出这是一个关于大数据技术栈的培训课程,涉及的技术包括Hadoop、HBase、Zookeeper、Spark、Kafka、Scala以及Ambari。下面将针对这些技术进行详细的介绍和...
Hadoop,作为一个开源框架,最初由Apache软件基金会推出,旨在解决大数据的存储和计算问题。Hadoop的核心组件包括Common、HDFS、MapReduce和Yarn。 Common模块提供了诸如文件系统、远程过程调用(RPC)和序列化库等...
**大数据技术:Hadoop 框架详细介绍** 在当今信息化社会,海量的数据正在不断涌现,传统的数据处理方式已经无法满足需求。此时,大数据技术应运而生,其中Hadoop框架作为开源的分布式计算平台,成为了大数据处理的...
Hadoop作为开源的大数据处理框架,提供了三种运行模式,分别是本地模式、伪分布式模式和完全分布式模式。这三种模式分别针对不同的应用场景和学习阶段,为用户提供了灵活的选择。 **4.1 本地运行模式** 本地运行...
YARN是Hadoop 2.x引入的重要改进,它将原本在JobTracker中的资源管理和作业调度功能分离出来,形成了单独的ResourceManager和NodeManager。ResourceManager全局协调整个集群的资源,而NodeManager则在每个节点上管理...
在大数据领域,Hadoop 2.0 是一个关键的分布式计算框架,它为海量数据处理提供了强大支持。本文将深入探讨Hadoop 2.0的主要组件、架构、以及其相较于Hadoop 1.0的改进。 一、Hadoop 2.0概述 Hadoop 2.0是Apache软件...
### Hadoop大数据实战手册知识点概览 #### 一、Hadoop概述与发展历程 - **Hadoop定位**:Hadoop是一个由Apache基金会开发的开源分布式系统基础架构,它为用户提供了一个无需深入了解分布式底层细节就能开发分布式...
Hadoop大数据技术在当今的数据处理领域占据着重要的地位,其核心组件包括HDFS(Hadoop Distributed File System)和MapReduce,它们共同构成了高效的大数据处理框架。本文将围绕Hadoop大数据期末考试的重点内容进行...
Hadoop的核心组件主要包括分布式文件系统(HDFS)和MapReduce编程环境。HDFS是一种分布式存储系统,能够存储大量数据,适合进行大数据分析。而MapReduce则提供了一个编程模型,用于处理和生成大数据集,通过分布式的...
### 大数据之路选择Hadoop还是MaxCompute? #### 一、Hadoop与MaxCompute概述 ##### 1.1 Hadoop介绍与发展历程 Hadoop是由Apache软件基金会开发的一个开源分布式计算平台,采用Java语言编写,旨在支持大规模数据...
在大数据技术领域,Hadoop 是一个关键的分布式计算框架,由Apache基金会开发,主要用于处理和存储海量数据。本文将深入探讨Hadoop的核心组件、工作原理以及面试中常遇到的相关问题。 1. **HDFS(Hadoop Distributed...
在本课程“云计算分布式大数据Hadoop实战之路--从零开始(第1-10讲)”中,我们将深入探讨Hadoop这一核心的云计算技术在大数据处理中的应用。Hadoop是Apache软件基金会的一个开源项目,其设计目标是允许在廉价硬件上...
作为一个开源框架,Hadoop为海量数据的存储、处理和分析提供了高效且可扩展的解决方案。本文将深入探讨“Hadoop高级编程——构建与实现大数据解决方案”这一主题,旨在帮助读者掌握如何利用Hadoop构建实际的大数据...
Hadoop作为大数据处理的核心框架,是应对大数据挑战的关键。Hadoop生态系统由一系列相互协作的组件构成,如HDFS(Hadoop Distributed File System)用于分布式存储,MapReduce用于分布式计算,YARN(Yet Another ...
### 大数据技术分享:Hadoop运行原理分析 #### 一、概论 Hadoop作为一个开源框架,主要用于处理大规模的数据集。它通过提供一个高效、可靠、可扩展的基础架构来支持分布式数据处理任务。Hadoop的核心组件包括HDFS...