`
younglibin
  • 浏览: 1216381 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

MapReduce中Job的初始化

 
阅读更多

原文:http://langyu.iteye.com/blog/909170

 

可参考:

 http://younglibin.iteye.com/blog/1930169

 

MapReduce依赖Hadoop FileSystem存储job执行过程中需要的所有资源文件。这些文件有job的jar文件、job的配置文件、job的mapper需要处理的目标文件(输入文件)以及job的输出结果。MapReduce可以根据配置文件中File System的URI判断当前是使用哪种Hadoop支持的File System,默认是local system。我更关注job在TT上的表现,而TT又是依赖于DN,所以之后所说的File System都是指HDFS。 

        运行在Cluster上的MapReduce job需要关注的配置文件有:mapred-default.xml与mapred-site.xml,它们之间没有太大的区别,从名称上分,site文件中应当配置与Cluster有关的内容,default就可以随便配置了。与它们有关的引文有:How To Configure 和 Cluster setup
 

               

               
         上图表示job的完整执行流程。本篇blog只关注从第一步到第四步的具体实现,当然也会从模拟的例子按步就班叙述。下面开始我们的进程。。。 

         MapReduce自带WordCount的例子,如流程第一步,在设置基本的参数后,启动job
 

Java代码  收藏代码
  1.               Cluster cluster = new Cluster(config);  
  2. Job job = Job.getInstance(cluster);  
  3.   
  4. job.setMapperClass(WordMapper.class);  
  5. job.setReducerClass(WordReducer.class);  
  6. job.setJarByClass(WordCount.class);  
  7. job.setCombinerClass(WordReducer.class);  
  8. job.setOutputKeyClass(Text.class);  
  9. job.setOutputValueClass(IntWritable.class);  
  10.   
  11. FileInputFormat.addInputPath(job, new Path(args[0]));  
  12. FileOutputFormat.setOutputPath(job, new Path(args[1]));  
  13.   
  14. job.submit();  



                       
        注意这里的FileInputFormat.addInputPath(job, path),首先得确定MapReduce的输入文件或目录应该在File system上存在,如果MapReduce依赖于HDFS,就得先将本地的文件上传到HDFS上。MapReduce为了防止一个job的输出结果覆盖之前job的输出结果,要求每个job的输出目录都必须独立与其它job,且这个目录在job初始化时不应该存在,只有job需要时才去创建,否则就会报错。我以为MapReduce会在用户第一次设置输出目录时去检查这个目录的有效性,但事实上它是等做了一大堆事情后才去检查,这点让我很困惑。 
      
        在job提交初期,也如流程中的第二步,client会向JT申请一个jobID来作为job的标识符。jobID的格式如job_201101281410_0001,中间的字符串为JT的标识符,后面是job的序号,从1开始一直递增。

        在得到jobID后,MapReduce就需要将job执行必要的资源文件copy到File system上去。在copy之前,我们得先确定这些资源文件存放在File system的什么地方。JT设置有一个工作目录(Staging area, 也称数据中转站),用来存储与每个job相关的数据。这个目录的前缀由mapreduce.jobtracker.staging.root.dir 参数来指定,默认是/tmp/hadoop/mapred/staging,每个client user可以提交多个job,在这个目录后就得附加user name的信息。所以这个工作目录(Staging area)类似于:/tmp/hadoop/mapred/staging/denny/.staging/。与job相关的资源文件存储的目录是工作目录+jobID:${Staging area}/job_201101281410_0001。 

        这些资源文件存储的情况如下:
 
    

Java代码  收藏代码
  1. stagingArea/job_yyyyMMddHHmm_tttt/job.jar  执行job任务的那个jar文件  
  2. stagingArea/job_yyyyMMddHHmm_tttt/files  存储job的输入文件  
  3. stagingArea/job_yyyyMMddHHmm_tttt/libjars 与job相关的其它jar文件  
  4. stagingArea/job_yyyyMMddHHmm_tttt/archives  job的archives文件  



             如果当前的File system是HDFS,那么对于上面的每个文件,我们会设置它在HDFS的replication,这个值由mapreduce.client.submit.file.replication参数指定,默认是10,比普通HDFS文件的默认幅本数大很多,可能也是考虑到把输入数据放到更多的DT上,尽可能实现本地数据计算。 

         把资源文件上传到File system之后,负责job提交的程序会检查job设置的输出目录(output dir)。如果这个目录没有指定或是目录在File system上存在,就会抛出异常。为啥非要在上传那么多文件后才做这项关键检查呢? 

        接下来才是整个job提交过程中最重要的一步:对输入文件做数据分片(input split)。MapReduce过程中每个mapper怎样知道处理输入文件的哪部分内容呢?理应在mapper执行之前就确定它处理数据的范围吧,那现在的数据分片工作就是干这种事的。更主要的是分片的数量决定map task的数量,它们之间一一对应。这种数据分片(split)只是逻辑分片,记录它应当访问哪个block,及在这个block上的起始index及数据长度的信息。 

        下面我们细说怎样划分数据分片。job可能会有多个输入文件,或许分布在不同的目录下。我们获取输入目录的设置,然后识别得到我们需要处理的那些文件。这里我们可以设置一个PathFilter来过滤那些目录中的文件是否符合我们的要求,自定义的PathFilter类可由mapreduce.input.pathFilter.class属性来设置。对于我们获取的每一个输入文件,根据它的block信息产生数据分片,文件之间不能产生分片。我们可以设置数据分片的数据大小,最小字节数由mapreduce.input.fileinputformat.split.minsize设置,默认是1,最大字节数由mapreduce.input.fileinputformat.split.maxsize设置,默认是Long.MAX_VALUE。由用户定义的分片大小的设置及每个文件block大小的设置,可以计算得分片的大小。计算分片大小的公式是
 

Java代码  收藏代码
  1. splitSize = Math.max(minSize, Math.min(maxSize, blockSize))  


从公式可以看出,如果maxSize设置大于blockSize,那么每个block就是一个分片,否则就会将一个block文件分隔为多个分片,如果block中剩下的一小段数据量小于splitSize,还是认为它是独立的分片。 

        产生分片后我们要把这些数据保存起来,序列化到stagingArea/job_yyyyMMddHHmm_tttt/job.split文件中,之后在map task运行时才可以访问到。同时为每个分片分成一个MetaData信息,这个MetaData信息包含每个分片是放在哪台slave server上,它是由JT访问,且作为有效分发map task到拥有物理文件的那台slave server的依据。MetaData信息保存于stagingArea/job_yyyyMMddHHmm_tttt/job.splitmetainfo文件中。 
       
        至此,job提交所需要准备的数据大都已经就绪,前面一步的分片任务也确定了需要多少个map task,与job相关的配置都已确定。把job的配置文件上传到stagingArea/job_yyyyMMddHHmm_tttt/job.xml文件中,在client端做的任务就完成了。Client尝试与JT通信,然后把job提交到JT。 

分享到:
评论

相关推荐

    MapReduce Job本地提交过程源码跟踪及分析

    1. **初始化Job**:在`Job`类的构造函数中,会创建一个`JobContext`对象,该对象包含了作业配置信息。然后,`submit()`方法会设置作业的输入输出路径、设置Mapper和Reducer等信息。 2. **验证Job**:在提交之前,...

    【Debug跟踪Hadoop3.0.0源码之MapReduce Job提交流程】第一节 Configuration和Job对象的初始化

    【Debug跟踪Hadoop3.0.0源码之MapReduce Job提交流程】第一节 Configuration和Job对象的初始化前言Configuration和Job对象的初始化后记跳转 前言 不得不说,在此前我对阅读源码这件事是拒绝的,一方面也知道自己非读...

    MapReduce Job集群提交过程源码跟踪及分析

    首先,当用户编写好MapReduce程序并调用Job的submit()方法时,客户端会进行一系列的初始化工作。这包括将程序的JAR包、配置信息以及输入输出路径等打包成一个JobConf对象,然后通过JobClient发送到JobTracker。 1. ...

    用MapReduce实现KMeans算法

    这个过程包括初始化中心点、分配数据点到最近的中心点以及更新中心点等步骤,直到满足停止条件(如中心点不再改变或达到预设迭代次数)。 接下来,我们探讨如何在MapReduce中实现这一过程。Map阶段,每个Mapper接收...

    mapreduce统计度分布

    - **运行说明**:此主函数首先初始化一个Job对象,并对其进行一系列配置,包括指定Mapper、Reducer类等。最后通过调用`job.waitForCompletion(true)`来启动MapReduce任务。 #### 六、总结 通过上述代码和解析,...

    MapReduce开发案例

    1. **设置输入和输出路径**:在MapReduce程序的初始化阶段,需要指定输入数据文件的路径(通常是HDFS中的一个目录)以及预期输出结果的路径。 2. **编写Mapper**:Mapper类包含map()方法,这是处理输入数据的地方。...

    005_hadoop中MapReduce详解_2

    1. `configure()`: 这个方法用于初始化Mapper或Reducer类,设置如输入输出格式、分区器、比较器等配置。 2. `map()` 和 `reduce()`: 这两个方法分别是Mapper和Reducer的核心,用户需要根据业务需求实现它们。`map()...

    MapReduce实现单元最短路径算法.doc

    - 将节点颜色初始化为白色,距离初始化为无穷大。 - 遍历状态列表,根据颜色和距离更新节点k2的状态。 - 如果列表中的某个状态的边权重更优,则更新k2的边信息。 - 输出更新后的k2和状态。 通过多轮MapReduce...

    自定义MapReduce的InputFormat

    5. **配置和使用自定义InputFormat**:在你的MapReduce作业中,通过设置`job.setInputFormatClass()`方法指定你的自定义InputFormat类。同时,如果需要,你还可以在JobConf中添加额外的配置参数来指导InputFormat和...

    单源最短路径算法(MapReduce)源代码

    - 这个包中包含了启动MapReduce作业的主要逻辑,通常包括初始化输入输出路径、配置job参数等。 3. **com.ouyang.graph**:这部分负责图的基本建模。 - 定义了图的基本元素如节点(Node)和边(Edge),以及节点的状态...

    MapReduce2.0源码分析与实战编程

    1. JobClient:提交作业的客户端,负责与ResourceManager通信,初始化ApplicationMaster。 2. ResourceManager:全局资源调度中心,负责分配集群中的资源。 3. NodeManager:运行在每个节点上,管理该节点上的容器,...

    8-在Eclipse中运行MapReduce程序1

    // 配置和初始化Job // 添加输入输出路径,设置Mapper和Reducer类 // 设置输出键值类型,运行Job并等待完成 } public static class TokenizerMapper extends Mapper, Text, Text, IntWritable> { // 实现...

    大数据MapReduce文件分发

    文件分发过程涉及RecordReader的初始化和数据读取。 7. **Shuffle与Reduce阶段**: - Mapper的输出会被排序和分区,然后通过网络传输到Reducer。在这个过程中,数据分发和合并再次发生,保证了数据的正确聚合。 8...

    17_尚硅谷大数据之MapReduce框架原理1

    2. **作业初始化**:JobTracker解析JobConf(配置信息),计算输入数据的切片(split)信息,根据配置决定map任务的数量。 3. **任务分配**:JobTracker将map任务分配给TaskTracker(YARN中的NodeManager)。 4. **...

    Hadoop MapReduce 入门

    - **作业初始化**: JobTracker 负责初始化作业,并进行调度。 - **任务分配**: TaskTracker 定期向 JobTracker 请求任务,并执行分配的任务。 - **任务执行**: TaskTracker 启动 Map 或 Reduce 任务,执行具体的计算...

    理论部分-MapReduce-hadoop1

    - **任务初始化**:JobTracker从HDFS读取Job.split,分配任务给TaskTracker。 - **任务本地化**:TaskTracker将任务所需资源从HDFS复制到本地,包括job.split、job.jar、配置文件等,解压并启动任务。 4. **错误...

    hadoop MapReduce编程教程

    Driver类负责初始化`Job`对象,并设置其属性(如输入路径、输出路径、映射器和化简器类等),然后提交作业进行执行。 #### 四、WordCount实例详解 - **WordCount**:这是一个经典的MapReduce示例程序,用于统计文本...

Global site tag (gtag.js) - Google Analytics