- 浏览: 141951 次
- 性别:
- 来自: 上海
最新评论
-
xueyinv86:
你这个增强版的wordcount是在哪个版本的hadoop上跑 ...
MapReduce入门程序WordCount增强版 -
chenjianjx:
很不错的收集!
几篇关于Hadoop+Hive数据仓库的入门文章 -
canedy:
import org.apache.hadoop.hbase. ...
使用HBase的一个典型例子,涉及了HBase中很多概念 -
天籁の圁:
你的图全部挂了啊
基于Eclipse的Hadoop应用开发环境的配置 -
landyer:
happinesss 写道你是做java开发的吗我是杂货铺,什 ...
MongoDB1.8安装、分布式自动分片(Auto-Sharding)配置备忘
原文地址:http://www.cnblogs.com/end/archive/2011/04/26/2029499.html
转者注:本来想在Hadoop学习总结系列详细解析HDFS以及Map-Reduce的,然而查找资料的时候,发现了这篇文章,并且发现caibinbupt已经对Hadoop的源代码已经进行了详细的分析,推荐大家阅读。 转自http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 1 caibinbupt的源代码分析http://caibinbupt.javaeye.com/ http://coderplay.javaeye.com/blog/295097 http://coderplay.javaeye.com/blog/318602 http://www.cppblog.com/javenstudio/articles/43073.html Map/Reduce是一个用于大规模数据处理的分布式计算模型,它最初是由Google工程师设计并实现的,Google已经将它完整的MapReduce论 文公开发布了。其中对它的定义是,Map/Reduce是一个编程模型(programming model),是一个用于处理和生成大规模数据集(processing and generating large data sets)的相关的实现。用户定义一个map函数来处理一个key/value对以生成一批中间的key/value对,再定义一个reduce函数将所 有这些中间的有着相同key的values合并起来。很多现实世界中的任务都可用这个模型来表达。 Map- Reduce框架的运作完全基于<key,value>对,即数据的输入是一批<key,value>对,生成的结果也是一 批<key,value>对,只是有时候它们的类型不一样而已。Key和value的类由于需要支持被序列化(serialize)操作,所 以它们必须要实现Writable接口,而且key的类还必须实现WritableComparable接口,使得可以让框架对数据集的执行排序操作。 一个Map-Reduce任务的执行过程以及数据输入输出的类型如下所示: Map:<k1,v1> ->list<k2,v2> Reduce:<k2,list<v2>> -><k3,v3> 下面通过一个的例子来详细说明这个过程。 WordCount是Hadoop自带的一个例子,目标是统计文本文件中单词的个数。假设有如下的两个文本文件来运行WorkCount程序: Hello World Bye World Hello Hadoop GoodBye Hadoop Hadoop针对文本文件缺省使用LineRecordReader类来实现读取,一行一个key/value对,key取偏移量,value为行内容。 如下是map1的输入数据: Key1 Value1 0 Hello World Bye World 如下是map2的输入数据: Key1 Value1 0 Hello Hadoop GoodBye Hadoop 如下是map1的输出结果 Key2 Value2 Hello 1 World 1 Bye 1 World 1 如下是map2的输出结果 Key2 Value2 Hello 1 Hadoop 1 GoodBye 1 Hadoop 1 Combiner类实现将相同key的值合并起来,它也是一个Reducer的实现。 如下是combine1的输出 Key2 Value2 Hello 1 World 2 Bye 1 如下是combine2的输出 Key2 Value2 Hello 1 Hadoop 2 GoodBye 1 Reducer类实现将相同key的值合并起来。 如下是reduce的输出 Key2 Value2 Hello 2 World 2 Bye 1 Hadoop 2 GoodBye 1 JobTracker是一个master服务, JobTracker负责调度job的每一个子任务task运行于TaskTracker上,并监控它们,如果发现有失败的task就重新运行它。一般情况应该把JobTracker部署在单独的机器上。 TaskTracker是运行于多个节点上的slaver服务。TaskTracker则负责直接执行每一个task。TaskTracker都需要运行在HDFS的DataNode上, 每一个job都会在用户端通过JobClient类将应用程序以及配置参数打包成jar文件存储在HDFS,并把路径提交到JobTracker,然后由JobTracker创建每一个Task(即MapTask和ReduceTask)并将它们分发到各个TaskTracker服务中去执行。 运行于Hadoop的MapReduce应用程序最基本的组成部分包括一个Mapper和一个Reducer类,以及一个创建JobConf的执行程序,在一些应用中还可以包括一个Combiner类,它实际也是Reducer的实现。 JobClient提交job后,JobTracker会创建一个JobInProgress来跟踪和调度这个job,并把它添加到job队列里。JobInProgress会根据提交的job jar中定义的输入数据集(已分解成FileSplit)创建对应的一批TaskInProgress用于监控和调度MapTask,同时在创建指定数目的TaskInProgress用于监控和调度ReduceTask,缺省为1个ReduceTask。 JobTracker启动任务时通过每一个TaskInProgress来launchTask,这时会把Task对象(即MapTask和ReduceTask)序列化写入相应的TaskTracker服务中,TaskTracker收到后会创建对应的TaskInProgress(此TaskInProgress实现非JobTracker中使用的TaskInProgress,作用类似)用于监控和调度该Task。启动具体的Task进程是通过TaskInProgress管理的TaskRunner对象来运行的。TaskRunner会自动装载job jar,并设置好环境变量后启动一个独立的java child进程来执行Task,即MapTask或者ReduceTask,但它们不一定运行在同一个TaskTracker中。 一个完整的job会自动依次执行Mapper、Combiner(在JobConf指定了Combiner时执行)和Reducer,其中Mapper和Combiner是由MapTask调用执行,Reducer则由ReduceTask调用,Combiner实际也是Reducer接口类的实现。Mapper会根据job jar中定义的输入数据集按<key1,value1>对读入,处理完成生成临时的<key2,value2>对,如果定义了Combiner,MapTask会在Mapper完成调用该Combiner将相同key的值做合并处理,以减少输出结果集。MapTask的任务全完成即交给ReduceTask进程调用Reducer处理,生成最终结果<key3,value3>对。这个过程在下一部分再详细介绍。 下图描述了Map/Reduce框架中主要组成和它们之间的关系: 一 道MapRedcue作业是通过JobClient.rubJob(job)向master节点的JobTracker提交的, JobTracker接到JobClient的请求后把其加入作业队列中。JobTracker一直在等待JobClient通过RPC提交作业,而 TaskTracker一直通过RPC向 JobTracker发送心跳heartbeat询问有没有任务可做,如果有,让其派发任务给它执行。如果JobTracker的作业队列不为空, 则TaskTracker发送的心跳将会获得JobTracker给它派发的任务。这是一道pull过程。slave节点的TaskTracker接到任 务后在其本地发起Task,执行任务。以下是简略示意图: 下面详细介绍一下Map/Reduce处理一个工作的流程。 在编写MapReduce程序时通常是上是这样写的: Configuration conf = new Configuration(); // 读取hadoop配置 Job job = new Job(conf, "作业名称"); // 实例化一道作业 job.setMapperClass(Mapper类型); job.setCombinerClass(Combiner类型); job.setReducerClass(Reducer类型); job.setOutputKeyClass(输出Key的类型); job.setOutputValueClass(输出Value的类型); FileInputFormat.addInputPath(job, new Path(输入hdfs路径)); FileOutputFormat.setOutputPath(job, new Path(输出hdfs路径)); // 其它初始化配置 JobClient.runJob(job); JobConf是用户描述一个job的接口。下面的信息是MapReduce过程中一些较关键的定制信息: 一个MapReduce的Job会通过JobClient类根据用户在JobConf类中定义的InputFormat实现类来将输入的数据集分解成一批小的数据集,每一个小数据集会对应创建一个MapTask来处理。JobClient会使用缺省的FileInputFormat类调用FileInputFormat.getSplits()方法生成小数据集,如果判断数据文件是isSplitable()的话,会将大的文件分解成小的FileSplit,当然只是记录文件在HDFS里的路径及偏移量和Split大小。这些信息会统一打包到jobFile的jar中。 JobClient 然后使用submitJob(job)方法向 master提交作业。submitJob(job)内部是通过submitJobInternal(job)方法完成实质性的作业提交。 submitJobInternal(job)方法首先会向hadoop分布系统文件系统hdfs依次上传三个文件: job.jar, job.split和job.xml。 jobFile的提交过程是通过RPC模块(有单独一章来详细介绍)来实现的。大致过程是,JobClient类中通过RPC实现的Proxy接口调用JobTracker的submitJob()方法,而JobTracker必须实现JobSubmissionProtocol接口。 JobTracker创建job成功后会给JobClient传回一个JobStatus对象用于记录job的状态信息,如执行时间、Map和Reduce任务完成的比例等。JobClient会根据这个JobStatus对象创建一个NetworkedJob的RunningJob对象,用于定时从JobTracker获得执行过程的统计数据来监控并打印到用户的控制台。 与创建Job过程相关的类和方法如下图所示 上面已经提到,job是统一由JobTracker来调度的,具体的Task分发给各个TaskTracker节点来执行。下面来详细解析执行过程,首先先从JobTracker收到JobClient的提交请求开始。 当JobTracker接收到新的job请求(即submitJob()函数被调用)后,会创建一个JobInProgress对象并通过它来管理和调度任务。JobInProgress在创建的时候会初始化一系列与任务有关的参数,调用到FileSystem,把在JobClient端上传的所有任务文件下载到本地的文件系统中的临时目录里。这其中包括上传的*.jar文件包、记录配置信息的xml、记录分割信息的文件。 JobTracker 中的监听器类EagerTaskInitializationListener负责任务Task的初始化。JobTracker使用jobAdded(job)加入job到EagerTaskInitializationListener中一个专门管理需要初始化的队列里,即一个list成员变量jobInitQueue里。resortInitQueue方法根据作业的优先级排序。然后调用notifyAll()函数,会唤起一个用于初始化job的线程JobInitThread来处理。JobInitThread收到信号后即取出最靠前的job,即优先级别最高的job,调用TaskTrackerManager的initJob最终调用JobInProgress.initTasks()执行真正的初始化工作。 任务Task分两种: MapTask 和reduceTask,它们的管理对象都是TaskInProgress 。 首先JobInProgress会创建Map的监控对象。在initTasks()函数里通过调用JobClient的readSplitFile()获得已分解的输入数据的RawSplit列表,然后根据这个列表创建对应数目的Map执行管理对象TaskInProgress。在这个过程中,还会记录该RawSplit块对应的所有在HDFS里的blocks所在的DataNode节点的host,这个会在RawSplit创建时通过FileSplit的getLocations()函数获取,该函数会调用DistributedFileSystem的getFileCacheHints()获得(这个细节会在HDFS中讲解)。当然如果是存储在本地文件系统中,即使用LocalFileSystem时当然只有一个location即“localhost”了。 创建这些TaskInProgress对象完毕后,initTasks()方法会通 过createCache()方法为这些TaskInProgress对象产生一个未执行任务的Map缓存nonRunningMapCache。slave端的 TaskTracker向master发送心跳时,就可以直接从这个cache中取任务去执行。 其次JobInProgress会创建Reduce的监控对象,这个比较简单,根据JobConf里指定的Reduce数目创建,缺省只创建1个Reduce任务。监控和调度Reduce任务的是TaskInProgress类,不过构造方法有所不同,TaskInProgress会根据不同参数分别创建具体的MapTask或者ReduceTask。同样地,initTasks()也会通过createCache()方法产生nonRunningReduceCache成员。 JobInProgress创建完TaskInProgress后,最后构造JobStatus并记录job正在执行中,然后再调用JobHistory.JobInfo.logStarted()记录job的执行日志。到这里JobTracker里初始化job的过程全部结束。 hadoop默认的调度器是FIFO策略的JobQueueTaskScheduler,它有两个成员变量 jobQueueJobInProgressListener与上面说的eagerTaskInitializationListener。JobQueueJobInProgressListener是JobTracker的另一个监听器类,它包含了一个映射,用来管理和调度所有的JobInProgress。jobAdded(job)同时会加入job到JobQueueJobInProgressListener中的映射。 JobQueueTaskScheduler最 重要的方法是assignTasks ,他实现了工作调度。具体实现:JobTracker 接到TaskTracker 的heartbeat() 调用后,首先会检查上一个心跳响应是否完成,是没要求启动或重启任务,如果一切正常,则会处理心跳。首先它会检查 TaskTracker 端还可以做多少个 map 和 reduce 任务,将要派发的任务数是否超出这个数,是否超出集群的任务平均剩余可负载数。如果都没超出,则为此 TaskTracker 分配一个 MapTask 或 ReduceTask 。产生 Map 任务使用 JobInProgress 的 obtainNewMapTask() 方法,实质上最后调用了 JobInProgress 的 findNewMapTask() 访问 nonRunningMapCache 。 上面讲解任务初始化时说过,createCache()方法会在网络拓扑结构上挂上需要执行的 TaskInProgress。findNewMapTask()从近到远一层一层地寻找,首先是同一节点,然后在寻找同一机柜上的节点,接着寻找相同数 据中心下的节点,直到找了maxLevel层结束。这样的话,在JobTracker给TaskTracker派发任务的时候,可以迅速找到最近的 TaskTracker,让它执行任务。 最终生成一个Task类对象,该对象被封装在一个LanuchTaskAction 中,发回给TaskTracker,让它去执行任务。 产生 Reduce 任务过程类似,使用 JobInProgress.obtainNewReduceTask() 方法,实质上最后调用了 JobInProgress 的 findNewReduceTask() 访问 nonRuningReduceCache。 Task的执行实际是由TaskTracker发起的,TaskTracker会定期(缺省为10秒钟,参见MRConstants类中定义的HEARTBEAT_INTERVAL变量)与JobTracker进行一次通信,报告自己Task的执行状态,接收JobTracker的指令等。如果发现有自己需要执行的新任务也会在这时启动,即是在TaskTracker调用JobTracker的heartbeat()方法时进行,此调用底层是通过IPC层调用Proxy接口实现。下面一一简单介绍下每个步骤。 TaskTracker的启动过程会初始化一系列参数和服务,然后尝试连接JobTracker(即必须实现InterTrackerProtocol接口),如果连接断开,则会循环尝试连接JobTracker,并重新初始化所有成员和参数。 如果连接JobTracker服务成功,TaskTracker就会调用offerService()函数进入主执行循环中。这个循环会每隔10秒与JobTracker通讯一次,调用transmitHeartBeat(),获得HeartbeatResponse信息。然后调用HeartbeatResponse的getActions()函数获得JobTracker传过来的所有指令即一个TaskTrackerAction数组。再遍历这个数组,如果是一个新任务指令即LaunchTaskAction则调用调用addToTaskQueue加入到待执行队列,否则加入到tasksToCleanup队列,交给一个taskCleanupThread线程来处理,如执行KillJobAction或者KillTaskAction等。 在transmitHeartBeat()函数处理中,TaskTracker会创建一个新的TaskTrackerStatus对象记录目前任务的执行状况,检查目前执行的Task数目以及本地磁盘的空间使用情况等,如果可以接收新的Task则设置heartbeat()的askForNewTask参数为true。然后通过IPC接口调用JobTracker的heartbeat()方法发送过去,heartbeat()返回值TaskTrackerAction数组。 TaskLauncher 是用来处理新任务的线程类,包含了一个待运行任务的队列 tasksToLaunch。TaskTracker.addToTaskQueue会调用TaskTracker的registerTask,创建 TaskInProgress对象来调度和监控任务,并把它加入到runningTasks队列中。同时将这个TaskInProgress加到 tasksToLaunch 中,并notifyAll()唤醒一个线程运行,该线程从队列tasksToLaunch取出一个待运行任务,调用TaskTracker的 startNewTask运行任务。 调用localizeJob()真正初始化Task并开始执行。 此 函数主要任务是初始化工作目录workDir,再将job jar包从HDFS复制到本地文件系统中,调用RunJar.unJar()将包解压到工作目录。然后创建一个RunningJob并调用 addTaskToJob()函数将它添加到runningJobs监控队列中。addTaskToJob方法把一个任务加入到该任务属于的 runningJob的tasks列表中。如果该任务属于的runningJob不存在,先新建,加到runningJobs中。完成后即调用 launchTaskForJob()开始执行Task。 启动Task的工作实际是调用TaskTracker$TaskInProgress的launchTask()函数来执行的。 执行任务前先调用localizeTask()更新一下jobConf文件并写入到本地目录中。然后通过调用Task的createRunner()方法创建TaskRunner对象并调用其start()方法最后启动Task独立的java执行子进程。 Task有两个实现版本,即MapTask和ReduceTask,它们分别用于创建Map和Reduce任务。MapTask会创建MapTaskRunner来启动Task子进程,而ReduceTask则创建ReduceTaskRunner来启动。 TaskRunner 负责将一个任务放到一个进程里面来执行。它会调用run()函数来处理,主要的工作就是初始化启动java子进程的一系列环境变量,包括设定工作目录 workDir,设置CLASSPATH环境变量等。然后装载job jar包。JvmManager用于管理该TaskTracker上所有运行的Task子进程。每一个进程都是由JvmRunner来管理的,它也是位于 单独线程中的。JvmManager的launchJvm方法,根据任务是map还是reduce,生成对应的JvmRunner并放到对应 JvmManagerForType的进程容器中进行管理。JvmManagerForType的reapJvm() 分配一个新的JVM 进程。如果JvmManagerForType槽满,就寻找idle的进程,如果是同Job的直接放进去,否则杀死这个进程,用一个新的进程代替。如果槽 没有满,那么就启动新的子进程。生成新的进程使用spawnNewJvm方法。spawnNewJvm使用JvmRunner线程的run方法,run方 法用于生成一个新的进程并运行它,具体实现是调用runChild。 真实的执行载体,是Child,它包含一个 main函数,进程执行,会将相关参数传进来,它会拆解这些参数,通过getTask(jvmId)向父进程索取任务,并且构造出相关的Task实例,然后使用Task的run()启动任务。 2.1 run 方 法相当简单,配置完系统的TaskReporter后,就根据情况执行 runJobCleanupTask,runJobSetupTask,runTaskCleanupTask或执行Mapper。由于 MapReduce现在有两套API,MapTask需要支持这两套API,使得MapTask执行Mapper分为runNewMapper和 runOldMapper,我们分析runOldMapper。 2.2 runOldMapper runOldMapper 最开始部分是构造Mapper处理的InputSplit,然后就开始创建Mapper的RecordReader,最终得到map的输入。之后构造 Mapper的输出,是通过MapOutputCollector进行的,也分两种情况,如果没有Reducer,那么,用 DirectMapOutputCollector,否则,用MapOutputBuffer。 构造完Mapper的输入输出,通过构造 配置文件中配置的MapRunnable,就可以执行Mapper了。目前系统有两个MapRunnable:MapRunner和 MultithreadedMapRunner。MapRunner是单线程执行器,比较简单,他会使用反射机制生成用户定义的Mapper接口实现类, 作为他的一个成员。 2.3 MapRunner的run方法 会先创建对应的key,value对象,然后,对 InputSplit的每一对<key,value>,调用用户实现的Mapper接口实现类的map方法,每处理一个数据对,就要使用 OutputCollector收集每次处理kv对后得到的新的kv对,把他们spill到文件或者放到内存,以做进一步的处理,比如排 序,combine等。 2.4 OutputCollector OutputCollector的作用是收集每次调用map后得到的新的kv对,宁把他们spill到文件或者放到内存,以做进一步的处理,比如排序,combine等。 MapOutputCollector 有两个子类:MapOutputBuffer和DirectMapOutputCollector。 DirectMapOutputCollector用在不需要Reduce阶段的时候。如果Mapper后续有reduce任务,系统会使用 MapOutputBuffer做为输出, MapOutputBuffer使用了一个缓冲区对map的处理结果进行缓存,放在内存中,又使用几个数组对这个缓冲区进行管理。 在适当的时机,缓冲区中的数据会被spill到硬盘中。 向硬盘中写数据的时机: (1)当内存缓冲区不能容下一个太大的kv对时。spillSingleRecord方法。 (2)内存缓冲区已满时。SpillThread线程。 (3)Mapper的结果都已经collect了,需要对缓冲区做最后的清理。Flush方法。 2.5 spillThread线程:将缓冲区中的数据spill到硬盘中。 (1)需要spill时调用函数sortAndSpill,按照partition和key做排序。默认使用的是快速排序QuickSort。 (2)如果没有combiner,则直接输出记录,否则,调用CombinerRunner的combine,先做combin然后输出。 ReduceTask.run 方法开始和MapTask类似,包括initialize()初始化 ,runJobCleanupTask(),runJobSetupTask(),runTaskCleanupTask()。之后进入正式的工作,主要 有这么三个步骤:Copy、Sort、Reduce。 3.1 Copy 就是从执行各个Map任务的服务器那里,收罗到map的输出文件。拷贝的任务,是由ReduceTask.ReduceCopier 类来负责。 3.1.1 类图: 3.1.2 流程: 使用ReduceCopier.fetchOutputs开始 (1) 索取任务。使用GetMapEventsThread线程。该线程的run方法不停的调用getMapCompletionEvents方法,该方法又使 用RPC调用TaskUmbilicalProtocol协议的getMapCompletionEvents,方法使用所属的jobID向其父 TaskTracker询问此作业个Map任务的完成状况(TaskTracker要向JobTracker询问后再转告给它...)。返回一个数组 TaskCompletionEvent events[]。TaskCompletionEvent包含taskid和ip地址之类的信息。 (2)当获取到相关Map任务执行服务器的信息后,有一个线程MapOutputCopier开启,做具体的拷贝工作。 它会在一个单独的线程内,负责某个Map任务服务器上文件的拷贝工作。MapOutputCopier的run循环调用 copyOutput,copyOutput又调用getMapOutput,使用HTTP远程拷贝。 (3)getMapOutput远程拷贝过来的内容(当然也可以是本地了...),作为MapOutput对象存在,它可以在内存中也可以序列化在磁盘上,这个根据内存使用状况来自动调节。 (4) 同时,还有一个内存Merger线程InMemFSMergeThread和一个文件Merger线程LocalFSMerger在同步工作,它们将下载 过来的文件(可能在内存中,简单的统称为文件...),做着归并排序,以此,节约时间,降低输入文件的数量,为后续的排序工作减 负。InMemFSMergeThread的run循环调用doInMemMerge, 该方法使用工具类Merger实现归并,如果需要combine,则combinerRunner.combine。 3.2 Sort 排序工作,就相当于上述排序工作的一个延续。它会在所有的文件都拷贝完毕后进行。使用工具类Merger归并所有的文件。经过这一个流程,一个合并了所有所需Map任务输出文件的新文件产生了。而那些从其他各个服务器网罗过来的 Map任务输出文件,全部删除了。 3.3Reduce Reduce 任务的最后一个阶段。他会准备好 keyClass("mapred.output.key.class"或"mapred.mapoutput.key.class"), valueClass("mapred.mapoutput.value.class"或"mapred.output.value.class")和 Comparator(“mapred.output.value.groupfn.class”或 “mapred.output.key.comparator.class”)。最后调用runOldReducer方法。(也是两套API,我们分析 runOldReducer) 3.3.1 runOldReducer (1)输出方面。 它会准备一个 OutputCollector收集输出,与MapTask不同,这个OutputCollector更为简单,仅仅是打开一个 RecordWriter,collect一次,write一次。最大的不同在于,这次传入RecordWriter的文件系统,基本都是分布式文件系 统, 或者说是HDFS。 (2)输入方面,ReduceTask会用准备好的KeyClass、ValueClass、KeyComparator等等之类的自定义类,构造出Reducer所需的键类型, 和值的迭代类型Iterator(一个键到了这里一般是对应一组值)。 (3)有了输入,有了输出,不断循环调用自定义的Reducer,最终,Reduce阶段完成。参考:
3 Javen-Studio 咖啡小屋
一 MapReduce概述
二 MapReduce工作原理
1 map数据输入
2 map输出/combine输入
3 combine输出
4 reduce输出
三 MapReduce框架结构
1 角色
1.1 JobTracker
1.2 TaskTracker
1.3 JobClient
2 数据结构
2.1 Mapper和Reducer
2.2 JobInProgress
2.3 TaskInProgress
2.4 MapTask和ReduceTask
3 流程
四JobClient
1 配置Job
2 JobClient.runJob():运行Job并分解输入数据集
job.xml: 作业配置,例如Mapper, Combiner, Reducer的类型,输入输出格式的类型等。
job.jar: jar包,里面包含了执行此任务需要的各种类,比如 Mapper,Reducer等实现。
job.split: 文件分块的相关信息,比如有数据分多少个块,块的大小(默认64m)等。
这 三个文件在hdfs上的路径由hadoop-default.xml文件中的mapreduce系统路径mapred.system.dir属性 + jobid决定。mapred.system.dir属性默认是/tmp/hadoop-user_name/mapred/system。写完这三个文 件之后, 此方法会通过RPC调用master节点上的JobTracker.submitJob(job)方法,此时作业已经提交完成。3 提交Job
五 JobTracker
1 JobTracker初始化Job
1.1 JobTracker.submitJob() 收到请求
1.2 JobTracker.JobInitThread 通知初始化线程
1.3 JobInProgress.initTasks() 初始化TaskInProgress
2 JobTracker调度Job
六 TaskTracker
1 TaskTracker加载Task到子进程
1.1 TaskTracker.run() 连接JobTracker
1.2 TaskTracker.offerService() 主循环
1.3 TaskTracker.transmitHeartBeat() 获取JobTracker指令
1.4 TaskTracker.addToTaskQueue,交给TaskLauncher处理
1.5 TaskTracker.startNewTask() 启动新任务
1.6 TaskTracker.localizeJob() 初始化job目录等
1.7 TaskTracker.launchTaskForJob() 执行任务
1.8 TaskTracker$TaskInProgress.launchTask() 执行任务
1.9 Task.createRunner() 创建启动Runner对象
1.10 TaskRunner.start() 启动子进程
2 子进程执行MapTask
3 子进程执行ReduceTask
发表评论
-
thrift安装资料集合
2011-06-22 14:19 1142http://www.buywine168.com/index ... -
在Ubuntu下编译安装Thrift(支持php和c++)
2011-06-22 14:16 1643原文地址:http://www.coder4.com/arch ... -
HBase Thrift 0.5.0 + PHP 5 安裝設定
2011-06-22 14:13 1873原文地址: http://blog.kfchph.com ... -
Hadoop+hbase+thrift H.H.T环境部署
2011-06-21 12:58 1095原文地址:http://blog.sina.com.cn/s/ ... -
php操作hbase例子
2011-06-21 10:59 23241 $GLOBALS['THRIFT_ROOT'] = '/h ... -
HBase技术介绍
2011-06-21 10:54 1087原文地址:http://www.searc ... -
详细讲解Hadoop中的一个简单数据库HBase
2011-06-08 17:29 940原文地址:http://databas ... -
hive sql语法解读
2011-06-08 16:52 1350版权声明:转载时请以超链接形式标明文章原始出处和作者信息及 ... -
Hive 的启动方式
2011-06-08 16:49 2503Hive 的启动方式 hive 命令行 ... -
Hive环境搭建与入门
2011-06-08 16:47 1527原文地址:http://www.jiacheo.org ... -
Hbase入门6 -白话MySQL(RDBMS)与HBase之间
2011-06-08 15:38 1559作者: H.E. | 您可以转载, ... -
Apache Hive入门3–Hive与HBase的整合
2011-06-08 10:03 1565作者: H.E. | 您可以转载, ... -
Apache Hive入门2
2011-06-08 10:00 1338我的偏见: 对于互联 ... -
Apache Hive入门1
2011-06-08 09:59 2541作者: H.E. | 您可以转载, ... -
hbase分布安装部署
2011-06-07 22:37 18091、 hbase安装部署#cd /hadoop# ... -
使用HBase的一个典型例子,涉及了HBase中很多概念
2011-06-07 15:38 1582一个使用HBase的例子,如下。 import java ... -
HBase入门篇4–存储
2011-06-06 21:22 1591作者: H.E. | 您可以转载, 但必须以超链接形式标明文章 ... -
HBase入门篇3
2011-06-06 21:21 1852作者: H.E. | 您可以转载, 但必须以超链接形式标明文章 ... -
HBase入门篇2-Java操作HBase例子
2011-06-06 21:18 2399作者: H.E. | 您可以转载, 但必须以超链接形式标明文章 ... -
HBase入门篇
2011-06-06 21:17 2387作者: H.E. | 您可以转载, ...
相关推荐
MapReduce是一种分布式计算模型,由Google...在实际应用中,MapReduce广泛应用于日志分析、搜索引擎索引构建、数据挖掘等多个领域。理解并掌握MapReduce的工作原理和框架结构对于开发大规模数据处理系统至关重要。
总结,MapReduce的源码分析涵盖了数据分片、Map函数、Shuffle过程、Reduce函数、输入输出格式、任务调度等多个关键部分。理解这些核心组件的工作原理,有助于我们更高效地利用Hadoop MapReduce处理大数据,同时也...
以下是对MapReduce源码的一些关键知识点的详细阐述: 1. **MapReduce架构**:MapReduce将大型数据集分解为小块,通过“Map”阶段并行处理这些块,然后在“Reduce”阶段汇总结果。这种分而治之的策略使得处理海量...
在这个例子中,我们将深入理解Hadoop MapReduce的工作原理以及如何在Eclipse环境下实现WordCount源码。 1. **Hadoop MapReduce概述**: Hadoop MapReduce是由两个主要部分组成的:Map阶段和Reduce阶段。Map阶段将...
在大数据处理领域,K-Means算法是一种广泛应用的聚类分析方法,用于将数据集划分为不同的类别或簇。在单机环境下,K-Means的执行效率可能受限于数据量的大小。然而,借助Hadoop这样的分布式计算框架,我们可以实现K-...
总结,MapReduce通过Split切片将大文件分隔,然后在多个MapTask中并行处理这些小块数据。MapTask的read阶段逐行读取数据,map阶段执行用户自定义的映射逻辑,最后通过collect、spill和combine阶段准备数据供Reduce...
本文将基于“Hadoop学习总结和源码分析”这一主题,结合提供的文档资源,深入探讨Hadoop的核心组件HDFS(Hadoop Distributed File System)和MapReduce。 首先,我们从“Hadoop学习总结之一:HDFS简介.doc”开始,...
### Hadoop.MapReduce 分析 #### 一、概述 Hadoop.MapReduce 是一种分布式计算模型,主要用于处理大规模数据集。其基本思想源自Google提出的MapReduce论文。本文将深入解析Hadoop.MapReduce的工作原理、核心组件...
### Storm源码分析 #### 一、Storm简介与应用场景 Apache Storm是一款开源的分布式实时计算系统,它提供了简单而强大的API来定义数据流处理逻辑,同时保证了消息处理的容错性和高性能。Storm的设计目标是成为实时...
总结,MapReduce是Hadoop处理大数据的核心工具,通过简单的编程模型实现了大规模数据的高效并行处理。了解并掌握MapReduce的工作原理、优缺点以及编程规范,对于大数据开发人员来说至关重要。在实际项目中,可以通过...
本主题将深入探讨如何使用Hadoop MapReduce来实现MatrixMultiply,即矩阵相乘,这是一个基础且重要的数学运算,尤其在数据分析、机器学习以及高性能计算中有着广泛应用。 首先,理解矩阵相乘的基本原理至关重要。在...
源码分析对于理解MapReduce的工作机制至关重要,它能帮助我们更好地控制数据处理的每个细节。同时,工具的使用也很重要,比如Hadoop提供的命令行工具用于提交和监控Job,或者使用Hadoop的API在Java代码中直接操作。 ...
### Map-Reduce原理体系架构和工作机制 #### 一、Map-Reduce原理概述 Map-Reduce是一种编程模型,用于...在实际应用中,Map-Reduce已经被广泛应用于搜索引擎索引构建、社交网络数据分析、金融交易记录分析等领域。