- 浏览: 73989 次
前面一片文章写了MR怎么写, 然后添加的主要功能怎么用, 像partitioner, combiner等, 这周看了一下MR执行的时候Job提交以及Task运行的过程, 记录一下整个源码执行步骤, 量太大就不写详细了, 只是一步一步跟踪下去, 具体是在做什么就稍微解释一下, 跟多还是要靠自己看上下文理解了, 首先Job是通过job.waitForCompletion(true) 来提交的, 里面是通过submit()来提交的:
Submit里面通过submitter.submitJobInternal(Job.this, cluster)来提交, 由于我们用的是mapreduce下面的api, 所以会设置一些配置属性, 通过setUseNewAPI来做, 之后会有用到, 旧API和新API调用的执行Task方法不一样:
之后在submitJobInternal里面通过submitJob来提交, 里面设置了很多属性, 然后把Job相关文件放到JobTracker的机器上面了, 顺便也根据InputFormatClass来分割了输入文件, 多个splits:
LocalJobRunner里面的submitJob方法:
看一下New Job里面做了什么:
那么接下来就要看run方法里面都做了什么了
其实可以看到里面最重要的就是runable和runtask这两个方法, 我们看一下runtask到底是做了什么:
MapTaskRunnable类的run方法:
其实主要就是创建MapTask, 调用MapTask里面的run方法
那么接下来要看一下maptask的run方法了:
在runNewMapper里面, 调用了map.run方法, 实际上是循环执行了我们在MR里面写的map的程序。 然后最后把所有的K,v输出变成k, IterV
当调用Context.write的时候, 其实是调用上面的mapperContext.write, 间接的调用了mapContext.write, 最终是调用了TaskInputOutputContextImpl里面的write方法, 实则是促发了NewOutputCollector类的write的方法, 那么我们看一下当我们写write的时候发生了什么:
可以看到这里在写write方法的时候已经使用了partitioner的类去划分
其中的collector是createSortingCollector(job, reporter)创建的, 点进去看的话是Class<?>[] collectorClasses = job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
默认就是MapOutputBuffer.class, 所以我们看一下MapOutputBuffer.collect
这里其实就是把数据写到缓存, 然后如果缓存满了的话就写到磁盘, 存储是先按partition的index加上K v的offset来对应实际的存储内容, 里面好多都是计算存储位置加上缓存的处理, 大家随便看看就是了, 整个过程就是通过startSpill激活spill, 然后计算,存放数据, 在这个类的init里面就启动了spillthread, 我们看看这个线程里面是干嘛的:
主要就是靠sortAndSpillcombine (job.setCombinerClass):
然后Map做完后就会回到runNewMapper 去做output.close, 之前说过close动作回去做flush:
collector的flush动作其实回去做把结果集成并且排序这个动作, 就是会调用job.setSortComparatorClass设定的方法, 生成最终的Map端输出:
可以看到最重要的方法是mergeParts, 他把所有的内容的存到一个file里面, 并且根据我们设置的类 去做排序, 我们看一下这个方法里面做了什么:
好了到这里为止 基本上Map这边就搞清楚了, reducer这边有空再看一下, 入口也是runTask和 reducerunnable这两个方法。
public boolean waitForCompletion(boolean verbose ) throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState.DEFINE) { submit(); //实际提交 } if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf()); //循环等待Job跑完 while (!isComplete()) { try { Thread.sleep(completionPollIntervalMillis); } catch (InterruptedException ie) { } } } return isSuccessful(); }
Submit里面通过submitter.submitJobInternal(Job.this, cluster)来提交, 由于我们用的是mapreduce下面的api, 所以会设置一些配置属性, 通过setUseNewAPI来做, 之后会有用到, 旧API和新API调用的执行Task方法不一样:
public void submit() throws IOException, InterruptedException, ClassNotFoundException { ensureState(JobState.DEFINE); setUseNewAPI(); //使用新的API Mapreduce包下的 connect(); final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException { //提交Job return submitter.submitJobInternal(Job.this, cluster); } }); //submit后Job状态为running state = JobState.RUNNING; LOG.info("The url to track the job: " + getTrackingURL()); }
之后在submitJobInternal里面通过submitJob来提交, 里面设置了很多属性, 然后把Job相关文件放到JobTracker的机器上面了, 顺便也根据InputFormatClass来分割了输入文件, 多个splits:
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs checkSpecs(job); Configuration conf = job.getConfiguration(); addMRFrameworkToDistributedCache(conf); //获取Job的实际路径 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //configure the command line options correctly on the submitting dfs //这个应该是Jobtracker的机器的host和IP InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } //生成Job ID JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); //JobID生成路径 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; //设置一堆属性 try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); //这个就是我们为什么要配SSL的免密码登录 这里有用 // generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { int keyLen = CryptoUtils.isShuffleEncrypted(conf) ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) : SHUFFLE_KEY_LENGTH; keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(keyLen); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } //复制Job的jar 还有文件等各种 到Jobtracker的机器上面 copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); //获取map端分片量的数据, 其实一直跟踪进去的话就会发现是通过List<InputSplit> splits = input.getSplits(job); 来拿到所有的splits, //如果我们用KeyValueTextInputFormat.class的话那么就是调用这个类的分片方法 //在FileInputFormat里面通过这个条件去分片while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) SPLIT_SLOP 默认是1.1 //在全部默认设置的情况下, 最后还是有可能一个分片是超过64MB的 (实际不到1.1倍) //并且会将Split的信息写到submitJobDir下去 int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Set reservation info if it exists ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); } // 创建Job xml来存储信息 writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); //××××××××××××××××××××××××××××××××××××××××××××××××××××××××× //××××××××××××××××××提交Job, //SubmitClient有两个YARNRunner 和LocalJobRunner //LocalJobRunner比较简单, 这里先用这种模式来看, 后面学习了YARN后再来一遍 //××××××××××××××××××××××××××××××××××××××××××××××××××××××××× status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
LocalJobRunner里面的submitJob方法:
public org.apache.hadoop.mapreduce.JobStatus submitJob( org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir, Credentials credentials) throws IOException { //根据前面copy过来的信息 创建Job, 并在里面启动JobTracker的线程 Job job = new Job(JobID.downgrade(jobid), jobSubmitDir); job.job.setCredentials(credentials); return job.status; }
看一下New Job里面做了什么:
public Job(JobID jobid, String jobSubmitDir) throws IOException { this.systemJobDir = new Path(jobSubmitDir); this.systemJobFile = new Path(systemJobDir, "job.xml"); this.id = jobid; //根据之前生成的Job xml创建JobConf JobConf conf = new JobConf(systemJobFile); this.localFs = FileSystem.getLocal(conf); String user = UserGroupInformation.getCurrentUser().getShortUserName(); this.localJobDir = localFs.makeQualified(new Path( new Path(conf.getLocalPath(jobDir), user), jobid.toString())); this.localJobFile = new Path(this.localJobDir, id + ".xml"); // Manage the distributed cache. If there are files to be copied, // this will trigger localFile to be re-written again. localDistributedCacheManager = new LocalDistributedCacheManager(); localDistributedCacheManager.setup(conf); // Write out configuration file. Instead of copying it from // systemJobFile, we re-write it, since setup(), above, may have // updated it. OutputStream out = localFs.create(localJobFile); try { conf.writeXml(out); } finally { out.close(); } this.job = new JobConf(localJobFile); // Job (the current object) is a Thread, so we wrap its class loader. if (localDistributedCacheManager.hasLocalClasspaths()) { setContextClassLoader(localDistributedCacheManager.makeClassLoader( getContextClassLoader())); } //创建Profile profile = new JobProfile(job.getUser(), id, systemJobFile.toString(), "http://localhost:8080/", job.getJobName()); //创建JobStatus status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING, profile.getUser(), profile.getJobName(), profile.getJobFile(), profile.getURL().toString()); //放到已经启动Job里面 jobs.put(id, this); //×××××××××××××××××××××××××××××××××××××××××××× //启动当前Job的run方法啦 this.start(); }
那么接下来就要看run方法里面都做了什么了
public void run() { JobID jobId = profile.getJobID(); JobContext jContext = new JobContextImpl(job, jobId); org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null; try { outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf); } catch (Exception e) { LOG.info("Failed to createOutputCommitter", e); return; } try { //从systemJobDir下面把之前写进来的split的info读出来 创建为TaskSplitMetaInfo, 为后面启动map线程做准备 //资源允许情况下有多少个split就会启动多少个map线程 TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir); //获取设置的reduce task数目 int numReduceTasks = job.getNumReduceTasks(); outputCommitter.setupJob(jContext); status.setSetupProgress(1.0f); //创建Map的输出对象 Map<TaskAttemptID, MapOutputFile> mapOutputFiles = Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>()); //创建实际map的run方法所在的对象, 到时候会为每个runnable创建一个线程去跑run, 其实就是多少个runnable就是多少个map List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables( taskSplitMetaInfos, jobId, mapOutputFiles); //初始化各种计数器 initCounters(mapRunnables.size(), numReduceTasks); //mapservice 其实就是一个thread pool, 到时候就会分线程给每个map ExecutorService mapService = createMapExecutor(); 执行maprunnable里面的run方法 runTasks(mapRunnables, mapService, "map"); try { if (numReduceTasks > 0) { List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables( jobId, mapOutputFiles); //同样的 创建reducer的thread pool ExecutorService reduceService = createReduceExecutor(); //跑reduce的run方法 runTasks(reduceRunnables, reduceService, "reduce"); } } finally { for (MapOutputFile output : mapOutputFiles.values()) { output.removeAll(); } } // delete the temporary directory in output directory outputCommitter.commitJob(jContext); status.setCleanupProgress(1.0f); if (killed) { this.status.setRunState(JobStatus.KILLED); } else { this.status.setRunState(JobStatus.SUCCEEDED); } JobEndNotifier.localRunnerNotification(job, status); } catch (Throwable t) { try { outputCommitter.abortJob(jContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED); } catch (IOException ioe) { LOG.info("Error cleaning up job:" + id); } status.setCleanupProgress(1.0f); if (killed) { this.status.setRunState(JobStatus.KILLED); } else { this.status.setRunState(JobStatus.FAILED); } LOG.warn(id, t); JobEndNotifier.localRunnerNotification(job, status); } finally { try { fs.delete(systemJobFile.getParent(), true); // delete submit dir localFs.delete(localJobFile, true); // delete local copy // Cleanup distributed cache localDistributedCacheManager.close(); } catch (IOException e) { LOG.warn("Error cleaning up "+id+": "+e); } } }
其实可以看到里面最重要的就是runable和runtask这两个方法, 我们看一下runtask到底是做了什么:
private void runTasks(List<RunnableWithThrowable> runnables, ExecutorService service, String taskType) throws Exception { // Start populating the executor with work units. // They may begin running immediately (in other threads). //提交runnale, 其实就是一个pool, 有线程空余即马上运行runnable里面的run方法, 所以我们要去看mapRunnables里面的run方法了 for (Runnable r : runnables) { service.submit(r); } ... }
MapTaskRunnable类的run方法:
其实主要就是创建MapTask, 调用MapTask里面的run方法
public void run() { try { TaskAttemptID mapId = new TaskAttemptID(new TaskID( jobId, TaskType.MAP, taskId), 0); LOG.info("Starting task: " + mapId); mapIds.add(mapId); //创建MapTask MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId, info.getSplitIndex(), 1); map.setUser(UserGroupInformation.getCurrentUser(). getShortUserName()); setupChildMapredLocalDirs(map, localConf); MapOutputFile mapOutput = new MROutputFiles(); mapOutput.setConf(localConf); mapOutputFiles.put(mapId, mapOutput); map.setJobFile(localJobFile.toString()); localConf.setUser(map.getUser()); map.localizeConfiguration(localConf); map.setConf(localConf); try { map_tasks.getAndIncrement(); myMetrics.launchMap(mapId); //调用MapTask里面的run方法 map.run(localConf, Job.this); myMetrics.completeMap(mapId); } finally { map_tasks.getAndDecrement(); } LOG.info("Finishing task: " + mapId); } catch (Throwable e) { this.storedException = e; } }
那么接下来要看一下maptask的run方法了:
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; ... //刚开始的时候跑过setUseNewAPI 所以这里我们会去看runNewMapper if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); }
在runNewMapper里面, 调用了map.run方法, 实际上是循环执行了我们在MR里面写的map的程序。 然后最后把所有的K,v输出变成k, IterV
private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // 根据我们设置的Mapper(job.setMapperClass)来创建mapper类 org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // 根据设置的InputFormart来创建类 (job.setInputFormatClass) org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // 根据前面拿到的splitinfo 创建InputSplit org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split); //创建RecordReader, 用来读数据 org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext); job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null; // 创建输出类, 在context.write的时候会调用,正常情况下就是NewOutputCollector if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, job, umbilical, reporter); } // org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> mapContext = new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split); //创建Context org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context mapperContext = new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); try { //初始化 input.initialize(split, mapperContext); //跑Mapper的run方法, 实际上就是只要还有输入值, 会一直调用我们MR代码里面的map方法 mapper.run(mapperContext); mapPhase.complete(); setPhase(TaskStatus.Phase.SORT); statusUpdate(umbilical); input.close(); input = null; //××××××××××××××××××××××××××××××××××××××××××××××××××××××× //close output, 这里还会做shuffle, 把k,v变成 K iterV output.close(mapperContext); output = null; } finally { closeQuietly(input); closeQuietly(output, mapperContext); } }
当调用Context.write的时候, 其实是调用上面的mapperContext.write, 间接的调用了mapContext.write, 最终是调用了TaskInputOutputContextImpl里面的write方法, 实则是促发了NewOutputCollector类的write的方法, 那么我们看一下当我们写write的时候发生了什么:
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException { collector = createSortingCollector(job, reporter); partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } } public void write(K key, V value) throws IOException, InterruptedException { collector.collect(key, value, partitioner.getPartition(key, value, partitions)); }
可以看到这里在写write方法的时候已经使用了partitioner的类去划分
其中的collector是createSortingCollector(job, reporter)创建的, 点进去看的话是Class<?>[] collectorClasses = job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
默认就是MapOutputBuffer.class, 所以我们看一下MapOutputBuffer.collect
这里其实就是把数据写到缓存, 然后如果缓存满了的话就写到磁盘, 存储是先按partition的index加上K v的offset来对应实际的存储内容, 里面好多都是计算存储位置加上缓存的处理, 大家随便看看就是了, 整个过程就是通过startSpill激活spill, 然后计算,存放数据, 在这个类的init里面就启动了spillthread, 我们看看这个线程里面是干嘛的:
public void run() { spillLock.lock(); spillThreadRunning = true; try { while (true) { spillDone.signal(); while (!spillInProgress) { spillReady.await(); } try { spillLock.unlock(); //排序然后调用sort和combiner去做处理 sortAndSpill(); } catch (Throwable t) { sortSpillException = t; } finally { spillLock.lock(); if (bufend < bufstart) { bufvoid = kvbuffer.length; } kvstart = kvend; bufstart = bufend; spillInProgress = false; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { spillLock.unlock(); spillThreadRunning = false; } } }
主要就是靠sortAndSpillcombine (job.setCombinerClass):
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions final long size = distanceTo(bufstart, bufend, bufvoid) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); final int mstart = kvend / NMETA; final int mend = 1 + // kvend is a valid record (kvstart >= kvend ? kvstart : kvmeta.capacity() + kvstart) / NMETA; // sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); int spindex = mstart; final IndexRecord rec = new IndexRecord(); final InMemValBytes value = new InMemValBytes(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out); writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec, spilledRecordsCounter); //如果有设置了combiner则先跑一下combiner if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { final int kvoff = offsetFor(spindex % maxRec); int keystart = kvmeta.get(kvoff + KEYSTART); int valstart = kvmeta.get(kvoff + VALSTART); key.reset(kvbuffer, keystart, valstart - keystart); getVBytesForOffset(kvoff, value); writer.append(key, value); ++spindex; } } else { int spstart = spindex; while (spindex < mend && kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); //组成K IterV来作为combiner的输入 RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); //跑combiner combinerRunner.combine(kvIter, combineCollector); } } // close the writer writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, i); writer = null; } finally { if (null != writer) writer.close(); } } if (totalIndexCacheMemory >= indexCacheMemoryLimit) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); } }
然后Map做完后就会回到runNewMapper 去做output.close, 之前说过close动作回去做flush:
@Override public void close(TaskAttemptContext context ) throws IOException,InterruptedException { try { collector.flush(); } catch (ClassNotFoundException cnf) { throw new IOException("can't find class ", cnf); } collector.close(); }
collector的flush动作其实回去做把结果集成并且排序这个动作, 就是会调用job.setSortComparatorClass设定的方法, 生成最终的Map端输出:
public void flush() throws IOException, ClassNotFoundException, InterruptedException { LOG.info("Starting flush of map output"); spillLock.lock(); try { while (spillInProgress) { reporter.progress(); spillDone.await(); } checkSpillException(); final int kvbend = 4 * kvend; if ((kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE)) { // spill finished resetSpill(); } if (kvindex != kvend) { kvend = (kvindex + NMETA) % kvmeta.capacity(); bufend = bufmark; LOG.info("Spilling map output"); LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark + "; bufvoid = " + bufvoid); LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) + "); kvend = " + kvend + "(" + (kvend * 4) + "); length = " + (distanceTo(kvend, kvstart, kvmeta.capacity()) + 1) + "/" + maxRec); //如果还有内容没有写完, 再写一次, 把剩余的写进去 sortAndSpill(); } } catch (InterruptedException e) { throw new IOException("Interrupted while waiting for the writer", e); } finally { spillLock.unlock(); } assert !spillLock.isHeldByCurrentThread(); // shut down spill thread and wait for it to exit. Since the preceding // ensures that it is finished with its work (and sortAndSpill did not // throw), we elect to use an interrupt instead of setting a flag. // Spilling simultaneously from this thread while the spill thread // finishes its work might be both a useful way to extend this and also // sufficient motivation for the latter approach. try { spillThread.interrupt(); spillThread.join(); } catch (InterruptedException e) { throw new IOException("Spill failed", e); } // release sort buffer before the merge kvbuffer = null; //把所有的map输出组合成一个, 排序 mergeParts(); Path outputPath = mapOutputFile.getOutputFile(); fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen()); }
可以看到最重要的方法是mergeParts, 他把所有的内容的存到一个file里面, 并且根据我们设置的类 去做排序, 我们看一下这个方法里面做了什么:
private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException { ... { ... int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100); // sort the segments only if there are intermediate merges boolean sortSegments = segmentList.size() > mergeFactor; //merge @SuppressWarnings("unchecked") //将所有的内容merge成一个K IterV RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase(), TaskType.MAP); //write merged output to disk long segmentStart = finalOut.getPos(); FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut); Writer<K, V> writer = new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null || numSpills < minSpillsForCombine) { //如果没有combiner则结果可以直接输出 Merger.writeFile(kvIter, writer, reporter, job); } else { //否则要再跑一遍combiner, 因为从其他map里面数据有可能混了 combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); } //close writer.close(); sortPhase.startNextPhase(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job); rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job); spillRec.putIndex(rec, parts); } spillRec.writeToFile(finalIndexFile, job); finalOut.close(); for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); } } }
好了到这里为止 基本上Map这边就搞清楚了, reducer这边有空再看一下, 入口也是runTask和 reducerunnable这两个方法。
发表评论
-
kafka + flume + hdfs + zookeeper + spark 测试环境搭建
2017-07-20 11:28 1108最近由于项目需要, 搭建了一个类似线上环境的处理流数据的环境 ... -
YARNRunner的运行原理总结
2016-10-25 17:52 1137之前看了那么些源码, 大致对整个Yarn的运行过程有了一个了解 ... -
Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析(下)
2016-10-11 13:53 2444中间隔了国庆, 好不容易才看明白了MRAppMaster如何启 ... -
Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析 (中)
2016-09-27 13:25 1590继续上一篇文章, 那时候AM Allocation已经生成, ... -
Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析 (上)
2016-09-24 16:46 3603参考了一篇文章, 才看懂了Yarnrunner的整个流程: h ... -
Hadoop的Map端sort, partition, combiner以及Group
2016-09-05 15:15 1509Mapreduce在执行的时候首先会解析成KV键值对传送到Ma ... -
Hadoop 的WordCount
2016-08-30 19:41 633之前花了点时间玩spark, 现在开始学一下hadoop 前 ...
相关推荐
此外,还需要配置Job参数,如输入路径、输出路径、Mapper和Reducer类等,并提交Job到Hadoop集群执行。 压缩包中的"tfidf"文件可能是包含源代码、测试数据或者运行结果的文件。源代码可能包含了Mapper和Reducer的...
本文将深入探讨如何使用Java编程语言来操作Hadoop MapReduce进行基本实践,通过源码分析来理解其核心工作原理和编程模型。 MapReduce的核心思想是将大规模数据集分解成小块,然后在分布式集群上并行处理这些小块,...
MapReduce是Hadoop生态系统中的核心组件,主要用于处理和存储大规模数据。...通过阅读《Job本地提交过程源码分析及图解》这样的文档,我们可以深入学习MapReduce的工作原理,提升我们的Hadoop编程技能。
在Hadoop MapReduce框架中,Job的提交过程是整个分布式计算流程中的关键步骤。这个过程涉及到客户端、JobTracker(在Hadoop 2.x版本中被ResourceManager替代)和TaskTracker(在Hadoop 2.x版本中被NodeManager替代)...
4. **编写驱动程序**:驱动程序设置输入和输出路径,创建Job对象,设置Mapper和Reducer类,然后提交Job给Hadoop集群执行。 5. **运行和验证结果**:运行程序后,结果将写入到指定的输出路径,通常是一个或多个part-...
然后提交这个Job到Hadoop集群,让其执行MapReduce任务。 5. **结果处理**:MapReduce完成后,会生成输出文件,通常包含每个年份及其对应的最低温度。我们可以通过HDFS命令或者编程方式读取这些结果,进一步进行数据...
编写MapReduce程序通常需要以下jar包:hadoop-client, hadoop-common, hadoop-hdfs, hadoop-mapreduce-client-core和commons-cli。这些jar包可以通过Maven的pom文件来导入,从而确保代码能够正常编译和运行。 接...
《Hadoop源码分析——MapReduce深度解析》 Hadoop,作为云计算领域的核心组件,以其分布式存储和计算能力,为大数据处理提供了强大的支持。MapReduce是Hadoop的主要计算框架,其设计思想源于Google的论文,旨在解决...
Hadoop 源码解析_Yarn 源码解析 Hadoop 是一个基于 Java 的大数据处理框架,Yarn 是 Hadoop 的资源管理器,负责资源分配、任务调度和集群管理。下面是 Yarn 源码解析的知识点: 1. MR 程序提交 MR(MapReduce)...
MapReduce是Hadoop生态系统中的核心组件之一,主要用于处理和生成大规模数据集。它的设计目标是简化分布式计算,使得开发者能够专注于编写业务逻辑,而无需过多关注底层分布式系统的复杂性。 **1. MapReduce定义** ...
2. **MapReduce接口**:深入`org.apache.hadoop.mapreduce`包,理解Job、Mapper、Reducer和Partitioner等类的作用。 3. **YARN的资源调度**:研究`org.apache.hadoop.yarn`包,特别是ResourceManager和...
根据提供的内容,Hadoop.MapReduce的源码量总计达到92,372行,分为新旧两部分代码: - **旧代码**:主要位于`org.apache.hadoop.mapred.*`,包含55,277行代码,这部分代码多数未经重构,保留了一些必要的公共代码。...
最后,提交Job到Hadoop集群执行。 在开发MapReduce程序时,我们还需要了解一些关键概念: - InputFormat和OutputFormat: 分别定义输入数据和输出数据的格式,例如TextInputFormat和TextOutputFormat分别用于处理...
### 使用命令行编译打包运行自己...以上就是使用命令行编译打包运行自己的MapReduce程序的过程详解,包括了Hadoop 2.6.0版本的变化、编译打包流程、运行命令解析以及使用Eclipse进行开发的方法。希望对初学者有所帮助。
Hadoop架构分析之集群结构分析,Hadoop架构分析之HDFS架构分析,Hadoop架构分析之NN和DN原生文档解读,Hadoop MapReduce原理之流程图.Hadoop MapReduce原理之核心类Job和ResourceManager解读.Hadoop MapReduce原理之...
本篇文章将深入探讨“远程调用执行Hadoop Map/Reduce”的概念、原理及其实现过程,同时结合标签“源码”和“工具”,我们将涉及到如何通过编程接口与Hadoop集群进行交互。 Hadoop MapReduce是一种编程模型,用于大...
MapReduce是一种分布式编程模型,由Google在2004年提出,主要用于处理和生成大规模数据集。这个保姆级教程源码旨在...实践过程中,可以尝试修改源码,观察不同设置对结果的影响,以此加深对MapReduce工作原理的理解。
在源码包中,开发者可以查看到Hadoop的核心类和接口,例如`org.apache.hadoop.fs.FileSystem`接口定义了与文件系统的交互,`org.apache.hadoop.mapreduce.Job`类提供了提交MapReduce作业的方法。同时,源码中还包含...
MapReduce源码分析(主要四大模块,其他表示父目录下的.java文件的总称):1.org.apache.hadoop.mapred(旧版MapReduceAPI):( 1).jobcontrol(job作业直接控制类)(2 ).join :(作业作业中用于模仿数据连接处理...
那么这一次,我在已经初步阅读过MapReduce提交Job源码的基础上,根据【大数据入门笔记系列】第五小节SpringBoot集成hadoop开发环境(复杂版的WordCount)做出来的环境,通过Debug的方式来跟一下整个Job提交流程。...