`

(转)【Hadoop代码笔记】Hadoop作业提交之JobTracker接收作业提交

 
阅读更多

 

一、概要描述

上一篇博文中主要描述了JobTracker接收作业的几个服务(或功能)模块的初始化过程。本节将介绍这些服务(或功能)是如何接收到提交的job。本来作业的初始化也可以在本节内描述,但是涉及到JobInProgress的初始化过程放在一张图上太拥挤,就分开到下一篇文章中描述。

二、 流程描述     

1. JobClient通过RPC的方式向JobTracker提交作业

2. 调用JobTracker的submitJob方法。该方法是JobTracker向外提供的供调用的提交作业的接口。

3. submit方法中调用JobTracker的addJob方法。

4. 在addJob方法中会把作业加入到集合中供调度,并会触发注册的JobInProgressListener的jobAdded事件。由上篇博文的jobtracker相关服务和功能的初始化的FairScheduler的start方法中看到,这里注册的是两个JobInProgressListener。分别是FairScheduler的内部类JobListener和EagerTaskInitializationListener

5. FairScheduler的内部类JobListener响应jobAdded事件事件

6. EagerTaskInitializationListener响应jobAdded事件事件

三、代码详细

 1. JobClient的submitJob方法,调用submitJobInternal方法。

主要流程:

1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormatcheckOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
5)计算作业的输入分片。通过InputFormat的getSplits(job)方法获得作业的split并将split序列化封装为RawSplit。返回split数目,也即代表有多个分片有多少个map。详细参见InputFormat获取Split的方法。
6)writeNewSplits 方法把输入分片写到JobTracker的job目录下。
7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
8) 使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。 JobTracker作业放入到内存队列中,由作业调度器进行调度。并初始化作业实例。JobTracker创建job成功后会给JobClient传回 一个JobStatus对象 用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个 JobStatus对象创建一个 NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。

复制代码
1 public RunningJob submitJobInternal(JobConf job)
 2             throws FileNotFoundException, ClassNotFoundException,
 3             InterruptedException, IOException {
 4 
 5         // 1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
 6         JobID jobId = jobSubmitClient.getNewJobId();
 7         // 2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
 8         // 3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
 9 
10         Path submitJobDir = new Path(getSystemDir(), jobId.toString());
11         Path submitJarFile = new Path(submitJobDir, "job.jar");
12         Path submitSplitFile = new Path(submitJobDir, "job.split");
13         configureCommandLineOptions(job, submitJobDir, submitJarFile);
14         Path submitJobFile = new Path(submitJobDir, "job.xml");
15         int reduces = job.getNumReduceTasks();
16         JobContext context = new JobContext(job, jobId);
17 
18         // Check the output specification
19         // 4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
20 
21         if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
22             org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
23                     .newInstance(context.getOutputFormatClass(), job);
24             output.checkOutputSpecs(context);
25         } else {
26             job.getOutputFormat().checkOutputSpecs(fs, job);
27         }
28 
29         // 5)计算作业的输入分片。详细参见FormatInputFormat获取Split的方法。
30         // 6)writeNewSplits 方法把输入分片写到JobTracker的job目录下,名称是submitSplitFile
31         // job.split名称。
32         // 7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
33 
34         // Create the splits for the job
35         LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
36         int maps;
37         if (job.getUseNewMapper()) {
38             maps = writeNewSplits(context, submitSplitFile);
39         } else {
40             maps = writeOldSplits(job, submitSplitFile);
41         }
42         job.set("mapred.job.split.file", submitSplitFile.toString());
43         job.setNumMapTasks(maps);
44 
45         // Write job file to JobTracker's fs
46         FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
47                 new FsPermission(JOB_FILE_PERMISSION));
48 
49         try {
50             job.writeXml(out);
51         } finally {
52             out.close();
53         }
54 
55         // 8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker根据接收到的submitJob()方法调用后,把调用放入到内存队列中,由作业调度器进行调度。并初始化作业实例。
56 
57         JobStatus status = jobSubmitClient.submitJob(jobId);
58         if (status != null) {
59             return new NetworkedJob(status);
60         } else {
61             throw new IOException("Could not launch job");
62         }
63     }
复制代码

2. JobTracker的submitJob方法,是JobTracker向外提供的供调用的提交作业的接口。

复制代码
public synchronized JobStatus submitJob(JobID jobId) throws IOException {
if(jobs.containsKey(jobId)) {
      //检查Job已经存在,则仅仅返回其status
      return jobs.get(jobId).getStatus();
}
//不存在,则创建该job 的JobInProgress 实例,
JobInProgress job = new JobInProgress(jobId, this, this.conf);
String queue = job.getProfile().getQueueName();
 new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));    
    }
// check for access
checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
// 检查内存是否够用
checkMemoryRequirements(job);
return addJob(jobId, job);
}
复制代码

3.  JobTracker的addJob方法,把作业加入到集合中供调度。其中jobs 是Map<JobID, JobInProgress>类型,维护着加入进来的JobInProgress job。

复制代码
private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {
totalSubmissions++;
synchronized (jobs) {
      synchronized (taskScheduler) {
//将job实例加入到Map<JobID, JobInProgress> jobs 集合中,
        jobs.put(job.getProfile().getJobID(), job);
//并触发所有注册的JobInProgressListener,通知其一个新Job添加进来了,让各个Listener响应各自的动作。
        for (JobInProgressListener listener : jobInProgressListeners) {
          try {
            listener.jobAdded(job);
          } catch (IOException ioe) {
            LOG.warn("Failed to add and so skipping the job : "
                + job.getJobID() + ". Exception : " + ioe);
          }
        }
      }
    }
    myInstrumentation.submitJob(job.getJobConf(), jobId);
    return job.getStatus();
  }
复制代码

4.FairScheduler.JobListener的jobAdded方法。jobAdded方法是JobInProgressListener中定义的在JobTracker中响应job变化的方法。在这个方法中,只是为每个加入的Job创建一个用于FairScheduler调度用的JobInfo对象,并将其和job的对应的存储在Map<JobInProgress, JobInfo> infos集合中。

复制代码
 @Override
    public void jobAdded(JobInProgress job) {
      synchronized (FairScheduler.this) {
        poolMgr.addJob(job);
        JobInfo info = new JobInfo();
        infos.put(job, info);
        update();
      }
    }
复制代码

5. EagerTaskInitializationListener的jobAdded方法。 这个方法其实在前面文章中介绍过,在EagerTaskInitializationListener中,jobAdded 只是简单的把job加入到一个List<JobInProgress>类型的 jobInitQueue中。并不直接对其进行初始化,对其中的job的处理由另外线程JobInitManager来做。该线程,一直检查 jobInitQueue是否有作业,有则拿出来从线程池中取一个线程InitJob处理。关于作业的初始化过程专门在下一篇文章中介绍。

复制代码
@Override
  public void jobAdded(JobInProgress job) {
    synchronized (jobInitQueue) {
      jobInitQueue.add(job);
      resortInitQueue();
      jobInitQueue.notifyAll();
    }

  }
复制代码

 

 完。

为了转载内容的一致性、可追溯性和保证及时更新纠错,转载时请注明来自:http://www.cnblogs.com/douba/p/hadoop_mapreduce_jobadded.html。谢谢!

分享到:
评论

相关推荐

    HADOOP学习笔记

    【HADOOP学习笔记】 Hadoop是Apache基金会开发的一个开源分布式计算框架,是云计算领域的重要组成部分,尤其在大数据处理方面有着广泛的应用。本学习笔记将深入探讨Hadoop的核心组件、架构以及如何搭建云计算平台。...

    hadoop 笔记

    为了克服NameNode和JobTracker的单点故障问题,Hadoop引入了Secondary NameNode和ResourceManager等组件,这些组件可以备份关键数据并在主节点发生故障时接管任务。 **7.3 经验总结** - 在搭建Hadoop集群时,需要...

    Hadoop云计算2.0笔记第一课Hadoop介绍

    MapReduce 的架构中,JobTracker 是作业跟踪器,负责管理所有作业处理,包括将作业分解成一系列任务,将任务指派给 TaskTracker,监控任务状态,决定哪些文件参与,然后切割 task 病分配节点。TaskTracker 是任务...

    大数据云计算技术 Hadoop运维笔记(共21页).pptx

    【大数据云计算技术 Hadoop运维笔记】的PPT涵盖了Hadoop在蓝汛公司的应用实践,以及Cloudera的产品和运维经验。以下是对其中知识点的详细解释: 1. **Hadoop在蓝汛的应用**: - 蓝汛使用了6000台设备,300个集群,...

    hadoop学习笔记

    《Hadoop学习笔记》 Hadoop,作为大数据处理的核心框架,是开源社区的杰作,由Apache软件基金会维护。这份文档旨在深入解析Hadoop的基本概念、架构及其在大数据处理中的应用,帮助读者全面掌握这一重要技术。 一、...

    Hadoop学习笔记AAAAAAAAAAA

    5. **资源调度**:YARN作为独立的资源调度器,解决了Hadoop 1.x中MapReduce和JobTracker的耦合问题。ResourceManager负责接收应用的资源请求,根据节点状态分配容器(container),并监控应用运行状态。NodeManager...

    妳那伊抹微笑_云计算之Hadoop完美笔记2.0

    妳那伊抹微笑自己整理的Hadoop笔记,有以下内容: Day1 搭建伪分布实验环境 Day2 介绍HDFS体系结构及shell、java操作方式 Day3 介绍MapReduce体系结构(1) Day4 介绍MapReduce体系结构(2) Day5 介绍Hadoop集群、...

    hadoop学习笔记(一、hadoop集群环境搭建).docx

    【Hadoop集群环境搭建】 Hadoop是一个开源的分布式计算框架,它允许在大规模集群中运行应用程序,处理海量数据。在本文中,我们将详细介绍如何搭建一个Hadoop集群环境,包括必要的步骤和配置。首先,我们需要准备...

    hadoop- w3c 学习笔记

    JobTracker(在旧版 Hadoop 中)负责作业调度和任务监控,而 TaskTracker 负责任务的执行和心跳报告。然而,这种架构存在单点故障、资源管理不精细以及内存消耗高等问题。 为了解决这些问题,Hadoop 引入了 YARN,...

    hadoop笔记.zip

    2. JobTracker与TaskTracker:在旧版Hadoop中,JobTracker负责任务调度和资源管理,TaskTracker接收并执行任务。每个Map任务和Reduce任务在TaskTracker上运行。 3. YARN(Yet Another Resource Negotiator):为了...

    hadoop完全分布式集群搭建笔记

    ### Hadoop完全分布式集群搭建详解 #### 一、概述 Hadoop是一款开源的大数据处理框架,主要用于处理海量数据。为了实现高效的数据处理能力,Hadoop通常会在多台计算机上搭建分布式集群。本文将详细介绍如何搭建一...

    Hadoop数据分析平台学习笔记

    - **JobTracker**:它是MapReduce的主控节点,负责接收客户端提交的作业,并调度这些作业到TaskTrackers上执行。 - **TaskTracker**:这些节点执行实际的Map和Reduce任务,并将结果返回给JobTracker。 #### 四、...

    hadoop笔记1

    【Hadoop笔记1】 在大数据处理领域,Hadoop是一个至关重要的开源框架,它为分布式存储和计算提供了强大的支持。这篇笔记主要围绕Hadoop的核心组件、工作原理以及如何使用Hadoop进行数据处理进行深入探讨。 一、...

    大数据平台,hadoop集群学习笔记

    第四章可能涉及Hadoop的其他组件,如YARN(Yet Another Resource Negotiator),它是Hadoop 2.x版本引入的资源管理系统,取代了最初的JobTracker,优化了资源调度和作业管理。Zookeeper则是Hadoop生态中的协调服务,...

    VMware下完全分布式Hadoop集群安装笔记

    ### VMware下完全分布式Hadoop集群安装笔记 #### 一、准备工作与环境搭建 **1. 安装VMware** 在开始之前,首先需要一个虚拟化平台来模拟多台计算机之间的交互,这里选择的是VMware。根据您的操作系统选择合适的...

    hadoop和yarn原理笔记.docx

    本篇笔记将深入探讨Hadoop和YARN的基本原理以及它们在大数据处理中的作用。 首先,大数据的特点可以用4V来概括:大量(Volume)、高速(Velocity)、多样(Variety)和低价值密度(Value)。由于数据量巨大,传统的...

Global site tag (gtag.js) - Google Analytics