1. 概要描述
仅仅描述向Hadoop提交作业的第一步,即调用Jobclient的submitJob方法,向Hadoop提交作业。
2. 详细描述
Jobclient使用内置的JobSubmissionProtocol 实例jobSubmitClient 和JobTracker交互,最主要是提交作业、获取作业执行信息等。
在JobClient中作业提交的主要过程如下:
1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
5)计算作业的输入分片。通过InputFormat的getSplits(job)方法获得作业的split并将split序列化封装为RawSplit。返回split数目,也即代表有多个分片有多少个map。详细参见InputFormat获取Split的方法。
6)writeNewSplits 方法把输入分片写到JobTracker的job目录下。
7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
8) 使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。 JobTracker作业放入到内存队列中,由作业调度器进行调度。并初始化作业实例。JobTracker创建job成功后会给JobClient传回 一个JobStatus对象 用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个 JobStatus对象创建一个 NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。
引用下Hadoop: The Definitive Guide, Second Edition中的一张经典图。这里仅仅描述上图中的左上角第一个框部分内容,即本步骤的最终输出仅仅是将作业提交到JobTracker。其他后续文章会继续描述。
3. 涉及主要类介绍:
Jobclient :JobClient是向JobTracker提交作业的接口,可以理解为Hadoop的Mapreduce作业框架向用户开放的作业提交入口。可以提交作业,监视作业状态等
JobSubmissionProtocol(为什么0.20.1的javadoc中找不到这个接口,虽然0.20.1 0.20.2代码中都是相同的用法,知道2.2.0貌似重命名为被ClientProtocol替换):JobClient和JobTracker进行通 信的一个协议。JobClient实际上是用这个句柄来提交锁业并且监视作业的执行状况。
这个接口有两个实现:LocalJobRunner(conf)当mapred-site.xml中的mapred.job.tracker值为local是为此对象。表示在单机上执行;如果为一个地址的话则是 JobTracker的对象,表示分布式执 行。
详细可参照JobClient中 的初始化代码:
/** *如果是非local的就会 连接到指定的JobTracker */ public void init(JobConf conf) throws IOException { String tracker = conf.get("mapred.job.tracker", "local"); if ("local".equals(tracker)) { this.jobSubmitClient = new LocalJobRunner(conf); } else { this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf); } } /* * RPC不是本次主题重点,可参照后续发表的专题内容 */ private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, Configuration conf) throws IOException { return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, JobSubmissionProtocol.versionID, addr, getUGI(conf), conf, NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class)); }
InputFormat 重要,但暂不展开(此处会有链接)
Split 重要,但暂不展开(此处会有链接)
RowSplit 重要,但暂不展开(此处会有链接)
4. 主要代码
通过代码来了解流程,了解如何调用JobClient向Hadoop集群提交作业。
public RunningJob submitJob(JobConf job) throws FileNotFoundException, IOException { try { return submitJobInternal(job); } catch (InterruptedException ie) { throw new IOException("interrupted", ie); } catch (ClassNotFoundException cnfe) { throw new IOException("class not found", cnfe); } }
实际方法的执行是submitJobInternal方法。着重看下这个方法的内部执行。主要的逻辑部分比较详细的进行了注释。(有些想继续展开,感觉太细了,后面的文章中部分重要的会有涉及,不想深度遍历了,到时会回过头来互相链接)
1 public RunningJob submitJobInternal(JobConf job) 2 throws FileNotFoundException, ClassNotFoundException, 3 InterruptedException, IOException { 4 5 // 1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID 6 JobID jobId = jobSubmitClient.getNewJobId(); 7 // 2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。 8 // 3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile 9 10 Path submitJobDir = new Path(getSystemDir(), jobId.toString()); 11 Path submitJarFile = new Path(submitJobDir, "job.jar"); 12 Path submitSplitFile = new Path(submitJobDir, "job.split"); 13 configureCommandLineOptions(job, submitJobDir, submitJarFile); 14 Path submitJobFile = new Path(submitJobDir, "job.xml"); 15 int reduces = job.getNumReduceTasks(); 16 JobContext context = new JobContext(job, jobId); 17 18 // Check the output specification 19 // 4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException 20 21 if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) { 22 org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils 23 .newInstance(context.getOutputFormatClass(), job); 24 output.checkOutputSpecs(context); 25 } else { 26 job.getOutputFormat().checkOutputSpecs(fs, job); 27 } 28 29 // 5)计算作业的输入分片。详细参见FormatInputFormat获取Split的方法。 30 // 6)writeNewSplits 方法把输入分片写到JobTracker的job目录下,名称是submitSplitFile 31 // job.split名称。 32 // 7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。 33 34 // Create the splits for the job 35 LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); 36 int maps; 37 if (job.getUseNewMapper()) { 38 maps = writeNewSplits(context, submitSplitFile); 39 } else { 40 maps = writeOldSplits(job, submitSplitFile); 41 } 42 job.set("mapred.job.split.file", submitSplitFile.toString()); 43 job.setNumMapTasks(maps); 44 45 // Write job file to JobTracker's fs 46 FSDataOutputStream out = FileSystem.create(fs, submitJobFile, 47 new FsPermission(JOB_FILE_PERMISSION)); 48 49 try { 50 job.writeXml(out); 51 } finally { 52 out.close(); 53 } 54 55 // 8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker根据接收到的submitJob()方法调用后,把调用放入到内存队列中,由作业调度器进行调度。并初始化作业实例。 56 57 JobStatus status = jobSubmitClient.submitJob(jobId); 58 if (status != null) { 59 return new NetworkedJob(status); 60 } else { 61 throw new IOException("Could not launch job"); 62 } 63 }
/** * JobTracker.submitJob() kicks off a new job. * * Create a 'JobInProgress' object, which contains both JobProfile * and JobStatus. Those two sub-objects are sometimes shipped outside * of the JobTracker. But JobInProgress adds info that's useful for * the JobTracker alone. */ public synchronized JobStatus submitJob(JobID jobId) throws IOException { if(jobs.containsKey(jobId)) { //job already running, don't start twice return jobs.get(jobId).getStatus(); } JobInProgress job = new JobInProgress(jobId, this, this.conf); String queue = job.getProfile().getQueueName(); if(!(queueManager.getQueues().contains(queue))) { new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId)); throw new IOException("Queue \"" + queue + "\" does not exist"); } // check for access try { checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB); } catch (IOException ioe) { LOG.warn("Access denied for user " + job.getJobConf().getUser() + ". Ignoring job " + jobId, ioe); new CleanupQueue().addToQueue(conf, getSystemDirectoryForJob(jobId)); throw ioe; } return addJob(jobId, job); }
为了转载内容的一致性、可追溯性和保证及时更新纠错,转载时请注明来自:http://www.cnblogs.com/douba/p/hadoop_jobclient_submit.html。谢谢!
相关推荐
Hadoop的源码项目结构主要包括hadoop-common-project、hadoop-hdfs-project、hadoop-mapreduce-project、hadoop-yarn-project等,每个项目下又有多个子项目,包含了Hadoop运行所需的各个组件和客户端等。 在实际...
它特别适合于存储非结构化和半结构化数据,并且能够存储和运行在廉价硬件之上。Hadoop具有高可靠性、高扩展性和高吞吐率的特点,因此它成为了处理大数据的理想平台。 Hadoop的核心组成部分包括: 1. HDFS(Hadoop ...
Hadoop 3.x 笔记 Hadoop 是一个基于分布式存储的大数据处理框架,本文档将详细介绍 Hadoop 3.x 的配置和底层原理,从零搭建集群以及解决遇到的问题,通过图形化的方式更好地理解 Hadoop 的作用。 一、HDFS 组成 ...
### Hadoop基础知识与实战应用详解 #### 一、Hadoop概览 **1.1 什么是Hadoop?** Hadoop是由Apache Software Foundation所维护的一个开源软件框架,它为大规模数据处理提供了高效、可靠且可扩展的支持。Hadoop的...
10. **测试与监控**:通过写入和读取数据到HDFS,以及提交MapReduce作业,验证HA功能是否正常。同时,应定期检查日志和监控系统,确保NameNode和ResourceManager的健康状态。 在提供的压缩包文件中,"HDP HAģʽ...
Hadoop 云计算 2.0 笔记第一课 Hadoop 介绍 Hadoop 云计算 2.0 笔记第一课 Hadoop 介绍中,我们可以了解到 Hadoop 的生态系统特点、Hadoop 生态系统概况、Hadoop 生态系统版本衍化、下一代 Hadoop、Hadoop 学习...
Hadoop 详细笔记 本知识点总结了 Hadoop 的基本概念、特征、架构和组件,以及 HDFS 的设计和读写流程。 大数据的四大特征 1. Volume(大容量):指的是数据量的规模,通常在 10TB 以上。 2. Variety(多样化):...
本笔记将深入探讨Hadoop的相关知识点,包括其设计理念、核心组件、工作原理以及实际应用。 一、Hadoop设计理念 Hadoop的设计源于Google的MapReduce论文和GFS(Google文件系统)。它遵循“廉价硬件”和“容错性”的...
标题和描述中提到的是“传智黑马赵星老师hadoop七天课程资料笔记-第二天(全)”,这表明这是一个关于Hadoop技术的深度学习资源,主要聚焦于赵星老师的Hadoop教学课程中的第二天内容。通常,这样的课程会涵盖Hadoop的...
Hadoop 上课笔记之 Java 编程和 HBase Hadoop 是一个分布式计算框架,HBase 是基于 Hadoop 的一个分布式数据库系统。下面是 Java 编程和 HBase 相关的知识点: 一、HBase 主要类和接口 1. Admin 类:用于建立...
### IT十八掌徐培成HADOOP笔记解析 #### Hadoop概述 Hadoop是一个开源的分布式计算框架,它能够高效地处理大型数据集,并通过在集群中的多台计算机上分配数据来实现高度的并行性。Hadoop的核心组成部分包括HDFS...
【大数据与Hadoop简介】 大数据是指在传统数据处理技术无法有效处理的海量、高增长率和多样性的信息资产。随着互联网的迅速发展,人们在日常生活、工作中产生的数据量呈现出爆炸式增长,这使得需要新的技术来管理和...
1. **分布式文件系统(HDFS)**:Hadoop的核心组件之一,HDFS为大数据存储提供了基础。它将大文件分割成块并分布在多台机器上,确保高可用性和容错性。在"读取数据流程.png"中,我们可以看到数据从客户端经过...
【标题】"Hadoop之HBase学习笔记"主要聚焦于Hadoop生态中的分布式数据库HBase。HBase是一个基于Google Bigtable理念设计的开源NoSQL数据库,它运行在Hadoop之上,提供高性能、高可靠性以及可水平扩展的数据存储能力...
【Hadoop学习笔记】 Hadoop 是一个开源框架,主要用于处理和存储大数据。它源自于解决互联网公司面临的海量数据处理问题,特别是Google发布的三篇技术论文,即GFS(Google File System)、MapReduce以及BigTable。...
- **JobTracker**:它是MapReduce的主控节点,负责接收客户端提交的作业,并调度这些作业到TaskTrackers上执行。 - **TaskTracker**:这些节点执行实际的Map和Reduce任务,并将结果返回给JobTracker。 #### 四、...
【Hadoop笔记1】 在大数据处理领域,Hadoop是一个至关重要的开源框架,它为分布式存储和计算提供了强大的支持。这篇笔记主要围绕Hadoop的核心组件、工作原理以及如何使用Hadoop进行数据处理进行深入探讨。 一、...
1. **HDFS(Hadoop Distributed File System)**:用于存储海量数据,采用Master/Slave架构,其中NameNode作为主节点管理文件系统的命名空间和客户端对文件的访问,DataNodes作为从节点负责存储实际的数据块。...