Hadoop make guarantee that the input to every reducer is sorted by key. The process by which the system performs the sort and transfers the map outputs to the reducers as inputs is known as the shuffle. The process can be show by below figure:
Each map task has a circular memory buffer that it writes the output to. The buffer is 100MB by defualt, a size that can be tuned by changing the io.sort.mb property. When the contents of the buffer reaches a certain theshold size (io.sort.spill.percent which has the default 0.80 or 80%), a background thread will start to spill the contents to disk. Map outputs will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time, the map will block until the spill is complete.
// In MapTask.java public synchronized void flush() throws IOException, ClassNotFoundException, InterruptedException { LOG.info("Starting flush of map output"); spillLock.lock(); try { while (kvstart != kvend) { reporter.progress(); spillDone.await(); } if (sortSpillException != null) { throw (IOException)new IOException("Spill failed" ).initCause(sortSpillException); } if (kvend != kvindex) { kvend = kvindex; bufend = bufmark; sortAndSpill(); } } catch (InterruptedException e) { throw (IOException)new IOException( "Buffer interrupted while waiting for the writer" ).initCause(e); } finally { spillLock.unlock(); } assert !spillLock.isHeldByCurrentThread(); // shut down spill thread and wait for it to exit. Since the preceding // ensures that it is finished with its work (and sortAndSpill did not // throw), we elect to use an interrupt instead of setting a flag. // Spilling simultaneously from this thread while the spill thread // finishes its work might be both a useful way to extend this and also // sufficient motivation for the latter approach. try { spillThread.interrupt(); spillThread.join(); } catch (InterruptedException e) { throw (IOException)new IOException("Spill failed" ).initCause(e); } // release sort buffer before the merge kvbuffer = null; mergeParts(); Path outputPath = mapOutputFile.getOutputFile(); fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen()); }
private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { //approximate the length of the output file to be the length of the //buffer + header lengths for the partitions long size = (bufend >= bufstart ? bufend - bufstart : (bufvoid - bufend) + bufstart) + partitions * APPROX_HEADER_LENGTH; FSDataOutputStream out = null; try { // create spill file final SpillRecord spillRec = new SpillRecord(partitions); final Path filename = mapOutputFile.getSpillFileForWrite(numSpills, size); out = rfs.create(filename); final int endPosition = (kvend > kvstart) ? kvend : kvoffsets.length + kvend; sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter); int spindex = kvstart; IndexRecord rec = new IndexRecord(); InMemValBytes value = new InMemValBytes(); for (int i = 0; i < partitions; ++i) { IFile.Writer<K, V> writer = null; try { long segmentStart = out.getPos(); writer = new Writer<K, V>(job, out, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null) { // spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { final int kvoff = kvoffsets[spindex % kvoffsets.length]; getVBytesForOffset(kvoff, value); key.reset(kvbuffer, kvindices[kvoff + KEYSTART], (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART])); writer.append(key, value); ++spindex; } } else { int spstart = spindex; while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { ++spindex; } // Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); } } // close the writer writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, i); writer = null; } finally { if (null != writer) writer.close(); } } if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) { // create spill index file Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH); spillRec.writeToFile(indexFilename, job); } else { indexCacheList.add(spillRec); totalIndexCacheMemory += spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH; } LOG.info("Finished spill " + numSpills); ++numSpills; } finally { if (out != null) out.close(); } }Spills are written in round-robin fashion to the directories specified by the mapred.local.dir property, in a job-specific directory.
// In LocalDirAllocator.java /** Get a path from the local FS. If size is known, we go * round-robin over the set of disks (via the configured dirs) and return * the first complete path which has enough space. * * If size is not known, use roulette selection -- pick directories * with probability proportional to their available space. */ public synchronized Path getLocalPathForWrite(String pathStr, long size, Configuration conf, boolean checkWrite ) throws IOException { confChanged(conf); int numDirs = localDirsPath.length; int numDirsSearched = 0; //remove the leading slash from the path (to make sure that the uri //resolution results in a valid path on the dir being checked) if (pathStr.startsWith("/")) { pathStr = pathStr.substring(1); } Path returnPath = null; Path path = new Path(pathStr); if(size == SIZE_UNKNOWN) { //do roulette selection: pick dir with probability //proportional to available size long[] availableOnDisk = new long[dirDF.length]; long totalAvailable = 0; //build the "roulette wheel" for(int i =0; i < dirDF.length; ++i) { availableOnDisk[i] = dirDF[i].getAvailable(); totalAvailable += availableOnDisk[i]; } // Keep rolling the wheel till we get a valid path Random r = new java.util.Random(); while (numDirsSearched < numDirs && returnPath == null) { long randomPosition = Math.abs(r.nextLong()) % totalAvailable; int dir = 0; while (randomPosition > availableOnDisk[dir]) { randomPosition -= availableOnDisk[dir]; dir++; } dirNumLastAccessed = dir; returnPath = createPath(path, checkWrite); if (returnPath == null) { totalAvailable -= availableOnDisk[dir]; availableOnDisk[dir] = 0; // skip this disk numDirsSearched++; } } } else { while (numDirsSearched < numDirs && returnPath == null) { long capacity = dirDF[dirNumLastAccessed].getAvailable(); if (capacity > size) { returnPath = createPath(path, checkWrite); } dirNumLastAccessed++; dirNumLastAccessed = dirNumLastAccessed % numDirs; numDirsSearched++; } } if (returnPath != null) { return returnPath; } //no path found throw new DiskErrorException("Could not find any valid local " + "directory for " + pathStr); }As revealled by above code, before it writes to disk, the thread first divides the data into partitions corresponding to the reducers that they will ultimately be sent to. Within each partition, the background thread performs an in-memory sort by key, and if there is a combiner function, it is run on the output of the sort. Running the combiner function makes for a more compact map output, so there is less data to write to local disk and to transfer to the reducer.
combinerRunner.combine(kvIter, combineCollector);Each time the memory buffer reaches the spill threshold, a new spill file is created, so after the map task has written its last output record, there could be several spill files.
// spill directly DataInputBuffer key = new DataInputBuffer(); while (spindex < endPosition && kvindices[kvoffsets[spindex % kvoffsets.length] + PARTITION] == i) { final int kvoff = kvoffsets[spindex % kvoffsets.length]; getVBytesForOffset(kvoff, value); key.reset(kvbuffer, kvindices[kvoff + KEYSTART], (kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART])); writer.append(key, value); ++spindex; }Before the task is finished, the spill files are merged into a single partitioned and sorted output file. The configuration property io.sort.factor controls the maximum number of streams to merge at once with default value 100.
// In MapTask.java private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException { // get the approximate size of the final output/index files long finalOutFileSize = 0; long finalIndexFileSize = 0; final Path[] filename = new Path[numSpills]; final TaskAttemptID mapId = getTaskID(); for(int i = 0; i < numSpills; i++) { filename[i] = mapOutputFile.getSpillFile(i); finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); } if (numSpills == 1) { //the spill is the final output rfs.rename(filename[0], new Path(filename[0].getParent(), "file.out")); if (indexCacheList.size() == 0) { rfs.rename(mapOutputFile.getSpillIndexFile(0), new Path(filename[0].getParent(),"file.out.index")); } else { indexCacheList.get(0).writeToFile( new Path(filename[0].getParent(),"file.out.index"), job); } return; } // read in paged indices for (int i = indexCacheList.size(); i < numSpills; ++i) { Path indexFileName = mapOutputFile.getSpillIndexFile(i); indexCacheList.add(new SpillRecord(indexFileName, job, null)); } //make correction in the length to include the sequence file header //lengths for each partition finalOutFileSize += partitions * APPROX_HEADER_LENGTH; finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH; Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); //The output stream for the final single output file FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); if (numSpills == 0) { //create dummy files IndexRecord rec = new IndexRecord(); SpillRecord sr = new SpillRecord(partitions); try { for (int i = 0; i < partitions; i++) { long segmentStart = finalOut.getPos(); Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null); writer.close(); rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); sr.putIndex(rec, i); } sr.writeToFile(finalIndexFile, job); } finally { finalOut.close(); } return; } { IndexRecord rec = new IndexRecord(); final SpillRecord spillRec = new SpillRecord(partitions); for (int parts = 0; parts < partitions; parts++) { //create the segments to be merged List<Segment<K,V>> segmentList = new ArrayList<Segment<K, V>>(numSpills); for(int i = 0; i < numSpills; i++) { IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts); Segment<K,V> s = new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset, indexRecord.partLength, codec, true); segmentList.add(i, s); if (LOG.isDebugEnabled()) { LOG.debug("MapId=" + mapId + " Reducer=" + parts + "Spill =" + i + "(" + indexRecord.startOffset + "," + indexRecord.rawLength + ", " + indexRecord.partLength + ")"); } } //merge @SuppressWarnings("unchecked") RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, job.getInt("io.sort.factor", 100), new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, null, spilledRecordsCounter); //write merged output to disk long segmentStart = finalOut.getPos(); Writer<K, V> writer = new Writer<K, V>(job, finalOut, keyClass, valClass, codec, spilledRecordsCounter); if (combinerRunner == null || numSpills < minSpillsForCombine) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); } //close writer.close(); // record offsets rec.startOffset = segmentStart; rec.rawLength = writer.getRawLength(); rec.partLength = writer.getCompressedLength(); spillRec.putIndex(rec, parts); } spillRec.writeToFile(finalIndexFile, job); finalOut.close(); for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); } } } } // MapOutputBufferIf there are at least three spill files (set by the min.num.spills.for.combine property), the combiner is run again before the output file is written (in case combiner is present). If there are only one or two spills, the potential reduction in map output size is not worth the overhead in invoking the combiner.
// Note: we would like to avoid the combiner if we've fewer // than some threshold of records for a partition if (spstart != spindex) { combineCollector.setWriter(writer); RawKeyValueIterator kvIter = new MRResultIterator(spstart, spindex); combinerRunner.combine(kvIter, combineCollector); }The output file partitions are made available to the reducers over HTTP. the maxinum number of worker threads used to serve the file partitions is controlled by the tasktracker.http.threads property; this setting is per tasktracker. The default of 40 may need to be increased for large clusters running large jobs.
// In TaskTracker.java String httpBindAddress = infoSocAddr.getHostName(); int httpPort = infoSocAddr.getPort(); this.server = new HttpServer("task", httpBindAddress, httpPort, httpPort == 0, conf, aclsManager.getAdminsAcl()); workerThreads = conf.getInt("tasktracker.http.threads", 40); server.setThreads(1, workerThreads);In MapReduce 2 this property is not applicable because the maximum number of threads is set automatically based on the number of processors on the machine. Also note that MapReduce 2 uses Netty, which by default allows up to twice as many threads as there are processors.
相关推荐
赠送jar包:hadoop-mapreduce-client-shuffle-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-shuffle-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-shuffle-2.5.1-sources.jar; 赠送Maven依赖...
赠送jar包:hadoop-mapreduce-client-shuffle-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-shuffle-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-shuffle-2.5.1-sources.jar; 赠送Maven依赖...
赠送jar包:hadoop-mapreduce-client-shuffle-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-shuffle-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-shuffle-2.6.5-sources.jar; 赠送Maven依赖...
赠送jar包:hadoop-mapreduce-client-shuffle-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-shuffle-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-shuffle-2.6.5-sources.jar; 赠送Maven依赖...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
hadoop-mapreduce-examples-2.7.1.jar
总的来说,实现MapReduce-Shuffle是一个很好的方式来加深对分布式计算模型的理解,尤其是对于那些希望在大数据处理领域工作的开发者来说。通过这样的实践,你不仅能掌握MapReduce的基本流程,还能学习到如何使用Java...
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-app-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.6.5-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-app-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-app-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-app-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.7.3-sources.jar; 赠送Maven依赖信息文件:...
hadoop-mapreduce-examples-2.6.5.jar 官方案例源码
赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:hadoop-mapreduce-client-core-2.6.5.jar 赠送原API文档:hadoop-mapreduce-client-core-2.6.5-javadoc.jar 赠送源代码:hadoop-mapreduce-client-core-2.6.5-sources.jar 包含翻译后的API文档:...
hadoop中的demo,wordcount列子用到的JAR包 用法: # 在容器里运行WordCount程序,该程序需要2个参数...hadoop jar hadoop-mapreduce-examples-2.7.1-sources.jar org.apache.hadoop.examples.WordCount input output
Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据...
hadoop-mapreduce-examples-2.0.0-alpha.jar
hadoop-mapreduce-client-core-2.5.1.jar,mapreduce必备组件,供学习使用 欢迎下载
赠送jar包:hadoop-mapreduce-client-common-2.7.3.jar; 赠送原API文档:hadoop-mapreduce-client-common-2.7.3-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-common-2.7.3-sources.jar; 赠送Maven依赖信息...