`

大数据框架hadoop的作业提交过程

阅读更多
    作业提交过程比较简单,它主要为后续作业执行准备环境,主要涉及创建目录、上传文件等操作;而一旦用户提交作业后,JobTracker端便会对作业进行初始化。作业初始化的主要工作是根据输入数据量和作业配置参数将作业分解成若干个Map Task以及Reduce Task,并添加到相关数据结构中,以等待后续被高度执行。总之,可将作业提交与初始化过程分为四个步骤,如下所示:

    步骤一:用户使用Hadoop提供的Shell命令提交作业。

    步骤二:JobClient按照作业配置信息(JobConf)将作业运行需要的全部文件上传到JobTracker文件系统(通常为HDFS)的某个目录下。

步骤三:JobClient调用RPC接口向JobTracker提交作业。

步骤四:JobTracker接收到作业后,将其告知TaskScheduler,由TaskScheduler对作业进行初始化。

1 作业提交过程详解

1.1 执行Shell命令

Shell示例如下:

$HADOOP_HOME/bin/hadoop jar example.jar \

-D mapred.job.name=example \

-D mapred.reduce.tasks=2 \

-files=blacklist.txt,whitelist.txt \

-libjars=third-party.jar \

-archives=dictionary.zip \

-input /test/input \

-output /test/output

当用户输入以上命令后,hadoop脚本根据“jar”命令将作业交给RunJar类处理,相关代码如下:

elif [ "$COMMAND" = "jar" ] ; then

  CLASS=org.apache.hadoop.util.RunJar

  HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS"

用户的MapReduce程序已经配置好了作业运行时需要的各种信息(如Mapper类,Reducer类,Reduce Task个数等),它最终在main函数中调用JobClient.runJob函数(新MapReduce API则使用job.waitForCompletion(true)函数)提交作业,这之后会依次经过下表的调用顺序才会将作业提交到JobTracker端。

调用顺序

方法

1

JobClient

runJob(JobConf job)

2

submitJob(JobConf job)

3

submitJobInternal(final JobConf job)

4

JobTracker

submitJob(JobID jobId, String jobSubmitDir, Credentials ts)

 

1.2 作业文件上传

JobClient将作业提交到JobTracker端之前,需要进行一些初始化工作,包括:获取作业ID,创建HDFS目录,上传作业文件以及生成Split文件等。这些工作由函数JobClient.submitJobInternal(job)实现,具体流程见下表:

序号

调用方法

1

JobTracker.getNewJob()

2

HDFS.mkdirs()

3

HDFS.copyRemoteFiles()

4

HDFS.writeSplits()

5

HDFS.writeXml()

6

JobTracker.submitJob(job)

HDFS.mkdirs()方法相关代码如下:

public class JobTracker {

... ...

// 获取需要创建的工作目录

private String getStagingAreaDirInternal(String user)  {

final Path stagingRootDir =

      new Path(conf.get("mapreduce.jobtracker.staging.root.dir",

            "/tmp/hadoop/mapred/staging"));

    final FileSystem fs = stagingRootDir.getFileSystem(conf);

    return fs.makeQualified(new Path(stagingRootDir,

                              user+"/.staging")).toString();

}

... ...

}

public class JobClient {

... ...

// 创建工作目录

private void copyAndConfigureFiles(JobConf job, Path submitJobDir

      short replication)

... ...

FileSystem.mkdirs(fssubmitJobDirmapredSysPerms);

... ...

}

... ...

}

HDFS.copyRemoteFiles()方法相关代码如下:

public class JobClient {

... ...

// 将客户端的相关文件拷贝到HDFS

private void copyAndConfigureFiles(JobConf job, Path submitJobDirshort replication)

  ... ...

  String files = job.get("tmpfiles");

  String libjars = job.get("tmpjars");

  String archives = job.get("tmparchives");

  if (files != null) {

... ...

// copies a file to the jobtracker filesystem and returns the path where it was copied to

Path newPath = copyRemoteFiles(fs,filesDirtmpjobreplication);

... ...

  }

  if (libjars != null) {

... ...

    Path newPath = copyRemoteFiles(fslibjarsDirtmpjobreplication);

... ...

  }

  if (archives != null) {

... ...

    Path newPath = copyRemoteFiles(fsarchivesDirtmpjobreplication);

... ...

  }

}

... ...

}

 

MapReduce作业文件的上传与下载是由DistributedCache工具完成的。这是Hadoop为方便用户进行应用程序开发而设计的数据分发工具。其整个工作流程对用户而言是透明的,也就是说,用户只需在提交作业时指定文件位置,至于这些文件的分发(需广播到各个TaskTracker上以运行Task),完全由DistricutedCache工具完成,不需要用户参与。

通常而言,对于一个典型的Java MapReduce作业,可能包含以下资源。

n 程序jar包:用户用Java编写的MapReduce应用程序jar包。

n 作业配置文件:描述MapReduce应用程序的配置信息

n 依赖的第三方jar包:应用程序依赖的第三方jar包,提交作业时用参数“-libjars”指定。

n 依赖的归档文件:应用程序中用到多个文件,可直接打包成归档文件,提交作业时用参数“-archives”指定。

n 依赖的普通文件:应用程序中可能用到普通文件,比如文本格式的字典文件,提交作业时用参数“-files”指定。

上述所有文件在JobClient端被提交到HDSF上,涉及的父目录如下表所示:

作业属性

属性值

说明

mapreduce.jobtracker.staging.root.dir

${hadoop.tmp.dir}/mapred/staging

HDFS上作业文件的上传目录,由管理员配置

mapreduce.job.dir

${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/${jobId}

用户${user}的作业${jobId}相关文件存放目录

文件上传完毕后,会将这些目录信息保存到作业配置对象JobConf中,其对应的作业属性如下表所示:

作业属性

说明

mapred.cache.files

作业依赖的普通文件在HDFS上的存放路径

mapred.job.classpath.archives

作业依赖的jar包在HDFS上的存放路径

mapred.cache.archives

作业依赖的压缩文件在HDFS上的存放路径

mapreduce.job.cache.files.visibilities

作业依赖的普通文件的可见性。如果是public可见性,则为true,否则为false

mapreduce.job.cache.archives.visibilities

作业依赖的归档文件的可见性。如果是public级别的可见性,则为true,否则为false

mapred.cache.files.timestamps

作业依赖的普通文件的最后一次修改时间的时间戳

mapred.cache.archives.timestamps

作业依赖的压缩文件的最后一次修改时间的时间戳

mapred.cache.files.filesizes

作业依赖的普通文件的大小

mapred.cache.archives.filesizes

作业依赖的归档文件的大小

mapred.jar

用户应用程序jar路径

 

    作业文件上传到HDFS后,可能会有大量节点同时从HDFS上下载这些文件,进而产生文件访问热点现象,造成性能瓶颈。为此,JobClient上传这些文件时会调高它们的副本数(由参数mapred.submit.replication指定,默认是10)以通过分摊负载方式避免产生访问热点。

1.3 产生InputSplit文件

用户提交MapReduce作业后,JobClient会调用InputFormatgetSplits方法生成InputSplit相关信息。该信息包括两部分:InputSplit元数据信息和原始InputSplit信息。其中,第一部分将被JobTracker使用,用以生成Task本地性相关的数据结构;而第二部分则将被Map Task初始化时使用,用以获取自己要处理的数据。相关代码如下:

public interface InputFormat<K, V> {

  /** 

   * Logically split the set of input files for the job.  

   */

  InputSplit[] getSplits(JobConf jobint numSplitsthrows IOException;

  RecordReader<K, V> getRecordReader(InputSplit split,JobConf job Reporter      reporterthrows IOException;

}

 

这两部分信息分别被保存到目录${mapreduce.jobtracker.staging.root.dir}/${user}/.staging/${JobId}下的文件job.splitjob.splitmetainfo中。

InputSplit相关操作放在包org.apache.hadoop.mapreduce.split中,主要包含三个类JobSplitJobSplitWriterSplitMetaInfoReader

JobSplit封装了读写InputSplit相关的基础类,主要包括以下三个。

n SplitMetaInfo:描述一个InputSplit的元数据信息,包括以下三项内容:

private long startOffset; //InputSplit元信息在job.split文件中的偏移量

private long inputDataLength; //InputSplit的数据长度

private String[] locations;    // InputSplit所在的host列表

所有InputSplit对应的SplitMetaInfo将被保存到文件job.splitmetainfo中。该文件内容组织方式如下表所示,内容依次为:一个用于标识InputSplit元数据文件头的字符串“META-SP,文件版本号splitVersion,作业对应的InputSplit数目length,最后是lengthInputSplit对应的SplitMetaInfo信息。

META-SP

splitVersion

length

SplitMetaInfo

SplitMetaInfo

... ...

 

n TaskSplitMetaInfo:用于保存InputSplit元信息的数据结构,包括以下三项内容:

pivate TaskSplitIndex splitIndex; //Split元信息在job.split文件中的位置

private long inputDataLength;    //InputSplit的数据长度

private String[] locations;        //InputSplit所在的host列表

    这些信息是在作业初始化时,JobTracker从文件job.splitmetainfo中获取的。其中,host列表信息是任务调度器判断任务是否具有本地性的最重要因素,而splitIndex信息保存了新任务需处理的数据位置信息在文件job.split中的索引,TaskTracker收到信息后,便可以从job.split文件中读取InputSplit信息,进而运行一个新任务。

 

n TaskSplitIndexJobTrackerTaskTracker分配新任务时,TaskSplitIndex用于指定新任务待处理数据位置信息在文件job.split中的索引,主要包括两项内容:

private String splitLocation; //job.split文件的位置(目录)

private long startOffset;    // InputSplit信息在job.split文件中的位置

相关代码如下:

public class JobSplit {

   ... ...  

  /**

   * This represents the meta information about the task split.

   */

  public static class SplitMetaInfo implements Writable {

    private long startOffset;

    private long inputDataLength;

    private String[] locations;

  }

  /**

   * This represents the meta information about the task split that the 

   * JobTracker creates

   */

  public static class TaskSplitMetaInfo {

    private TaskSplitIndex splitIndex;

    private long inputDataLength;

    private String[] locations;

  }

  /**

   * This represents the meta information about the task split that the 

   * task gets

   */

  public static class TaskSplitIndex {

    private String splitLocation;

    private long startOffset;

  }

}

1.4 作业提交到JobTracker

JobClient最终调用RPC方法submitJob将作业提交到JobTracker端,在JobTracker.submitJob中,会依次进行以下操作:

1) 为作业创建JobInProgress对象

JobTracker会为每个作业创建一个JobInProgress对象。该对象维护了作业的运行时信息。它在作业运行过程中一直存在,主要用于跟踪正在运行作业的运行状态和进度。相关代码如下:

job = new JobInProgress(thisthis.confjobInfo, 0, ts);

2) 检查用户是否具有指定队列的作业提交权限

    Hadoop以队列为单位管理作业和资源,每个队列分配有一定量的资源,每个用户属于一个或者多个队列且只能使用所属队列中的资源。管理员可为每个队列指定哪些用户具有作业提交权限和管理权限。相关代码如下:

 aclsManager.checkAccess(jobugi, Operation.SUBMIT_JOB);

3) 检查作业配置的内存使用量是否合理

用户提交作业时,可分别用参数mapred.job.map.memory.mbmapred.job.reduce.memory.mb指定Map TaskReduce Task占用的内存量;而管理员可通过参数mapred.cluster.max.map.memory.mbmapred.cluster.max.reduce.memory.mb限制用户配置的任务最大内存使用量,一旦用户配置的内存使用量超过系统限制,则作业提交失败。相关代码如下:

// Check the job if it cannot run in the cluster because of invalid memory

// requirements.

try {

    checkMemoryRequirements(job);

catch (IOException ioe) {

    throw ioe;

}

4) 通知TaskScheduler初始化作业

JobTracker收到作业后,并不会马上对其初始化,而是交给调度器,由它按照一定的策略对作业初始化。相关代码如下:

synchronized (taskScheduler) {

        jobs.put(job.getProfile().getJobID(), job);

        for (JobInProgressListener listener : jobInProgressListeners) {

          listener.jobAdded(job);

        }

}

之所以不选择JobTracker而让调度器初始化,主要考虑到以下两个原因:

l 作业一旦初始化便会占用一定量的内存资源,为了防止大量初始化的作业排队等待调度而占用大量不必要的内存资源,Hadoop按照一定的策略选择性地初始化作业以节省内存资源;

l 任务调度器的职责是根据每个节点的资源使用情况对其分配最合适的任务,而只有经过初始化的作业才有可能得到调度,因而将作业初始化策略嵌到调度器中是一种编辑部合理的设计。

1
0
分享到:
评论
2 楼 seandeng888 2014-12-03  
lvwenwen 写道
哥们在hadoop开发?

    目前没有在hadoop开发,只是因为个人兴趣,所以才好好专研一下hadoop。
1 楼 lvwenwen 2014-12-03  
哥们在hadoop开发?

相关推荐

    大数据-hadoop-mapreduce代码

    在大数据处理领域,Hadoop MapReduce 是一个至关重要的组件,它为海量数据的分布式计算提供了框架。本资源包“大数据-hadoop-mapreduce代码”显然包含了与MapReduce编程相关的实例或示例代码,对于理解并应用Hadoop ...

    大数据中Hadoop Shell介绍

    - `hadoop jar`:用于提交MapReduce作业。 - `hadoop distcp`:用于在两个HDFS之间复制数据。 - `hadoop fsck /`:检查HDFS文件系统的完整性。 总之,Hadoop Shell及其相关的脚本为Hadoop的部署、管理和日常维护...

    java+大数据相关框架实战项目(Hadoop, Spark, Storm, Flink).zip

    这些源码项目不仅能够帮助你掌握Java编程和大数据框架,还能够提升你的问题解决能力,因为实战项目通常涵盖了实际业务场景中的各种挑战。在学习过程中,建议首先了解每个框架的基本概念,然后逐步深入到源码层面,...

    大数据 hadoop mapreduce 词频统计

    4. **运行Job**:配置好MapReduce作业后,提交到Hadoop集群进行执行。集群会自动调度任务,将工作分配给各个节点。 5. **结果收集**:MapReduce完成后,最终的词频统计结果会被写入HDFS,可以进一步进行可视化或...

    大数据处理系统:Hadoop源代码情景分析_大数据_大数据分析_大数据Hadoop_

    本主题将深入解析Hadoop的源代码,帮助理解其内部工作机制,从而更好地运用和优化这个分布式计算框架。 Hadoop的核心由两个主要组件构成:Hadoop Distributed File System (HDFS) 和 MapReduce。HDFS是Hadoop的数据...

    大数据云计算技术 Hadoop应用浅析(共16页).pptx

    在Hadoop的维护和问题处理上,提到了对槽位利用率的监控、作业提交数的统计、死节点的自动重启、权限管理和资源分组、NameNode的镜像备份以及机架感知。这些都是保证Hadoop集群高效稳定运行的重要措施。例如,通过...

    Hadoop大数据开发基础

    Hadoop是Apache软件基金会的一个开源框架,专为处理和存储大量数据而设计。它以其分布式文件系统(HDFS)和MapReduce计算模型为核心,为企业和研究机构提供了处理海量数据的能力。这份名为“Hadoop大数据开发基础”...

    Hadoop集群作业的调度算法

    这种调度器按照作业提交的时间顺序来执行作业,简单易用,但在处理不同类型和优先级的作业时可能会导致资源浪费和不公平的问题。 随着Hadoop的应用场景越来越复杂,社区开发了更加复杂的调度算法来满足多样化的需求...

    分布式计算开源框架Hadoop入门实践.pdf

    【分布式计算开源框架Hadoop入门实践】 Hadoop是Apache开源组织开发的一款分布式计算框架,它在业界得到了广泛应用,尤其在大型互联网公司如亚马逊、Facebook和Yahoo等中扮演着重要角色。Hadoop的核心设计理念是...

    大数据技术之Hadoop(MapReduce&Yarn).docx

    - 编写Driver类:配置和提交MapReduce作业。 MapReduce与YARN结合使用,使得Hadoop能够高效地管理和调度资源,处理大数据的计算需求。然而,对于某些特定类型的计算任务,如实时分析和流式处理,Hadoop可能不是最佳...

    大数据Hadoop3.x全套视频资料

    - **YARN任务提交**:使用YARN提交MapReduce作业,并监控作业执行情况。 - **集群监控与管理**:利用Hadoop自带的Web界面监控集群状态,进行故障排查和性能优化。 #### 六、Hadoop应用案例分析 - **案例1:用户...

    Hadoop大数据开发基础-PPT课件.rar

    第三章“Hadoop基础操作”涵盖了Hadoop命令行工具的使用,如HDFS的文件操作,MapReduce作业的提交等,这些基本操作是进行Hadoop开发的必备技能。 第四章进入MapReduce编程的核心部分,以“MapReduce入门编程”为...

    《Linux系统》期末大作业任务书2019(大数据平台搭建) .doc

    本文档总结了Linux系统的大数据平台搭建步骤,涵盖了Hadoop大数据平台的搭建过程,并附带了经典的Linux系统基本命令操作和shell编程。 Linux大数据平台搭建 大数据平台搭建是基于Linux操作系统的,涵盖了Hadoop...

    hadoop大数据基础学习

    - `hadoop jar &lt;jarfile&gt; &lt;classname&gt; &lt;inputpath&gt; &lt;outputpath&gt;`:提交MapReduce作业。 3. **YARN命令**: - `yarn application -list`:查看正在运行的应用程序列表。 - `yarn application -status ...

    完整版大数据云计算课程 Hadoop数据分析平台系列课程 Hadoop 02 实施Hadoop集群 共41页.pptx

    此外,课程还会教授如何制定数据集成方案,如何向Hadoop提交作业并监控其运行状态,以及如何使用Map-Reduce进行编程。 【Hadoop基础知识】 Hadoop是Apache软件基金会的一个开源项目,它提供了一个分布式文件系统...

    大数据平台构建:YARN中运行Mapreduce程序.pptx

    1. **作业提交**:MapReduce 2.x(即Hadoop 2.x及以后版本)的作业提交使用与MapReduce 1.x相同的用户API。用户编写完成MapReduce程序后,通过Hadoop的客户端提交作业。 2. **获取作业ID**:作业客户端向资源管理器...

    HadoopYARN大数据计算框架及其资源调度机制研究

    Hadoop是一个由Apache软件基金会支持的开源分布式存储和计算框架,它最初是为了处理大规模数据集而设计的。随着技术发展,Hadoop的1.0版本中的MapReduce成为了当时唯一的计算框架。MapReduce能够处理大规模数据集的...

    精选_大数据Hadoop平台2-2、MapReduce_源码打包

    在大数据处理领域,Hadoop是一个不可或缺的开源框架,它提供了分布式存储系统HDFS(Hadoop Distributed File System)和分布式计算模型MapReduce。本教程聚焦于Hadoop MapReduce,特别是源码打包过程,这对于理解...

Global site tag (gtag.js) - Google Analytics