`

(转)【Hadoop代码笔记】Hadoop作业提交之客户端作业提交

 
阅读更多

1.      概要描述
仅仅描述向Hadoop提交作业的第一步,即调用Jobclient的submitJob方法,向Hadoop提交作业。

2.      详细描述
Jobclient使用内置的JobSubmissionProtocol 实例jobSubmitClient 和JobTracker交互,最主要是提交作业、获取作业执行信息等。

在JobClient中作业提交的主要过程如下:

1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormatcheckOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
5)计算作业的输入分片。通过InputFormat的getSplits(job)方法获得作业的split并将split序列化封装为RawSplit。返回split数目,也即代表有多个分片有多少个map。详细参见InputFormat获取Split的方法。
6)writeNewSplits 方法把输入分片写到JobTracker的job目录下。
7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
8) 使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。 JobTracker作业放入到内存队列中,由作业调度器进行调度。并初始化作业实例。JobTracker创建job成功后会给JobClient传回 一个JobStatus对象 用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个 JobStatus对象创建一个 NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。

  

引用下Hadoop: The Definitive Guide, Second Edition中的一张经典图。这里仅仅描述上图中的左上角第一个框部分内容,即本步骤的最终输出仅仅是将作业提交到JobTracker。其他后续文章会继续描述。

3.      涉及主要类介绍:

Jobclient :JobClient是向JobTracker提交作业的接口,可以理解为Hadoop的Mapreduce作业框架向用户开放的作业提交入口。可以提交作业,监视作业状态等

JobSubmissionProtocol(为什么0.20.1的javadoc中找不到这个接口,虽然0.20.1 0.20.2代码中都是相同的用法,知道2.2.0貌似重命名为被ClientProtocol替换):JobClient和JobTracker进行通 信的一个协议。JobClient实际上是用这个句柄来提交锁业并且监视作业的执行状况。

这个接口有两个实现:LocalJobRunner(conf)当mapred-site.xml中的mapred.job.tracker值为local是为此对象。表示在单机上执行;如果为一个地址的话则是 JobTracker的对象,表示分布式执 行。

详细可参照JobClient中 的初始化代码:

复制代码
  /**
   *如果是非local的就会 连接到指定的JobTracker  
   */
  public void init(JobConf conf) throws IOException {
    String tracker = conf.get("mapred.job.tracker", "local");
    if ("local".equals(tracker)) {
      this.jobSubmitClient = new LocalJobRunner(conf);
    } else {
      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
    }        
  }

 /*
  * RPC不是本次主题重点,可参照后续发表的专题内容
  */
  private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
      Configuration conf) throws IOException {
    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
        JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
  }
复制代码

 

InputFormat 重要,但暂不展开(此处会有链接)

Split  重要,但暂不展开(此处会有链接)

RowSplit 重要,但暂不展开(此处会有链接)

 4.     主要代码
通过代码来了解流程,了解如何调用JobClient向Hadoop集群提交作业。

复制代码
  public RunningJob submitJob(JobConf job) throws FileNotFoundException,
                                                  IOException {
    try {
      return submitJobInternal(job);
    } catch (InterruptedException ie) {
      throw new IOException("interrupted", ie);
    } catch (ClassNotFoundException cnfe) {
      throw new IOException("class not found", cnfe);
    }
  }
复制代码

实际方法的执行是submitJobInternal方法。着重看下这个方法的内部执行。主要的逻辑部分比较详细的进行了注释。(有些想继续展开,感觉太细了,后面的文章中部分重要的会有涉及,不想深度遍历了,到时会回过头来互相链接) 

复制代码
 1 public RunningJob submitJobInternal(JobConf job)
 2             throws FileNotFoundException, ClassNotFoundException,
 3             InterruptedException, IOException {
 4 
 5         // 1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
 6         JobID jobId = jobSubmitClient.getNewJobId();
 7         // 2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
 8         // 3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
 9 
10         Path submitJobDir = new Path(getSystemDir(), jobId.toString());
11         Path submitJarFile = new Path(submitJobDir, "job.jar");
12         Path submitSplitFile = new Path(submitJobDir, "job.split");
13         configureCommandLineOptions(job, submitJobDir, submitJarFile);
14         Path submitJobFile = new Path(submitJobDir, "job.xml");
15         int reduces = job.getNumReduceTasks();
16         JobContext context = new JobContext(job, jobId);
17 
18         // Check the output specification
19         // 4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
20 
21         if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
22             org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
23                     .newInstance(context.getOutputFormatClass(), job);
24             output.checkOutputSpecs(context);
25         } else {
26             job.getOutputFormat().checkOutputSpecs(fs, job);
27         }
28 
29         // 5)计算作业的输入分片。详细参见FormatInputFormat获取Split的方法。
30         // 6)writeNewSplits 方法把输入分片写到JobTracker的job目录下,名称是submitSplitFile
31         // job.split名称。
32         // 7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
33 
34         // Create the splits for the job
35         LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
36         int maps;
37         if (job.getUseNewMapper()) {
38             maps = writeNewSplits(context, submitSplitFile);
39         } else {
40             maps = writeOldSplits(job, submitSplitFile);
41         }
42         job.set("mapred.job.split.file", submitSplitFile.toString());
43         job.setNumMapTasks(maps);
44 
45         // Write job file to JobTracker's fs
46         FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
47                 new FsPermission(JOB_FILE_PERMISSION));
48 
49         try {
50             job.writeXml(out);
51         } finally {
52             out.close();
53         }
54 
55         // 8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker根据接收到的submitJob()方法调用后,把调用放入到内存队列中,由作业调度器进行调度。并初始化作业实例。
56 
57         JobStatus status = jobSubmitClient.submitJob(jobId);
58         if (status != null) {
59             return new NetworkedJob(status);
60         } else {
61             throw new IOException("Could not launch job");
62         }
63     }
复制代码

 

复制代码
 /**
   * JobTracker.submitJob() kicks off a new job.  
   *
   * Create a 'JobInProgress' object, which contains both JobProfile
   * and JobStatus.  Those two sub-objects are sometimes shipped outside
   * of the JobTracker.  But JobInProgress adds info that's useful for
   * the JobTracker alone.
   */
  public synchronized JobStatus submitJob(JobID jobId) throws IOException {
    if(jobs.containsKey(jobId)) {
      //job already running, don't start twice
      return jobs.get(jobId).getStatus();
    }
    
    JobInProgress job = new JobInProgress(jobId, this, this.conf);
    
    String queue = job.getProfile().getQueueName();
    if(!(queueManager.getQueues().contains(queue))) {      
      new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId));
      throw new IOException("Queue \"" + queue + "\" does not exist");        
    }

    // check for access
    try {
      checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
    } catch (IOException ioe) {
       LOG.warn("Access denied for user " + job.getJobConf().getUser() 
                + ". Ignoring job " + jobId, ioe);
      new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId));
      throw ioe;
    }

   return addJob(jobId, job); 
  }
复制代码

 

为了转载内容的一致性、可追溯性和保证及时更新纠错,转载时请注明来自:http://www.cnblogs.com/douba/p/hadoop_jobclient_submit.html。谢谢!

分享到:
评论

相关推荐

    Hadoop学习笔记.pdf

    Hadoop的源码项目结构主要包括hadoop-common-project、hadoop-hdfs-project、hadoop-mapreduce-project、hadoop-yarn-project等,每个项目下又有多个子项目,包含了Hadoop运行所需的各个组件和客户端等。 在实际...

    3.Hadoop学习笔记.pdf

    它特别适合于存储非结构化和半结构化数据,并且能够存储和运行在廉价硬件之上。Hadoop具有高可靠性、高扩展性和高吞吐率的特点,因此它成为了处理大数据的理想平台。 Hadoop的核心组成部分包括: 1. HDFS(Hadoop ...

    hadoop3.x笔记.docx

    Hadoop 3.x 笔记 Hadoop 是一个基于分布式存储的大数据处理框架,本文档将详细介绍 Hadoop 3.x 的配置和底层原理,从零搭建集群以及解决遇到的问题,通过图形化的方式更好地理解 Hadoop 的作用。 一、HDFS 组成 ...

    hadoop 笔记

    ### Hadoop基础知识与实战应用详解 #### 一、Hadoop概览 **1.1 什么是Hadoop?** Hadoop是由Apache Software Foundation所维护的一个开源软件框架,它为大规模数据处理提供了高效、可靠且可扩展的支持。Hadoop的...

    Hadoop HA搭建笔记和配置文件

    10. **测试与监控**:通过写入和读取数据到HDFS,以及提交MapReduce作业,验证HA功能是否正常。同时,应定期检查日志和监控系统,确保NameNode和ResourceManager的健康状态。 在提供的压缩包文件中,"HDP HAģʽ...

    Hadoop云计算2.0笔记第一课Hadoop介绍

    Hadoop 云计算 2.0 笔记第一课 Hadoop 介绍 Hadoop 云计算 2.0 笔记第一课 Hadoop 介绍中,我们可以了解到 Hadoop 的生态系统特点、Hadoop 生态系统概况、Hadoop 生态系统版本衍化、下一代 Hadoop、Hadoop 学习...

    大数据知识点Hadoop详细笔记

    Hadoop 详细笔记 本知识点总结了 Hadoop 的基本概念、特征、架构和组件,以及 HDFS 的设计和读写流程。 大数据的四大特征 1. Volume(大容量):指的是数据量的规模,通常在 10TB 以上。 2. Variety(多样化):...

    hadoop笔记

    本笔记将深入探讨Hadoop的相关知识点,包括其设计理念、核心组件、工作原理以及实际应用。 一、Hadoop设计理念 Hadoop的设计源于Google的MapReduce论文和GFS(Google文件系统)。它遵循“廉价硬件”和“容错性”的...

    传智黑马赵星老师hadoop七天课程资料笔记-第二天(全)

    标题和描述中提到的是“传智黑马赵星老师hadoop七天课程资料笔记-第二天(全)”,这表明这是一个关于Hadoop技术的深度学习资源,主要聚焦于赵星老师的Hadoop教学课程中的第二天内容。通常,这样的课程会涵盖Hadoop的...

    03.hadoop上课笔记之java编程和hbase

    Hadoop 上课笔记之 Java 编程和 HBase Hadoop 是一个分布式计算框架,HBase 是基于 Hadoop 的一个分布式数据库系统。下面是 Java 编程和 HBase 相关的知识点: 一、HBase 主要类和接口 1. Admin 类:用于建立...

    IT十八掌徐培成HADOOP笔记

    ### IT十八掌徐培成HADOOP笔记解析 #### Hadoop概述 Hadoop是一个开源的分布式计算框架,它能够高效地处理大型数据集,并通过在集群中的多台计算机上分配数据来实现高度的并行性。Hadoop的核心组成部分包括HDFS...

    hadoop学习笔记.pdf

    【大数据与Hadoop简介】 大数据是指在传统数据处理技术无法有效处理的海量、高增长率和多样性的信息资产。随着互联网的迅速发展,人们在日常生活、工作中产生的数据量呈现出爆炸式增长,这使得需要新的技术来管理和...

    传智黑马赵星老师hadoop七天课程资料笔记-第四天(全)

    1. **分布式文件系统(HDFS)**:Hadoop的核心组件之一,HDFS为大数据存储提供了基础。它将大文件分割成块并分布在多台机器上,确保高可用性和容错性。在"读取数据流程.png"中,我们可以看到数据从客户端经过...

    Hadoop之HBase学习笔记

    【标题】"Hadoop之HBase学习笔记"主要聚焦于Hadoop生态中的分布式数据库HBase。HBase是一个基于Google Bigtable理念设计的开源NoSQL数据库,它运行在Hadoop之上,提供高性能、高可靠性以及可水平扩展的数据存储能力...

    Hadoop学习笔记

    【Hadoop学习笔记】 Hadoop 是一个开源框架,主要用于处理和存储大数据。它源自于解决互联网公司面临的海量数据处理问题,特别是Google发布的三篇技术论文,即GFS(Google File System)、MapReduce以及BigTable。...

    Hadoop数据分析平台学习笔记

    - **JobTracker**:它是MapReduce的主控节点,负责接收客户端提交的作业,并调度这些作业到TaskTrackers上执行。 - **TaskTracker**:这些节点执行实际的Map和Reduce任务,并将结果返回给JobTracker。 #### 四、...

    hadoop笔记1

    【Hadoop笔记1】 在大数据处理领域,Hadoop是一个至关重要的开源框架,它为分布式存储和计算提供了强大的支持。这篇笔记主要围绕Hadoop的核心组件、工作原理以及如何使用Hadoop进行数据处理进行深入探讨。 一、...

    Hadoop阶段初识学习笔记

    1. **HDFS(Hadoop Distributed File System)**:用于存储海量数据,采用Master/Slave架构,其中NameNode作为主节点管理文件系统的命名空间和客户端对文件的访问,DataNodes作为从节点负责存储实际的数据块。...

Global site tag (gtag.js) - Google Analytics