- 浏览: 52864 次
- 性别:
文章分类
最新评论
首先看看Hadoop ssh 脚本
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
任务递交。
WordCount 里面有一句话:
1.job.waitForCompletion:一般情况下我们提交一个job都是通过job.waitForCompletion方法提交,该方法内部会调用job.submit()方法
2.job.submit():在submit中会调用setUseNewAPI(),setUseNewAPI()这个方法主要是判断是使用新的api还是旧的api,之后会调用connect()方法,该方法主要是实例化jobClient,然后会调用jobClient.submitJobInternal(conf)这个方法进行job的提交
3.jobClient.submitJobInternal():这个方法会将job运行时所需的所有文件上传到jobTarcker文件系统(一般是hdfs)中,同时进行备份(备份数默认是10,通过mapred.submit.replication变量可以设置),这个方法需要深入进行解读。
4.JobSubmissionFiles.getStagingDir:这个方法是在jobClient.submitJobInternal()最先调用的,这个方法主要是获取一个job提交的根目录,主要是通过Path stagingArea = client.getStagingAreaDir();方法获得,这个方法最终会调用jobTracker.getStagingAreaDirInternal()方法,代码如下:
在获取了stagingDir之后会执行JobID jobId = jobSubmitClient.getNewJobId();为job获取一个jobId,然后执行Path submitJobDir = new Path(jobStagingArea, jobId.toString());获得该job提交的路径,也就是在stagingDir目录下建一个以jobId为文件名的目录。有了submitJobDir之后就可以将job运行所需的全部文件上传到对应的目录下了,具体是调用jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)这个方法。
5.jobClient.copyAndConfigureFiles(jobCopy, submitJobDir):这个方法最终调用jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication);这个方法实现文件上传。
6.jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication):这个方法首先获取用户在使用命令执行job的时候所指定的-libjars, -files, -archives文件,对应的conf配置参数是tmpfiles tmpjars tmparchives,这个过程是在ToolRunner.run()的时候进行解析的,当用户指定了这三个参数之后,会将这三个参数对应的文件都上传到hdfs上,下面我们具体看一个参数的处理:tmpfiles(其他两个基本相同)
7.jobClient处理tmpfiles:该方法会将tmpfiles参数值按‘,’分割,然后将每一个文件上传到hdfs,其中如何文件的路径本身就在hdfs中,那么将不进行上传操作,上传操作只针对文件不在hdfs中的文件。调用的方法是:
Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication),
该方法内部使用的是FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job)方法将文件上传至hdfs,注意此处的remoteFs和jtFs,remoteFs就是需上传文件的原始文件系统,jtFs则是jobTracker的文件系统(hdfs)。
在文件上传至hdfs之后,会执行DistributedCache.createSymlink(job)这个方法,这个方法是创建一个别名(好像是这么个名字),这里需要注意的是tmpfiles和tmparchives都会创建别名,而tmpjars则不会,个人认为tmpjars则jar文件,不是用户在job运行期间调用,所以不需要别名,而tmpfiles和tmparchives则在job运行期间用户可能会调用,所以使用别名可以方便用户调用
8.将这三个参数指定的文件上传到hdfs之后,需要将job的jar文件上传到hdfs,名称为submitJobDir/job.jar,使用fs.copyFromLocalFile(originalJarFile, submitJarFile)上传即可。
到这里jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)方法就完成了,期间丢了jobClient.copyAndConfigureFiles(jobCopy, submitJobDir),TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job),TrackerDistributedCacheManager.getDelegationTokens(job, job.getCredentials())
三个方法,这三个方法是进行一些cached archives and files的校验和保存其时间戳和权限内容
9.继续我们的jobClient.submitJobInternal()方法,这之后会根据我们设置的outputFormat类执行output.checkOutputSpecs(context),进行输出路径的检验,主要是保证输出路径不存在,存在会抛出异常。这之后就是对输入文件进行分片操作了,writeSplits(context, submitJobDir)。
10.jobClient.writeSplits():这个方法内部会根据我们之前判断的使用new-api还是old-api分别进行分片操作,我们只看new-api的分片操作。
11.jobClient.writeNewSplits():这个方法主要是根据我们设置的inputFormat.class通过反射获得inputFormat对象,然后调用inputFormat对象的getSplits方法,当获得分片信息之后调用JobSplitWriter.createSplitFiles方法将分片的信息写入到submitJobDir/job.split文件中。
12.JobSplitWriter.createSplitFiles:这个方法的作用就是讲分片信息写入到submitJobDir/job.split文件中,方法内部调用
JobSplitWriter.writeNewSplits进行写操作
13.JobSplitWriter.writeNewSplits:该方法具体对每一个InputSplit对象进行序列化写入到输出流中,具体每个InputSplit对象写入的信息包括:
split.getClass().getName(),serializer.serialize(split)将整个对象序列化。然后将InputSplit对象的locations信息放入SplitMetaInfo对象中,同时还包括InputSpilt元信息在job.split文件中的偏移量,该InputSplit的长度,再将SplitMetaInfo对象。
然后调用JobSplitWriter.writeJobSplitMetaInfo()方法将SplitMetaInfo对象写入submitJobDir/job.splitmetainfo文件中。
14.JobSplitWriter.writeJobSplitMetaInfo():
将SplitMetaInfo对象写入submitJobDir/job.splitmetainfo文件中,
具体写入的信息包括:JobSplit.META_SPLIT_FILE_HEADER,splitVersion,allSplitMetaInfo.length(SplitMetaInfo对象的个数,一个split对应一个SplitMetaInfo),然后分别将所有的SplitMetaInfo对象序列化到输出流中,
到此文件的分片工作完成。
15.继续回头看jobClient.submitJobInternal()方法:在上一步进行分片操作之后,或返回切片的数目,据此设定map的数量,所以在job中设置的map数量是没有用的。
16.继续往下走:
这三句话是获得job对应的任务队列信息,这里涉及到hadoop的作业调度内容,就不深入研究了
17.继续:下面就是讲job的配置文件信息(jobConf对象)写入到xml文件中,以便用户查看,具体文件是:submitJobDir/job.xml,通过jobCopy.writeXml(out)方法,
方法比较简单,就是写xml文件。下面就进入到jobTracker提交任务环节了,status = jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials()),
就到这吧,后面下次再慢慢研究。
总结下:在用户提交job之后,第一步主要是jobClient对job进行一些必要的文件上传操作
,主要包括:
1)为job生成一个jobId,然后获得job提交的stagingDir,根据jobId获得submitJobDir,之后所有的job运行时文件豆浆保存在此目录下
2)将用户在命令行通过-libjars, -files, -archives指定的文件上传到jobTracker的文件系统中,并将job.jar上传到hdfs中
3)校验输出路径
4)进行输入文件的分片操作,并将分片信息写入submitJobDir下的相应文件中,有两个文件:job.split以及job.splitmetainfo
5)将job的配置参数(jobConf对象)写入到job.xml文件中。
18. jobTracker进行job的提交过程,还有一个JobSubmissionProtocol的实现是LocalJobRunner,这是本地执行的时候使用的,真正集群运行Job还是使用的jobTracker,所以只看jobTracker类的submitJob
18.1.jobTracker.submitJob():第一句就是checkJobTrackerState()这个是检查jobTracker状态,是否运行中,这里说一句,jobTracker是在hadoop集群启动的时候启动的,也就是在执行start-all或者start-mapred的时候启动,启动的时候会调用JobTracker的main方法,然后在jps的时候就可以看见一个jobTracker的进程了。下面来看一下JobTracker.main()方法。
18.2.JobTracker.main():第一句是JobTracker tracker = startTracker(new JobConf()),这是实例化一个jobTracke实例。
18.3.JobTracker.startTracker():result = new JobTracker(conf, identifier),实例化一个jobTracker对象,在实例化的时候会做很多事,所以还是进去瞅瞅。
18.4.JobTracker.JobTracker():实例化的时候会初始化很多参数,记也记不住,主要看下实例化taskScheduler的内容:
这两句就是根据配置文件设置的taskScheduler类名,
通过反射获得对应的taskScheduler对象,
在实例化的时候虽然不同的TaskScheduler具体操作不一样,但是统一的都会初始化一个JobListener对象,
这个对象就是后面将要监听job的listener。剩下的内容就不说了。
回到JobTracker.startTracker()方法。
5.JobTracker.JobTracker():
在实例化jobTracker之后,
会执行result.taskScheduler.setTaskTrackerManager(result)
,这个就是将jobTracker对象设置给taskScheduler。后面就什么了,现在可以回到main方法了
.JobTracker.main():在实例化jobTracker之后,会调用tracker.offerService()方法,
之后main方法就没什么了,下面看看tracker.offerService()这个方法
只看taskScheduler.start()这个方法,因为这里只是想分析下JobTracker提交job的过程.
18.5 taskScheduler.start():这个方法就是启动TaskScheduler,这个方法不同taskScheduler也不同,但是统一的还是会有一个taskTrackerManager.addJobInProgressListener(jobListener)这个操作,taskTrackerManager就是jobTracker(第5步),这句的意思是为jobTracker添加jobListener,用来监听job的。这句的内部就是调用jobTracker的jobInProgressListeners集合的add(listener)方法。
到这里可以说看完了整个JobTracker的启动过程,虽然很浅显,但是对于后面将要分析的内容,这些就够了。下面来看看job的提交过程,也就是jobTracker的submit()方法。
jobTracker的submit()
一, 第一步是checkSafeMode(),检查是否在安全模式,在安全模式则抛出异常。
然后执行
jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),new Path(jobSubmitDir),生成一个jobInfo对象,jobInfo主要保存job的id,user,jobSubmitDir(也就是job的任务目录,上一篇文章提到)。
接着是判断job是否可被recovered(job失败的时候尝试再次执行),
如果允许的话(默认允许),则将jobInfo对象序列化到job-info文件中。
接着到达最关键的地方,job = new JobInProgress(this, this.conf, jobInfo, 0, ts),
为job实例化一个JobInProgress对象,这个对象将会对job以后的所有情况进行负责,如初始化,执行等。下面看看JobInProgress对象的初始化操作。
二, 这里看下将job.xml下载到本地的操作。然后就是job的队列信息,
默认的队列名是default,Queue queue = this.jobtracker.getQueueManager().getQueue(queueName),这个主要是根据hadoop所使用的taskScheduler有关,具体不研究。剩下的是一些参数的初始化,如map的数目,reduce的数目等。这里还有个设置job的优先级的,默认是normal。this.priority = conf.getJobPriority();this.status.setJobPriority(this.priority);
还有检查taskLimit的操作,就是检查map+reduce的任务数是否超出mapred.jobtracker.maxtasks.per.job设置的值,默认是-1,就是没有限制的意思。回到jobTracker.submit()方法
三, submit():实例化JobInProgress之后,会根据jobProfile获取job的队列信息,
并判断相应的队列是否在运行中,不在则任务失败。然后检查内存情况checkMemoryRequirements(job),再调用taskScheduler的taskScheduler.checkJobSubmission(job)
检查任务提交情况(具体是啥玩意,不太情况)。
接下来就是执行
status = addJob(jobId, job),
为Job设置listener。
addJob():前面说过,在初始化jobTracker的时候会实例化taskScheduler,
然后调用taskScheduler的start()方法,为jobTracker添加JobListener对象,
所以这里的JobInProgressListener对象就是相应的taskScheduler的JobListener,
这里为job添加了JobListener。
==================== JobTracker init job=================
JobTracker.initJob():主要调用job.initTasks(),下面进入到JobInProgress.initTasks()。
JobInProgress.initTasks():为job对象设置优先级setPriority(this.priority),
接着读取分片信息文件获取分片信息,SplitMetaInfoReader.readSplitMetaInfo()这个方就是jobInPorgress用来读取分分片信息的,读取过程与写入过程相对应,
具体还是较简单的。读取了分片信息之后,根据分片数量创建相应数量的mapTask(TaskInProgress对象),接下来会执行nonRunningMapCache = createCache(splits, maxLevel),这个方法是根据每个分片的location信息,
然后根据location的host判断每个host上所有的job,并放入cache中。
接着根据设置的reduce数量新建对应的reduceTask(TaskInProgress对象),
并加入到nonRunningReduces队列中,
并根据mapred.reduce.slowstart.completed.maps(百分比,默认是5%)参数的值计算completedMapsForReduceSlowstart(
分别对应map和reduce task。到此initTask完成,
initTask完成JobTracker的initJob也就差不多完成了,
===================== 参考=========================
1. CSDN blog
2. hadoop官网
3. 董西城的书《Haoop技术内幕》
elif [ "$COMMAND" = "jar" ] ; then
CLASS=org.apache.hadoop.util.RunJar
任务递交。
WordCount 里面有一句话:
System.exit(job.waitForCompletion(true) ? 0 : 1);
1.job.waitForCompletion:一般情况下我们提交一个job都是通过job.waitForCompletion方法提交,该方法内部会调用job.submit()方法
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); } if (verbose) { jobClient.monitorAndPrintJob(conf, info); } else { info.waitForCompletion(); } return isSuccessful(); }
2.job.submit():在submit中会调用setUseNewAPI(),setUseNewAPI()这个方法主要是判断是使用新的api还是旧的api,之后会调用connect()方法,该方法主要是实例化jobClient,然后会调用jobClient.submitJobInternal(conf)这个方法进行job的提交
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); // Connect to the JobTracker and submit the job connect(); info = jobClient.submitJobInternal(conf); super.setJobID(info.getID()); state = JobState.RUNNING; }
3.jobClient.submitJobInternal():这个方法会将job运行时所需的所有文件上传到jobTarcker文件系统(一般是hdfs)中,同时进行备份(备份数默认是10,通过mapred.submit.replication变量可以设置),这个方法需要深入进行解读。
4.JobSubmissionFiles.getStagingDir:这个方法是在jobClient.submitJobInternal()最先调用的,这个方法主要是获取一个job提交的根目录,主要是通过Path stagingArea = client.getStagingAreaDir();方法获得,这个方法最终会调用jobTracker.getStagingAreaDirInternal()方法,代码如下:
private String getStagingAreaDirInternal(String user) throws IOException { final Path stagingRootDir = new Path(conf.get("mapreduce.jobtracker.staging.root.dir", "/tmp/hadoop/mapred/staging")); final FileSystem fs = stagingRootDir.getFileSystem(conf); return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString(); }
在获取了stagingDir之后会执行JobID jobId = jobSubmitClient.getNewJobId();为job获取一个jobId,然后执行Path submitJobDir = new Path(jobStagingArea, jobId.toString());获得该job提交的路径,也就是在stagingDir目录下建一个以jobId为文件名的目录。有了submitJobDir之后就可以将job运行所需的全部文件上传到对应的目录下了,具体是调用jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)这个方法。
5.jobClient.copyAndConfigureFiles(jobCopy, submitJobDir):这个方法最终调用jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication);这个方法实现文件上传。
6.jobClient.copyAndConfigureFiles(job, jobSubmitDir, replication):这个方法首先获取用户在使用命令执行job的时候所指定的-libjars, -files, -archives文件,对应的conf配置参数是tmpfiles tmpjars tmparchives,这个过程是在ToolRunner.run()的时候进行解析的,当用户指定了这三个参数之后,会将这三个参数对应的文件都上传到hdfs上,下面我们具体看一个参数的处理:tmpfiles(其他两个基本相同)
7.jobClient处理tmpfiles:该方法会将tmpfiles参数值按‘,’分割,然后将每一个文件上传到hdfs,其中如何文件的路径本身就在hdfs中,那么将不进行上传操作,上传操作只针对文件不在hdfs中的文件。调用的方法是:
Path newPath = copyRemoteFiles(fs,filesDir, tmp, job, replication),
该方法内部使用的是FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, job)方法将文件上传至hdfs,注意此处的remoteFs和jtFs,remoteFs就是需上传文件的原始文件系统,jtFs则是jobTracker的文件系统(hdfs)。
在文件上传至hdfs之后,会执行DistributedCache.createSymlink(job)这个方法,这个方法是创建一个别名(好像是这么个名字),这里需要注意的是tmpfiles和tmparchives都会创建别名,而tmpjars则不会,个人认为tmpjars则jar文件,不是用户在job运行期间调用,所以不需要别名,而tmpfiles和tmparchives则在job运行期间用户可能会调用,所以使用别名可以方便用户调用
8.将这三个参数指定的文件上传到hdfs之后,需要将job的jar文件上传到hdfs,名称为submitJobDir/job.jar,使用fs.copyFromLocalFile(originalJarFile, submitJarFile)上传即可。
到这里jobClient.copyAndConfigureFiles(jobCopy, submitJobDir)方法就完成了,期间丢了jobClient.copyAndConfigureFiles(jobCopy, submitJobDir),TrackerDistributedCacheManager.determineTimestampsAndCacheVisibilities(job),TrackerDistributedCacheManager.getDelegationTokens(job, job.getCredentials())
三个方法,这三个方法是进行一些cached archives and files的校验和保存其时间戳和权限内容
9.继续我们的jobClient.submitJobInternal()方法,这之后会根据我们设置的outputFormat类执行output.checkOutputSpecs(context),进行输出路径的检验,主要是保证输出路径不存在,存在会抛出异常。这之后就是对输入文件进行分片操作了,writeSplits(context, submitJobDir)。
10.jobClient.writeSplits():这个方法内部会根据我们之前判断的使用new-api还是old-api分别进行分片操作,我们只看new-api的分片操作。
private int writeSplits(org.apache.hadoop.mapreduce.JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { JobConf jConf = (JobConf)job.getConfiguration(); int maps; if (jConf.getUseNewMapper()) { maps = writeNewSplits(job, jobSubmitDir); } else { maps = writeOldSplits(jConf, jobSubmitDir); } return maps; }
11.jobClient.writeNewSplits():这个方法主要是根据我们设置的inputFormat.class通过反射获得inputFormat对象,然后调用inputFormat对象的getSplits方法,当获得分片信息之后调用JobSplitWriter.createSplitFiles方法将分片的信息写入到submitJobDir/job.split文件中。
private <T extends InputSplit> int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = job.getConfiguration(); InputFormat<?, ?> input = ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job); T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest // go first Arrays.sort(array, new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array); return array.length; }
12.JobSplitWriter.createSplitFiles:这个方法的作用就是讲分片信息写入到submitJobDir/job.split文件中,方法内部调用
JobSplitWriter.writeNewSplits进行写操作
13.JobSplitWriter.writeNewSplits:该方法具体对每一个InputSplit对象进行序列化写入到输出流中,具体每个InputSplit对象写入的信息包括:
split.getClass().getName(),serializer.serialize(split)将整个对象序列化。然后将InputSplit对象的locations信息放入SplitMetaInfo对象中,同时还包括InputSpilt元信息在job.split文件中的偏移量,该InputSplit的长度,再将SplitMetaInfo对象。
然后调用JobSplitWriter.writeJobSplitMetaInfo()方法将SplitMetaInfo对象写入submitJobDir/job.splitmetainfo文件中。
14.JobSplitWriter.writeJobSplitMetaInfo():
将SplitMetaInfo对象写入submitJobDir/job.splitmetainfo文件中,
具体写入的信息包括:JobSplit.META_SPLIT_FILE_HEADER,splitVersion,allSplitMetaInfo.length(SplitMetaInfo对象的个数,一个split对应一个SplitMetaInfo),然后分别将所有的SplitMetaInfo对象序列化到输出流中,
到此文件的分片工作完成。
15.继续回头看jobClient.submitJobInternal()方法:在上一步进行分片操作之后,或返回切片的数目,据此设定map的数量,所以在job中设置的map数量是没有用的。
16.继续往下走:
String queue = jobCopy.getQueueName(); AccessControlList acl = jobSubmitClient.getQueueAdmins(queue); jobCopy.set(QueueManager.toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());
这三句话是获得job对应的任务队列信息,这里涉及到hadoop的作业调度内容,就不深入研究了
17.继续:下面就是讲job的配置文件信息(jobConf对象)写入到xml文件中,以便用户查看,具体文件是:submitJobDir/job.xml,通过jobCopy.writeXml(out)方法,
方法比较简单,就是写xml文件。下面就进入到jobTracker提交任务环节了,status = jobSubmitClient.submitJob(jobId, submitJobDir.toString(), jobCopy.getCredentials()),
就到这吧,后面下次再慢慢研究。
总结下:在用户提交job之后,第一步主要是jobClient对job进行一些必要的文件上传操作
,主要包括:
1)为job生成一个jobId,然后获得job提交的stagingDir,根据jobId获得submitJobDir,之后所有的job运行时文件豆浆保存在此目录下
2)将用户在命令行通过-libjars, -files, -archives指定的文件上传到jobTracker的文件系统中,并将job.jar上传到hdfs中
3)校验输出路径
4)进行输入文件的分片操作,并将分片信息写入submitJobDir下的相应文件中,有两个文件:job.split以及job.splitmetainfo
5)将job的配置参数(jobConf对象)写入到job.xml文件中。
18. jobTracker进行job的提交过程,还有一个JobSubmissionProtocol的实现是LocalJobRunner,这是本地执行的时候使用的,真正集群运行Job还是使用的jobTracker,所以只看jobTracker类的submitJob
18.1.jobTracker.submitJob():第一句就是checkJobTrackerState()这个是检查jobTracker状态,是否运行中,这里说一句,jobTracker是在hadoop集群启动的时候启动的,也就是在执行start-all或者start-mapred的时候启动,启动的时候会调用JobTracker的main方法,然后在jps的时候就可以看见一个jobTracker的进程了。下面来看一下JobTracker.main()方法。
18.2.JobTracker.main():第一句是JobTracker tracker = startTracker(new JobConf()),这是实例化一个jobTracke实例。
18.3.JobTracker.startTracker():result = new JobTracker(conf, identifier),实例化一个jobTracker对象,在实例化的时候会做很多事,所以还是进去瞅瞅。
18.4.JobTracker.JobTracker():实例化的时候会初始化很多参数,记也记不住,主要看下实例化taskScheduler的内容:
Class<? extends TaskScheduler> schedulerClass = conf.getClass("mapred.jobtracker.taskScheduler",JobQueueTaskScheduler.class, TaskScheduler.class); taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf),
这两句就是根据配置文件设置的taskScheduler类名,
通过反射获得对应的taskScheduler对象,
在实例化的时候虽然不同的TaskScheduler具体操作不一样,但是统一的都会初始化一个JobListener对象,
这个对象就是后面将要监听job的listener。剩下的内容就不说了。
回到JobTracker.startTracker()方法。
5.JobTracker.JobTracker():
在实例化jobTracker之后,
会执行result.taskScheduler.setTaskTrackerManager(result)
,这个就是将jobTracker对象设置给taskScheduler。后面就什么了,现在可以回到main方法了
public static JobTracker startTracker(JobConf conf, String identifier, boolean initialize) throws IOException, InterruptedException { DefaultMetricsSystem.initialize("JobTracker"); JobTracker result = null; while (true) { try { result = new JobTracker(conf, identifier); result.taskScheduler.setTaskTrackerManager(result); break; } catch (VersionMismatch e) { throw e; } catch (BindException e) { throw e; } catch (UnknownHostException e) { throw e; } catch (AccessControlException ace) { // in case of jobtracker not having right access // bail out throw ace; } catch (IOException e) { LOG.warn("Error starting tracker: " + StringUtils.stringifyException(e)); } Thread.sleep(1000); } if (result != null) { JobEndNotifier.startNotifier(); MBeans.register("JobTracker", "JobTrackerInfo", result); if(initialize == true) { result.setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER); result.initializeFilesystem(); result.setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE); result.initialize(); } } return result; }
.JobTracker.main():在实例化jobTracker之后,会调用tracker.offerService()方法,
之后main方法就没什么了,下面看看tracker.offerService()这个方法
public static void main(String argv[] ) throws IOException, InterruptedException { StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG); try { if(argv.length == 0) { JobTracker tracker = startTracker(new JobConf()); tracker.offerService(); } else { if ("-dumpConfiguration".equals(argv[0]) && argv.length == 1) { dumpConfiguration(new PrintWriter(System.out)); } else { System.out.println("usage: JobTracker [-dumpConfiguration]"); System.exit(-1); } } } catch (Throwable e) { LOG.fatal(StringUtils.stringifyException(e)); System.exit(-1); } }
只看taskScheduler.start()这个方法,因为这里只是想分析下JobTracker提交job的过程.
18.5 taskScheduler.start():这个方法就是启动TaskScheduler,这个方法不同taskScheduler也不同,但是统一的还是会有一个taskTrackerManager.addJobInProgressListener(jobListener)这个操作,taskTrackerManager就是jobTracker(第5步),这句的意思是为jobTracker添加jobListener,用来监听job的。这句的内部就是调用jobTracker的jobInProgressListeners集合的add(listener)方法。
到这里可以说看完了整个JobTracker的启动过程,虽然很浅显,但是对于后面将要分析的内容,这些就够了。下面来看看job的提交过程,也就是jobTracker的submit()方法。
jobTracker的submit()
一, 第一步是checkSafeMode(),检查是否在安全模式,在安全模式则抛出异常。
然后执行
jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),new Path(jobSubmitDir),生成一个jobInfo对象,jobInfo主要保存job的id,user,jobSubmitDir(也就是job的任务目录,上一篇文章提到)。
接着是判断job是否可被recovered(job失败的时候尝试再次执行),
如果允许的话(默认允许),则将jobInfo对象序列化到job-info文件中。
接着到达最关键的地方,job = new JobInProgress(this, this.conf, jobInfo, 0, ts),
为job实例化一个JobInProgress对象,这个对象将会对job以后的所有情况进行负责,如初始化,执行等。下面看看JobInProgress对象的初始化操作。
二, 这里看下将job.xml下载到本地的操作。然后就是job的队列信息,
默认的队列名是default,Queue queue = this.jobtracker.getQueueManager().getQueue(queueName),这个主要是根据hadoop所使用的taskScheduler有关,具体不研究。剩下的是一些参数的初始化,如map的数目,reduce的数目等。这里还有个设置job的优先级的,默认是normal。this.priority = conf.getJobPriority();this.status.setJobPriority(this.priority);
还有检查taskLimit的操作,就是检查map+reduce的任务数是否超出mapred.jobtracker.maxtasks.per.job设置的值,默认是-1,就是没有限制的意思。回到jobTracker.submit()方法
this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR +"/"+jobId + ".xml"); Path jobFilePath = JobSubmissionFiles.getJobConfPath(jobSubmitDir); jobFile = jobFilePath.toString(); fs.copyToLocalFile(jobFilePath, localJobFile); conf = new JobConf(localJobFile);
三, submit():实例化JobInProgress之后,会根据jobProfile获取job的队列信息,
并判断相应的队列是否在运行中,不在则任务失败。然后检查内存情况checkMemoryRequirements(job),再调用taskScheduler的taskScheduler.checkJobSubmission(job)
检查任务提交情况(具体是啥玩意,不太情况)。
接下来就是执行
status = addJob(jobId, job),
为Job设置listener。
addJob():前面说过,在初始化jobTracker的时候会实例化taskScheduler,
然后调用taskScheduler的start()方法,为jobTracker添加JobListener对象,
所以这里的JobInProgressListener对象就是相应的taskScheduler的JobListener,
这里为job添加了JobListener。
private synchronized JobStatus addJob(JobID jobId, JobInProgress job) throws IOException { totalSubmissions++; synchronized (jobs) { synchronized (taskScheduler) { jobs.put(job.getProfile().getJobID(), job); for (JobInProgressListener listener : jobInProgressListeners) { listener.jobAdded(job); } } } myInstrumentation.submitJob(job.getJobConf(), jobId); job.getQueueMetrics().submitJob(job.getJobConf(), jobId); LOG.info("Job " + jobId + " added successfully for user '" + job.getJobConf().getUser() + "' to queue '" + job.getJobConf().getQueueName() + "'"); AuditLogger.logSuccess(job.getUser(), Operation.SUBMIT_JOB.name(), jobId.toString()); return job.getStatus(); }
==================== JobTracker init job=================
JobTracker.initJob():主要调用job.initTasks(),下面进入到JobInProgress.initTasks()。
JobInProgress.initTasks():为job对象设置优先级setPriority(this.priority),
接着读取分片信息文件获取分片信息,SplitMetaInfoReader.readSplitMetaInfo()这个方就是jobInPorgress用来读取分分片信息的,读取过程与写入过程相对应,
具体还是较简单的。读取了分片信息之后,根据分片数量创建相应数量的mapTask(TaskInProgress对象),接下来会执行nonRunningMapCache = createCache(splits, maxLevel),这个方法是根据每个分片的location信息,
然后根据location的host判断每个host上所有的job,并放入cache中。
接着根据设置的reduce数量新建对应的reduceTask(TaskInProgress对象),
并加入到nonRunningReduces队列中,
并根据mapred.reduce.slowstart.completed.maps(百分比,默认是5%)参数的值计算completedMapsForReduceSlowstart(
分别对应map和reduce task。到此initTask完成,
initTask完成JobTracker的initJob也就差不多完成了,
public synchronized void initTasks() throws IOException, KillInterruptedException, UnknownHostException { // log the job priority setPriority(this.priority); //........ numMapTasks = splits.length; // 添加Map 和 reduce jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks); jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks); //.... //create map; maps = new TaskInProgress[numMapTasks]; //.... //create reduce this.reduces = new TaskInProgress[numReduceTasks]; //下面是cleanup 和log //.... }
===================== 参考=========================
1. CSDN blog
2. hadoop官网
3. 董西城的书《Haoop技术内幕》
相关推荐
在ReduceTask方面,初始化过程包括`Initialize()`和`Init(shuffleContext)`。`totalMaps`变量设置为Job中的Map任务数量,`merger`对象负责合并来自不同MapTask的数据。ReduceTask的输入来自多个溢写文件,这些文件在...
4. 初始化HDFS:使用`hadoop namenode -format`命令格式化名称节点。 5. 启动Hadoop服务:运行`start-dfs.sh`和`start-yarn.sh`启动HDFS和YARN。 现在,Eclipse已经准备好连接到Hadoop集群。在Eclipse中,执行以下...
02-hadoop中的序列化机制.avi 03-流量求和mr程序开发.avi 04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi 06-shuffle机制.avi 07-mr程序的组件全貌.avi 08-textinputformat对切片规划的源码...
【Debug跟踪Hadoop3.0.0源码之MapReduce Job提交流程】第一节 Configuration和Job对象的初始化前言Configuration和Job对象的初始化后记跳转 前言 不得不说,在此前我对阅读源码这件事是拒绝的,一方面也知道自己非读...
第一天 hadoop的基本概念 伪分布式hadoop集群安装 hdfs mapreduce 演示 ... 02-hadoop中的序列化机制.avi 03-流量求和mr程序开发.avi 04-hadoop的自定义排序实现.avi 05-mr程序中自定义分组的实现.avi
2. 下载Hadoop:从Apache官网获取最新版本的Hadoop源码或者二进制包。 3. 解压配置:将Hadoop解压到指定目录,并进行基本的配置,包括修改`core-site.xml`、`hdfs-site.xml`、`yarn-site.xml`和`mapred-site.xml`等...
- 初始化HDFS文件系统,执行`hdfs namenode -format`。 10. **启动Hadoop服务**: - 启动DataNode、NameNode、ResourceManager、NodeManager和Secondary NameNode等服务,使用`start-dfs.sh`和`start-yarn.sh`...
MapReduce是Hadoop生态系统中的核心组件,主要用于处理和存储大规模数据。...通过阅读《Job本地提交过程源码分析及图解》这样的文档,我们可以深入学习MapReduce的工作原理,提升我们的Hadoop编程技能。
1. **格式化NameNode**:使用`hadoop namenode -format`命令初始化NameNode的元数据。 2. **启动HDFS**:使用`start-dfs.sh`脚本来启动HDFS的所有组件。 3. **启动YARN**:使用`start-yarn.sh`脚本来启动YARN服务。 ...
SparkContext是用户程序和Spark集群之间的桥梁,负责初始化Spark应用运行所需的各种参数和环境。在SparkShell中,用户提交的代码会首先被解析为Driver Program,然后利用SparkContext实例来创建和执行各个任务。 ...
在Hadoop MapReduce框架中,Job的提交过程是整个分布式计算流程中的关键步骤。这个过程涉及到客户端、JobTracker(在Hadoop 2.x版本中被ResourceManager替代)和TaskTracker(在Hadoop 2.x版本中被NodeManager替代)...
1. `setup(Context context)`:初始化Mapper,加载任何必要的资源。 2. `map(LongWritable key, Text value, Context context)`:处理输入行,分词并输出键值对。 3. `cleanup(Context context)`:清理Mapper的资源...
完成以上配置后,你需要对HDFS进行初始化,即“格式化”NameNode。在命令行中,切换到Hadoop安装目录,执行以下命令: ``` $ cd /opt/hadoop $ source conf/hadoop-env.sh $ hadoop namenode -format ``` 这一步会...
这些文件用于告诉Hadoop集群哪些节点应该运行特定的服务,是集群初始化和扩展的关键部分。 核心配置文件有三个,分别是`core-site.xml`、`hdfs-site.xml`和`mapred-site.xml`。`core-site.xml`主要设置Hadoop的基本...
同时启动TaskScheduler,这是整个初始化过程中非常关键的一步。 ```scala private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master, appName) taskScheduler.start() ``` 启动Task...
如`org.pentaho.di.core.KettleClientEnvironment`负责环境初始化,`org.pentaho.di.core.database.DatabaseMeta`用于数据库连接元数据,以及`org.pentaho.di.trans.Trans`和`org.pentaho.di.job.Job`代表数据转换和...
1. `configure()`: 这个方法用于初始化Mapper或Reducer类,设置如输入输出格式、分区器、比较器等配置。 2. `map()` 和 `reduce()`: 这两个方法分别是Mapper和Reducer的核心,用户需要根据业务需求实现它们。`map()...
// 初始化DBInputFormat DBInputFormat.configureDB(conf); ``` 在这个示例中,`MyMapper.class`是自定义的Mapper类,该类需要实现特定的逻辑来处理来自数据库的数据。 ### MapReduce中多文件输出的使用 #### 多...
1. **从Main函数开始**: 分布式程序的解析往往需要从系统的初始化部分入手,理解系统如何启动以及各组件之间的交互。 2. **关注核心组件**: 如Master和Worker的启动过程、Driver与Executor的工作原理等。 3. **...