- 浏览: 109280 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (75)
- JVM (22)
- 数据结构 (11)
- java 基础 (16)
- gc (6)
- jmock (1)
- Google (2)
- MapReduce (1)
- Memory (2)
- 算法 (2)
- cglib (1)
- jdk (3)
- 虚拟机 (3)
- 安全 (2)
- 多线程 (1)
- 工作 (1)
- 生活 (1)
- MongoDB (2)
- Hadoop (4)
- HDFS (2)
- cms (2)
- Spring (1)
- 网络协议 (1)
- GitHub (1)
- MYSQL 调优和使用必读(转) (1)
- 分布式 (2)
- Big Data (0)
- 技术Blog (1)
- Hbase (2)
- Zookeeper (1)
- paper (0)
最新评论
-
lzc_java:
Java线程安全兼谈DCL -
select*from爱:
it's nice
IT业薪水大揭秘
转载自 ---- http://www.cnblogs.com/duguguiyu/archive/2009/02/28/1400278.html
二. 分布式计算(Map/Reduce)
分布式式计算,同样是一个宽泛的概念,在这里,它狭义的指代,按Google Map/Reduce框架所设计的分布式框架。在Hadoop中,分布式文件系统,很大程度上,是为各种分布式计算需求所服务的。我们说分布式文件系统就是加了分布式的文件系统,类似的定义推广到分布式计算上,我们可以将其视为增加了分布式支持的计算函数。从计算的角度上看,Map/Reduce框架接受各种格式的键值对文件作为输入,读取计算后,最终生成自定义格式的输出文件。而从分布式的角度上看,分布式计算的输入文件往往规模巨大,且分布在多个机器上,单机计算完全不可支撑且效率低下,因此Map/Reduce框架需要提供一套机制,将此计算扩展到无限规模的机器集群上进行。依照这样的定义,我们对整个Map/Reduce的理解,也可以分别沿着这两个流程去看。。。
在Map/Reduce框架中,每一次计算请求,被称为作业。在分布式计算Map/Reduce框架中,为了完成这个作业,它进行两步走的战略,首先是将其拆分成若干个Map任务,分配到不同的机器上去执行,每一个Map任务拿输入文件的一部分作为自己的输入,经过一些计算,生成某种格式的中间文件,这种格式,与最终所需的文件格式完全一致,但是仅仅包含一部分数据。因此,等到所有Map任务完成后,它会进入下一个步骤,用以合并这些中间文件获得最后的输出文件。此时,系统会生成若干个Reduce任务,同样也是分配到不同的机器去执行,它的目标,就是将若干个Map任务生成的中间文件为汇总到最后的输出文件中去。当然,这个汇总不总会像1 + 1 = 2那么直接了当,这也就是Reduce任务的价值所在。经过如上步骤,最终,作业完成,所需的目标文件生成。整个算法的关键,就在于增加了一个中间文件生成的流程,大大提高了灵活性,使其分布式扩展性得到了保证。。。
I. 术语对照
和分布式文件系统一样,Google、Hadoop和....我,各执一种方式表述统一概念,为了保证其统一性,特有下表。。。
文中翻译 Hadoop术语 Google术语 相关解释
作业 Job Job 用户的每一个计算请求,就称为一个作业。
作业服务器 JobTracker Master 用户提交作业的服务器,同时,它还负责各个作业任务的分配,管理所有的任务服务器。
任务服务器 TaskTracker Worker 任劳任怨的工蜂,负责执行具体的任务。
任务 Task Task 每一个作业,都需要拆分开了,交由多个服务器来完成,拆分出来的执行单位,就称为任务。
备份任务 Speculative Task Buckup Task 每一个任务,都有可能执行失败或者缓慢,为了降低为此付出的代价,系统会未雨绸缪的实现在另外的任务服务器上执行同样一个任务,这就是备份任务。
II. 基本架构
与分布式文件系统类似,Map/Reduce的集群,也由三类服务器构成。其中作业服务器,在Hadoop中称为Job Tracker,在Google论文中称为Master。前者告诉我们,作业服务器是负责管理运行在此框架下所有作业的,后者告诉我们,它也是为各个作业分配任务的核心。与HDFS的主控服务器类似,它也是作为单点存在的,简化了负责的同步流程。具体的负责执行用户定义操作的,是任务服务器,每一个作业被拆分成很多的任务,包括Map任务和Reduce任务等,任务是具体执行的基本单元,它们都需要分配到合适任务服务器上去执行,任务服务器一边执行一边向作业服务器汇报各个任务的状态,以此来帮助作业服务器了解作业执行的整体情况,分配新的任务等等。。。
除了作业的管理者执行者,还需要有一个任务的提交者,这就是客户端。与分布式文件系统一样,客户端也不是一个单独的进程,而是一组API,用户需要自定义好自己需要的内容,经由客户端相关的代码,将作业及其相关内容和配置,提交到作业服务器去,并时刻监控执行的状况。。。
同作为Hadoop的实现,与HDFS的通信机制相同,Hadoop Map/Reduce也是用了协议接口来进行服务器间的交流。实现者作为RPC服务器,调用者经由RPC的代理进行调用,如此,完成大部分的通信,具体服务器的架构,和其中运行的各个协议状况,参见下图。从图中可以看到,与HDFS相比,相关的协议少了几个,客户端与任务服务器,任务服务器之间,都不再有直接通信关系。这并不意味着客户端就不需要了解具体任务的执行状况,也不意味着,任务服务器之间不需要了解别家任务执行的情形,只不过,由于整个集群各机器的联系比HDFS复杂的多,直接通信过于的难以维系,所以,都统一由作业服务器整理转发。另外,从这幅图可以看到,任务服务器不是一个人在战斗,它会像孙悟空一样招出一群宝宝帮助其具体执行任务。这样做的好处,个人觉得,应该有安全性方面的考虑,毕竟,任务的代码是用户提交的,数据也是用户指定的,这质量自然良莠不齐,万一碰上个搞破坏的,把整个任务服务器进程搞死了,就因小失大了。因此,放在单独的地盘进行,爱咋咋地,也算是权责明确了。。。
与分布式文件系统相比,Map/Reduce框架的还有一个特点,就是可定制性强。文件系统中很多的算法,都是很固定和直观的,不会由于所存储的内容不同而有太多的变化。而作为通用的计算框架,需要面对的问题则要复杂很多,在各种不同的问题、不同的输入、不同的需求之间,很难有一种包治百病的药能够一招鲜吃遍天。作为Map/Reduce框架而言,一方面要尽可能的抽取出公共的一些需求,实现出来。更重要的,是需要提供良好的可扩展机制,满足用户自定义各种算法的需求。Hadoop是由Java来实现的,因此通过反射来实现自定义的扩展,显得比较小菜一碟了。在JobConf类中,定义了大量的接口,这基本上是Hadoop Map/Reduce框架所有可定制内容的一次集中展示。在JobConf中,有大量set接口接受一个Class<? extends xxx>的参数,通常它都有一个默认实现的类,用户如果不满意,则可自定义实现。。。
III. 计算流程
如果一切都按部就班的进行,那么整个作业的计算流程,应该是作业的提交 -> Map任务的分配和执行 -> Reduce任务的分配和执行 -> 作业的完成。而在每个任务的执行中,又包含输入的准备 -> 算法的执行 -> 输出的生成,三个子步骤。沿着这个流程,我们可以很快的整理清晰整个Map/Reduce框架下作业的执行。。。
1、作业的提交
一个作业,在提交之前,需要把所有应该配置的东西都配置好,因为一旦提交到了作业服务器上,就陷入了完全自动化的流程,用户除了观望,最多也就能起一个监督作用,惩治一些不好好工作的任务。。。
基本上,用户在提交代码阶段,需要做的工作主要是这样的:
首先,书写好所有自定的代码,最起码,需要有Map和Reduce的执行代码。在Hadoop中,Map需要派生自Mapper<K1, V1, K2, V2>接口,Reduce需要派生自Reducer<K2, V2, K3, V3>接口。这里都是用的泛型,用以支持不同的键值类型。这两个接口都仅有一个方法,一个是map,一个是reduce,这两个方法都直接受四个参数,前两个是输入的键和值相关的数据结构,第三个是作为输出相关的数据结构,最后一个,是一个Reporter类的实例,实现的时候可以利用它来统计一些计数。除了这两个接口,还有大量可以派生的接口,比如分割的Partitioner<K2, V2>接口。。。
然后,需要书写好主函数的代码,其中最主要的内容就是实例化一个JobConf类的对象,然后调用其丰富的setXXX接口,设定好所需的内容,包括输入输出的文件路径,Map和Reduce的类,甚至包括读取写入文件所需的格式支持类,等等。。。
最后,调用JobClient的runJob方法,提交此JobConf对象。runJob方法会先行调用到JobSubmissionProtocol接口所定义的submitJob方法,将此作业,提交给作业服务器。接着,runJob开始循环,不停的调用JobSubmissionProtocol的getTaskCompletionEvents方法,获得TaskCompletionEvent类的对象实例,了解此作业各任务的执行状况。。。
2、Map任务的分配
当一个作业提交到了作业服务器上,作业服务器会生成若干个Map任务,每一个Map任务,负责将一部分的输入转换成格式与最终格式相同的中间文件。通常一个作业的输入都是基于分布式文件系统的文件(当然在单机环境下,文件系统单机的也可以...),因为,它可以很天然的和分布式的计算产生联系。而对于一个Map任务而言,它的输入往往是输入文件的一个数据块,或者是数据块的一部分,但通常,不跨数据块。因为,一旦跨了数据块,就可能涉及到多个服务器,带来了不必要的复杂性。。。
当一个作业,从客户端提交到了作业服务器上,作业服务器会生成一个JobInProgress对象,作为与之对应的标识,用于管理。作业被拆分成若干个Map任务后,会预先挂在作业服务器上的任务服务器拓扑树。这是依照分布式文件数据块的位置来划分的,比如一个Map任务需要用某个数据块,这个数据块有三份备份,那么,在这三台服务器上都会挂上此任务,可以视为是一个预分配。。。
关于任务管理和分配的大部分的真实功能和逻辑的实现,JobInProgress则依托JobInProgressListener和TaskScheduler的子类。TaskScheduler,顾名思义是用于任务分配的策略类(为了简化描述,用它代指所有TaskScheduler的子类...)。它会掌握好所有作业的任务信息,其assignTasks函数,接受一个TaskTrackerStatus作为参数,依照此任务服务器的状态和现有的任务状况,为其分配新的任务。而为了掌握所有作业相关任务的状况,TaskScheduler会将若干个JobInProgressListener注册到JobTracker中去,当有新的作业到达、移除或更新的时候,JobTracker会告知给所有的JobInProgressListener,以便它们做出相应的处理。。。
任务分配是一个重要的环节,所谓任务分配,就是将合适作业的合适任务分配到合适的服务器上。不难看出,里面蕴含了两个步骤,先是选择作业,然后是在此作业中选择任务。和所有分配工作一样,任务分配也是一个复杂的活。不良好的任务分配,可能会导致网络流量增加、某些任务服务器负载过重效率下降,等等。不仅如此,任务分配还是一个无一致模式的问题,不同的业务背景,可能需要不同的算法才能满足需求。因此,在Hadoop中,有很多TaskScheduler的子类,像Facebook,Yahoo,都为其贡献出了自家用的算法。在Hadoop中,默认的任务分配器,是JobQueueTaskScheduler类。它选择作业的基本次序是:Map Clean Up Task(Map任务服务器的清理任务,用于清理相关的过期的文件和环境...) -> Map Setup Task(Map任务服务器的安装任务,负责配置好相关的环境...) -> Map Tasks -> Reduce Clean Up Task -> Reduce Setup Task -> Reduce Tasks。在这个前提下,具体到Map任务的分配上来。当一个任务服务器工作的游刃有余,期待获得新的任务的时候,JobQueueTaskScheduler会按照各个作业的优先级,从最高优先级的作业开始分配。每分配一个,还会为其留出余量,已被不时之需。举一个例子:系统目前有优先级3、2、1的三个作业,每个作业都有一个可分配的Map任务,一个任务服务器来申请新的任务,它还有能力承载3个任务的执行,JobQueueTaskScheduler会先从优先级3的作业上取一个任务分配给它,然后再留出一个1任务的余量。此时,系统只能在将优先级2作业的任务分配给此服务器,而不能分配优先级1的任务。这样的策略,基本思路就是一切为高优先级的作业服务,优先分配不说,分配了好保留有余力以备不时之需,如此优待,足以让高优先级的作业喜极而泣,让低优先级的作业感慨既生瑜何生亮,甚至是活活饿死。。。
确定了从哪个作业提取任务后,具体的分配算法,经过一系列的调用,最后实际是由JobInProgress的findNewMapTask函数完成的。它的算法很简单,就是尽全力为此服务器非配且尽可能好的分配任务,也就是说,只要还有可分配的任务,就一定会分给它,而不考虑后来者。作业服务器会从离它最近的服务器开始,看上面是否还挂着未分配的任务(预分配上的),从近到远,如果所有的任务都分配了,那么看有没有开启多次执行,如果开启,考虑把未完成的任务再分配一次(后面有地方详述...)。。。
对于作业服务器来说,把一个任务分配出去了,并不意味着它就彻底解放,可以对此任务可以不管不顾了。因为任务可以在任务服务器上执行失败,可能执行缓慢,这都需要作业服务器帮助它们再来一次。因此在Task中,记录有一个TaskAttemptID,对于任务服务器而言,它们每次跑的,其实都只是一个Attempt而已,Reduce任务只需要采信一个的输出,其他都算白忙乎了。。。
3、Map任务的执行
与HDFS类似,任务服务器是通过心跳消息,向作业服务器汇报此时此刻其上各个任务执行的状况,并向作业服务器申请新的任务的。具体实现,是TaskTracker调用InterTrackerProtocol协议的heartbeat方法来做的。这个方法接受一个TaskTrackerStatus对象作为参数,它描述了此时此任务服务器的状态。当其有余力接受新的任务的时候,它还会传入acceptNewTasks为true的参数,表示希望作业服务器委以重任。JobTracker接收到相关的参数后,经过处理,会返回一个HeartbeatResponse对象。这个对象中,定义了一组TaskTrackerAction,用于指导任务服务器进行下一步的工作。系统中已定义的了一堆其TaskTrackerAction的子类,有的对携带的参数进行了扩充,有的只是标明了下ID,具体不详写了,一看便知。。。
当TaskTracker收到的TaskTrackerAction中,包含了LaunchTaskAction,它会开始执行所分配的新的任务。在TaskTracker中,有一个TaskTracker.TaskLauncher线程(确切的说是两个,一个等Map任务,一个等Reduce任务),它们在痴痴的守候着新任务的来到。一旦等到了,会最终调用到Task的createRunner方法,构造出一个TaskRunner对象,新建一个线程来执行。对于一个Map任务,它对应的Runner是TaskRunner的子类MapTaskRunner,不过,核心部分都在TaskRunner的实现内。TaskRunner会先将所需的文件全部下载并拆包好,并记录到一个全局缓存中,这是一个全局的目录,可以供所有此作业的所有任务使用。它会用一些软链接,将一些文件名链接到这个缓存中来。然后,根据不同的参数,配置出一个JVM执行的环境,这个环境与JvmEnv类的对象对应。
接着,TaskRunner会调用JvmManager的launchJvm方法,提交给JvmManager处理。JvmManager用于管理该TaskTracker上所有运行的Task子进程。在目前的实现中,尝试的是池化的方式。有若干个固定的槽,如果槽没有满,那么就启动新的子进程,否则,就寻找idle的进程,如果是同Job的直接放进去,否则杀死这个进程,用一个新的进程代替。每一个进程都是由JvmRunner来管理的,它也是位于单独线程中的。但是从实现上看,这个机制好像没有部署开,子进程是死循环等待,而不会阻塞在父进程的相关线程上,父线程的变量一直都没有个调整,一旦分配,始终都处在繁忙的状况了。
真实的执行载体,是Child,它包含一个main函数,进程执行,会将相关参数传进来,它会拆解这些参数,并且构造出相关的Task实例,调用其run函数进行执行。每一个子进程,可以执行指定个数量的Task,这就是上面所说的池化的配置。但是,这套机制在我看来,并没有运行起来,每个进程其实都没有机会不死而执行新的任务,只是傻傻的等待进程池满,而被一刀毙命。也许是我老眼昏花,没看出其中实现的端倪。。。
4、Reduce任务的分配与执行
比之Map任务,Reduce的分配及其简单,基本上是所有Map任务完成了,有空闲的任务服务器,来了就给分配一个Job任务。因为Map任务的结果星罗棋布,且变化多端,真要搞一个全局优化的算法,绝对是得不偿失。而Reduce任务的执行进程的构造和分配流程,与Map基本完全的一致,没有啥可说的了。。。
但其实,Reduce任务与Map任务的最大不同,是Map任务的文件都在本地隔着,而Reduce任务需要到处采集。这个流程是作业服务器经由此Reduce任务所处的任务服务器,告诉Reduce任务正在执行的进程,它需要的Map任务执行过的服务器地址,此Reduce任务服务器会于原Map任务服务器联系(当然本地就免了...),通过FTP服务,下载过来。这个隐含的直接数据联系,就是执行Reduce任务与执行Map任务最大的不同了。。。
5、作业的完成
当所有Reduce任务都完成了,所需数据都写到了分布式文件系统上,整个作业才正式完成了。此中,涉及到很多的类,很多的文件,很多的服务器,所以说起来很费劲,话说,一图解千语,说了那么多,我还是画两幅图,彻底表达一下吧。。。
首先,是一个时序图。它模拟了一个由3个Map任务和1个Reduce任务构成的作业执行流程。我们可以看到,在执行的过程中,只要有人太慢,或者失败,就会增加一次尝试,以此换取最快的执行总时间。一旦所有Map任务完成,Reduce开始运作(其实,不一定要这样的...),对于每一个Map任务来说,只有执行到Reduce任务把它上面的数据下载完成,才算成功,否则,都是失败,需要重新进行尝试。。。
而第二副图,不是我画的,就不转载了,参见这里,它描述了整个Map/Reduce的服务器状况图,包括整体流程、所处服务器进程、输入输出等,看清楚这幅图,对Map/Reduce的基本流程应该能完全跑通了。有这几点,可能图中描述的不够清晰需要提及一下,一个是在HDFS中,其实还有日志文件,图中没有标明;另一个是步骤5,其实是由TaskTracker主动去拉取而不是JobTracker推送过来的;还有步骤8和步骤11,创建出来的MapTask和ReduceTask,在Hadoop中都是运行在独立的进程上的。。。
IV. Map任务详请
从上面,可以了解到整个Map和Reduce任务的整体流程,而后面要啰嗦的,是具体执行中的细节。Map任务的输入,是分布式文件系统上的,包含键值对信息的文件。为了给每一个Map任务指定输入,我们需要掌握文件格式把它分切成块,并从每一块中分离出键值信息。在HDFS中,输入的文件格式,是由InputFormat<K, V>类来表示的,在JobConf中,它的默认值是TextInputFormat类(见getInputFormat),此类是特化的FileInputFormat<LongWritable, Text>子类,而FileInputFormat<K, V>正是InputFormat<K, V>的子类。通过这样的关系我们可以很容易的理解,默认的文件格式是文本文件,且键是LongWritable类型(整形数),值是Text类型(字符串)。仅仅知道文件类型是不够的,我们还需要将文件中的每一条数据,分离成键值对,这个工作,是RecordReader<K, V>来做的。在TextInputFormat的getRecordReader方法中我们可以看到,与TextInputFormat默认配套使用的,是LineRecordReader类,是特化的RecordReader<LongWritable, Text>的子类,它将每一行作为一个记录,起始的位置作为键,整行的字符串作为值。有了格式,分出了键值,还需要切开分给每一个Map任务。每一个Map任务的输入用InputSplit接口表示,对于一个文件输入而言,其实现是FileSplit,它包含着文件名、起始位置、长度和存储它的一组服务器地址。。。
当Map任务拿到所属的InputSplit后,就开始一条条读取记录,并调用用于定义的Mapper,进行计算(参见MapRunner<K1, V1, K2, V2>和MapTask的run方法),然后,输出。MapTask会传递给Mapper一个OutputCollector<K, V>对象,作为输出的数据结构。它定义了一个collect的函数,接受一个键值对。在MapTask中,定义了两个OutputCollector的子类,一个是MapTask.DirectMapOutputCollector<K, V>,人如其名,它的实现确实很Direct,直截了当。它会利用一个RecordWriter<K, V>对象,collect一调用,就直接调用RecordWriter<K, V>的write方法,写入本地的文件中去。如果觉着RecordWriter<K, V>出现的很突兀,那么看看上一段提到的RecordReader<K, V>,基本上,数据结构都是对应着的,一个是输入一个是输出。输出很对称也是由RecordWriter<K, V>和OutputFormat<K, V>来协同完成的,其默认实现是LineRecordWriter<K, V>和TextOutputFormat<K, V>,多么的眼熟啊。。。
除了这个非常直接的实现之外,MapTask中还有一个复杂的多的实现,是MapTask.MapOutputBuffer<K extends Object, V extends Object>。有道是简单压倒一切,那为什么有很简单的实现,要琢磨一个复杂的呢。原因在于,看上去很美的往往带着刺,简单的输出实现,每调用一次collect就写一次文件,频繁的硬盘操作很有可能导致此方案的低效。为了解决这个问题,这就有了这个复杂版本,它先开好一段内存做缓存,然后制定一个比例做阈值,开一个线程监控此缓存。collect来的内容,先写到缓存中,当监控线程发现缓存的内容比例超过阈值,挂起所有写入操作,建一个新的文件,把缓存的内容批量刷到此文件中去,清空缓存,重新开放,接受继续collect。。。
为什么说是刷到文件中去呢。因为这不是一个简单的照本宣科简单复制的过程,在写入之前,会先将缓存中的内存,经过排序、合并器(Combiner)统计之后,才会写入。如果你觉得Combiner这个名词听着太陌生,那么考虑一下Reducer,Combiner也就是一个Reducer类,通过JobConf的setCombinerClass进行设置,在常用的配置中,Combiner往往就是用用户为Reduce任务定义的那个Reducer子类。只不过,Combiner只是服务的范围更小一些而已,它在Map任务执行的服务器本地,依照Map处理过的那一小部分数据,先做一次Reduce操作,这样,可以压缩需要传输内容的大小,提高速度。每一次刷缓存,都会开一个新的文件,等此任务所有的输入都处理完成后,就有了若干个有序的、经过合并的输出文件。系统会将这些文件搞在一起,再做一个多路的归并外排,同时使用合并器进行合并,最终,得到了唯一的、有序的、经过合并的中间文件(注:文件数量等同于分类数量,在不考虑分类的时候,简单的视为一个...)。它,就是Reduce任务梦寐以求的输入文件。。。
除了做合并,复杂版本的OutputCollector,还具有分类的功能。分类,是通过Partitioner<K2, V2>来定义的,默认实现是HashPartitioner<K2, V2>,作业提交者可以通过JobConf的setPartitionerClass来自定义。分类的含义是什么呢,简单的说,就是将Map任务的输出,划分到若干个文件中(通常与Reduce任务数目相等),使得每一个Reduce任务,可以处理某一类文件。这样的好处是大大的,举一个例子说明一下。比如有一个作业是进行单词统计的,其Map任务的中间结果应该是以单词为键,以单词数量为值的文件。如果这时候只有一个Reduce任务,那还好说,从全部的Map任务那里收集文件过来,分别统计得到最后的输出文件就好。但是,如果单Reduce任务无法承载此负载或效率太低,就需要多个Reduce任务并行执行。此时,再沿用之前的模式就有了问题。每个Reduce任务从一部分Map任务那里获得输入文件,但最终的输出结果并不正确,因为同一个单词可能在不同的Reduce任务那里都有统计,需要想方法把它们统计在一起才能获得最后结果,这样就没有将Map/Reduce的作用完全发挥出来。这时候,就需要用到分类。如果此时有两个Reduce任务,那么将输出分成两类,一类存放字母表排序较高的单词,一类存放字母表排序低的单词,每一个Reduce任务从所有的Map任务那里获取一类的中间文件,得到自己的输出结果。最终的结果,只需要把各个Reduce任务输出的,拼接在一起就可以了。本质上,这就是将Reduce任务的输入,由垂直分割,变成了水平分割。Partitioner的作用,正是接受一个键值,返回一个分类的序号。它会在从缓存刷到文件之前做这个工作,其实只是多了一个文件名的选择而已,别的逻辑都不需要变化。。。
除了缓存、合并、分类等附加工作之外,复杂版本的OutputCollector还支持错误数据的跳过功能,在后面分布式将排错的时候,还会提及,标记一下,按下不表。。。
V. Reduce任务详情
理论上看,Reduce任务的整个执行流程要比Map任务更为的罗嗦一些,因为,它需要收集输入文件,然后才能进行处理。Reduce任务,主要有这么三个步骤:Copy、Sort、Reduce(参见ReduceTask的run方法)。所谓Copy,就是从执行各个Map任务的服务器那里,收罗到本地来。拷贝的任务,是由ReduceTask.ReduceCopier类来负责,它有一个内嵌类,叫MapOutputCopier,它会在一个单独的线程内,负责某个Map任务服务器上文件的拷贝工作。远程拷贝过来的内容(当然也可以是本地了...),作为MapOutput对象存在,它可以在内存中也可以序列化在磁盘上,这个根据内存使用状况来自动调节。整个拷贝过程是一个动态的过程,也就是说它不是一次给好所有输入信息就不再变化了。它会不停的调用TaskUmbilicalProtocol协议的getMapCompletionEvents方法,向其父TaskTracker询问此作业个Map任务的完成状况(TaskTracker要向JobTracker询问后再转告给它...)。当获取到相关Map任务执行服务器的信息后,都会有一个线程开启,做具体的拷贝工作。同时,还有一个内存Merger线程和一个文件Merger线程在同步工作,它们将新鲜下载过来的文件(可能在内存中,简单的统称为文件...),做着归并排序,以此,节约时间,降低输入文件的数量,为后续的排序工作减负。。。
Sort,排序工作,就相当于上述排序工作的一个延续。它会在所有的文件都拷贝完毕后进行,因为虽然同步有做着归并的工作,但可能留着尾巴,没做彻底。经过这一个流程,该彻底的都彻底了,一个崭新的、合并了所有所需Map任务输出文件的新文件,诞生了。而那些千行万苦从其他各个服务器网罗过来的Map任务输出文件,很快的结束了它们的历史使命,被扫地出门一扫而光,全部删除了。。。
所谓好戏在后头,Reduce任务的最后一个阶段,正是Reduce本身。它也会准备一个OutputCollector收集输出,与MapTask不同,这个OutputCollector更为简单,仅仅是打开一个RecordWriter,collect一次,write一次。最大的不同在于,这次传入RecordWriter的文件系统,基本都是分布式文件系统,或者说是HDFS。而在输入方面,ReduceTask会从JobConf那里调用一堆getMapOutputKeyClass、getMapOutputValueClass、getOutputKeyComparator等等之类的自定义类,构造出Reducer所需的键类型,和值的迭代类型Iterator(一个键到了这里一般是对应一组值)。具体实现颇为拐弯抹角,建议看一下Merger.MergeQueue,RawKeyValueIterator,ReduceTask.ReduceValuesIterator等等之类的实现。有了输入,有了输出,不断循环调用自定义的Reducer,最终,Reduce阶段完成。。。
VI. 分布式支持
1、服务器正确性保证
Hadoop Map/Reduce服务器状况和HDFS很类似,由此可知,救死扶伤的方法也是大同小异。废话不多说了,直接切正题。同作为客户端,Map/Reduce的客户端只是将作业提交,就开始搬个板凳看戏,没有占茅坑的行动。因此,一旦它挂了,也就挂了,不伤大雅。而任务服务器,也需要随时与作业服务器保持心跳联系,一旦有了问题,作业服务器可以将其上运行的任务,移交给它人完成。作业服务器,作为一个单点,非常类似的是利用还原点(等同于HDFS的镜像)和历史记录(等同于HDFS的日志),来进行恢复。其上,需要持久化用于恢复的内容,包含作业状况、任务状况、各个任务尝试的工作状况等。有了这些内容,再加上任务服务器的动态注册,就算挪了个窝,还是很容易恢复的。JobHistory是历史记录相关的一个静态类,本来,它也就是一个干写日志活的,只是在Hadoop的实现中,对日志的写入做了面向对象的封装,同时又大量用到观察者模式做了些嵌入,使得看起来不是那么直观。本质上,它就是打开若干个日志文件,利用各类接口来往里面写内容。只不过,这些日志,会放在分布式文件系统中,就不需要像HDFS那样,来一个SecondXXX随时候命,由此可见,有巨人在脚下踩着,真好。JobTracker.RecoveryManager类是作业服务器中用于进行恢复相关的事情,当作业服务器启动的时候,会调用其recover方法,恢复日志文件中的内容。其中步骤,注释中写的很清楚,请自行查看。。。
2、任务执行的正确和速度
整个作业流程的执行,秉承着木桶原理。执行的最慢的Map任务和Reduce任务,决定了系统整体执行时间(当然,如果执行时间在整个流程中占比例很小的话,也许就微不足道了...)。因此,尽量加快最慢的任务执行速度,成为提高整体速度关键。所使用的策略,简约而不简单,就是一个任务多次执行。当所有未执行的任务都分配出去了,并且先富起来的那部分任务已经完成了,并还有任务服务器孜孜不倦的索取任务的时候,作业服务器会开始炒剩饭,把那些正在吭哧吭哧在某个服务器上慢慢执行的任务,再把此任务分配到一个新的任务服务器上,同时执行。两个服务器各尽其力,成王败寇,先结束者的结果将被采纳。这样的策略,隐含着一个假设,就是我们相信,输入文件的分割算法是公平的,某个任务执行慢,并不是由于这个任务本身负担太重,而是由于服务器不争气负担太重能力有限或者是即将撒手西去,给它换个新环境,人挪死树挪活事半功倍。。。
当然,肯定有哽咽的任务,不论是在哪个服务器上,都无法顺利完成。这就说明,此问题不在于服务器上,而是任务本身天资有缺憾。缺憾在何处?每个作业,功能代码都是一样的,别的任务成功了,就是这个任务不成功,很显然,问题出在输入那里。输入中有非法的输入条目,导致程序无法辨识,只能挥泪惜别。说到这里,解决策略也浮出水面了,三十六计走位上,惹不起,还是躲得起的。在MapTask中的MapTask.SkippingRecordReader<K, V>和ReduceTask里的ReduceTask.SkippingReduceValuesIterator<KEY,VALUE>,都是用于干这个事情的。它们的原理很简单,就是在读一条记录前,把当前的位置信息,封装成SortedRanges.Range对象,经由Task的reportNextRecordRange方法提交到TaskTracker上去。TaskTracker会把这些内容,搁在TaskStatus对象中,随着心跳消息,汇报到JobTracker上面。这样,作业服务器就可以随时随刻了解清楚,每个任务正读取在那个位置,一旦出错,再次执行的时候,就在分配的任务信息里面添加一组SortedRanges信息。MapTask或ReduceTask读取的时候,会看一下这些区域,如果当前区域正好处于上述雷区,跳过不读。如此反复,正可谓,道路曲折,前途光明啊。。。
VII. 总结
对于Map/Reduce而言,真正的困难,在于提高其适应能力,打造一款能够包治百病的执行框架。Hadoop已经做得很好了,但只有真正搞清楚了整个流程,你才能帮助它做的更好。。。
二. 分布式计算(Map/Reduce)
分布式式计算,同样是一个宽泛的概念,在这里,它狭义的指代,按Google Map/Reduce框架所设计的分布式框架。在Hadoop中,分布式文件系统,很大程度上,是为各种分布式计算需求所服务的。我们说分布式文件系统就是加了分布式的文件系统,类似的定义推广到分布式计算上,我们可以将其视为增加了分布式支持的计算函数。从计算的角度上看,Map/Reduce框架接受各种格式的键值对文件作为输入,读取计算后,最终生成自定义格式的输出文件。而从分布式的角度上看,分布式计算的输入文件往往规模巨大,且分布在多个机器上,单机计算完全不可支撑且效率低下,因此Map/Reduce框架需要提供一套机制,将此计算扩展到无限规模的机器集群上进行。依照这样的定义,我们对整个Map/Reduce的理解,也可以分别沿着这两个流程去看。。。
在Map/Reduce框架中,每一次计算请求,被称为作业。在分布式计算Map/Reduce框架中,为了完成这个作业,它进行两步走的战略,首先是将其拆分成若干个Map任务,分配到不同的机器上去执行,每一个Map任务拿输入文件的一部分作为自己的输入,经过一些计算,生成某种格式的中间文件,这种格式,与最终所需的文件格式完全一致,但是仅仅包含一部分数据。因此,等到所有Map任务完成后,它会进入下一个步骤,用以合并这些中间文件获得最后的输出文件。此时,系统会生成若干个Reduce任务,同样也是分配到不同的机器去执行,它的目标,就是将若干个Map任务生成的中间文件为汇总到最后的输出文件中去。当然,这个汇总不总会像1 + 1 = 2那么直接了当,这也就是Reduce任务的价值所在。经过如上步骤,最终,作业完成,所需的目标文件生成。整个算法的关键,就在于增加了一个中间文件生成的流程,大大提高了灵活性,使其分布式扩展性得到了保证。。。
I. 术语对照
和分布式文件系统一样,Google、Hadoop和....我,各执一种方式表述统一概念,为了保证其统一性,特有下表。。。
文中翻译 Hadoop术语 Google术语 相关解释
作业 Job Job 用户的每一个计算请求,就称为一个作业。
作业服务器 JobTracker Master 用户提交作业的服务器,同时,它还负责各个作业任务的分配,管理所有的任务服务器。
任务服务器 TaskTracker Worker 任劳任怨的工蜂,负责执行具体的任务。
任务 Task Task 每一个作业,都需要拆分开了,交由多个服务器来完成,拆分出来的执行单位,就称为任务。
备份任务 Speculative Task Buckup Task 每一个任务,都有可能执行失败或者缓慢,为了降低为此付出的代价,系统会未雨绸缪的实现在另外的任务服务器上执行同样一个任务,这就是备份任务。
II. 基本架构
与分布式文件系统类似,Map/Reduce的集群,也由三类服务器构成。其中作业服务器,在Hadoop中称为Job Tracker,在Google论文中称为Master。前者告诉我们,作业服务器是负责管理运行在此框架下所有作业的,后者告诉我们,它也是为各个作业分配任务的核心。与HDFS的主控服务器类似,它也是作为单点存在的,简化了负责的同步流程。具体的负责执行用户定义操作的,是任务服务器,每一个作业被拆分成很多的任务,包括Map任务和Reduce任务等,任务是具体执行的基本单元,它们都需要分配到合适任务服务器上去执行,任务服务器一边执行一边向作业服务器汇报各个任务的状态,以此来帮助作业服务器了解作业执行的整体情况,分配新的任务等等。。。
除了作业的管理者执行者,还需要有一个任务的提交者,这就是客户端。与分布式文件系统一样,客户端也不是一个单独的进程,而是一组API,用户需要自定义好自己需要的内容,经由客户端相关的代码,将作业及其相关内容和配置,提交到作业服务器去,并时刻监控执行的状况。。。
同作为Hadoop的实现,与HDFS的通信机制相同,Hadoop Map/Reduce也是用了协议接口来进行服务器间的交流。实现者作为RPC服务器,调用者经由RPC的代理进行调用,如此,完成大部分的通信,具体服务器的架构,和其中运行的各个协议状况,参见下图。从图中可以看到,与HDFS相比,相关的协议少了几个,客户端与任务服务器,任务服务器之间,都不再有直接通信关系。这并不意味着客户端就不需要了解具体任务的执行状况,也不意味着,任务服务器之间不需要了解别家任务执行的情形,只不过,由于整个集群各机器的联系比HDFS复杂的多,直接通信过于的难以维系,所以,都统一由作业服务器整理转发。另外,从这幅图可以看到,任务服务器不是一个人在战斗,它会像孙悟空一样招出一群宝宝帮助其具体执行任务。这样做的好处,个人觉得,应该有安全性方面的考虑,毕竟,任务的代码是用户提交的,数据也是用户指定的,这质量自然良莠不齐,万一碰上个搞破坏的,把整个任务服务器进程搞死了,就因小失大了。因此,放在单独的地盘进行,爱咋咋地,也算是权责明确了。。。
与分布式文件系统相比,Map/Reduce框架的还有一个特点,就是可定制性强。文件系统中很多的算法,都是很固定和直观的,不会由于所存储的内容不同而有太多的变化。而作为通用的计算框架,需要面对的问题则要复杂很多,在各种不同的问题、不同的输入、不同的需求之间,很难有一种包治百病的药能够一招鲜吃遍天。作为Map/Reduce框架而言,一方面要尽可能的抽取出公共的一些需求,实现出来。更重要的,是需要提供良好的可扩展机制,满足用户自定义各种算法的需求。Hadoop是由Java来实现的,因此通过反射来实现自定义的扩展,显得比较小菜一碟了。在JobConf类中,定义了大量的接口,这基本上是Hadoop Map/Reduce框架所有可定制内容的一次集中展示。在JobConf中,有大量set接口接受一个Class<? extends xxx>的参数,通常它都有一个默认实现的类,用户如果不满意,则可自定义实现。。。
III. 计算流程
如果一切都按部就班的进行,那么整个作业的计算流程,应该是作业的提交 -> Map任务的分配和执行 -> Reduce任务的分配和执行 -> 作业的完成。而在每个任务的执行中,又包含输入的准备 -> 算法的执行 -> 输出的生成,三个子步骤。沿着这个流程,我们可以很快的整理清晰整个Map/Reduce框架下作业的执行。。。
1、作业的提交
一个作业,在提交之前,需要把所有应该配置的东西都配置好,因为一旦提交到了作业服务器上,就陷入了完全自动化的流程,用户除了观望,最多也就能起一个监督作用,惩治一些不好好工作的任务。。。
基本上,用户在提交代码阶段,需要做的工作主要是这样的:
首先,书写好所有自定的代码,最起码,需要有Map和Reduce的执行代码。在Hadoop中,Map需要派生自Mapper<K1, V1, K2, V2>接口,Reduce需要派生自Reducer<K2, V2, K3, V3>接口。这里都是用的泛型,用以支持不同的键值类型。这两个接口都仅有一个方法,一个是map,一个是reduce,这两个方法都直接受四个参数,前两个是输入的键和值相关的数据结构,第三个是作为输出相关的数据结构,最后一个,是一个Reporter类的实例,实现的时候可以利用它来统计一些计数。除了这两个接口,还有大量可以派生的接口,比如分割的Partitioner<K2, V2>接口。。。
然后,需要书写好主函数的代码,其中最主要的内容就是实例化一个JobConf类的对象,然后调用其丰富的setXXX接口,设定好所需的内容,包括输入输出的文件路径,Map和Reduce的类,甚至包括读取写入文件所需的格式支持类,等等。。。
最后,调用JobClient的runJob方法,提交此JobConf对象。runJob方法会先行调用到JobSubmissionProtocol接口所定义的submitJob方法,将此作业,提交给作业服务器。接着,runJob开始循环,不停的调用JobSubmissionProtocol的getTaskCompletionEvents方法,获得TaskCompletionEvent类的对象实例,了解此作业各任务的执行状况。。。
2、Map任务的分配
当一个作业提交到了作业服务器上,作业服务器会生成若干个Map任务,每一个Map任务,负责将一部分的输入转换成格式与最终格式相同的中间文件。通常一个作业的输入都是基于分布式文件系统的文件(当然在单机环境下,文件系统单机的也可以...),因为,它可以很天然的和分布式的计算产生联系。而对于一个Map任务而言,它的输入往往是输入文件的一个数据块,或者是数据块的一部分,但通常,不跨数据块。因为,一旦跨了数据块,就可能涉及到多个服务器,带来了不必要的复杂性。。。
当一个作业,从客户端提交到了作业服务器上,作业服务器会生成一个JobInProgress对象,作为与之对应的标识,用于管理。作业被拆分成若干个Map任务后,会预先挂在作业服务器上的任务服务器拓扑树。这是依照分布式文件数据块的位置来划分的,比如一个Map任务需要用某个数据块,这个数据块有三份备份,那么,在这三台服务器上都会挂上此任务,可以视为是一个预分配。。。
关于任务管理和分配的大部分的真实功能和逻辑的实现,JobInProgress则依托JobInProgressListener和TaskScheduler的子类。TaskScheduler,顾名思义是用于任务分配的策略类(为了简化描述,用它代指所有TaskScheduler的子类...)。它会掌握好所有作业的任务信息,其assignTasks函数,接受一个TaskTrackerStatus作为参数,依照此任务服务器的状态和现有的任务状况,为其分配新的任务。而为了掌握所有作业相关任务的状况,TaskScheduler会将若干个JobInProgressListener注册到JobTracker中去,当有新的作业到达、移除或更新的时候,JobTracker会告知给所有的JobInProgressListener,以便它们做出相应的处理。。。
任务分配是一个重要的环节,所谓任务分配,就是将合适作业的合适任务分配到合适的服务器上。不难看出,里面蕴含了两个步骤,先是选择作业,然后是在此作业中选择任务。和所有分配工作一样,任务分配也是一个复杂的活。不良好的任务分配,可能会导致网络流量增加、某些任务服务器负载过重效率下降,等等。不仅如此,任务分配还是一个无一致模式的问题,不同的业务背景,可能需要不同的算法才能满足需求。因此,在Hadoop中,有很多TaskScheduler的子类,像Facebook,Yahoo,都为其贡献出了自家用的算法。在Hadoop中,默认的任务分配器,是JobQueueTaskScheduler类。它选择作业的基本次序是:Map Clean Up Task(Map任务服务器的清理任务,用于清理相关的过期的文件和环境...) -> Map Setup Task(Map任务服务器的安装任务,负责配置好相关的环境...) -> Map Tasks -> Reduce Clean Up Task -> Reduce Setup Task -> Reduce Tasks。在这个前提下,具体到Map任务的分配上来。当一个任务服务器工作的游刃有余,期待获得新的任务的时候,JobQueueTaskScheduler会按照各个作业的优先级,从最高优先级的作业开始分配。每分配一个,还会为其留出余量,已被不时之需。举一个例子:系统目前有优先级3、2、1的三个作业,每个作业都有一个可分配的Map任务,一个任务服务器来申请新的任务,它还有能力承载3个任务的执行,JobQueueTaskScheduler会先从优先级3的作业上取一个任务分配给它,然后再留出一个1任务的余量。此时,系统只能在将优先级2作业的任务分配给此服务器,而不能分配优先级1的任务。这样的策略,基本思路就是一切为高优先级的作业服务,优先分配不说,分配了好保留有余力以备不时之需,如此优待,足以让高优先级的作业喜极而泣,让低优先级的作业感慨既生瑜何生亮,甚至是活活饿死。。。
确定了从哪个作业提取任务后,具体的分配算法,经过一系列的调用,最后实际是由JobInProgress的findNewMapTask函数完成的。它的算法很简单,就是尽全力为此服务器非配且尽可能好的分配任务,也就是说,只要还有可分配的任务,就一定会分给它,而不考虑后来者。作业服务器会从离它最近的服务器开始,看上面是否还挂着未分配的任务(预分配上的),从近到远,如果所有的任务都分配了,那么看有没有开启多次执行,如果开启,考虑把未完成的任务再分配一次(后面有地方详述...)。。。
对于作业服务器来说,把一个任务分配出去了,并不意味着它就彻底解放,可以对此任务可以不管不顾了。因为任务可以在任务服务器上执行失败,可能执行缓慢,这都需要作业服务器帮助它们再来一次。因此在Task中,记录有一个TaskAttemptID,对于任务服务器而言,它们每次跑的,其实都只是一个Attempt而已,Reduce任务只需要采信一个的输出,其他都算白忙乎了。。。
3、Map任务的执行
与HDFS类似,任务服务器是通过心跳消息,向作业服务器汇报此时此刻其上各个任务执行的状况,并向作业服务器申请新的任务的。具体实现,是TaskTracker调用InterTrackerProtocol协议的heartbeat方法来做的。这个方法接受一个TaskTrackerStatus对象作为参数,它描述了此时此任务服务器的状态。当其有余力接受新的任务的时候,它还会传入acceptNewTasks为true的参数,表示希望作业服务器委以重任。JobTracker接收到相关的参数后,经过处理,会返回一个HeartbeatResponse对象。这个对象中,定义了一组TaskTrackerAction,用于指导任务服务器进行下一步的工作。系统中已定义的了一堆其TaskTrackerAction的子类,有的对携带的参数进行了扩充,有的只是标明了下ID,具体不详写了,一看便知。。。
当TaskTracker收到的TaskTrackerAction中,包含了LaunchTaskAction,它会开始执行所分配的新的任务。在TaskTracker中,有一个TaskTracker.TaskLauncher线程(确切的说是两个,一个等Map任务,一个等Reduce任务),它们在痴痴的守候着新任务的来到。一旦等到了,会最终调用到Task的createRunner方法,构造出一个TaskRunner对象,新建一个线程来执行。对于一个Map任务,它对应的Runner是TaskRunner的子类MapTaskRunner,不过,核心部分都在TaskRunner的实现内。TaskRunner会先将所需的文件全部下载并拆包好,并记录到一个全局缓存中,这是一个全局的目录,可以供所有此作业的所有任务使用。它会用一些软链接,将一些文件名链接到这个缓存中来。然后,根据不同的参数,配置出一个JVM执行的环境,这个环境与JvmEnv类的对象对应。
接着,TaskRunner会调用JvmManager的launchJvm方法,提交给JvmManager处理。JvmManager用于管理该TaskTracker上所有运行的Task子进程。在目前的实现中,尝试的是池化的方式。有若干个固定的槽,如果槽没有满,那么就启动新的子进程,否则,就寻找idle的进程,如果是同Job的直接放进去,否则杀死这个进程,用一个新的进程代替。每一个进程都是由JvmRunner来管理的,它也是位于单独线程中的。但是从实现上看,这个机制好像没有部署开,子进程是死循环等待,而不会阻塞在父进程的相关线程上,父线程的变量一直都没有个调整,一旦分配,始终都处在繁忙的状况了。
真实的执行载体,是Child,它包含一个main函数,进程执行,会将相关参数传进来,它会拆解这些参数,并且构造出相关的Task实例,调用其run函数进行执行。每一个子进程,可以执行指定个数量的Task,这就是上面所说的池化的配置。但是,这套机制在我看来,并没有运行起来,每个进程其实都没有机会不死而执行新的任务,只是傻傻的等待进程池满,而被一刀毙命。也许是我老眼昏花,没看出其中实现的端倪。。。
4、Reduce任务的分配与执行
比之Map任务,Reduce的分配及其简单,基本上是所有Map任务完成了,有空闲的任务服务器,来了就给分配一个Job任务。因为Map任务的结果星罗棋布,且变化多端,真要搞一个全局优化的算法,绝对是得不偿失。而Reduce任务的执行进程的构造和分配流程,与Map基本完全的一致,没有啥可说的了。。。
但其实,Reduce任务与Map任务的最大不同,是Map任务的文件都在本地隔着,而Reduce任务需要到处采集。这个流程是作业服务器经由此Reduce任务所处的任务服务器,告诉Reduce任务正在执行的进程,它需要的Map任务执行过的服务器地址,此Reduce任务服务器会于原Map任务服务器联系(当然本地就免了...),通过FTP服务,下载过来。这个隐含的直接数据联系,就是执行Reduce任务与执行Map任务最大的不同了。。。
5、作业的完成
当所有Reduce任务都完成了,所需数据都写到了分布式文件系统上,整个作业才正式完成了。此中,涉及到很多的类,很多的文件,很多的服务器,所以说起来很费劲,话说,一图解千语,说了那么多,我还是画两幅图,彻底表达一下吧。。。
首先,是一个时序图。它模拟了一个由3个Map任务和1个Reduce任务构成的作业执行流程。我们可以看到,在执行的过程中,只要有人太慢,或者失败,就会增加一次尝试,以此换取最快的执行总时间。一旦所有Map任务完成,Reduce开始运作(其实,不一定要这样的...),对于每一个Map任务来说,只有执行到Reduce任务把它上面的数据下载完成,才算成功,否则,都是失败,需要重新进行尝试。。。
而第二副图,不是我画的,就不转载了,参见这里,它描述了整个Map/Reduce的服务器状况图,包括整体流程、所处服务器进程、输入输出等,看清楚这幅图,对Map/Reduce的基本流程应该能完全跑通了。有这几点,可能图中描述的不够清晰需要提及一下,一个是在HDFS中,其实还有日志文件,图中没有标明;另一个是步骤5,其实是由TaskTracker主动去拉取而不是JobTracker推送过来的;还有步骤8和步骤11,创建出来的MapTask和ReduceTask,在Hadoop中都是运行在独立的进程上的。。。
IV. Map任务详请
从上面,可以了解到整个Map和Reduce任务的整体流程,而后面要啰嗦的,是具体执行中的细节。Map任务的输入,是分布式文件系统上的,包含键值对信息的文件。为了给每一个Map任务指定输入,我们需要掌握文件格式把它分切成块,并从每一块中分离出键值信息。在HDFS中,输入的文件格式,是由InputFormat<K, V>类来表示的,在JobConf中,它的默认值是TextInputFormat类(见getInputFormat),此类是特化的FileInputFormat<LongWritable, Text>子类,而FileInputFormat<K, V>正是InputFormat<K, V>的子类。通过这样的关系我们可以很容易的理解,默认的文件格式是文本文件,且键是LongWritable类型(整形数),值是Text类型(字符串)。仅仅知道文件类型是不够的,我们还需要将文件中的每一条数据,分离成键值对,这个工作,是RecordReader<K, V>来做的。在TextInputFormat的getRecordReader方法中我们可以看到,与TextInputFormat默认配套使用的,是LineRecordReader类,是特化的RecordReader<LongWritable, Text>的子类,它将每一行作为一个记录,起始的位置作为键,整行的字符串作为值。有了格式,分出了键值,还需要切开分给每一个Map任务。每一个Map任务的输入用InputSplit接口表示,对于一个文件输入而言,其实现是FileSplit,它包含着文件名、起始位置、长度和存储它的一组服务器地址。。。
当Map任务拿到所属的InputSplit后,就开始一条条读取记录,并调用用于定义的Mapper,进行计算(参见MapRunner<K1, V1, K2, V2>和MapTask的run方法),然后,输出。MapTask会传递给Mapper一个OutputCollector<K, V>对象,作为输出的数据结构。它定义了一个collect的函数,接受一个键值对。在MapTask中,定义了两个OutputCollector的子类,一个是MapTask.DirectMapOutputCollector<K, V>,人如其名,它的实现确实很Direct,直截了当。它会利用一个RecordWriter<K, V>对象,collect一调用,就直接调用RecordWriter<K, V>的write方法,写入本地的文件中去。如果觉着RecordWriter<K, V>出现的很突兀,那么看看上一段提到的RecordReader<K, V>,基本上,数据结构都是对应着的,一个是输入一个是输出。输出很对称也是由RecordWriter<K, V>和OutputFormat<K, V>来协同完成的,其默认实现是LineRecordWriter<K, V>和TextOutputFormat<K, V>,多么的眼熟啊。。。
除了这个非常直接的实现之外,MapTask中还有一个复杂的多的实现,是MapTask.MapOutputBuffer<K extends Object, V extends Object>。有道是简单压倒一切,那为什么有很简单的实现,要琢磨一个复杂的呢。原因在于,看上去很美的往往带着刺,简单的输出实现,每调用一次collect就写一次文件,频繁的硬盘操作很有可能导致此方案的低效。为了解决这个问题,这就有了这个复杂版本,它先开好一段内存做缓存,然后制定一个比例做阈值,开一个线程监控此缓存。collect来的内容,先写到缓存中,当监控线程发现缓存的内容比例超过阈值,挂起所有写入操作,建一个新的文件,把缓存的内容批量刷到此文件中去,清空缓存,重新开放,接受继续collect。。。
为什么说是刷到文件中去呢。因为这不是一个简单的照本宣科简单复制的过程,在写入之前,会先将缓存中的内存,经过排序、合并器(Combiner)统计之后,才会写入。如果你觉得Combiner这个名词听着太陌生,那么考虑一下Reducer,Combiner也就是一个Reducer类,通过JobConf的setCombinerClass进行设置,在常用的配置中,Combiner往往就是用用户为Reduce任务定义的那个Reducer子类。只不过,Combiner只是服务的范围更小一些而已,它在Map任务执行的服务器本地,依照Map处理过的那一小部分数据,先做一次Reduce操作,这样,可以压缩需要传输内容的大小,提高速度。每一次刷缓存,都会开一个新的文件,等此任务所有的输入都处理完成后,就有了若干个有序的、经过合并的输出文件。系统会将这些文件搞在一起,再做一个多路的归并外排,同时使用合并器进行合并,最终,得到了唯一的、有序的、经过合并的中间文件(注:文件数量等同于分类数量,在不考虑分类的时候,简单的视为一个...)。它,就是Reduce任务梦寐以求的输入文件。。。
除了做合并,复杂版本的OutputCollector,还具有分类的功能。分类,是通过Partitioner<K2, V2>来定义的,默认实现是HashPartitioner<K2, V2>,作业提交者可以通过JobConf的setPartitionerClass来自定义。分类的含义是什么呢,简单的说,就是将Map任务的输出,划分到若干个文件中(通常与Reduce任务数目相等),使得每一个Reduce任务,可以处理某一类文件。这样的好处是大大的,举一个例子说明一下。比如有一个作业是进行单词统计的,其Map任务的中间结果应该是以单词为键,以单词数量为值的文件。如果这时候只有一个Reduce任务,那还好说,从全部的Map任务那里收集文件过来,分别统计得到最后的输出文件就好。但是,如果单Reduce任务无法承载此负载或效率太低,就需要多个Reduce任务并行执行。此时,再沿用之前的模式就有了问题。每个Reduce任务从一部分Map任务那里获得输入文件,但最终的输出结果并不正确,因为同一个单词可能在不同的Reduce任务那里都有统计,需要想方法把它们统计在一起才能获得最后结果,这样就没有将Map/Reduce的作用完全发挥出来。这时候,就需要用到分类。如果此时有两个Reduce任务,那么将输出分成两类,一类存放字母表排序较高的单词,一类存放字母表排序低的单词,每一个Reduce任务从所有的Map任务那里获取一类的中间文件,得到自己的输出结果。最终的结果,只需要把各个Reduce任务输出的,拼接在一起就可以了。本质上,这就是将Reduce任务的输入,由垂直分割,变成了水平分割。Partitioner的作用,正是接受一个键值,返回一个分类的序号。它会在从缓存刷到文件之前做这个工作,其实只是多了一个文件名的选择而已,别的逻辑都不需要变化。。。
除了缓存、合并、分类等附加工作之外,复杂版本的OutputCollector还支持错误数据的跳过功能,在后面分布式将排错的时候,还会提及,标记一下,按下不表。。。
V. Reduce任务详情
理论上看,Reduce任务的整个执行流程要比Map任务更为的罗嗦一些,因为,它需要收集输入文件,然后才能进行处理。Reduce任务,主要有这么三个步骤:Copy、Sort、Reduce(参见ReduceTask的run方法)。所谓Copy,就是从执行各个Map任务的服务器那里,收罗到本地来。拷贝的任务,是由ReduceTask.ReduceCopier类来负责,它有一个内嵌类,叫MapOutputCopier,它会在一个单独的线程内,负责某个Map任务服务器上文件的拷贝工作。远程拷贝过来的内容(当然也可以是本地了...),作为MapOutput对象存在,它可以在内存中也可以序列化在磁盘上,这个根据内存使用状况来自动调节。整个拷贝过程是一个动态的过程,也就是说它不是一次给好所有输入信息就不再变化了。它会不停的调用TaskUmbilicalProtocol协议的getMapCompletionEvents方法,向其父TaskTracker询问此作业个Map任务的完成状况(TaskTracker要向JobTracker询问后再转告给它...)。当获取到相关Map任务执行服务器的信息后,都会有一个线程开启,做具体的拷贝工作。同时,还有一个内存Merger线程和一个文件Merger线程在同步工作,它们将新鲜下载过来的文件(可能在内存中,简单的统称为文件...),做着归并排序,以此,节约时间,降低输入文件的数量,为后续的排序工作减负。。。
Sort,排序工作,就相当于上述排序工作的一个延续。它会在所有的文件都拷贝完毕后进行,因为虽然同步有做着归并的工作,但可能留着尾巴,没做彻底。经过这一个流程,该彻底的都彻底了,一个崭新的、合并了所有所需Map任务输出文件的新文件,诞生了。而那些千行万苦从其他各个服务器网罗过来的Map任务输出文件,很快的结束了它们的历史使命,被扫地出门一扫而光,全部删除了。。。
所谓好戏在后头,Reduce任务的最后一个阶段,正是Reduce本身。它也会准备一个OutputCollector收集输出,与MapTask不同,这个OutputCollector更为简单,仅仅是打开一个RecordWriter,collect一次,write一次。最大的不同在于,这次传入RecordWriter的文件系统,基本都是分布式文件系统,或者说是HDFS。而在输入方面,ReduceTask会从JobConf那里调用一堆getMapOutputKeyClass、getMapOutputValueClass、getOutputKeyComparator等等之类的自定义类,构造出Reducer所需的键类型,和值的迭代类型Iterator(一个键到了这里一般是对应一组值)。具体实现颇为拐弯抹角,建议看一下Merger.MergeQueue,RawKeyValueIterator,ReduceTask.ReduceValuesIterator等等之类的实现。有了输入,有了输出,不断循环调用自定义的Reducer,最终,Reduce阶段完成。。。
VI. 分布式支持
1、服务器正确性保证
Hadoop Map/Reduce服务器状况和HDFS很类似,由此可知,救死扶伤的方法也是大同小异。废话不多说了,直接切正题。同作为客户端,Map/Reduce的客户端只是将作业提交,就开始搬个板凳看戏,没有占茅坑的行动。因此,一旦它挂了,也就挂了,不伤大雅。而任务服务器,也需要随时与作业服务器保持心跳联系,一旦有了问题,作业服务器可以将其上运行的任务,移交给它人完成。作业服务器,作为一个单点,非常类似的是利用还原点(等同于HDFS的镜像)和历史记录(等同于HDFS的日志),来进行恢复。其上,需要持久化用于恢复的内容,包含作业状况、任务状况、各个任务尝试的工作状况等。有了这些内容,再加上任务服务器的动态注册,就算挪了个窝,还是很容易恢复的。JobHistory是历史记录相关的一个静态类,本来,它也就是一个干写日志活的,只是在Hadoop的实现中,对日志的写入做了面向对象的封装,同时又大量用到观察者模式做了些嵌入,使得看起来不是那么直观。本质上,它就是打开若干个日志文件,利用各类接口来往里面写内容。只不过,这些日志,会放在分布式文件系统中,就不需要像HDFS那样,来一个SecondXXX随时候命,由此可见,有巨人在脚下踩着,真好。JobTracker.RecoveryManager类是作业服务器中用于进行恢复相关的事情,当作业服务器启动的时候,会调用其recover方法,恢复日志文件中的内容。其中步骤,注释中写的很清楚,请自行查看。。。
2、任务执行的正确和速度
整个作业流程的执行,秉承着木桶原理。执行的最慢的Map任务和Reduce任务,决定了系统整体执行时间(当然,如果执行时间在整个流程中占比例很小的话,也许就微不足道了...)。因此,尽量加快最慢的任务执行速度,成为提高整体速度关键。所使用的策略,简约而不简单,就是一个任务多次执行。当所有未执行的任务都分配出去了,并且先富起来的那部分任务已经完成了,并还有任务服务器孜孜不倦的索取任务的时候,作业服务器会开始炒剩饭,把那些正在吭哧吭哧在某个服务器上慢慢执行的任务,再把此任务分配到一个新的任务服务器上,同时执行。两个服务器各尽其力,成王败寇,先结束者的结果将被采纳。这样的策略,隐含着一个假设,就是我们相信,输入文件的分割算法是公平的,某个任务执行慢,并不是由于这个任务本身负担太重,而是由于服务器不争气负担太重能力有限或者是即将撒手西去,给它换个新环境,人挪死树挪活事半功倍。。。
当然,肯定有哽咽的任务,不论是在哪个服务器上,都无法顺利完成。这就说明,此问题不在于服务器上,而是任务本身天资有缺憾。缺憾在何处?每个作业,功能代码都是一样的,别的任务成功了,就是这个任务不成功,很显然,问题出在输入那里。输入中有非法的输入条目,导致程序无法辨识,只能挥泪惜别。说到这里,解决策略也浮出水面了,三十六计走位上,惹不起,还是躲得起的。在MapTask中的MapTask.SkippingRecordReader<K, V>和ReduceTask里的ReduceTask.SkippingReduceValuesIterator<KEY,VALUE>,都是用于干这个事情的。它们的原理很简单,就是在读一条记录前,把当前的位置信息,封装成SortedRanges.Range对象,经由Task的reportNextRecordRange方法提交到TaskTracker上去。TaskTracker会把这些内容,搁在TaskStatus对象中,随着心跳消息,汇报到JobTracker上面。这样,作业服务器就可以随时随刻了解清楚,每个任务正读取在那个位置,一旦出错,再次执行的时候,就在分配的任务信息里面添加一组SortedRanges信息。MapTask或ReduceTask读取的时候,会看一下这些区域,如果当前区域正好处于上述雷区,跳过不读。如此反复,正可谓,道路曲折,前途光明啊。。。
VII. 总结
对于Map/Reduce而言,真正的困难,在于提高其适应能力,打造一款能够包治百病的执行框架。Hadoop已经做得很好了,但只有真正搞清楚了整个流程,你才能帮助它做的更好。。。
相关推荐
### 云计算基础——分布式计算详解 #### 一、分布式计算概览 分布式计算,作为云计算领域的核心组成部分,指的是将计算任务分散至多个计算机节点上执行的过程。这一概念源于解决单机计算能力有限的问题,通过利用...
下面将详细讨论分布式计算的原理、算法以及相关系统。 ### 原理 1. **资源共享**:分布式计算系统中的各个节点可以共享计算资源、存储资源和网络资源,使得整体系统能够处理更大的数据量和更高的计算需求。 2. **...
Continuous MapReduce则是通过实现作业间的数据传输pipeline,实现了在线聚合和连续查询,允许Reduce在Map产生部分数据后就开始计算,提高了数据处理的实时性。百度提出的Dynamic add input方法则允许在数据未完全...
在大数据处理领域,Hadoop是不可或缺的核心框架,其核心组件MapReduce则是分布式计算的重要实现方式。MapReduce的设计理念源于Google的同名论文,它通过将大规模数据处理任务分解为两个阶段:Map(映射)和Reduce...
### 分布式机器学习与系统设计与实现 #### 核心知识点概述 1. **分布式机器学习的基础概念**:理解什么是分布式机器学习及其在解决大规模机器学习问题中的重要性。 2. **分布式训练的必要性**:探讨面对大规模数据...
2. Reduce阶段:Reduce阶段是MapReduce的第二阶段,负责将Map阶段的输出结果进行聚合和排序。 MapReduce的特点: 1. 高性能:MapReduce可以提供高性能的计算能力,适合大规模数据的计算任务。 2. 高可靠性:...
"分布式系统——从GFS到Hadoop" 本文将围绕分布式系统进行介绍,重点介绍Google的“三驾马车”,包括GFS、MapReduce和BigTable。通过对这些技术的介绍,让读者快速了解分布式系统的总体概念和架构。 分布式系统是...
《HDFS——Hadoop分布式文件系统深度实践》这本书是针对Hadoop分布式文件系统(HDFS)的详尽指南,旨在帮助读者深入理解HDFS的工作原理、设计思想以及在实际应用中的最佳实践。HDFS是Apache Hadoop项目的核心组件之...
### 分布式计算框架——深入理解MapReduce #### 概述 分布式计算框架是指能够将大规模数据处理任务分散到多台计算机(节点)上并行处理的技术体系。它旨在通过利用集群中的多台计算机共同完成复杂的计算任务,提高...
MapReduce将大规模数据处理分解为两个主要步骤——映射(map)和化简(reduce),使得并行计算成为可能。而Spark则在此基础上进一步提升了实时处理能力,通过内存计算降低了数据读写磁盘的开销。 泛型则是编程语言...
而MapReduce是Hadoop的另一个核心组件,它是一种分布式计算模型,通过Map(映射)和Reduce(归约)两个阶段对大规模数据集进行处理。Map阶段将输入数据分割成独立的块,并进行并行处理,产生一系列中间结果。Reduce...
本文将深入探讨Hadoop平台的核心组成部分——HDFS(Hadoop Distributed File System)和Map/Reduce计算框架,并在此基础上提出一种基于Hadoop的云计算模型。 #### Hadoop的云计算架构体系 ##### 1.1 分布式文件...
以上六篇论文详细阐述了Google在分布式计算领域的创新理念和技术实践,对于理解大数据处理的原理和优化方法,以及开发类似系统的工程师来说,具有极高的学习价值。通过对这些论文的深入研究,我们可以掌握如何构建可...
它将计算任务分为两个阶段——Map(映射)和Reduce(归约),使得数据处理变得简单和高效。 11. **云计算**:分布式计算是云计算的基础,大型云服务提供商如Amazon AWS、Google Cloud等都基于分布式计算技术提供...
MOOFS整理收藏资料,个人整理,包含MFS文件系统的组成.doc,分布式基础学习【二】 —— 分布式计算系统(Map-Reduce),mfs权威指南(moosefs)分布式文件系统一站式解决方案(部署,性能测试)不断更新 - 存储备份之家 ...
通过熟练掌握相关的Python库和分布式计算框架,我们可以构建出高效的机器学习系统,应对各种复杂的数据挑战。在这个过程中,数据预处理、模型选择、并行计算策略以及性能评估都是至关重要的环节,值得深入研究和实践...
Hadoop实现了一个分布式计算模型——MapReduce,它将数据处理分为两个阶段:Map(映射)和Reduce(归约)。首先,Map阶段将输入数据转换成一系列中间形式的键值对;然后,Reduce阶段将这些键值对进行汇总,得到最终...
Pig作为一个开源子项目,以Hadoop的Map-Reduce框架为基础,提供了一种类SQL功能的编程语言——Pig Latin。这种命令式编程语言不仅简化了数据处理流程,还提高了数据处理的效率和便捷性。 在Pig平台的系统架构中,它...