一、概要描述
在上一篇博文中主要描述了JobTracker接收作业的几个服务(或功能)模块的初始化过程。本节将介绍这些服务(或功能)是如何接收到提交的job。本来作业的初始化也可以在本节内描述,但是涉及到JobInProgress的初始化过程放在一张图上太拥挤,就分开到下一篇文章中描述。
二、 流程描述
1. JobClient通过RPC的方式向JobTracker提交作业;
2. 调用JobTracker的submitJob方法。该方法是JobTracker向外提供的供调用的提交作业的接口。
3. submit方法中调用JobTracker的addJob方法。
4. 在addJob方法中会把作业加入到集合中供调度,并会触发注册的JobInProgressListener的jobAdded事件。由上篇博文的jobtracker相关服务和功能的初始化的FairScheduler的start方法中看到,这里注册的是两个JobInProgressListener。分别是FairScheduler的内部类JobListener和EagerTaskInitializationListener。
5. FairScheduler的内部类JobListener响应jobAdded事件事件
6. EagerTaskInitializationListener响应jobAdded事件事件
三、代码详细
1. JobClient的submitJob方法,调用submitJobInternal方法。
主要流程:
1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID
2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。
3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile
4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException
5)计算作业的输入分片。通过InputFormat的getSplits(job)方法获得作业的split并将split序列化封装为RawSplit。返回split数目,也即代表有多个分片有多少个map。详细参见InputFormat获取Split的方法。
6)writeNewSplits 方法把输入分片写到JobTracker的job目录下。
7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。
8) 使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。 JobTracker作业放入到内存队列中,由作业调度器进行调度。并初始化作业实例。JobTracker创建job成功后会给JobClient传回 一个JobStatus对象 用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个 JobStatus对象创建一个 NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。
1 public RunningJob submitJobInternal(JobConf job) 2 throws FileNotFoundException, ClassNotFoundException, 3 InterruptedException, IOException { 4 5 // 1)通过调用JobTracker的getNewJobId()向jobtracker请求一个新的作业ID 6 JobID jobId = jobSubmitClient.getNewJobId(); 7 // 2)获取job的jar、输入分片、作业描述等几个路径信息,以jobId命名。 8 // 3)其中getSystemDir()是返回jobtracker的系统目录,来放置job相关的文件。包括:mapreduce的jar文件submitJarFile、分片文件submitSplitFile、作业描述文件submitJobFile 9 10 Path submitJobDir = new Path(getSystemDir(), jobId.toString()); 11 Path submitJarFile = new Path(submitJobDir, "job.jar"); 12 Path submitSplitFile = new Path(submitJobDir, "job.split"); 13 configureCommandLineOptions(job, submitJobDir, submitJarFile); 14 Path submitJobFile = new Path(submitJobDir, "job.xml"); 15 int reduces = job.getNumReduceTasks(); 16 JobContext context = new JobContext(job, jobId); 17 18 // Check the output specification 19 // 4)检查作业的输出说明,如果没有指定输出目录或输出目录以及存在,则作业不提交。参照org.apache.hadoop.mapreduce.lib.output.FileOutputFormat的checkOutputSpecs方法。如果没有指定,则抛出InvalidJobConfException,文件已经存在则抛出FileAlreadyExistsException 20 21 if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) { 22 org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils 23 .newInstance(context.getOutputFormatClass(), job); 24 output.checkOutputSpecs(context); 25 } else { 26 job.getOutputFormat().checkOutputSpecs(fs, job); 27 } 28 29 // 5)计算作业的输入分片。详细参见FormatInputFormat获取Split的方法。 30 // 6)writeNewSplits 方法把输入分片写到JobTracker的job目录下,名称是submitSplitFile 31 // job.split名称。 32 // 7)将运行作业所需的资源(包括作业jar文件,配置文件和计算所得的输入分片)复制到jobtracker的文件系统中一个以作业ID命名的目录下。 33 34 // Create the splits for the job 35 LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile)); 36 int maps; 37 if (job.getUseNewMapper()) { 38 maps = writeNewSplits(context, submitSplitFile); 39 } else { 40 maps = writeOldSplits(job, submitSplitFile); 41 } 42 job.set("mapred.job.split.file", submitSplitFile.toString()); 43 job.setNumMapTasks(maps); 44 45 // Write job file to JobTracker's fs 46 FSDataOutputStream out = FileSystem.create(fs, submitJobFile, 47 new FsPermission(JOB_FILE_PERMISSION)); 48 49 try { 50 job.writeXml(out); 51 } finally { 52 out.close(); 53 } 54 55 // 8)使用句柄JobSubmissionProtocol通过RPC远程调用的submitJob()方法,向JobTracker提交作业。JobTracker根据接收到的submitJob()方法调用后,把调用放入到内存队列中,由作业调度器进行调度。并初始化作业实例。 56 57 JobStatus status = jobSubmitClient.submitJob(jobId); 58 if (status != null) { 59 return new NetworkedJob(status); 60 } else { 61 throw new IOException("Could not launch job"); 62 } 63 }
2. JobTracker的submitJob方法,是JobTracker向外提供的供调用的提交作业的接口。
public synchronized JobStatus submitJob(JobID jobId) throws IOException { if(jobs.containsKey(jobId)) { //检查Job已经存在,则仅仅返回其status return jobs.get(jobId).getStatus(); } //不存在,则创建该job 的JobInProgress 实例, JobInProgress job = new JobInProgress(jobId, this, this.conf); String queue = job.getProfile().getQueueName(); new CleanupQueue().addToQueue(conf,getSystemDirectoryForJob(jobId)); } // check for access checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB); // 检查内存是否够用 checkMemoryRequirements(job); return addJob(jobId, job); }
3. JobTracker的addJob方法,把作业加入到集合中供调度。其中jobs 是Map<JobID, JobInProgress>类型,维护着加入进来的JobInProgress job。
private synchronized JobStatus addJob(JobID jobId, JobInProgress job) { totalSubmissions++; synchronized (jobs) { synchronized (taskScheduler) { //将job实例加入到Map<JobID, JobInProgress> jobs 集合中, jobs.put(job.getProfile().getJobID(), job); //并触发所有注册的JobInProgressListener,通知其一个新Job添加进来了,让各个Listener响应各自的动作。 for (JobInProgressListener listener : jobInProgressListeners) { try { listener.jobAdded(job); } catch (IOException ioe) { LOG.warn("Failed to add and so skipping the job : " + job.getJobID() + ". Exception : " + ioe); } } } } myInstrumentation.submitJob(job.getJobConf(), jobId); return job.getStatus(); }
4.FairScheduler.JobListener的jobAdded方法。jobAdded方法是JobInProgressListener中定义的在JobTracker中响应job变化的方法。在这个方法中,只是为每个加入的Job创建一个用于FairScheduler调度用的JobInfo对象,并将其和job的对应的存储在Map<JobInProgress, JobInfo> infos集合中。
@Override public void jobAdded(JobInProgress job) { synchronized (FairScheduler.this) { poolMgr.addJob(job); JobInfo info = new JobInfo(); infos.put(job, info); update(); } }
5. EagerTaskInitializationListener的jobAdded方法。 这个方法其实在前面文章中介绍过,在EagerTaskInitializationListener中,jobAdded 只是简单的把job加入到一个List<JobInProgress>类型的 jobInitQueue中。并不直接对其进行初始化,对其中的job的处理由另外线程JobInitManager来做。该线程,一直检查 jobInitQueue是否有作业,有则拿出来从线程池中取一个线程InitJob处理。关于作业的初始化过程专门在下一篇文章中介绍。
@Override public void jobAdded(JobInProgress job) { synchronized (jobInitQueue) { jobInitQueue.add(job); resortInitQueue(); jobInitQueue.notifyAll(); } }
完。
为了转载内容的一致性、可追溯性和保证及时更新纠错,转载时请注明来自:http://www.cnblogs.com/douba/p/hadoop_mapreduce_jobadded.html。谢谢!
相关推荐
【HADOOP学习笔记】 Hadoop是Apache基金会开发的一个开源分布式计算框架,是云计算领域的重要组成部分,尤其在大数据处理方面有着广泛的应用。本学习笔记将深入探讨Hadoop的核心组件、架构以及如何搭建云计算平台。...
为了克服NameNode和JobTracker的单点故障问题,Hadoop引入了Secondary NameNode和ResourceManager等组件,这些组件可以备份关键数据并在主节点发生故障时接管任务。 **7.3 经验总结** - 在搭建Hadoop集群时,需要...
MapReduce 的架构中,JobTracker 是作业跟踪器,负责管理所有作业处理,包括将作业分解成一系列任务,将任务指派给 TaskTracker,监控任务状态,决定哪些文件参与,然后切割 task 病分配节点。TaskTracker 是任务...
【大数据云计算技术 Hadoop运维笔记】的PPT涵盖了Hadoop在蓝汛公司的应用实践,以及Cloudera的产品和运维经验。以下是对其中知识点的详细解释: 1. **Hadoop在蓝汛的应用**: - 蓝汛使用了6000台设备,300个集群,...
《Hadoop学习笔记》 Hadoop,作为大数据处理的核心框架,是开源社区的杰作,由Apache软件基金会维护。这份文档旨在深入解析Hadoop的基本概念、架构及其在大数据处理中的应用,帮助读者全面掌握这一重要技术。 一、...
5. **资源调度**:YARN作为独立的资源调度器,解决了Hadoop 1.x中MapReduce和JobTracker的耦合问题。ResourceManager负责接收应用的资源请求,根据节点状态分配容器(container),并监控应用运行状态。NodeManager...
妳那伊抹微笑自己整理的Hadoop笔记,有以下内容: Day1 搭建伪分布实验环境 Day2 介绍HDFS体系结构及shell、java操作方式 Day3 介绍MapReduce体系结构(1) Day4 介绍MapReduce体系结构(2) Day5 介绍Hadoop集群、...
【Hadoop集群环境搭建】 Hadoop是一个开源的分布式计算框架,它允许在大规模集群中运行应用程序,处理海量数据。在本文中,我们将详细介绍如何搭建一个Hadoop集群环境,包括必要的步骤和配置。首先,我们需要准备...
JobTracker(在旧版 Hadoop 中)负责作业调度和任务监控,而 TaskTracker 负责任务的执行和心跳报告。然而,这种架构存在单点故障、资源管理不精细以及内存消耗高等问题。 为了解决这些问题,Hadoop 引入了 YARN,...
2. JobTracker与TaskTracker:在旧版Hadoop中,JobTracker负责任务调度和资源管理,TaskTracker接收并执行任务。每个Map任务和Reduce任务在TaskTracker上运行。 3. YARN(Yet Another Resource Negotiator):为了...
### Hadoop完全分布式集群搭建详解 #### 一、概述 Hadoop是一款开源的大数据处理框架,主要用于处理海量数据。为了实现高效的数据处理能力,Hadoop通常会在多台计算机上搭建分布式集群。本文将详细介绍如何搭建一...
- **JobTracker**:它是MapReduce的主控节点,负责接收客户端提交的作业,并调度这些作业到TaskTrackers上执行。 - **TaskTracker**:这些节点执行实际的Map和Reduce任务,并将结果返回给JobTracker。 #### 四、...
【Hadoop笔记1】 在大数据处理领域,Hadoop是一个至关重要的开源框架,它为分布式存储和计算提供了强大的支持。这篇笔记主要围绕Hadoop的核心组件、工作原理以及如何使用Hadoop进行数据处理进行深入探讨。 一、...
第四章可能涉及Hadoop的其他组件,如YARN(Yet Another Resource Negotiator),它是Hadoop 2.x版本引入的资源管理系统,取代了最初的JobTracker,优化了资源调度和作业管理。Zookeeper则是Hadoop生态中的协调服务,...
### VMware下完全分布式Hadoop集群安装笔记 #### 一、准备工作与环境搭建 **1. 安装VMware** 在开始之前,首先需要一个虚拟化平台来模拟多台计算机之间的交互,这里选择的是VMware。根据您的操作系统选择合适的...
本篇笔记将深入探讨Hadoop和YARN的基本原理以及它们在大数据处理中的作用。 首先,大数据的特点可以用4V来概括:大量(Volume)、高速(Velocity)、多样(Variety)和低价值密度(Value)。由于数据量巨大,传统的...