`
langyu
  • 浏览: 888328 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

MapReduce: Job提交过程

阅读更多
      
        初学Hadoop,准备用几篇日志来陈述MapReduce job的生命周期中job提交、task分发和task执行,以及JT scheduling策略,job性能参考等方面的知识。通过代码及参考资料想了解job执行的大致细节,期望在以后job性能调优时有所依据。与细节相关的代码参考于Hadoop-0.21.0版本。

        MapReduce依赖Hadoop FileSystem存储job执行过程中需要的所有资源文件。这些文件有job的jar文件、job的配置文件、job的mapper需要处理的目标文件(输入文件)以及job的输出结果。MapReduce可以根据配置文件中File System的URI判断当前是使用哪种Hadoop支持的File System,默认是local system。我更关注job在TT上的表现,而TT又是依赖于DN,所以之后所说的File System都是指HDFS。

        运行在Cluster上的MapReduce job需要关注的配置文件有:mapred-default.xml与mapred-site.xml,它们之间没有太大的区别,从名称上分,site文件中应当配置与Cluster有关的内容,default就可以随便配置了。与它们有关的引文有:How To ConfigureCluster setup


             

              
         上图表示job的完整执行流程。本篇blog只关注从第一步到第四步的具体实现,当然也会从模拟的例子按步就班叙述。下面开始我们的进程。。。

         MapReduce自带WordCount的例子,如流程第一步,在设置基本的参数后,启动job

                Cluster cluster = new Cluster(config);
		Job job = Job.getInstance(cluster);
		
		job.setMapperClass(WordMapper.class);
		job.setReducerClass(WordReducer.class);
		job.setJarByClass(WordCount.class);
		job.setCombinerClass(WordReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.submit();


                      
        注意这里的FileInputFormat.addInputPath(job, path),首先得确定MapReduce的输入文件或目录应该在File system上存在,如果MapReduce依赖于HDFS,就得先将本地的文件上传到HDFS上。MapReduce为了防止一个job的输出结果覆盖之前job的输出结果,要求每个job的输出目录都必须独立与其它job,且这个目录在job初始化时不应该存在,只有job需要时才去创建,否则就会报错。我以为MapReduce会在用户第一次设置输出目录时去检查这个目录的有效性,但事实上它是等做了一大堆事情后才去检查,这点让我很困惑。
     
        在job提交初期,也如流程中的第二步,client会向JT申请一个jobID来作为job的标识符。jobID的格式如job_201101281410_0001,中间的字符串为JT的标识符,后面是job的序号,从1开始一直递增。

        在得到jobID后,MapReduce就需要将job执行必要的资源文件copy到File system上去。在copy之前,我们得先确定这些资源文件存放在File system的什么地方。JT设置有一个工作目录(Staging area, 也称数据中转站),用来存储与每个job相关的数据。这个目录的前缀由mapreduce.jobtracker.staging.root.dir 参数来指定,默认是/tmp/hadoop/mapred/staging,每个client user可以提交多个job,在这个目录后就得附加user name的信息。所以这个工作目录(Staging area)类似于:/tmp/hadoop/mapred/staging/denny/.staging/。与job相关的资源文件存储的目录是工作目录+jobID:${Staging area}/job_201101281410_0001。

        这些资源文件存储的情况如下:

    
 
      stagingArea/job_yyyyMMddHHmm_tttt/job.jar  执行job任务的那个jar文件
      stagingArea/job_yyyyMMddHHmm_tttt/files  存储job的输入文件
      stagingArea/job_yyyyMMddHHmm_tttt/libjars 与job相关的其它jar文件
      stagingArea/job_yyyyMMddHHmm_tttt/archives  job的archives文件


             如果当前的File system是HDFS,那么对于上面的每个文件,我们会设置它在HDFS的replication,这个值由mapreduce.client.submit.file.replication参数指定,默认是10,比普通HDFS文件的默认幅本数大很多,可能也是考虑到把输入数据放到更多的DT上,尽可能实现本地数据计算。

         把资源文件上传到File system之后,负责job提交的程序会检查job设置的输出目录(output dir)。如果这个目录没有指定或是目录在File system上存在,就会抛出异常。为啥非要在上传那么多文件后才做这项关键检查呢?

        接下来才是整个job提交过程中最重要的一步:对输入文件做数据分片(input split)。MapReduce过程中每个mapper怎样知道处理输入文件的哪部分内容呢?理应在mapper执行之前就确定它处理数据的范围吧,那现在的数据分片工作就是干这种事的。更主要的是分片的数量决定map task的数量,它们之间一一对应。这种数据分片(split)只是逻辑分片,记录它应当访问哪个block,及在这个block上的起始index及数据长度的信息。

        下面我们细说怎样划分数据分片。job可能会有多个输入文件,或许分布在不同的目录下。我们获取输入目录的设置,然后识别得到我们需要处理的那些文件。这里我们可以设置一个PathFilter来过滤那些目录中的文件是否符合我们的要求,自定义的PathFilter类可由mapreduce.input.pathFilter.class属性来设置。对于我们获取的每一个输入文件,根据它的block信息产生数据分片,文件之间不能产生分片。我们可以设置数据分片的数据大小,最小字节数由mapreduce.input.fileinputformat.split.minsize设置,默认是1,最大字节数由mapreduce.input.fileinputformat.split.maxsize设置,默认是Long.MAX_VALUE。由用户定义的分片大小的设置及每个文件block大小的设置,可以计算得分片的大小。计算分片大小的公式是

splitSize = Math.max(minSize, Math.min(maxSize, blockSize))

从公式可以看出,如果maxSize设置大于blockSize,那么每个block就是一个分片,否则就会将一个block文件分隔为多个分片,如果block中剩下的一小段数据量小于splitSize,还是认为它是独立的分片。

        产生分片后我们要把这些数据保存起来,序列化到stagingArea/job_yyyyMMddHHmm_tttt/job.split文件中,之后在map task运行时才可以访问到。同时为每个分片分成一个MetaData信息,这个MetaData信息包含每个分片是放在哪台slave server上,它是由JT访问,且作为有效分发map task到拥有物理文件的那台slave server的依据。MetaData信息保存于stagingArea/job_yyyyMMddHHmm_tttt/job.splitmetainfo文件中。
      
        至此,job提交所需要准备的数据大都已经就绪,前面一步的分片任务也确定了需要多少个map task,与job相关的配置都已确定。把job的配置文件上传到stagingArea/job_yyyyMMddHHmm_tttt/job.xml文件中,在client端做的任务就完成了。Client尝试与JT通信,然后把job提交到JT。

        提交到JT后的工作与job初始化的细节,下一节再说。

2
1
分享到:
评论
1 楼 sjtufighter 2014-04-24  
[color=red][/color]
您好,考虑到hdfs上面的文件有多个副本,请问在getsplits获取分片的时候是如何决定获取某一个block的哪一个副本的分片信息的。 还有就是以我的对您的文章的理解,获取分片的工作是在向jobtracker提交之前就做好了的,您说这个任务是由提交任务的client完成的,那么要获取位置信息的话,client是需要和namenode来获取元数据信息的,是吧?  谢谢。

相关推荐

    MapReduce Job本地提交过程源码跟踪及分析

    Job本地提交过程是MapReduce执行任务的一个重要环节,特别是在开发和调试阶段,理解这一过程对于优化性能和解决潜在问题至关重要。本文将深入源码层面,分析MapReduce Job在本地提交的详细步骤。 首先,我们来了解...

    MapReduce Job集群提交过程源码跟踪及分析

    在Hadoop MapReduce框架中,Job的提交过程是整个分布式计算流程中的关键步骤。这个过程涉及到客户端、JobTracker(在Hadoop 2.x版本中被ResourceManager替代)和TaskTracker(在Hadoop 2.x版本中被NodeManager替代)...

    第一个Mapreduce程序.pdf

    文中没有详细说明如何运行作业,但在Hadoop MapReduce中,一般通过Hadoop命令行工具来提交作业,命令通常类似于“hadoop jar wc3.jar”。 整个过程大致如下: 1. 搭建Hadoop环境(以CDH5为基础)。 2. 编写或获取...

    MapReduceV1:Job提交流程之JobClient端分析

    我们基于Hadoop1.2.1源码分析MapReduceV1的处理流程。MapReduceV1实现中,主要存在3个主要的...在编写好MapReduce程序以后,需要将Job提交给JobTracker,那么我们就需要了解在提交Job的过程中,在JobClient端都做了哪

    实验项目 MapReduce 编程

    此外,通过Shell命令如`mapred job -status id`,可以在运行过程中和结束后跟踪作业状态,这有利于理解MapReduce的执行流程。 实验的总结与思考部分,强调了实验的目标在于理解和掌握MapReduce编程思想,了解...

    seminario-mapreduce:用于 Ciemat-UEX Hadoop 会议的资源

    4. **Java编程**:掌握编写MapReduce作业的Java代码,包括设置Job配置,定义Mapper和Reducer类,以及提交作业到集群。 5. **实战案例**:可能有实际的数据处理例子,如日志分析、网页排名计算等,帮助理解MapReduce...

    MapReduce:使用Hadoop Java API在Map Reduce中进行练习

    5. **`Job`**:这是配置和提交MapReduce作业的主要类,你可以设置各种参数,如输入和输出路径,Mapper和Reducer类等。 **MapReduce实战** 在"MapReduce-master"这个项目中,你可能看到的是一系列示例代码,展示...

    hadoop2.x集群搭建.txt(hdfs和yarn貌似正常,但mapreduce 提交job执行失败,请看我的另一个资源,另一个搭建是成功的)

    #### 六、MapReduce Job提交失败问题排查 从提供的信息来看,HDFS和YARN部分似乎运行正常,但是遇到了MapReduce提交Job执行失败的问题。这可能是由以下原因导致的: 1. **配置错误**:检查`mapred-site.xml`和`...

    mapreduce:在HortonWorks Sandbox上测试并执行的工作地图简化示例

    - 主类中设置Job配置,包括输入输出路径、Mapper和Reducer类等,并提交作业。 3. **上传数据到HDFS**: - 使用`hdfs dfs -put`命令将本地文件系统中的数据上传到HDFS,作为MapReduce作业的输入。 4. **编译和...

    win7下hadoop job提交

    以上就是在Windows 7环境下提交Hadoop Job的全过程,涉及到Java环境配置、Hadoop安装与配置、MapReduce编程、Job提交和监控等多个环节。通过这个过程,你可以更好地理解Hadoop的工作原理和分布式计算的基本概念。

    mapreduce运行过程(个人见解如有错误希望大神指导).pdf

    其运行过程可以大致分为map阶段和reduce阶段,而MapReduce编程模型允许将复杂的业务逻辑分解为多个MapReduce程序的串行运行。下面将详细阐述MapReduce的运行过程以及它的编程模型。 首先,MapReduce的运行过程大致...

    大数据 hadoop mapreduce 词频统计

    4. **运行Job**:配置好MapReduce作业后,提交到Hadoop集群进行执行。集群会自动调度任务,将工作分配给各个节点。 5. **结果收集**:MapReduce完成后,最终的词频统计结果会被写入HDFS,可以进一步进行可视化或...

    MapReduce的实现细节

    在Hadoop MapReduce中,服务器间的通信主要依赖于远程过程调用(RPC)机制。具体来说: - 客户端通过RPC接口向作业服务器提交作业。 - 作业服务器通过RPC接口分配任务给任务服务器。 - 任务服务器通过RPC接口向作业...

    Hadoop MapReduce入门

    MapReduce模型的核心思想是将复杂的、运行在大规模集群上的并行计算过程高度抽象到了两个函数:Map和Reduce。 Map函数:它主要负责处理输入数据,将数据解析为一系列中间形式的键值对(Key-Value pair)。Map阶段...

    Hadoop应用系列2--MapReduce原理浅析(上)

    Hadoop提供了丰富的工具来支持MapReduce作业的管理和调试,包括`hadoop jar`命令用于提交作业,`hadoop fs`用于文件系统操作,以及`job`和`task`命令用于查看作业和任务的状态。 总结,MapReduce是Hadoop处理大数据...

    大数据Hadoop核心之MapReduce详解

    * 整个程序需要一个Driver来进行提交,提交的是一个描述了各种必要信息的job对象。 Hadoop序列化: 1. 为什么要序列化?序列化可以存储活的对象,可以将活的对象发送到远程计算机。 2. 什么是序列化?序列化就是...

    Hadoop MapReduce

    4. Job Submission and Monitoring:负责提交作业到集群并提供监控作业执行过程的功能。 5. Job Input:处理作业的输入数据,通常是读取HDFS中的数据。 6. Job Output:处理作业的输出数据,通常是将结果写回到HDFS...

    【MapReduce篇07】MapReduce之数据清洗ETL1

    在main方法中,我们首先获取了Job信息,然后加载了Mapper代码,设置了输入和输出路径,并提交了作业。 小结 本文详细介绍了MapReduce之数据清洗ETL的实现过程,包括Extract、Transform和Load三个阶段。我们通过...

    Hadoop-mapreduce过程.doc

    《深入理解Hadoop MapReduce执行过程》 MapReduce是Apache Hadoop的核心组件之一,它为大规模数据处理提供了分布式计算框架。本文将从客户端、JobTracker、TaskTracker和Child四个角度,详细阐述MapReduce的工作...

Global site tag (gtag.js) - Google Analytics