`
nlslzf
  • 浏览: 1054107 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Hadoop的mapred JobTracker端源码概览

阅读更多

http://jiwenke.iteye.com/blog/335093

上一节看到TaskTracker启动新任务的过程,这里接着看看在JobTracker中是怎样响应和调度的,在hadoop中,我们看到采用的是pull的方式拿到任务。 

Java代码 
  1.       
  2. HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,   
  3.                                                             justStarted, askForNewTask,   
  4.                                                               heartbeatResponseId);  

这里是TaskTracker想JobTracker发送heartbeat的地方 - 使用的是RPC,这样我们你就来到JobTracker了: 
Java代码 
  1.     
  2. public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,   
  3.                                                   boolean initialContact, boolean acceptNewTasks, short responseId)   
  4.     throws IOException {  
  5. .............  
  6.     //如果是接受新任务的话,让JotTracker去进行调度,这里会调用taskScheduler的assignTasks  
  7.     if (acceptNewTasks) {  
  8.       TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);  
  9.       if (taskTrackerStatus == null) {  
  10.         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);  
  11.       } else {  
  12.         List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);  
  13.     //这里是准备assignTask的地方,由配置的调度器来决定怎样调度  
  14.         if (tasks == null ) {  
  15.          tasks = taskScheduler.assignTasks(taskTrackerStatus);  
  16.         }  
  17.         if (tasks != null) {  
  18.           for (Task task : tasks) {  
  19.             expireLaunchingTasks.addNewTask(task.getTaskID());  
  20.             LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());  
  21.             actions.add(new LaunchTaskAction(task));  
  22.           }  
  23.         }  
  24.       }  
  25.     }  

这个taskScheduler采用的是默认的    
Java代码 
  1. taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass,conf);  

这是在配置文件中指定的,"mapred.jobtracker.taskScheduler",常常是JobQueueTaskScheduler是hadoop的实现,FIFO类型的调度器,让我们看看这个调度器是怎样assignTasks的: 
Java代码 
  1.     
  2. public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)  
  3.       throws IOException {  
  4.   
  5.     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();  
  6.     int numTaskTrackers = clusterStatus.getTaskTrackers();  
  7.   
  8.     Collection<JobInProgress> jobQueue =  
  9.       jobQueueJobInProgressListener.getJobQueue();  
  10.   
  11.     //  
  12.     // Get map + reduce counts for the current tracker.  
  13.     //  
  14.     int maxCurrentMapTasks = taskTracker.getMaxMapTasks();  
  15.     int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks();  
  16.     int numMaps = taskTracker.countMapTasks();  
  17.     int numReduces = taskTracker.countReduceTasks();  
  18.   
  19.     //  
  20.     // Compute average map and reduce task numbers across pool  
  21.     //  
  22.     int remainingReduceLoad = 0;  
  23.     int remainingMapLoad = 0;  
  24.     synchronized (jobQueue) {  
  25.       for (JobInProgress job : jobQueue) {  
  26.         if (job.getStatus().getRunState() == JobStatus.RUNNING) {  
  27.           int totalMapTasks = job.desiredMaps();  
  28.           int totalReduceTasks = job.desiredReduces();  
  29.           remainingMapLoad += (totalMapTasks - job.finishedMaps());  
  30.           remainingReduceLoad += (totalReduceTasks - job.finishedReduces());  
  31.         }  
  32.       }  
  33.     }  
  34.   
  35.     // find out the maximum number of maps or reduces that we are willing  
  36.     // to run on any node.  
  37.     int maxMapLoad = 0;  
  38.     int maxReduceLoad = 0;  
  39.     if (numTaskTrackers > 0) {  
  40.       maxMapLoad = Math.min(maxCurrentMapTasks,  
  41.                             (int) Math.ceil((double) remainingMapLoad /   
  42.                                             numTaskTrackers));  
  43.       maxReduceLoad = Math.min(maxCurrentReduceTasks,  
  44.                                (int) Math.ceil((double) remainingReduceLoad  
  45.                                                / numTaskTrackers));  
  46.     }  
  47.           
  48.     int totalMaps = clusterStatus.getMapTasks();  
  49.     int totalMapTaskCapacity = clusterStatus.getMaxMapTasks();  
  50.     int totalReduces = clusterStatus.getReduceTasks();  
  51.     int totalReduceTaskCapacity = clusterStatus.getMaxReduceTasks();  
  52.   
  53.     //  
  54.     // In the below steps, we allocate first a map task (if appropriate),  
  55.     // and then a reduce task if appropriate.  We go through all jobs  
  56.     // in order of job arrival; jobs only get serviced if their   
  57.     // predecessors are serviced, too.  
  58.     //  
  59.   
  60.     //  
  61.     // We hand a task to the current taskTracker if the given machine   
  62.     // has a workload that's less than the maximum load of that kind of  
  63.     // task.  
  64.     //  
  65.          
  66.     if (numMaps < maxMapLoad) {  
  67.   
  68.       int totalNeededMaps = 0;  
  69.       synchronized (jobQueue) {  
  70.         for (JobInProgress job : jobQueue) {  
  71.           if (job.getStatus().getRunState() != JobStatus.RUNNING) {  
  72.             continue;  
  73.           }  
  74.       //这里是取得Task的地方,需要到job中去取  
  75.           Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers,  
  76.               taskTrackerManager.getNumberOfUniqueHosts());  
  77.           if (t != null) {  
  78.             return Collections.singletonList(t);  
  79.           }  
  80.   
  81.           //  
  82.           // Beyond the highest-priority task, reserve a little   
  83.           // room for failures and speculative executions; don't   
  84.           // schedule tasks to the hilt.  
  85.           //  
  86.           totalNeededMaps += job.desiredMaps();  
  87.           int padding = 0;  
  88.           if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {  
  89.             padding = Math.min(maxCurrentMapTasks,  
  90.                                (int)(totalNeededMaps * padFraction));  
  91.           }  
  92.           if (totalMaps + padding >= totalMapTaskCapacity) {  
  93.             break;  
  94.           }  
  95.         }  
  96.       }  
  97.     }  
  98.   
  99.     //  
  100. font-size: 1em; margin-top:
    分享到:
    评论

相关推荐

    细细品味Hadoop_Hadoop集群(第5期)_Hadoop安装配置

    #### 一、Hadoop概览及集群角色解析 **Hadoop**,作为Apache软件基金会旗下的一款开源分布式计算平台,以其核心组件**Hadoop分布式文件系统**(HDFS)和**MapReduce**而闻名。HDFS提供了一种透明的分布式存储解决方案...

    一个基于Qt Creator(qt,C++)实现中国象棋人机对战

    qt 一个基于Qt Creator(qt,C++)实现中国象棋人机对战.

    热带雨林自驾游自然奇观探索.doc

    热带雨林自驾游自然奇观探索

    冰川湖自驾游冰雪交融景象.doc

    冰川湖自驾游冰雪交融景象

    C51 单片机数码管使用 Keil项目C语言源码

    C51 单片机数码管使用 Keil项目C语言源码

    基于智能算法的无人机路径规划研究 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    前端分析-2023071100789s12

    前端分析-2023071100789s12

    Delphi 12.3控件之Laz-制作了一些窗体和对话框样式.7z

    Laz_制作了一些窗体和对话框样式.7z

    ocaml-docs-4.05.0-6.el7.x64-86.rpm.tar.gz

    1、文件内容:ocaml-docs-4.05.0-6.el7.rpm以及相关依赖 2、文件形式:tar.gz压缩包 3、安装指令: #Step1、解压 tar -zxvf /mnt/data/output/ocaml-docs-4.05.0-6.el7.tar.gz #Step2、进入解压后的目录,执行安装 sudo rpm -ivh *.rpm 4、更多资源/技术支持:公众号禅静编程坊

    学习笔记-沁恒第六讲-米醋

    学习笔记-沁恒第六讲-米醋

    工业机器人技术讲解【36页】.pptx

    工业机器人技术讲解【36页】

    基于CentOS 7和Docker环境下安装和配置Elasticsearch数据库

    内容概要:本文档详细介绍了在 CentOS 7 上利用 Docker 容器化环境来部署和配置 Elasticsearch 数据库的过程。首先概述了 Elasticsearch 的特点及其主要应用场景如全文检索、日志和数据分析等,并强调了其分布式架构带来的高性能与可扩展性。之后针对具体的安装流程进行了讲解,涉及创建所需的工作目录,准备docker-compose.yml文件以及通过docker-compose工具自动化完成镜像下载和服务启动的一系列命令;同时对可能出现的问题提供了应对策略并附带解决了分词功能出现的问题。 适合人群:从事IT运维工作的技术人员或对NoSQL数据库感兴趣的开发者。 使用场景及目标:该教程旨在帮助读者掌握如何在一个Linux系统中使用现代化的应用交付方式搭建企业级搜索引擎解决方案,特别适用于希望深入了解Elastic Stack生态体系的个人研究与团队项目实践中。 阅读建议:建议按照文中给出的具体步骤进行实验验证,尤其是要注意调整相关参数配置适配自身环境。对于初次接触此话题的朋友来说,应该提前熟悉一下Linux操作系统的基础命令行知识和Docker的相关基础知识

    基于CNN和FNN的进化神经元模型的快速响应尖峰神经网络 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    网络小说的类型创新、情节设计与角色塑造.doc

    网络小说的类型创新、情节设计与角色塑造

    毕业设计-基于springboot+vue开发的学生考勤管理系统【源码+sql+可运行】50311.zip

    毕业设计_基于springboot+vue开发的学生考勤管理系统【源码+sql+可运行】【50311】.zip 全部代码均可运行,亲测可用,尽我所能,为你服务; 1.代码压缩包内容 代码:springboo后端代码+vue前端页面代码 脚本:数据库SQL脚本 效果图:运行结果请看资源详情效果图 2.环境准备: - JDK1.8+ - maven3.6+ - nodejs14+ - mysql5.6+ - redis 3.技术栈 - 后台:springboot+mybatisPlus+Shiro - 前台:vue+iview+Vuex+Axios - 开发工具: idea、navicate 4.功能列表 - 系统设置:用户管理、角色管理、资源管理、系统日志 - 业务管理:班级信息、学生信息、课程信息、考勤记录、假期信息、公告信息 3.运行步骤: 步骤一:修改数据库连接信息(ip、port修改) 步骤二:找到启动类xxxApplication启动 4.若不会,可私信博主!!!

    57页-智慧办公园区智能化设计方案.pdf

    在智慧城市建设的大潮中,智慧园区作为其中的璀璨明珠,正以其独特的魅力引领着产业园区的新一轮变革。想象一下,一个集绿色、高端、智能、创新于一体的未来园区,它不仅融合了科技研发、商业居住、办公文创等多种功能,更通过深度应用信息技术,实现了从传统到智慧的华丽转身。 智慧园区通过“四化”建设——即园区运营精细化、园区体验智能化、园区服务专业化和园区设施信息化,彻底颠覆了传统园区的管理模式。在这里,基础设施的数据收集与分析让管理变得更加主动和高效,从温湿度监控到烟雾报警,从消防水箱液位监测到消防栓防盗水装置,每一处细节都彰显着智能的力量。而远程抄表、空调和变配电的智能化管控,更是在节能降耗的同时,极大地提升了园区的运维效率。更令人兴奋的是,通过智慧监控、人流统计和自动访客系统等高科技手段,园区的安全防范能力得到了质的飞跃,让每一位入驻企业和个人都能享受到“拎包入住”般的便捷与安心。 更令人瞩目的是,智慧园区还构建了集信息服务、企业服务、物业服务于一体的综合服务体系。无论是通过园区门户进行信息查询、投诉反馈,还是享受便捷的电商服务、法律咨询和融资支持,亦或是利用云ERP和云OA系统提升企业的管理水平和运营效率,智慧园区都以其全面、专业、高效的服务,为企业的发展插上了腾飞的翅膀。而这一切的背后,是大数据、云计算、人工智能等前沿技术的深度融合与应用,它们如同智慧的大脑,让园区的管理和服务变得更加聪明、更加贴心。走进智慧园区,就像踏入了一个充满无限可能的未来世界,这里不仅有科技的魅力,更有生活的温度,让人不禁对未来充满了无限的憧憬与期待。

    一种欠定盲源分离方法及其在模态识别中的应用 附Matlab代码.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    Matlab实现基于BO贝叶斯优化Transformer结合GRU门控循环单元时间序列预测的详细项目实例(含完整的程序,GUI设计和代码详解)

    内容概要:本文介绍了使用 Matlab 实现基于 BO(贝叶斯优化)的 Transformer 结合 GRU 门控循环单元时间序列预测的具体项目案例。文章首先介绍了时间序列预测的重要性及其现有方法存在的限制,随后深入阐述了该项目的目标、挑战与特色。重点描述了项目中采用的技术手段——结合 Transformer 和 GRU 模型的优点,通过贝叶斯优化进行超参数调整。文中给出了模型的具体实现步骤、代码示例以及完整的项目流程。同时强调了数据预处理、特征提取、窗口化分割、超参数搜索等关键技术点,并讨论了系统的设计部署细节、可视化界面制作等内容。 适合人群:具有一定机器学习基础,尤其是熟悉时间序列预测与深度学习的科研工作者或从业者。 使用场景及目标:适用于金融、医疗、能源等多个行业的高精度时间序列预测。该模型可通过捕捉长时间跨度下的复杂模式,提供更为精准的趋势预判,辅助相关机构作出合理的前瞻规划。 其他说明:此项目还涵盖了从数据采集到模型发布的全流程讲解,以及GUI图形用户界面的设计实现,有助于用户友好性提升和技术应用落地。此外,文档包含了详尽的操作指南和丰富的附录资料,包括完整的程序清单、性能评价指标等,便于读者动手实践。

    漫画与青少年教育关系.doc

    漫画与青少年教育关系

    励志图书的成功案例分享、人生智慧提炼与自我提升策略.doc

    励志图书的成功案例分享、人生智慧提炼与自我提升策略

Global site tag (gtag.js) - Google Analytics