`

Hadoop Map/Reduce执行全流程关键代码

 
阅读更多
Hadoop Map/Reduce 执行流程关键代码

JobClient.runJob(conf) | 运行job
|-->JobClient jc = new JobClient(job);
|-->RunningJob rj = jc.submitJob(job);
	|-->submitJobInternal(job);
		|-->int reduces = job.getNumReduceTasks();
		|-->JobContext context = new JobContext(job, jobId);
		|-->maps = writeOldSplits(job, submitSplitFile);
		|-->job.setNumMapTasks(maps);
		|-->job.writeXml(out);
		|-->JobStatus status = jobSubmitClient.submitJob(jobId);

JobTracker.submitJob(JobId) |提交job
|-->JobInProgress job = new JobInProgress(jobId, this, this.conf);
|-->checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);  |检查权限
|-->checkMemoryRequirements(job);  |检查内存需求
|-->addJob(jobId, job);  |添加至job队列
	|-->jobs.put(job.getProfile().getJobID(), job);
	|--> for (JobInProgressListener listener : jobInProgressListeners) |添加至监听器,供调度使用
		|-->listener.jobAdded(job);

JobTracker.heartbeat()  |JobTracker启动后供TaskTracker以RPC方式来调用,返回Response集合
|-->List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
|-->tasks = taskScheduler.assignTasks(taskTrackerStatus);  |通过调度器选择合适的tasks
|-->for (Task task : tasks)
	|-->expireLaunchingTasks.addNewTask(task.getTaskID());
	|-->actions.add(new LaunchTaskAction(task));  |实际actions还会添加commmitTask等
|-->response.setHeartbeatInterval(nextInterval);
|-->response.setActions(actions.toArray(new TaskTrackerAction[actions.size()]));
|-->return response;


TaskTracker.offerService |TaskTracker启动后通过offerservice()不断发心跳至JobTracker中
|-->transmitHeartBeat()
	|-->HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, justInited,askForNewTask, heartbeatResponseId);
|-->TaskTrackerAction[] actions = heartbeatResponse.getActions();
|-->for(TaskTrackerAction action: actions)
	|-->if (action instanceof LaunchTaskAction)
		|-->addToTaskQueue((LaunchTaskAction)action);  |添加至执行Queue,根据map/reduce task分别添加
			|-->if (action.getTask().isMapTask()) {
				|-->mapLauncher.addToTaskQueue(action);
					|-->TaskInProgress tip = registerTask(action, this);
					|-->tasksToLaunch.add(tip);
					|-->tasksToLaunch.notifyAll();  |唤醒阻塞进程
			|-->else 
				|-->reduceLauncher.addToTaskQueue(action);

TaskLauncher.run()
|--> while (tasksToLaunch.isEmpty()) 
             |-->tasksToLaunch.wait();
|-->tip = tasksToLaunch.remove(0);
|-->startNewTask(tip);
	|-->localizeJob(tip);
		|-->launchTaskForJob(tip, new JobConf(rjob.jobConf)); 
			|-->tip.setJobConf(jobConf);
			|-->tip.launchTask();  |TaskInProgress.launchTask()
				|-->this.runner = task.createRunner(TaskTracker.this, this); |区分map/reduce
				|-->this.runner.start();
MapTaskRunner.run()  |执行MapTask
|-->File workDir = new File(lDirAlloc.getLocalPathToRead()  |准备执行路径
|-->String jar = conf.getJar();  |准备jar包
|-->File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java");  |获取jvm
|-->vargs.add(Child.class.getName());  |添加参数,Child类作为main主函数启动
|-->tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf, pidFile);  |添加至内存管理
|-->jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,  |统一纳入jvm管理器当中并启动
				workDir, env, pidFile, conf));
		|-->mapJvmManager.reapJvm(t, env);  |区分map/reduce操作

JvmManager.reapJvm()  |
|--> while (jvmIter.hasNext())
	|-->JvmRunner jvmRunner = jvmIter.next().getValue();
	|-->JobID jId = jvmRunner.jvmId.getJobId();
	|-->setRunningTaskForJvm(jvmRunner.jvmId, t);
|-->spawnNewJvm(jobId, env, t);
	|-->JvmRunner jvmRunner = new JvmRunner(env,jobId);
        |-->jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
	|-->jvmRunner.start();   |执行JvmRunner的run()方法
		|-->jvmRunner.run()
			|-->runChild(env);
				|-->List<String> wrappedCommand =  TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
						 env.logSize, env.pidFile);  |选取main函数
				|-->shexec.execute();  |执行
				|-->int exitCode = shexec.getExitCode(); |获取执行状态值
				|--> updateOnJvmExit(jvmId, exitCode, killed); |更新Jvm状态

Child.main() 执行Task(map/reduce)
|-->JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
|-->TaskUmbilicalProtocol umbilical = (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
		TaskUmbilicalProtocol.versionID, address, defaultConf);
|--> while (true) 
	|-->JvmTask myTask = umbilical.getTask(jvmId);
	|-->task = myTask.getTask();
	|-->taskid = task.getTaskID();
	|-->TaskRunner.setupWorkDir(job);
	|-->task.run(job, umbilical);   |以maptask为例
		|-->TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
		|-->if (useNewApi)
			|-->runNewMapper(job, split, umbilical, reporter);
		|-->else
			|-->runOldMapper(job, split, umbilical, reporter);
				|-->inputSplit = (InputSplit) ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
				|-->MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =  ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
				|-->runner.run(in, new OldOutputCollector(collector, conf), reporter);

MapRunner.run()
|--> K1 key = input.createKey();
|-->V1 value = input.createValue();
|-->while (input.next(key, value)) 
	|-->mapper.map(key, value, output, reporter);
	|--> if(incrProcCount) 
		|-->reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
                |-->SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
|-->mapper.close();

    

 

分享到:
评论

相关推荐

    远程调用执行Hadoop Map/Reduce

    例如,`org.apache.hadoop.mapred.MapTask`和`org.apache.hadoop.mapreduce.ReduceTask`分别对应Map和Reduce任务的实现,开发者可以通过阅读这些源码了解任务执行的详细流程。 7. **工具集成**:有许多开源工具可以...

    hadoop中map/reduce

    JobTracker负责调度和监控所有的Map和Reduce任务,确保任务的正确执行和资源的有效分配。然而,在Hadoop 2.x版本中,JobTracker被YARN(Yet Another Resource Negotiator)取代,YARN成为资源管理和任务调度的中心,...

    Windows平台下Hadoop的Map/Reduce开发

    通过以上步骤,初学者可以逐步熟悉Windows环境下Hadoop的安装、配置和开发流程,从而顺利入门Map/Reduce编程。随着经验的增长,可以进一步探索更复杂的数据处理任务和优化策略,如Combiner的使用、Shuffle和Sort过程...

    hadoop map-reduce turorial

    **作业配置**:作业配置是控制Map-Reduce作业行为的关键,包括但不限于数据输入输出路径、Map和Reduce函数的类名、缓存文件、日志级别等,通过这些配置,用户可以灵活地调整作业的执行细节。 **任务执行与环境**:...

    基于Map/Reduce的分布式搜索引擎研究

    本文将基于Map/Reduce算法,探讨如何利用开源框架Hadoop来设计和实现一种高容错、高性能的分布式搜索引擎。 #### 2. Map/Reduce算法 ##### 2.1 Map/Reduce算法概述 Map/Reduce是一种用于处理大规模数据集的编程...

    Hadoop Map-Reduce教程

    ### Hadoop Map-Reduce 教程 #### 一、Hadoop Map-Reduce 概述 Hadoop Map-Reduce 是一种编程模型,用于处理大规模数据集(通常为TB级或以上)。这种模型支持分布式计算,可以在成百上千台计算机上运行。Map-...

    hadoop伪分布式环境搭建

    本文将介绍如何在Windows和Linux平台上搭建Hadoop伪分布式环境,包括下载安装Hadoop、配置Eclipse、搭建Map/Reduce环境、编写Java代码等步骤。 一、下载安装Hadoop 下载Hadoop插件jar包“hadoop-eclipse-plugin-...

    hadoop教程

    Hadoop 的 JobTracker 是 Map/Reduce 框架中的一个核心组件,负责调度和监控作业的执行,并重新执行已经失败的任务。 TaskTracker 是 Map/Reduce 框架中的一个组件,负责执行 JobTracker 指派的任务,并将执行结果...

    Hadoop Map-Reduce

    Hadoop Map-Reduce Map-Reduce 是 Hadoop 框架中的一种核心组件,用于处理大规模数据。Map-Reduce 依靠两大步骤来完成数据处理:Map 和 Reduce。 Map 阶段的主要任务是将输入数据拆分成小块,并将其转换成 key-...

    Hadoop教程.pdf

    本文档提供了Hadoop Map/Reduce教程的详细介绍,从用户的角度出发,全面地介绍了Hadoop Map/Reduce框架的各个方面。下面是其中的重要知识点: 1. Hadoop是一个分布式的文件系统,可以将多台机器组装成一台超级...

    在solr文献检索中用map/reduce

    标题中的“在solr文献检索中用map/reduce”指的是使用Apache Solr,一个流行的开源搜索引擎,结合Hadoop的MapReduce框架来处理大规模的分布式搜索任务。MapReduce是一种编程模型,用于处理和生成大型数据集,它将...

    map/reduce template

    标题中的“map/reduce template”指的是MapReduce编程模型的一个模板或框架,它是Apache Hadoop项目的核心部分,用于处理和生成大数据集。MapReduce的工作原理分为两个主要阶段:Map阶段和Reduce阶段,它允许程序员...

    基于Eclipse的Hadoop应用开发环境配置

    在 Eclipse 中配置 Hadoop Installation Directory,打开 Window--&gt;Preferences,发现 Hadoop Map/Reduce 选项,在这个选项里需要配置 Hadoop Installation Directory。 5. 配置 Map/Reduce Locations 在 Window--...

    hadoop权威指南part2

    hadoop权威指南,hadoop map/reduce 分布式计算

    hadoop权威指南.part1

    hadoop权威指南,hadoop map/reduce 分布式计算

    Hadoop Map Reduce教程

    该框架将任务分解为一系列较小的任务(Map 和 Reduce),并在集群中的多台计算机上并行执行这些任务。 - **应用场景**:适用于大数据分析、搜索引擎索引构建、日志文件分析等场景。 #### 二、MapReduce 工作原理 1...

    hadoop map reduce 中文教程

    每个案例都详细列出了实践步骤,包括如何编写 Map 和 Reduce 函数、如何配置 Hadoop 环境、如何运行 MapReduce 任务等。 #### 六、总结 Hadoop MapReduce 是一种非常强大的分布式数据处理工具,它通过简单的编程...

    hadoop 1.2.1 api 最新chm 伪中文版

    Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。 一个Map/Reduce 作业(job) 通常会把输入的...

    基于Map_Reduce的分布式搜索引擎研究

    在对Map/Reduce算法进行分析的基础上,利用开源Hadoop软件设计出高容错高性能的分布式搜索引擎,以面对搜索引擎对海量数据的处理和存储问题

Global site tag (gtag.js) - Google Analytics