`

如何编写Hadoop调度器

 
阅读更多

中国第一个在线Hadoop教育平台—小象学院,推荐给Hadoop初学者和实践者,网址是:

本博客微信公共账号:hadoop123(微信号为:hadoop-123),分享hadoop技术内幕,hadoop最新技术进展,发布hadoop相关职位和求职信息,hadoop技术交流聚会、讲座以及会议等。二维码如下:


1. 编写目的

 

在Hadoop中,调度器是一个可插拔的模块,用户可以根据自己的实际应用要求设计调度器,然后在配置文件中指定相应的调度器,这样,当Hadoop集群启动时,便会加载该调度器。当前Hadoop自带了几种调度器,分别是FIFO(默认调度器),Capacity Scheduler和FairScheduler,通常境况下,这些调度器很难满足公司复杂的应用需求,因而往往需要开发自己的调度器。本文介绍了Hadoop调度器的基本编写方法。

2. Hadoop调度框架

Hadoop的调度器是在JobTracker中加载和调用的,用户可以在配置文件mapred-site.xml中的mapred.jobtracker.taskScheduler属性中指定调度器。本节分析了Hadoop调度器的调度框架,实际上分析了两个重要类:TaskScheduler和JobTracker的关系。

(1) TaskScheduler

如果用户要编写自己的调度器,需要继承抽象类TaskScheduler,该类的接口如下:

abstract class TaskScheduler implements Configurable {

protected Configuration conf; //配置文件

protected TaskTrackerManager taskTrackerManager; //一般会设为JobTracker

public Configuration getConf() {

  return conf;

}

public void setConf(Configuration conf) {

  this.conf = conf;

}

public synchronized void setTaskTrackerManager(

TaskTrackerManager taskTrackerManager) {

  this.taskTrackerManager = taskTrackerManager;

}

public void start() throws IOException { //初始化函数,如加载配置文件等

  // do nothing

}

public void terminate() throws IOException { //结束函数

// do nothing

}

//最重要的函数,为该taskTracker分配合适的task

public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker)

throws IOException;

  //根据队列名字获job列表

public abstract Collection<JobInProgress> getJobs(String queueName);

}

(2) JobTracker

JobTracker是Hadoop最核心的组件,它监控整个集群中的作业运行情况并对资源进行管理和调度。

每个TaskTracker每个3s(默认值,可配置)通过heartbeat向JobTracker汇报自己管理的机器的一些基本信息,包括内存使用量,内存剩余量,正在运行的task,空闲的slot数目等,一旦JobTracker发现该TaskTracker出现了空闲的slot,便会调用调度器中的AssignTasks方法为该TaskTracker分配task。

下面分析JobTracker调用TaskScheduler的具体流程:

……

private final TaskScheduler taskScheduler; //声明调度器对象

……

public static JobTracker startTracker(JobConf conf, String identifier) {

  …….

  result = new JobTracker(conf, identifier);

  result.taskScheduler.setTaskTrackerManager(result); //设置调度器的manager

  ……

}

//创建调度器

JobTracker(JobConf conf, String identifier) {

  ……

  // Create the scheduler

  Class<? extends TaskScheduler> schedulerClass

  = conf.getClass("mapred.jobtracker.taskScheduler",

    JobQueueTaskScheduler.class, TaskScheduler.class);

  taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);

  …..

}

//run forever

public void offerService() {

  ……

  taskScheduler.start(); //启动调度器

  ……

}

。。。。。

HeartbeatResponse heartbeat(TaskTrackerStatus status,

boolean restarted,

boolean initialContact,

boolean acceptNewTasks,

short responseId) {

  …….

  // Check for new tasks to be executed on the tasktracker

  if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {

    ……

    //使用调度器,为该taskTracker分配作业

    tasks = taskScheduler.assignTasks(taskTrackerStatus);

    ……

  }

}

从上面的分析可以知道,Scheduler和JobTracker之间会相互包含(实际上是组合模式),Scheduler中要包含JobTracker(实际上就是TaskTrackerManager)对象,以便获取整个Hadoop集群的一些信息,如slot总数,QueueManager对象,添加JobInProgressListener以便增加或删除job时,通知Scheduler;JobTracker中要包含Scheduler对象,以便可以对每个TaskTracker分配task。

3. 编写Hadoop调度器

假设我们要编写一个新的调度器,为MyHadoopScheduler,需要进行以下工作:

(1) 用户需要自己实现的类

@ MyHadoopSchedulerConf:配置文件管理类,读取你自己的配置文件,并保存到合适的数据结构中,一般而言,这个类应该支持动态加载配置文件。

@ MyHadoopSchedulerListener:编写自己的JobInProgressListener,并调用JobTracker的addJobInProgressListener(),将之加到系统的Listener队列中,以便系统中添加或删除job后,JobTracker可立刻告诉调度器。

@ MyHadoopScheduler:调度器的核心实现算法

(2) 用户要用到的系统类

@ JobTracker:JobTracker在startTracker函数中,会将MyHadoopScheduler的taskTrackerManager赋值为JobTracker对象,这样,在MyHadoopScheduler中,可调用Jobracker中的所有public方法和成员变量,常用的有:

$ getClusterStatus():获取集群的状态,如tasktracker列表,map slot总数,reduce slot总数,当前正在运行的map/reduce task总数等

$ getQueueManager():如果MyHadoopScheduler支持多队列,那么需要使用该方法获取QueueManager对象,通过该对象,会用可以获取系统的所有队列名称,每个队列的ACL(Access Control List),具体参考:http://hadoop.apache.org/common/docs/current/service_level_auth.html

$ killJob:可以调用该函数杀死某个job

$ killTask:如果调度器支持资源抢占,可调用该函数 杀死某个task以便进行资源抢占。

@ JobInprogress:用户向Hadoop中提交一个job后,Hadoop会为该job创建一个叫JobInProgress的对象,该对象中包含了job相关的基本信息,且它会伴随某个job的一生(与job共存亡)。该对象中包含的job信息有:该job包含的所有task的信息(如:正在运行的task列表,已经完成的task列表,尚未运行的task列表等),作业的优先级,作业的提交时间,开始运行时间,运行结束时间等信息。

在JobInprogress的task列表中,每个task以对象TaskInProgress的形式保存,该对象中包含了每个task的基本信息,包括:task要处理的数据split,task创建时间,task开始执行时间,task结束时间等信息。这些信息肯定会在调度器中使用。

@ JobConf

每个作业的运行参数和配置选项被保存到一个JobConf对象中,该对象包含了配置文件mapred-site.xml,core-site.xml和hdfs-site.xml设置的选项和该作业的特有属性(用户名,InputFormat,Mapper等),一般是以key/value的形式保存,比如:想获取当前用户名,可以这样:

JobConf conf;

…….

String username = conf.get("user.name");

用户也可以通过该对象传递一些自己定义的全局属性,如用户自己定义了一个属性叫mapred.job.deadline(作业的deadline时间),用户可以在提交作业时设定该值:

hadoop jar hadoop-examples.jar wordcount -files cachefile.txt \

-D mapred.job.deadline=100000 \

input output

然后在调度器中这样获取该属性的值:

JobConf conf;

…….

int deadline=conf.getInt("mapred.job.deadline", -1); //获取mapred.job.deadline属性,如果没有设置,则返回-1

4. 总结

调度器是Hadoop的中枢,其重要性可想而知。用户如果要设计Hadoop调度器,需要对Hadoop的整个框架有比较深入的理解,同时需阅读一些很重要的类(如JobTracker和JobInprogress等)的源码,以便利用这些类完成你的调度算法。

Hadoop目前自带了三个比较常用的调度器,分别为JobQueueTaskScheduler (FIFO,但队列调度器),Capacity Scheduler(多队列多用户调度器)和Fair Scheduler(多队列多用户调度器),它们是你学习Hadoop调度器的最好资料。

5. 参考资料

(1) Hadoop-0.20.2源代码

原创文章,转载请注明: 转载自董的博客

本文链接地址: http://dongxicheng.org/mapreduce/how-to-write-hadoop-schedulers/

作者:Dong,作者介绍:http://dongxicheng.org/about/

本博客的文章集合:

 

分享到:
评论

相关推荐

    Hadoop任务调度器

    如果要编写自己的Hadoop调度器,需要深入了解Hadoop调度器的接口和扩展机制。开发者可以继承原有的调度器类,重写其调度逻辑,或者添加新的调度策略和规则。在Hadoop社区,编写自定义调度器是一个较为高级的开发任务...

    用python编写nagios hadoop监控脚本

    3. `check_hadoop_tasktracker.py`:在旧版Hadoop中(Hadoop 1.x),TaskTracker是MapReduce框架的一部分,负责任务调度和执行。此脚本可能用于检测TaskTracker的服务状态,包括任务进度、资源利用率、是否有故障的...

    hadoop3.3.0-winutils所有bin文件

    Java是编写Hadoop和Spark程序的常用语言,而Scala则提供了一种更高级、更简洁的方式来编写Spark应用。通过结合使用这些工具和技术,开发者可以构建复杂的数据处理管道,高效地处理PB级别的数据。 在Windows环境中...

    Hadoop3.1.3.rar

    3. **YARN**:Yet Another Resource Negotiator (YARN)是Hadoop的资源管理器,负责调度集群中的计算资源。在3.1.3版本中,YARN的性能得到了优化,减少了调度延迟,提升了整体处理速度。 4. **MapReduce**:在Hadoop...

    hadoop几个实例

    7. **编程接口**:Hadoop提供了Java API来编写MapReduce程序,但也有如Hadoop Streaming这样的接口,允许使用其他语言(如Python、Perl)编写Mapper和Reducer。 8. **数据处理范式**:MapReduce遵循“批处理”处理...

    Hadoop权威指南中文版(第二版)+Hadoop in Action

    6. **YARN详解**:资源调度器(CapacityScheduler和FairScheduler)、ResourceManager、NodeManager和ApplicationMaster的角色和交互。 7. **Hadoop生态扩展**:Zookeeper(分布式协调服务)、Oozie(工作流调度...

    hadoop-3.3.0.tar.gz

    1. **YARN增强**:YARN(Yet Another Resource Negotiator)作为Hadoop的资源管理系统,得到了进一步优化,提高了调度器的效率和资源利用率。例如,增加了对公平调度器的改进,以及对多租户环境的支持。 2. **HDFS...

    Hadoop技术详解.Hadoop Operation

    此外,还会涉及YARN(Yet Another Resource Negotiator),它是Hadoop的资源管理系统,负责调度任务和管理集群资源。 Hadoop生态系统的其他组件如HBase、Hive、Pig、Spark等也可能在书中有所涉及。HBase是一个...

    hadoop2.6.0版本hadoop.dll和winutils.exe

    6. **MapReduce编程模型**: Hadoop的核心计算模型MapReduce,允许用户编写并行处理大规模数据的程序。在Hadoop 2.6.0中,MapReduce v2 (MRv2)即YARN作为资源管理层,负责任务调度和容器管理,提高了系统的整体效率。...

    hadoop-2.6.4 windows版本,bin目录包括hadoop.dll winutils.exe hadoop-2.6.0\bin

    它为Java编写的Hadoop应用程序提供了与Windows操作系统交互的接口。 winutils.exe是Hadoop在Windows环境下运行的实用工具集,它提供了一系列与Hadoop生态系统相关的命令行工具。例如,winutils.exe可以用来设置...

    hadoop面试题分解.pdf

    下面是 Hadoop 面试题的解析,涵盖了 Hadoop 的安装、配置、核心组件、调度器、MapReduce 编程等方面。 1. Hadoop 安装配置 Hadoop 安装配置需要完成以下步骤: 1. 使用 root 账户登录 2. 修改 IP 和主机名 3. ...

    hadoop-3.1.4.tar.zip

    Hadoop 3.x系列引入了一些重要改进,比如增强的YARN调度器、HDFS的Erasure Coding功能(用于提高数据容错性和存储效率)、支持跨数据中心的复制等。此外,Hadoop还与其他大数据处理框架,如Spark、Flink等紧密集成,...

    hadoop课后题带答案

    18. Azkaban:Azkaban是一个批量工作流任务调度器,支持工作流定义和依赖管理,提供UI界面。 19. Azkaban特点:所有任务资源文件需打包上传,工作流管理器由Relational Database、AzkabanWebServer和...

    Hadoop构建数据仓库实践1_hadoop_

    8. 性能优化:Hadoop通过诸如YARN(Yet Another Resource Negotiator)的资源管理器来优化集群资源的分配,确保数据处理的高效运行。同时,可以通过优化数据分区、数据压缩等手段进一步提升性能。 9. 安全与隐私:...

    实战hadoop中的源码

    1. **Hadoop架构**:理解Hadoop的分布式文件系统(HDFS)和MapReduce计算模型,以及YARN资源调度器的工作原理。 2. **HDFS**:研究HDFS的数据块、副本策略、NameNode与DataNode之间的通信,以及数据读写流程。 3. ...

    hadoop2lib.tar.gz

    此外,YARN(Yet Another Resource Negotiator)作为Hadoop 2.x的新特性,是资源管理和调度的中心,确保集群资源的有效分配和利用。 对于Java开发人员来说,Hadoop2lib提供了与Hadoop交互的API,使得开发者可以在...

    hadoop的安装

    - mapred-site.xml:配置MapReduce框架相关信息,如任务调度器等。 - 在配置文件设置完成之后,需要更新环境变量,让Hadoop的安装路径生效。这同样需要编辑profile文件,添加Hadoop的安装路径到PATH变量中,并执行...

    hadoop-eclipse插件各版本合集

    - 例如,从2.2.0到2.7.x,Hadoop进行了大量的优化,增强了稳定性,支持更多高级特性,如YARN(Yet Another Resource Negotiator)资源调度器的引入,提高了资源管理和任务调度的效率。 5. **安装与使用**:用户...

    大数据之路选择Hadoop还是MaxCompute?Hadoop开源与MaxCompute对比材料

    Hadoop是由Apache软件基金会开发的一个开源分布式计算平台,采用Java语言编写,旨在支持大规模数据集的分布式处理。Hadoop的核心组件包括Hadoop Distributed File System (HDFS) 和 MapReduce。其中: - **HDFS**:...

Global site tag (gtag.js) - Google Analytics