写完回头看看,发现好像有点儿乱,对不住各位看官啦,限于个人水平,有错误的地方请大家予以回复纠正。
环境:主机WIN7+Eclipse4.3,hadoop源码已导入eclipse,虚拟机中运行Ubuntu13.04,伪分布式模式运行hadoop。在eclipse中调试wordcount,为了防止出现Failed to set permissions of path的异常,将org.apache.hadoop.fs.FileUtil类中checkReturnValue方法中的源码注释掉!注意:在主机eclipse中调试,实际上是使用单机模式跑job,与分布式环境的作业运行流程不完全相同,故本篇所讲内容仅具有参考意义。
入口是job.waitForCompletion方法,进入该方法后,会执行Job类中的submit方法,该方法主要有下面两行代码:
connect();
info = jobClient.submitJobInternal(conf);
我们从这里分两条路,为了方便梳理,将代码按照执行顺序进行编号:
1.先看connect()方法,该方法中会new一个JobClient对象,在JobClient的构造函数中会去调用JobClient中的init方法,该方法的内容如下所示:
public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
tasklogtimeout = conf.getInt(
TASKLOG_PULL_TIMEOUT_KEY, DEFAULT_TASKLOG_TIMEOUT);
this.ugi = UserGroupInformation.getCurrentUser();
if ("local".equals(tracker)) {
conf.setNumMapTasks(1);
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
this.rpcJobSubmitClient =
createRPCProxy(JobTracker.getAddress(conf), conf);
this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);
}
}
因为我这里没有对mapred.job.tracker没有做特别的设置,所以得到的tracker应该是local,这种情况下会去创建一个LocalJobRunner对象。(注意:可以在conf中将mapred.job.tracker设置为hadoop中jobtracker的URL,这种情况会执行else语句块的内容,是会在hadoop中运行该job的,但是也会遇到一些错误,这里先不做介绍啦。)LocalJobRunner对象实现了JobSubmissionProtocol接口,我们知道hadoop中主要使用RPC进行通讯,此接口就是client与JobTracker对象通讯的协议,后面代码中的jobClient对象其实就是通过其内置的jobSubmitClient与JobTracker之间进行交互的。
2.接下来我们再来看jobClient.submitJobInternal(conf)这段代码。该方法里面的代码相对来说比较复杂:
2.1.首先会调用JobSubmissionFiles.getStagingDir()方法,该方法内部主要有两块儿代码,先看第一个:
Path stagingArea = client.getStagingAreaDir();
该方法内部最终调用的实际上是LocalJobRunner类中的getStagingAreaDir方法,代码如下:
Path stagingRootDir =
new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
"/tmp/hadoop/mapred/staging"));
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
String user;
randid = rand.nextInt(Integer.MAX_VALUE);
if (ugi != null) {
user = ugi.getShortUserName() + randid;
} else {
user = "dummy" + randid;
}
return fs.makeQualified(new Path(stagingRootDir, user+"/.staging")).toString();
从这个方法里我们看以看到staging root dir的生成,该目录主要用来存放job的相关资源(比如我们写的代码),由于此处我所使用的是LocalJobRunner,所以此PATH是在我们本地文件系统中生成的(上面最后一行代码),并且我们可以看到,Staging的根目录路径为${mapreduce.jobtracker.staging.root.dir}/${username}随机数/.staging。
我们再来看JobSubmissionFiles.getStagingDir()方法中的另一块代码,这里有一个在windows中跑job的时候常遇到的错误。由于上面代码中的user中含有一个随机数,所以理论上绝大多数情况下我们都需要在staging root dir下面新建一个由用户名和随机数构成的目录,所以,我们会执行
fs.mkdirs(stagingArea, new FsPermission(JOB_DIR_PERMISSION));
最终我们实际上执行的是RowLocalFileSystem类中的mkdirs(Path f, FsPermission permission)方法,该方法中又会去调用setPermission(f, permission)方法,此方法进一步会调用org.apache.hadoop.fs.FileUtil类的setPermission方法,最终又调用了FileUtil类中的checkReturnValue方法,该方法会抛出一个异常:
//if (!rv) {
// throw new IOException("Failed to set permissions of path: " + p +
// " to " + String.format("%04o", permission.toShort()));
//}
我们前面没有提到rv,这个变量是代表staging目录的File试图去设置linux文件系统中的read权限的结果,但是在windows中这样的操作是失败的,于是rv接收到的值就是false,所以一旦进入checkReturnValue,就会抛出上面的异常。由于事先已将该段代码注释掉了,所以程序可以继续运行下去,可以顺利的退出JobSubmissionFiles.getStagingDir()方法。
2.2 JobID jobId = jobSubmitClient.getNewJobId(),这行代码会执行LocalJobRunner类中的getNewJobId方法,该方法中会创建一个JobID对象并返回。接下来执行
copyAndConfigureFiles(jobCopy, submitJobDir);
此方法会将job的配置信息和需要的资源文件(比如files、archives,libjars)拷贝到submitJobDir(staging root dir目录下面一个以JOBID命名的),如果是运行在hadoop环境中,还会把这些文件加入到DistributedCache中。
2.3 接下来会检查输出路径:
if (reduces == 0 ? jobCopy.getUseNewMapper() :
jobCopy.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
ReflectionUtils.newInstance(context.getOutputFormatClass(),
jobCopy);
output.checkOutputSpecs(context);
} else {
jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
}
2.4 如果输出路径不存在,就会接着往下运行创建inputSplits
FileSystem fs = submitJobDir.getFileSystem(jobCopy);
LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
int maps = writeSplits(context, submitJobDir);
jobCopy.setNumMapTasks(maps);
2.4.1 这段代码中主要来看一下writeSplits方法,该方法内部会去判断是否使用的是新的Mapper,如果是,就进一度调用JobClient中的writeNewSplits方法,否则就调用writeOldSplits。我使用的是新的API,所以就进入了writeNewSplits方法,该方法的代码如下:
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
首先,通过反射得到InputFormat对象,调用其getSplits方法得到输入分片的集合,然后对分片进行排序,SlitComparator是按照分片的大小降序排列的。 之后调用JobSplitWriter.createSplitFiles方法去创建输入分片(位于前面的submitJobDir目录下job.split和job.splitmetainfo以及对应的crc校验文件)。
2.5 这个操作结束后,我们就可以退回到Jobclient类的submitJobInternal 方法中啦。接下来的一行代码
jobCopy.setNumMapTasks(maps);
会把返回的分片的数目设为map task的数目。
2.6 之后会创建一个FSDataOutputStream对象,向jobSubmitFile中写入当前job的所有配置信息。
2.7 接下来就是真正的提交job的操作,具体代码如下
status = jobSubmitClient.submitJob(
jobId, submitJobDir.toString(), jobCopy.getCredentials());
由于我们是在本地运行的job,所以会调用LocalJobRunner类中的submitJob,如果是在hadoop集群中运行,则会调用JobTracker的submitJob方法。相对于JobTracker,LocalJobRunner类中的submitJob方法非常简单
Job job = new Job(jobid, jobSubmitDir);
job.job.setCredentials(credentials);
return job.status;
2.7.1 进入Job的构造函数 ,大部分代码都是比较简单的,先来看下面这几行代码
this.trackerDistributedCacheManager =
new TrackerDistributedCacheManager(conf, taskController);
this.taskDistributedCacheManager =
trackerDistributedCacheManager.newTaskDistributedCacheManager(
jobid, conf);
taskDistributedCacheManager.setupCache(conf, "archive", "archive");
这里先创建了一个TrackerDistributedCacheManager对象,然后该对象又创建了一个TaskDistributedCacheManager对象,DistributedCache的具体工作流程可以参考
这篇bog。之后,会更新(重新写入配置信息)job的xml配置文件。
2.7.2 之后就是启动一个新线程执行Job。也就是会去执行Job类中的run方法,下面我们来分析一下run方法的执行过程。
2.7.2.1 run方法中的代码比较多,挑几个重要地方来看吧。首先看下
Map<TaskAttemptID, MapOutputFile> mapOutputFiles =
Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>());
List<MapTaskRunnable> taskRunnables = getMapTaskRunnables(taskSplitMetaInfos,
jobId, mapOutputFiles);
这段代码会创建一个map task线程的集合,然后放入线程池中执行,map的输出结果文件放在mapOutputFiles中。
2.7.2.2 接下来再来看这行代码
TaskAttemptID reduceId =
new TaskAttemptID(new TaskID(jobId, false, 0), 0);
从这行代码开始,下面的就是Reduce的执行部分。TaskAttemptID是什么东东呢,先来看TaskAttempt吧,
TaskAttempt可以理解为一个task,map task或reduce task, 这里对应的是一个reduce的id。
2.7.2.3 然后,创建一个reducetask,并对其进行设置
ReduceTask reduce =
new ReduceTask(systemJobFile.toString(), reduceId, 0, mapIds.size(),
1);
reduce.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
JobConf localConf = new JobConf(job);
localConf.set("mapreduce.jobtracker.address", "local");
然后会将map的输出文件交给reduce,
for (int i = 0; i < mapIds.size(); i++) {
if (!this.isInterrupted()) {
TaskAttemptID mapId = mapIds.get(i);
Path mapOut = mapOutputFiles.get(mapId).getOutputFile();
MapOutputFile localOutputFile = new MapOutputFile();
localOutputFile.setConf(localConf);
Path reduceIn =
localOutputFile.getInputFileForWrite(mapId.getTaskID(),
localFs.getFileStatus(mapOut).getLen());
if (!localFs.mkdirs(reduceIn.getParent())) {
throw new IOException("Mkdirs failed to create "
+ reduceIn.getParent().toString());
}
if (!localFs.rename(mapOut, reduceIn))
throw new IOException("Couldn't rename " + mapOut);
} else {
throw new InterruptedException();
}
}
之后会调用该reduce的run方法运行reducer
reduce.run(localConf, this);
由于在local模式下,reduce的数量只可以是0或1,所以reduce的运行还是比较简单的、具体可以看run方法内部的代码。
分享到:
相关推荐
这个发行版包含了运行Hadoop分布式文件系统(HDFS)和MapReduce计算框架所需的基本组件和服务。Hadoop是大数据处理的核心工具,它提供了一个分布式存储和计算的平台,使得海量数据的处理变得可能。 描述 "解决 ...
你可以通过 `bin/hadoop jar your-jar-file.jar your-class -Dmapred.job.tracker=local` 来启动本地模式。 2. **日志输出**:MapReduce 提供了详细的日志信息,包括 Map 和 Reduce 任务的执行情况。这些日志通常...
2. 调试MapReduce任务:通过"Debug As" -> "Hadoop Job",Eclipse提供了调试功能,可以在Mapper和Reducer中设置断点,逐步执行代码,查看变量状态,帮助找出问题所在。 六、优化与维护 Hadoop2x-eclipse-plugin还...
Python 中的 Hadoop Mapreduce 示例 python 中的几个 Mapreduce 示例以及有关运行它们的文档! 运行代码的步骤 文件夹结构 假定文件存储在 Linux 操作系统中的给定位置。 这只是一个示例说明,实际上位置并不重要。 ...
但在Hadoop 2.x中,这种集中式的依赖管理方式被分成了多个单独的JAR文件,这样的变化提高了系统的灵活性,但也增加了开发人员在构建和部署MapReduce应用程序时的复杂度。 在Hadoop 2.6.0中,运行一个简单的...
### Ubuntu安装Hadoop实现MapReduce里的WordCount #### 核心知识点概述 1. **Ubuntu环境下的基础配置**:包括VMware Tools的安装、JDK的安装与配置。 2. **Hadoop的安装与配置**:包括下载与解压、环境变量配置、...
5. **配置MapReduce**: 修改"Hadoop安装目录"\conf\mapred-site.xml,设置`mapreduce.framework.name`为`local`,以启动本地模式。 6. **格式化NameNode**: 运行`hadoop namenode -format`命令初始化HDFS。 7. **...
实验过程中可能会遇到MapReduce计算错误,解决方法包括自我检查、求助同学和在线搜索解决方案。通过实验,可以更深入地理解Hadoop对本地文件和DataNode上数据的操作,以及如何编写和执行MapReduce程序。这有助于提升...
编辑`mapred-site.xml`,创建并编辑`mapred-site.xml.template`,设置MapReduce的运行模式为本地模式(`mapreduce.framework.name`设为`local`)。 8. **格式化NameNode**: 在命令行中执行`hadoop namenode -...
在这个例子中,`hadoop jar`命令被用来运行包含MapReduce示例的jar包,如`hadoop jar {hadoop-install-dir}/share/hadoop/mapreduce/hadoop-mapreduce-examples-{version}.jar pi 10 10`,这将计算π的近似值,使用...
在`mapred-site.xml`中,指定MapReduce的运行模式(本地或集群)。 完成配置后,初始化HDFS命名空间,格式化NameNode,通过`hadoop namenode -format`命令实现。接着,启动Hadoop的各个服务,包括DataNode、...
这些文件是用于在Linux环境中搭建和运行Hadoop分布式文件系统(HDFS)和MapReduce计算框架的基础。 Hadoop是一个开源框架,由Apache软件基金会维护,它设计用于处理和存储大量数据。Hadoop的核心包括两个主要组件:...
在实际应用中,Hive 常常与其他Hadoop生态组件结合使用,如Hadoop MapReduce、HBase、Spark等,以实现更高效的数据处理和分析。Hive 还可以通过Oozie工作流管理系统自动化复杂的数据处理任务。 总之,Apache Hive ...
Hadoop 2.7.2是一个分布式文件系统(HDFS)和MapReduce计算框架的版本,它提供了高容错性和可扩展性。Spark与Hadoop的集成使得用户可以方便地处理存储在Hadoop HDFS上的大规模数据。 Scala 2.11是这个Spark版本所...
此外,使用`--master local[*]`可以在本地模式下快速测试代码,而无需连接到集群。 10. **社区和文档**: Spark有一个庞大的开发者社区,提供丰富的教程、文档和示例代码。官方文档详细介绍了Spark的各个组件和API,...
总的来说,"spark-2.3.4-bin-hadoop2.7.tgz"是一个包含完整Spark环境的压缩包,适合本地或集群环境中的数据分析和处理。通过pyspark,Python开发者可以充分利用Spark的性能和功能,进行高效的大数据处理任务。
此外,还需要启动Hadoop的相关服务,如namenode、datanode等,然后使用Spark的Local模式或者Hadoop的Mini Cluster模式运行Spark作业。 总之,Hadoop Common是Spark开发中必不可少的依赖,它提供与HDFS交互的接口和...
解压后,将Hadoop目录移动到合适的位置,例如 `/usr/local/hadoop`。 3. **配置环境变量**:在系统环境变量配置文件中(如`~/.bashrc`或`~/.bash_profile`),添加Hadoop路径,并设置HADOOP_HOME。 4. **配置Hadoop*...
根据提供的标题、描述和部分文本内容,我们可以提炼出与Hadoop相关的知识点,特别是关于如何使用HDFS命令行工具操作Hadoop分布式集群。 ### Hadoop基础知识 #### 什么是Hadoop? Hadoop是一个开源软件框架,用于...