`
ancin
  • 浏览: 52705 次
  • 性别: Icon_minigender_1
文章分类
社区版块
存档分类
最新评论

Hadoop源码解读-JobTracker处理HeartBeat

阅读更多
JobTracker会接受TaskTracker的心跳,并处理。不多说,直接上源码
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
                                                  boolean restarted,
                                                  boolean initialContact,
                                                  boolean acceptNewTasks, 
                                                  short responseId) 


1 首先检查heartbeat是否来自自己的host列表,否则抛出异常。
  如果不再Host列表或者在排除Host列表中,退出心跳处理。
 
  return (inHostsList(status) && !inExcludedHostsList(status));
 

2 判断是否在黑名单、灰名单、默认名单,并从这些名单中删除
  黑名单、灰名单主要是Hadoop的容错机制,在此不做过多解释,可以单写一篇文章。
 
  faultyTrackers.markTrackerHealthy(status.getHost());

3 根据trackerName获取上一个heartbeat response
 
   HeartbeatResponse prevHeartbeatResponse =
      trackerToHeartbeatResponseMap.get(trackerName);

4 如果上一个heartbeat 为null,让Tasktracker重新初始化  如果是第一个 response 从recoveryMap中移除
   
 if (prevHeartbeatResponse == null) {
        // This is the first heartbeat from the old tracker to the newly 
        // started JobTracker
        if (hasRestarted()) {
          addRestartInfo = true;
          // inform the recovery manager about this tracker joining back
          recoveryManager.unMarkTracker(trackerName);
        } else {
          // Jobtracker might have restarted but no recovery is needed
          // otherwise this code should not be reached
          LOG.warn("Serious problem, cannot find record of 'previous' " +
                   "heartbeat for '" + trackerName + 
                   "'; reinitializing the tasktracker");
          return new HeartbeatResponse(responseId, 
              new TaskTrackerAction[] {new ReinitTrackerAction()});
        }

      }

   如果重发的responseId,丢弃掉。
  
  if (prevHeartbeatResponse.getResponseId() != responseId) {
          LOG.info("Ignoring 'duplicate' heartbeat from '" + 
              trackerName + "'; resending the previous 'lost' response");
          return prevHeartbeatResponse;
        }

5 处理heartbeat  首先 updateTaskTrackerStatus 如果是被遗忘的tasktracker 加入队列中;更新任务状态;更新健康节点状态;
  
    private synchronized boolean processHeartbeat(
                                 TaskTrackerStatus trackerStatus, 
                                 boolean initialContact,
                                 long timeStamp) throws UnknownHostException {
   //主要集中在此不分析那么详细了
}

6 检查新Task是否执行,如果没有执行,加入执行队列
 
 
   if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);
      if (taskTrackerStatus == null) {
        LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
      } else {
        List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
        if (tasks == null ) {
          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
        }
//添加Task
        if (tasks != null) {
          for (Task task : tasks) {
            expireLaunchingTasks.addNewTask(task.getTaskID());
            if(LOG.isDebugEnabled()) {
              LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
            }
            actions.add(new LaunchTaskAction(task));
          }
        }
      }
    }

7 检查Task是否杀死
 List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
    if (killTasksList != null) {
      actions.addAll(killTasksList);
    }

8 检查 task 是否cleanup
 
List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
    if (killJobsList != null) {
      actions.addAll(killJobsList);
    }

9 检查task 的output是否可以save
  
 List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
    if (commitTasksList != null) {
      actions.addAll(commitTasksList);
    }

10 计算下一次heartbeat的时间间隔
  
    int nextInterval = getNextHeartbeatInterval();
    response.setHeartbeatInterval(nextInterval);
    response.setActions(
                        actions.toArray(new TaskTrackerAction[actions.size()]));

11 更新heartbeatMap,并remove掉Marked已经处理掉的heartbeat
  
// 更新Map
  trackerToHeartbeatResponseMap.put(trackerName, response);

    //清除处理完成的心跳
    removeMarkedTasks(trackerName);


  不对之处欢迎讨论。
=================参考====
hadoop源码。
分享到:
评论

相关推荐

    hadoop-core-0.20.2 源码 hadoop-2.5.1-src.tar.gz 源码 hadoop 源码

    这里我们将深入探讨"Hadoop-core-0.20.2"和"hadoop-2.5.1-src"的源码,以便更好地理解Hadoop的工作原理和内部机制。 **Hadoop Core源码分析** Hadoop-core-0.20.2是Hadoop早期版本的核心组件,它包含了Hadoop的...

    hadoop-eclipse-plugin-3.1.1.tar.gz

    使用Hadoop-Eclipse-Plugin时,建议遵循良好的编程习惯,如合理划分Mapper和Reducer的功能,优化数据处理流程,以及充分利用Hadoop的并行计算能力。同时,及时更新插件至最新版本,以获取最新的功能和修复。 通过...

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    hadoop-common-2.6.0-bin-master.zip

    `hadoop-common-2.6.0-bin-master.zip` 是一个针对Hadoop 2.6.0版本的压缩包,特别适用于在Windows环境下进行本地开发和测试。这个版本的Hadoop包含了对Windows系统的优化,比如提供了`winutils.exe`,这是在Windows...

    flink-shaded-hadoop-3-uber-3.1.1.7.1.1.0-565-9.0.jar.tar.gz

    Flink-shaded-hadoop-3-uber-jar通过重新打包和阴影处理(shading)技术,确保Flink能够正确地识别和使用Hadoop 3.x的新特性,同时避免了不同版本库之间的冲突。 Java的阴影处理是一种特殊的类重命名过程,它将特定...

    Hadoop-eclipse-plugin-2.7.2

    《Hadoop-eclipse-plugin-2.7.2:在Eclipse中轻松开发Hadoop应用》 在大数据处理领域,Hadoop作为一个开源的分布式计算框架,因其高效、可扩展的特性而备受青睐。然而,对于开发者而言,有效地集成开发环境至关重要...

    hadoop-eclipse-plugin三个版本的插件都在这里了。

    hadoop-eclipse-plugin-2.7.4.jar和hadoop-eclipse-plugin-2.7.3.jar还有hadoop-eclipse-plugin-2.6.0.jar的插件都在这打包了,都可以用。

    hadoop-eclipse-plugin-3.3.1.jar

    Ubuntu虚拟机HADOOP集群搭建eclipse环境 hadoop-eclipse-plugin-3.3.1.jar

    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip

    Apache Flink 是一个流行的开源大数据处理框架,而 `flink-shaded-hadoop-2-uber-2.7.5-10.0.jar.zip` 文件是针对 Flink 优化的一个特殊版本的 Hadoop 库。这个压缩包中的 `flink-shaded-hadoop-2-uber-2.7.5-10.0....

    flink-shaded-hadoop-2-uber-2.6.5-10.0.zip

    综上所述,“flink-shaded-hadoop-2-uber-2.6.5-10.0.zip”是Flink与Hadoop深度整合的关键,它使得开发者能够在不脱离Hadoop生态的前提下,充分利用Flink的流处理优势。这个压缩包的使用,不仅简化了开发流程,也...

    好用的hadoop-eclipse-plugin-2.6.4.jar

    在开发过程中,Hadoop-Eclipse-Plugin提供了一些实用的功能,如JobTracker视图,可以实时监控MapReduce任务的执行状态,包括任务进度、任务日志等信息,帮助开发者快速定位和解决问题。另外,它还支持直接在Eclipse...

    eclipse运行mr插件hadoop-eclipse-plugin-2.6.0.jar

    总之,`hadoop-eclipse-plugin-2.6.0.jar`是Hadoop开发者在Windows环境下利用Eclipse进行高效开发的重要工具,通过它,开发者可以更好地整合开发环境与大数据处理平台,提高开发效率和项目的可维护性。

    hadoop-eclipse-plugin-3.1.3.jar

    hadoop-eclipse-plugin-3.1.3,eclipse版本为eclipse-jee-2020-03

    hadoop-mapreduce-examples-2.7.1.jar

    hadoop-mapreduce-examples-2.7.1.jar

    hadoop-eclipse-plugin-3.1.2.jar

    在eclipse中搭建hadoop环境,需要安装hadoop-eclipse-pulgin的插件,根据hadoop的版本对应jar包的版本,此为hadoop3.1.2版本的插件。

    hadoop2x-eclipse-plugin

    2. 配置集群信息:如果你的Hadoop集群不是本地模式,需要在"Cluster Configuration"中添加集群的配置,包括JobTracker和NameNode的地址。 三、创建Hadoop项目 有了插件支持,创建Hadoop MapReduce项目变得非常简单...

    hadoop-lzo-0.4.21-SNAPSHOT jars

    3. `hadoop-lzo-0.4.21-SNAPSHOT-sources.jar`:这个文件包含了Hadoop-LZO的源代码,对于开发者来说非常有用,因为可以直接查看源码来理解其内部工作原理,也可以方便地进行二次开发或调试。 集成Hadoop-LZO到你的...

    hadoop-lzo-0.4.20.jar

    hadoop2 lzo 文件 ,编译好的64位 hadoop-lzo-0.4.20.jar 文件 ,在mac 系统下编译的,用法:解压后把hadoop-lzo-0.4.20.jar 放到你的hadoop 安装路径下的lib 下,把里面lib/Mac_OS_X-x86_64-64 下的所有文件 拷到 ...

    hadoop-yarn-server-common-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-yarn-server-common-2.6.5.jar; 赠送原API文档:hadoop-yarn-server-common-2.6.5-javadoc.jar; 赠送源代码:hadoop-yarn-server-common-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-...

    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

    flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

Global site tag (gtag.js) - Google Analytics