- 浏览: 596279 次
- 性别:
- 来自: 厦门
文章分类
- 全部博客 (669)
- oracle (36)
- java (98)
- spring (48)
- UML (2)
- hibernate (10)
- tomcat (7)
- 高性能 (11)
- mysql (25)
- sql (19)
- web (42)
- 数据库设计 (4)
- Nio (6)
- Netty (8)
- Excel (3)
- File (4)
- AOP (1)
- Jetty (1)
- Log4J (4)
- 链表 (1)
- Spring Junit4 (3)
- Autowired Resource (0)
- Jackson (1)
- Javascript (58)
- Spring Cache (2)
- Spring - CXF (2)
- Spring Inject (2)
- 汉字拼音 (3)
- 代理模式 (3)
- Spring事务 (4)
- ActiveMQ (6)
- XML (3)
- Cglib (2)
- Activiti (15)
- 附件问题 (1)
- javaMail (1)
- Thread (19)
- 算法 (6)
- 正则表达式 (3)
- 国际化 (2)
- Json (3)
- EJB (3)
- Struts2 (1)
- Maven (7)
- Mybatis (7)
- Redis (8)
- DWR (1)
- Lucene (2)
- Linux (73)
- 杂谈 (2)
- CSS (13)
- Linux服务篇 (3)
- Kettle (9)
- android (81)
- protocol (2)
- EasyUI (6)
- nginx (2)
- zookeeper (6)
- Hadoop (41)
- cache (7)
- shiro (3)
- HBase (12)
- Hive (8)
- Spark (15)
- Scala (16)
- YARN (3)
- Kafka (5)
- Sqoop (2)
- Pig (3)
- Vue (6)
- sprint boot (19)
- dubbo (2)
- mongodb (2)
最新评论
一、客户端
Map-Reduce的过程首先是由客户端提交一个任务开始的。
提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:
其中JobClient的submitJob函数实现如下:
二、JobTracker
JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:
* 调用静态函数startTracker(new JobConf())创建一个JobTracker对象
* 调用JobTracker.offerService()函数提供服务
在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。
在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:
* JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态
* EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化
EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。
在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:
EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:
三、TaskTracker
TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:
其中transmitHeartBeat主要逻辑如下:
四、JobTracker
当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:
默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:
从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。
五、TaskTracker
在向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):
TaskLauncher是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):
当所有的task运行所需要的资源都拷贝到本地后,则调用launchTaskForJob,其又调用TaskInProgress的launchTask函数:
TaskRunner是一个线程,其run函数如下:
六、Child
真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:
6.1、MapTask
如果task是MapTask,则其run函数如下:
MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:
结果集全部收集到MapOutputBuffer中,其collect函数如下:
内存buffer的格式如下:
(见几位hadoop大侠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 以及http://caibinbupt.javaeye.com/)
kvoffsets是为了写入内存前排序使用的。
从上面可知,内存buffer写入硬盘spill文件的函数为sortAndSpill:
当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:
6.2、ReduceTask
ReduceTask的run函数如下:
七、总结
Map-Reduce的过程总结如下图:
转自:http://www.cnblogs.com/forfuture1978/archive/2010/11/19/1882268.html
Map-Reduce的过程首先是由客户端提交一个任务开始的。
提交任务主要是通过JobClient.runJob(JobConf)静态函数实现的:
public static RunningJob runJob(JobConf job) throws IOException { //首先生成一个JobClient对象 JobClient jc = new JobClient(job); …… //调用submitJob来提交一个任务 running = jc.submitJob(job); JobID jobId = running.getID(); …… while (true) { //while循环中不断得到此任务的状态,并打印到客户端console中 } return running; }
其中JobClient的submitJob函数实现如下:
public RunningJob submitJob(JobConf job) throws FileNotFoundException, InvalidJobConfException, IOException { //从JobTracker得到当前任务的id JobID jobId = jobSubmitClient.getNewJobId(); //准备将任务运行所需要的要素写入HDFS: //任务运行程序所在的jar封装成job.jar //任务所要处理的input split信息写入job.split //任务运行的配置项汇总写入job.xml Path submitJobDir = new Path(getSystemDir(), jobId.toString()); Path submitJarFile = new Path(submitJobDir, "job.jar"); Path submitSplitFile = new Path(submitJobDir, "job.split"); // 此处将-libjars命令行指定的jar上传至HDFS configureCommandLineOptions(job, submitJobDir, submitJarFile); Path submitJobFile = new Path(submitJobDir, "job.xml"); …… //通过input format的格式获得相应的input split,默认类型为FileSplit InputSplit[] splits = job.getInputFormat().getSplits(job, job.getNumMapTasks()); // 生成一个写入流,将input split的信息写入job.split文件 FSDataOutputStream out = FileSystem.create(fs, submitSplitFile, new FsPermission(JOB_FILE_PERMISSION)); try { //写入job.split文件的信息包括:split文件头,split文件版本号,split的个数,接着依次写入每一个input split的信息。 //对于每一个input split写入:split类型名(默认FileSplit),split的大小,split的内容(对于FileSplit,写入文件名,此split在文件中的起始位置),split的location信息(即在那个DataNode上)。 writeSplitsFile(splits, out); } finally { out.close(); } job.set("mapred.job.split.file", submitSplitFile.toString()); //根据split的个数设定map task的个数 job.setNumMapTasks(splits.length); // 写入job的配置信息入job.xml文件 out = FileSystem.create(fs, submitJobFile, new FsPermission(JOB_FILE_PERMISSION)); try { job.writeXml(out); } finally { out.close(); } //真正的调用JobTracker来提交任务 JobStatus status = jobSubmitClient.submitJob(jobId); …… }
二、JobTracker
JobTracker作为一个单独的JVM运行,其运行的main函数主要调用有下面两部分:
* 调用静态函数startTracker(new JobConf())创建一个JobTracker对象
* 调用JobTracker.offerService()函数提供服务
在JobTracker的构造函数中,会生成一个taskScheduler成员变量,来进行Job的调度,默认为JobQueueTaskScheduler,也即按照FIFO的方式调度任务。
在offerService函数中,则调用taskScheduler.start(),在这个函数中,为JobTracker(也即taskScheduler的taskTrackerManager)注册了两个Listener:
* JobQueueJobInProgressListener jobQueueJobInProgressListener用于监控job的运行状态
* EagerTaskInitializationListener eagerTaskInitializationListener用于对Job进行初始化
EagerTaskInitializationListener中有一个线程JobInitThread,不断得到jobInitQueue中的JobInProgress对象,调用JobInProgress对象的initTasks函数对任务进行初始化操作。
在上一节中,客户端调用了JobTracker.submitJob函数,此函数首先生成一个JobInProgress对象,然后调用addJob函数,其中有如下的逻辑:
synchronized (jobs) { synchronized (taskScheduler) { jobs.put(job.getProfile().getJobID(), job); //对JobTracker的每一个listener都调用jobAdded函数 for (JobInProgressListener listener : jobInProgressListeners) { listener.jobAdded(job); } } }
EagerTaskInitializationListener的jobAdded函数就是向jobInitQueue中添加一个JobInProgress对象,于是自然触发了此Job的初始化操作,由JobInProgress得initTasks函数完成:
public synchronized void initTasks() throws IOException { …… //从HDFS中读取job.split文件从而生成input splits String jobFile = profile.getJobFile(); Path sysDir = new Path(this.jobtracker.getSystemDir()); FileSystem fs = sysDir.getFileSystem(conf); DataInputStream splitFile = fs.open(new Path(conf.get("mapred.job.split.file"))); JobClient.RawSplit[] splits; try { splits = JobClient.readSplitFile(splitFile); } finally { splitFile.close(); } //map task的个数就是input split的个数 numMapTasks = splits.length; //为每个map tasks生成一个TaskInProgress来处理一个input split maps = new TaskInProgress[numMapTasks]; for(int i=0; i < numMapTasks; ++i) { inputLength += splits[i].getDataLength(); maps[i] = new TaskInProgress(jobId, jobFile, splits[i], jobtracker, conf, this, i); } //对于map task,将其放入nonRunningMapCache,是一个Map<Node, List<TaskInProgress>>,也即对于map task来讲,其将会被分配到其input split所在的Node上。nonRunningMapCache将在JobTracker向TaskTracker分配map task的时候使用。 if (numMapTasks > 0) { nonRunningMapCache = createCache(splits, maxLevel); } //创建reduce task this.reduces = new TaskInProgress[numReduceTasks]; for (int i = 0; i < numReduceTasks; i++) { reduces[i] = new TaskInProgress(jobId, jobFile, numMapTasks, i, jobtracker, conf, this); //reduce task放入nonRunningReduces,其将在JobTracker向TaskTracker分配reduce task的时候使用。 nonRunningReduces.add(reduces[i]); } //创建两个cleanup task,一个用来清理map,一个用来清理reduce. cleanup = new TaskInProgress[2]; cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], jobtracker, conf, this, numMapTasks); cleanup[0].setJobCleanupTask(); cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks, jobtracker, conf, this); cleanup[1].setJobCleanupTask(); //创建两个初始化 task,一个初始化map,一个初始化reduce. setup = new TaskInProgress[2]; setup[0] = new TaskInProgress(jobId, jobFile, splits[0], jobtracker, conf, this, numMapTasks + 1 ); setup[0].setJobSetupTask(); setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks, numReduceTasks + 1, jobtracker, conf, this); setup[1].setJobSetupTask(); tasksInited.set(true);//初始化完毕 …… }
三、TaskTracker
TaskTracker也是作为一个单独的JVM来运行的,在其main函数中,主要是调用了new TaskTracker(conf).run(),其中run函数主要调用了:
State offerService() throws Exception { long lastHeartbeat = 0; // TaskTracker进程是一直存在的 while (running && !shuttingDown) { …… long now = System.currentTimeMillis(); //每隔一段时间就向JobTracker发送heartbeat long waitTime = heartbeatInterval - (now - lastHeartbeat); if (waitTime > 0) { synchronized(finishedCount) { if (finishedCount[0] == 0) { finishedCount.wait(waitTime); } finishedCount[0] = 0; } } …… //发送Heartbeat到JobTracker,得到response HeartbeatResponse heartbeatResponse = transmitHeartBeat(now); …… //从Response中得到此TaskTracker需要做的事情 TaskTrackerAction[] actions = heartbeatResponse.getActions(); …… if (actions != null){ for(TaskTrackerAction action: actions) { if (action instanceof LaunchTaskAction) { //如果是运行一个新的Task,则将Action添加到任务队列中 addToTaskQueue((LaunchTaskAction)action); } else if (action instanceof CommitTaskAction) { CommitTaskAction commitAction = (CommitTaskAction)action; if (!commitResponses.contains(commitAction.getTaskID())) { commitResponses.add(commitAction.getTaskID()); } } else { tasksToCleanup.put(action); } } } } return State.NORMAL; }
其中transmitHeartBeat主要逻辑如下:
private HeartbeatResponse transmitHeartBeat(long now) throws IOException { //每隔一段时间,在heartbeat中要返回给JobTracker一些统计信息 boolean sendCounters; if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) { sendCounters = true; previousUpdate = now; } else { sendCounters = false; } …… //报告给JobTracker,此TaskTracker的当前状态 if (status == null) { synchronized (this) { status = new TaskTrackerStatus(taskTrackerName, localHostname, httpPort, cloneAndResetRunningTaskStatuses( sendCounters), failures, maxCurrentMapTasks, maxCurrentReduceTasks); } } …… //当满足下面的条件的时候,此TaskTracker请求JobTracker为其分配一个新的Task来运行: //当前TaskTracker正在运行的map task的个数小于可以运行的map task的最大个数 //当前TaskTracker正在运行的reduce task的个数小于可以运行的reduce task的最大个数 boolean askForNewTask; long localMinSpaceStart; synchronized (this) { askForNewTask = (status.countMapTasks() < maxCurrentMapTasks || status.countReduceTasks() < maxCurrentReduceTasks) && acceptNewTasks; localMinSpaceStart = minSpaceStart; } …… //向JobTracker发送heartbeat,这是一个RPC调用 HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, justStarted, askForNewTask, heartbeatResponseId); …… return heartbeatResponse; }
四、JobTracker
当JobTracker被RPC调用来发送heartbeat的时候,JobTracker的heartbeat(TaskTrackerStatus status,boolean initialContact, boolean acceptNewTasks, short responseId)函数被调用:
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, boolean initialContact, boolean acceptNewTasks, short responseId) throws IOException { …… String trackerName = status.getTrackerName(); …… short newResponseId = (short)(responseId + 1); …… HeartbeatResponse response = new HeartbeatResponse(newResponseId, null); List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>(); //如果TaskTracker向JobTracker请求一个task运行 if (acceptNewTasks) { TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName); if (taskTrackerStatus == null) { LOG.warn("Unknown task tracker polling; ignoring: " + trackerName); } else { //setup和cleanup的task优先级最高 List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus); if (tasks == null ) { //任务调度器分配任务 tasks = taskScheduler.assignTasks(taskTrackerStatus); } if (tasks != null) { for (Task task : tasks) { //将任务放入actions列表,返回给TaskTracker expireLaunchingTasks.addNewTask(task.getTaskID()); actions.add(new LaunchTaskAction(task)); } } } } …… int nextInterval = getNextHeartbeatInterval(); response.setHeartbeatInterval(nextInterval); response.setActions( actions.toArray(new TaskTrackerAction[actions.size()])); …… return response; }
默认的任务调度器为JobQueueTaskScheduler,其assignTasks如下:
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException { ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus(); int numTaskTrackers = clusterStatus.getTaskTrackers(); Collection<JobInProgress> jobQueue = jobQueueJobInProgressListener.getJobQueue(); int maxCurrentMapTasks = taskTracker.getMaxMapTasks(); int maxCurrentReduceTasks = taskTracker.getMaxReduceTasks(); int numMaps = taskTracker.countMapTasks(); int numReduces = taskTracker.countReduceTasks(); //计算剩余的map和reduce的工作量:remaining int remainingReduceLoad = 0; int remainingMapLoad = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() == JobStatus.RUNNING) { int totalMapTasks = job.desiredMaps(); int totalReduceTasks = job.desiredReduces(); remainingMapLoad += (totalMapTasks - job.finishedMaps()); remainingReduceLoad += (totalReduceTasks - job.finishedReduces()); } } } //计算平均每个TaskTracker应有的工作量,remaining/numTaskTrackers是剩余的工作量除以TaskTracker的个数。 int maxMapLoad = 0; int maxReduceLoad = 0; if (numTaskTrackers > 0) { maxMapLoad = Math.min(maxCurrentMapTasks, (int) Math.ceil((double) remainingMapLoad / numTaskTrackers)); maxReduceLoad = Math.min(maxCurrentReduceTasks, (int) Math.ceil((double) remainingReduceLoad / numTaskTrackers)); } …… //map优先于reduce,当TaskTracker上运行的map task数目小于平均的工作量,则向其分配map task if (numMaps < maxMapLoad) { int totalNeededMaps = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING) { continue; } Task t = job.obtainNewMapTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { return Collections.singletonList(t); } …… } } } //分配完map task,再分配reduce task if (numReduces < maxReduceLoad) { int totalNeededReduces = 0; synchronized (jobQueue) { for (JobInProgress job : jobQueue) { if (job.getStatus().getRunState() != JobStatus.RUNNING || job.numReduceTasks == 0) { continue; } Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, taskTrackerManager.getNumberOfUniqueHosts()); if (t != null) { return Collections.singletonList(t); } …… } } } return null; }
从上面的代码中我们可以知道,JobInProgress的obtainNewMapTask是用来分配map task的,其主要调用findNewMapTask,根据TaskTracker所在的Node从nonRunningMapCache中查找TaskInProgress。JobInProgress的obtainNewReduceTask是用来分配reduce task的,其主要调用findNewReduceTask,从nonRunningReduces查找TaskInProgress。
五、TaskTracker
在向JobTracker发送heartbeat后,返回的reponse中有分配好的任务LaunchTaskAction,将其加入队列,调用addToTaskQueue,如果是map task则放入mapLancher(类型为TaskLauncher),如果是reduce task则放入reduceLancher(类型为TaskLauncher):
private void addToTaskQueue(LaunchTaskAction action) { if (action.getTask().isMapTask()) { mapLauncher.addToTaskQueue(action); } else { reduceLauncher.addToTaskQueue(action); } }
TaskLauncher是一个线程,其run函数从上面放入的queue中取出一个TaskInProgress,然后调用startNewTask(TaskInProgress tip)来启动一个task,其又主要调用了localizeJob(TaskInProgress tip):
private void localizeJob(TaskInProgress tip) throws IOException { //首先要做的一件事情是有关Task的文件从HDFS拷贝的TaskTracker的本地文件系统中:job.split,job.xml以及job.jar Path localJarFile = null; Task t = tip.getTask(); JobID jobId = t.getJobID(); Path jobFile = new Path(t.getJobFile()); …… Path localJobFile = lDirAlloc.getLocalPathForWrite( getLocalJobDir(jobId.toString()) + Path.SEPARATOR + "job.xml", jobFileSize, fConf); RunningJob rjob = addTaskToJob(jobId, tip); synchronized (rjob) { if (!rjob.localized) { FileSystem localFs = FileSystem.getLocal(fConf); Path jobDir = localJobFile.getParent(); …… //将job.split拷贝到本地 systemFS.copyToLocalFile(jobFile, localJobFile); JobConf localJobConf = new JobConf(localJobFile); Path workDir = lDirAlloc.getLocalPathForWrite( (getLocalJobDir(jobId.toString()) + Path.SEPARATOR + "work"), fConf); if (!localFs.mkdirs(workDir)) { throw new IOException("Mkdirs failed to create " + workDir.toString()); } System.setProperty("job.local.dir", workDir.toString()); localJobConf.set("job.local.dir", workDir.toString()); // copy Jar file to the local FS and unjar it. String jarFile = localJobConf.getJar(); long jarFileSize = -1; if (jarFile != null) { Path jarFilePath = new Path(jarFile); localJarFile = new Path(lDirAlloc.getLocalPathForWrite( getLocalJobDir(jobId.toString()) + Path.SEPARATOR + "jars", 5 * jarFileSize, fConf), "job.jar"); if (!localFs.mkdirs(localJarFile.getParent())) { throw new IOException("Mkdirs failed to create jars directory "); } //将job.jar拷贝到本地 systemFS.copyToLocalFile(jarFilePath, localJarFile); localJobConf.setJar(localJarFile.toString()); //将job得configuration写成job.xml OutputStream out = localFs.create(localJobFile); try { localJobConf.writeXml(out); } finally { out.close(); } // 解压缩job.jar RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile.getParent().toString())); } rjob.localized = true; rjob.jobConf = localJobConf; } } //真正的启动此Task launchTaskForJob(tip, new JobConf(rjob.jobConf)); }
当所有的task运行所需要的资源都拷贝到本地后,则调用launchTaskForJob,其又调用TaskInProgress的launchTask函数:
public synchronized void launchTask() throws IOException { …… //创建task运行目录 localizeTask(task); if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) { this.taskStatus.setRunState(TaskStatus.State.RUNNING); } // 创建并启动TaskRunner,对于MapTask,创建的是MapTaskRunner,对于ReduceTask,创建的是ReduceTaskRunner this.runner = task.createRunner(TaskTracker.this, this); this.runner.start(); this.taskStatus.setStartTime(System.currentTimeMillis()); }
TaskRunner是一个线程,其run函数如下:
public final void run() { …… TaskAttemptID taskid = t.getTaskID(); LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); File jobCacheDir = null; if (conf.getJar() != null) { jobCacheDir = new File( new Path(conf.getJar()).getParent().toString()); } File workDir = new File(lDirAlloc.getLocalPathToRead( TaskTracker.getLocalTaskDir( t.getJobID().toString(), t.getTaskID().toString(), t.isTaskCleanupTask()) + Path.SEPARATOR + MRConstants.WORKDIR, conf). toString()); FileSystem fileSystem; Path localPath; …… //拼写classpath String baseDir; String sep = System.getProperty("path.separator"); StringBuffer classPath = new StringBuffer(); // start with same classpath as parent process classPath.append(System.getProperty("java.class.path")); classPath.append(sep); if (!workDir.mkdirs()) { if (!workDir.isDirectory()) { LOG.fatal("Mkdirs failed to create " + workDir.toString()); } } String jar = conf.getJar(); if (jar != null) { // if jar exists, it into workDir File[] libs = new File(jobCacheDir, "lib").listFiles(); if (libs != null) { for (int i = 0; i < libs.length; i++) { classPath.append(sep); // add libs from jar to classpath classPath.append(libs[i]); } } classPath.append(sep); classPath.append(new File(jobCacheDir, "classes")); classPath.append(sep); classPath.append(jobCacheDir); } …… classPath.append(sep); classPath.append(workDir); //拼写命令行java及其参数 Vector<String> vargs = new Vector<String>(8); File jvm = new File(new File(System.getProperty("java.home"), "bin"), "java"); vargs.add(jvm.toString()); String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m"); javaOpts = javaOpts.replace("@taskid@", taskid.toString()); String [] javaOptsSplit = javaOpts.split(" "); String libraryPath = System.getProperty("java.library.path"); if (libraryPath == null) { libraryPath = workDir.getAbsolutePath(); } else { libraryPath += sep + workDir; } boolean hasUserLDPath = false; for(int i=0; i<javaOptsSplit.length ;i++) { if(javaOptsSplit[i].startsWith("-Djava.library.path=")) { javaOptsSplit[i] += sep + libraryPath; hasUserLDPath = true; break; } } if(!hasUserLDPath) { vargs.add("-Djava.library.path=" + libraryPath); } for (int i = 0; i < javaOptsSplit.length; i++) { vargs.add(javaOptsSplit[i]); } //添加Child进程的临时文件夹 String tmp = conf.get("mapred.child.tmp", "./tmp"); Path tmpDir = new Path(tmp); if (!tmpDir.isAbsolute()) { tmpDir = new Path(workDir.toString(), tmp); } FileSystem localFs = FileSystem.getLocal(conf); if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) { throw new IOException("Mkdirs failed to create " + tmpDir.toString()); } vargs.add("-Djava.io.tmpdir=" + tmpDir.toString()); // Add classpath. vargs.add("-classpath"); vargs.add(classPath.toString()); //log文件夹 long logSize = TaskLog.getTaskLogLength(conf); vargs.add("-Dhadoop.log.dir=" + new File(System.getProperty("hadoop.log.dir") ).getAbsolutePath()); vargs.add("-Dhadoop.root.logger=INFO,TLA"); vargs.add("-Dhadoop.tasklog.taskid=" + taskid); vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize); // 运行map task和reduce task的子进程的main class是Child vargs.add(Child.class.getName()); // main of Child …… //运行子进程 jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, workDir, env, pidFile, conf)); }
六、Child
真正的map task和reduce task都是在Child进程中运行的,Child的main函数的主要逻辑如下:
while (true) { //从TaskTracker通过网络通信得到JvmTask对象 JvmTask myTask = umbilical.getTask(jvmId); …… idleLoopCount = 0; task = myTask.getTask(); taskid = task.getTaskID(); isCleanup = task.isTaskCleanupTask(); JobConf job = new JobConf(task.getJobFile()); TaskRunner.setupWorkDir(job); numTasksToExecute = job.getNumTasksToExecutePerJvm(); task.setConf(job); defaultConf.addResource(new Path(task.getJobFile())); …… //运行task task.run(job, umbilical); // run the task if (numTasksToExecute > 0 && ++numTasksExecuted == numTasksToExecute) { break; } }
6.1、MapTask
如果task是MapTask,则其run函数如下:
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException { //用于同TaskTracker进行通信,汇报运行状况 final Reporter reporter = getReporter(umbilical); startCommunicationThread(umbilical); initialize(job, reporter); …… //map task的输出 int numReduceTasks = conf.getNumReduceTasks(); MapOutputCollector collector = null; if (numReduceTasks > 0) { collector = new MapOutputBuffer(umbilical, job, reporter); } else { collector = new DirectMapOutputCollector(umbilical, job, reporter); } //读取input split,按照其中的信息,生成RecordReader来读取数据 instantiatedSplit = (InputSplit) ReflectionUtils.newInstance(job.getClassByName(splitClass), job); DataInputBuffer splitBuffer = new DataInputBuffer(); splitBuffer.reset(split.getBytes(), 0, split.getLength()); instantiatedSplit.readFields(splitBuffer); if (instantiatedSplit instanceof FileSplit) { FileSplit fileSplit = (FileSplit) instantiatedSplit; job.set("map.input.file", fileSplit.getPath().toString()); job.setLong("map.input.start", fileSplit.getStart()); job.setLong("map.input.length", fileSplit.getLength()); } RecordReader rawIn = // open input job.getInputFormat().getRecordReader(instantiatedSplit, job, reporter); RecordReader in = isSkipping() ? new SkippingRecordReader(rawIn, getCounters(), umbilical) : new TrackedRecordReader(rawIn, getCounters()); job.setBoolean("mapred.skip.on", isSkipping()); //对于map task,生成一个MapRunnable,默认是MapRunner MapRunnable runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job); try { //MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理。 runner.run(in, collector, reporter); collector.flush(); } finally { in.close(); // close input collector.close(); } done(umbilical); }
MapRunner的run函数就是依次读取RecordReader中的数据,然后调用Mapper的map函数进行处理:
public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output, Reporter reporter) throws IOException { try { K1 key = input.createKey(); V1 value = input.createValue(); while (input.next(key, value)) { mapper.map(key, value, output, reporter); if(incrProcCount) { reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1); } } } finally { mapper.close(); } }
结果集全部收集到MapOutputBuffer中,其collect函数如下:
public synchronized void collect(K key, V value) throws IOException { reporter.progress(); …… //从此处看,此buffer是一个ring的数据结构 final int kvnext = (kvindex + 1) % kvoffsets.length; spillLock.lock(); try { boolean kvfull; do { //在ring中,如果下一个空闲位置接上起始位置的话,则表示满了 kvfull = kvnext == kvstart; //在ring中计算是否需要将buffer写入硬盘的阈值 final boolean kvsoftlimit = ((kvnext > kvend) ? kvnext - kvend > softRecordLimit : kvend - kvnext <= kvoffsets.length - softRecordLimit); //如果到达阈值,则开始将buffer写入硬盘,写成spill文件。 //startSpill主要是notify一个背后线程SpillThread的run()函数,开始调用sortAndSpill()开始排序,合并,写入硬盘 if (kvstart == kvend && kvsoftlimit) { startSpill(); } //如果buffer满了,则只能等待写入完毕 if (kvfull) { while (kvstart != kvend) { reporter.progress(); spillDone.await(); } } } while (kvfull); } finally { spillLock.unlock(); } try { //如果buffer不满,则将key, value写入buffer int keystart = bufindex; keySerializer.serialize(key); final int valstart = bufindex; valSerializer.serialize(value); int valend = bb.markRecord(); //调用设定的partitioner,根据key, value取得partition id final int partition = partitioner.getPartition(key, value, partitions); mapOutputRecordCounter.increment(1); mapOutputByteCounter.increment(valend >= keystart ? valend - keystart : (bufvoid - keystart) + valend); //将parition id以及key, value在buffer中的偏移量写入索引数组 int ind = kvindex * ACCTSIZE; kvoffsets[kvindex] = ind; kvindices[ind + PARTITION] = partition; kvindices[ind + KEYSTART] = keystart; kvindices[ind + VALSTART] = valstart; kvindex = kvnext; } catch (MapBufferTooSmallException e) { LOG.info("Record too large for in-memory buffer: " + e.getMessage()); spillSingleRecord(key, value); mapOutputRecordCounter.increment(1); return; } }
内存buffer的格式如下:
(见几位hadoop大侠的分析http://blog.csdn.net/HEYUTAO007/archive/2010/07/10/5725379.aspx 以及http://caibinbupt.javaeye.com/)
kvoffsets是为了写入内存前排序使用的。
从上面可知,内存buffer写入硬盘spill文件的函数为sortAndSpill:
private void sortAndSpill() throws IOException { …… FSDataOutputStream out = null; FSDataOutputStream indexOut = null; IFileOutputStream indexChecksumOut = null; //创建硬盘上的spill文件 Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(), numSpills, size); out = rfs.create(filename); …… final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend; //按照partition的顺序对buffer中的数据进行排序 sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); int spindex = kvstart; InMemValBytes value = new InMemValBytes(); //依次一个一个parition的写入文件 for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec); //如果combiner为空,则直接写入文件 if (null == combinerClass) { …… writer.append(key, value); ++spindex; } else { …… //如果combiner不为空,则先combine,调用combiner.reduce(…)函数后再写入文件 combineAndSpill(kvIter, combineInputCounter); } } …… }
当map阶段结束的时候,MapOutputBuffer的flush函数会被调用,其也会调用sortAndSpill将buffer中的写入文件,然后再调用mergeParts来合并写入在硬盘上的多个spill:
private void mergeParts() throws IOException { …… //对于每一个partition for (int parts = 0; parts < partitions; parts++){ //create the segments to be merged List<Segment<K, V>> segmentList = new ArrayList<Segment<K, V>>(numSpills); TaskAttemptID mapId = getTaskID(); //依次从各个spill文件中收集属于当前partition的段 for(int i = 0; i < numSpills; i++) { final IndexRecord indexRecord = getIndexInformation(mapId, i, parts); long segmentOffset = indexRecord.startOffset; long segmentLength = indexRecord.partLength; Segment<K, V> s = new Segment<K, V>(job, rfs, filename[i], segmentOffset, segmentLength, codec, true); segmentList.add(i, s); } //将属于同一个partition的段merge到一起 RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, segmentList, job.getInt("io.sort.factor", 100), new Path(getTaskID().toString()), job.getOutputKeyComparator(), reporter); //写入合并后的段到文件 long segmentStart = finalOut.getPos(); Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec); if (null == combinerClass || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combineAndSpill(kvIter, combineInputCounter); } …… } }
6.2、ReduceTask
ReduceTask的run函数如下:
public void run(JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException { job.setBoolean("mapred.skip.on", isSkipping()); //对于reduce,则包含三个步骤:拷贝,排序,Reduce if (isMapOrReduce()) { copyPhase = getProgress().addPhase("copy"); sortPhase = getProgress().addPhase("sort"); reducePhase = getProgress().addPhase("reduce"); } startCommunicationThread(umbilical); final Reporter reporter = getReporter(umbilical); initialize(job, reporter); //copy阶段,主要使用ReduceCopier的fetchOutputs函数获得map的输出。创建多个线程MapOutputCopier,其中copyOutput进行拷贝。 boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local")); if (!isLocal) { reduceCopier = new ReduceCopier(umbilical, job); if (!reduceCopier.fetchOutputs()) { …… } } copyPhase.complete(); //sort阶段,将得到的map输出合并,直到文件数小于io.sort.factor时停止,返回一个Iterator用于访问key-value setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); final FileSystem rfs = FileSystem.getLocal(job).getRaw(); RawKeyValueIterator rIter = isLocal ? Merger.merge(job, rfs, job.getMapOutputKeyClass(), job.getMapOutputValueClass(), codec, getMapFiles(rfs, true), !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100), new Path(getTaskID().toString()), job.getOutputKeyComparator(), reporter) : reduceCopier.createKVIterator(job, rfs, reporter); mapOutputFilesOnDisk.clear(); sortPhase.complete(); //reduce阶段 setPhase(TaskStatus.Phase.REDUCE); …… Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job); Class keyClass = job.getMapOutputKeyClass(); Class valClass = job.getMapOutputValueClass(); ReduceValuesIterator values = isSkipping() ? new SkippingReduceValuesIterator(rIter, job.getOutputValueGroupingComparator(), keyClass, valClass, job, reporter, umbilical) : new ReduceValuesIterator(rIter, job.getOutputValueGroupingComparator(), keyClass, valClass, job, reporter); //逐个读出key-value list,然后调用Reducer的reduce函数 while (values.more()) { reduceInputKeyCounter.increment(1); reducer.reduce(values.getKey(), values, collector, reporter); values.nextKey(); values.informReduceProgress(); } reducer.close(); out.close(reporter); done(umbilical); }
七、总结
Map-Reduce的过程总结如下图:
转自:http://www.cnblogs.com/forfuture1978/archive/2010/11/19/1882268.html
发表评论
文章已被作者锁定,不允许评论。
-
Hadoop namenode的fsimage与editlog详解
2017-05-19 10:04 1177Namenode主要维护两个文件,一个是fsimage,一个是 ... -
Hadoop HBase建表时预分区(region)的方法学习
2017-05-15 11:18 1189如果知道Hbase数据表的key的分布情况,就可以在建表的时候 ... -
Hadoop HBase行健(rowkey)设计原则学习
2017-05-15 10:34 1124Hbase是三维有序存储的,通过rowkey(行键),colu ... -
Hadoop HBase中split原理学习
2017-05-12 13:38 2270在Hbase中split是一个很重 ... -
Hadoop HBase中Compaction原理学习
2017-05-12 10:34 994HBase Compaction策略 RegionServer ... -
Hadoop HBase性能优化学习
2017-05-12 09:15 684一、调整参数 入门级的调优可以从调整参数开始。投入小,回报快 ... -
Hadoop 分布式文件系统学习
2017-05-10 15:34 498一. 分布式文件系统 分布式文件系统,在整个分布式系统体系中处 ... -
Hadoop MapReduce处理wordcount代码分析
2017-04-28 14:25 591package org.apache.hadoop.exa ... -
Hadoop YARN完全分布式配置学习
2017-04-26 10:27 572版本及配置简介 Java: J ... -
Hadoop YARN各个组件和流程的学习
2017-04-24 19:04 647一、基本组成结构 * 集 ... -
Hadoop YARN(Yet Another Resource Negotiator)详细解析
2017-04-24 18:30 1153带有 MapReduce 的 Apache Had ... -
Hive 注意事项与扩展特性
2017-04-06 19:31 7451. 使用HIVE注意点 字符集 Hadoop和Hive都 ... -
Hive 元数据和QL基本操作学习整理
2017-04-06 14:36 1017Hive元数据库 Hive将元数据存储在RDBMS 中,一般常 ... -
Hive 文件压缩存储格式(STORED AS)
2017-04-06 09:35 2299Hive文件存储格式包括以下几类: 1.TEXTFILE ... -
Hive SQL自带函数总结
2017-04-05 19:25 1139字符串长度函数:length ... -
Hive 连接查询操作(不支持IN查询)
2017-04-05 19:16 717CREATE EXTERNAL TABLE IF NOT ... -
Hive优化学习(join ,group by,in)
2017-04-05 18:48 1814一、join优化 Join ... -
Hive 基础知识学习(语法)
2017-04-05 15:51 896一.Hive 简介 Hive是基于 Hadoop 分布式文件 ... -
Hive 架构与基本语法(OLAP)
2017-04-05 15:16 1242Hive 是什么 Hive是建立在Hadoop上的数据仓库基础 ... -
Hadoop MapReduce操作Hbase范例学习(TableMapReduceUtil)
2017-03-24 15:37 1208Hbase里的数据量一般都 ...
相关推荐
**源代码解析**:单词计数是Hadoop Map-Reduce中最经典的示例之一,用于演示如何读取文本文件,统计其中每个单词出现的次数。Map函数将文本文件中的每一行分解为单词,为每个单词创建键值对(<单词,1>),Reduce...
### Hadoop MapReduce任务提交与执行流程解析 #### 一、客户端提交任务 在Hadoop MapReduce框架中,客户端的任务提交是整个MapReduce作业启动的关键步骤。这一过程主要由`JobClient`类中的`runJob(JobConf job)`...
为了解决这一问题,Hadoop作为一种开源的Map-Reduce实现框架被广泛应用于存储与处理大规模数据集。然而,Map-Reduce编程模型本身较为底层,开发者需要编写大量的自定义程序来实现特定的功能,这不仅增加了开发的难度...
《Hadoop MapReduce实战指南——基于<hadoop-map-reduce-demo>项目解析》 在大数据处理领域,Hadoop MapReduce作为核心组件,承担着数据分布式计算的任务。本篇将通过一个名为“hadoop-map-reduce-demo”的示例项目...
它将大型任务分解为许多小的Map任务和Reduce任务,这些任务在集群中的节点上并行执行。 3. **网络通信**:Hadoop使用`org.apache.hadoop.net`包中的类来处理网络通信,如`SocketServer`和`NetUtils`,它们负责节点...
本篇文章将深入探讨“远程调用执行Hadoop Map/Reduce”的概念、原理及其实现过程,同时结合标签“源码”和“工具”,我们将涉及到如何通过编程接口与Hadoop集群进行交互。 Hadoop MapReduce是一种编程模型,用于大...
1. **创建MapReduce项目**:安装插件后,Eclipse会新增一个"New" -> "Other" -> "Hadoop Map/Reduce Project"选项,点击即可创建一个MapReduce项目,提供了模板化的Job类和Mapper/Reducer类。 2. **编辑HDFS文件**...
例如,你可以继承org.apache.hadoop.mapred.MapReduceBase类,实现map()和reduce()方法来创建自己的MapReduce任务。同时,通过Configuration对象,可以设置Hadoop运行时的各种参数,如输入输出路径、分区策略等。 ...
MapReduce通过“映射”(Map)和“化简”(Reduce)两个阶段进行大规模数据处理。源码中,`mapred`模块提供了MapReduce任务的生命周期管理,而`mapreduce`模块则实现了新的API和运行时框架。 5. HDFS架构: HDFS...
标题中的"hadoop-training-map-reduce-example-4"表明这是一个关于Hadoop MapReduce的教程实例,很可能是第四个阶段或示例。Hadoop是Apache软件基金会的一个开源项目,它提供了分布式文件系统(HDFS)和MapReduce...
MapReduce则是Hadoop的数据处理模型,通过"map"和"reduce"两个阶段,实现了对大规模数据的并行处理。源码中的`hadoop-mapreduce-project`则涵盖了MapReduce的实现细节。 在Hadoop 3.0.1中,YARN(Yet Another ...
源码中,MapTask和ReduceTask的执行流程值得深入分析。 四、源码学习价值 阅读Hadoop 2.10.0的源码,可以帮助我们: 1. 理解Hadoop的内部工作机制,提升问题排查能力。 2. 学习分布式系统的设计与实现,为自定义...
在Hadoop环境中,我们可以利用MapReduce的并行计算能力,让每个Map任务负责一部分URL的抓取,然后Reduce阶段进行数据整合。 **Eclipse** 是一个流行的Java开发集成环境,支持各种插件,使得开发分布式爬虫变得更加...
源码中可以学习到作业提交、任务调度、数据分片、Shuffle和Reduce过程的实现。 6. **YARN详解** YARN是Hadoop 2.x引入的新资源管理框架,它将资源管理和任务调度分离,提高了集群资源利用率。源码中可以了解到...
本文将深入解析Hadoop MapReduce的工作流程,帮助你更好地理解和使用这个强大的工具。 MapReduce的过程通常分为三个主要阶段:提交阶段、执行阶段和完成阶段。 1. 提交阶段: 这个阶段由客户端开始,客户端通过`...
在MapReduce部分,书中可能详细讲解了MapReduce的编程模型,如何将任务拆分为Map阶段和Reduce阶段,以及shuffle和sort的过程。此外,可能还讨论了YARN(Yet Another Resource Negotiator)资源调度器,它是Hadoop 2....
同时,Snappy的快速解压能力使得在Map和Reduce阶段可以更快地处理数据。 3. **列式存储优化**:对于像HBase这样的列式存储系统,Snappy压缩能有效地压缩列数据,节省存储空间,同时在查询时仅解压需要的列,提高了I...