`
hongs_yang
  • 浏览: 61022 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

hadoop-mapreduce中reducetask运行分析

阅读更多

ReduceTask的运行

 

Reduce处理程序中需要执行三个类型的处理,

 

1.copy,从各mapcopy数据过来

 

2.sort,对数据进行排序操作。

 

3.reduce,执行业务逻辑的处理。

 

ReduceTask的运行也是通过run方法开始,

 

通过mapreduce.job.reduce.shuffle.consumer.plugin.class配置shuffleplugin,

 

默认是Shuffle实现类。实现ShuffleConsumerPlugin接口。

 

生成Shuffle实例,并执行plugininit函数进行初始化,

 

Class<? extendsShuffleConsumerPlugin> clazz =

 

job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);

 

 

 

shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);

 

LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);

 

 

 

ShuffleConsumerPlugin.Context shuffleContext =

 

newShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,

 

super.lDirAlloc, reporter, codec,

 

combinerClass, combineCollector,

 

spilledRecordsCounter, reduceCombineInputCounter,

 

shuffledMapsCounter,

 

reduceShuffleBytes, failedShuffleCounter,

 

mergedMapOutputsCounter,

 

taskStatus, copyPhase, sortPhase, this,

 

mapOutputFile, localMapFiles);

 

shuffleConsumerPlugin.init(shuffleContext);

 

执行shufflerun函数,得到RawKeyValueIterator的实例。

 

rIter = shuffleConsumerPlugin.run();

 

 

 

Shuffle.run函数定义:

 

.....................................

 

 

 

int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,

 

MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());

 

int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);

 

生成map的完成状态获取线程,并启动此线程,此线程中从am中获取此job中所有完成的mapevent

 

通过ShuffleSchedulerImpl实例把所有的map的完成的maphost,mapid,

 

等记录到mapLocations容器中。此线程每一秒执行一个获取操作。

 

// Start the map-completion events fetcher thread

 

final EventFetcher<K,V> eventFetcher =

 

new EventFetcher<K,V>(reduceId, umbilical, scheduler, this,

 

maxEventsToFetch);

 

eventFetcher.start();

 

下面看看EventFetcher.run函数的执行过程:以下代码中我只保留了代码的主体部分。

 

...................

 

EventFetcher.run:

 

public void run() {

 

int failures = 0;

 

........................

 

int numNewMaps = getMapCompletionEvents();

 

..................................

 

}

 

......................

 

}

 

EventFetcher.getMapCompletionEvents

 

..................................

 

MapTaskCompletionEventsUpdate update =

 

umbilical.getMapCompletionEvents(

 

(org.apache.hadoop.mapred.JobID)reduce.getJobID(),

 

fromEventIdx,

 

maxEventsToFetch,

 

(org.apache.hadoop.mapred.TaskAttemptID)reduce);

 

events = update.getMapTaskCompletionEvents();

 

.....................

 

for (TaskCompletionEvent event : events) {

 

scheduler.resolve(event);

 

if (TaskCompletionEvent.Status.SUCCEEDED == event.getTaskStatus()) {

 

++numNewMaps;

 

}

 

}

 

shecdulerShuffleShedulerImpl的实例。

 

ShuffleShedulerImpl.resolve

 

case SUCCEEDED:

 

URI u = getBaseURI(reduceId, event.getTaskTrackerHttp());

 

addKnownMapOutput(u.getHost() + ":" + u.getPort(),

 

u.toString(),

 

event.getTaskAttemptId());

 

maxMapRuntime = Math.max(maxMapRuntime, event.getTaskRunTime());

 

break;

 

.......

 

ShuffleShedulerImpl.addKnownMapOutput函数:

 

mapid与对应的host添加到mapLocations容器中,

 

MapHost host = mapLocations.get(hostName);

 

if (host == null) {

 

host = new MapHost(hostName, hostUrl);

 

mapLocations.put(hostName, host);

 

}

 

此时会把host的状设置为PENDING

 

host.addKnownMap(mapId);

 

同时把host添加到pendingHosts容器中。notify相关的Fetcher文件copy线程。

 

// Mark the host as pending

 

if (host.getState() == State.PENDING) {

 

pendingHosts.add(host);

 

notifyAll();

 

}

 

.....................

 

 

 

回到ReduceTask.run函数中,接着向下执行

 

// Start the map-output fetcher threads

 

boolean isLocal = localMapFiles != null;

 

通过mapreduce.reduce.shuffle.parallelcopies配置的值,默认为5,生成获取map数据的线程数。

 

生成Fetcher线程实例,并启动相关的线程。

 

通过mapreduce.reduce.shuffle.connect.timeout配置连接超时时间。默认180000

 

通过mapreduce.reduce.shuffle.read.timeout配置读取超时时间,默认为180000

 

finalint numFetchers = isLocal ? 1 :

 

jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);

 

Fetcher<K,V>[] fetchers = new Fetcher[numFetchers];

 

if (isLocal) {

 

fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,

 

merger, reporter, metrics, this, reduceTask.getShuffleSecret(),

 

localMapFiles);

 

fetchers[0].start();

 

} else {

 

for (int i=0; i < numFetchers; ++i) {

 

fetchers[i] = new Fetcher<K,V>(jobConf, reduceId, scheduler, merger,

 

reporter, metrics, this,

 

reduceTask.getShuffleSecret());

 

fetchers[i].start();

 

}

 

}

 

.........................

 

 

 

接下来进行Fetcher线程里面,看看Fetcher.run函数运行流程:

 

..........................

 

MapHost host = null;

 

try {

 

// If merge is on, block

 

merger.waitForResource();

 

ShuffleScheduler中取出一个MapHost实例,

 

// Get a host to shuffle from

 

host = scheduler.getHost();

 

metrics.threadBusy();

 

执行shuffle操作。

 

// Shuffle

 

copyFromHost(host);

 

} finally {

 

if (host != null) {

 

scheduler.freeHost(host);

 

metrics.threadFree();

 

}

 

}

 

接下来看看ShuffleScheduler中的getHost函数:

 

........

 

如果pendingHosts的值没有,先wait住,等待EventFetcher线程去获取数据来notifywait

 

while(pendingHosts.isEmpty()) {

 

wait();

 

}

 

 

 

MapHost host = null;

 

Iterator<MapHost> iter = pendingHosts.iterator();

 

pendingHostsrandom出一个MapHost,并返回给调用程序。

 

int numToPick = random.nextInt(pendingHosts.size());

 

for (int i=0; i <= numToPick; ++i) {

 

host = iter.next();

 

}

 

 

 

pendingHosts.remove(host);

 

........................

 

当得到一个MapHost后,执行copyFromHost来进行数据的copy操作。

 

此时,一个taskhosturl样子基本上是这个样子:

 

host:port/mapOutput?job=xxx&reduce=123(当前reducepartid)&map=

 

copyFromHost的代码部分:

 

.....

 

List<TaskAttemptID> maps = scheduler.getMapsForHost(host);

 

.....

 

Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);

 

.....

 

此部分完成后,url样子中map=后面会有很多个mapid,多个用英文的”,”号分开的。

 

URL url = getMapOutputURL(host, maps);

 

此处根据url打开http connection,

 

如果mapreduce.shuffle.ssl.enabled配置为true时,会打开SSL连接。默认为false.

 

openConnection(url);

 

.....

 

设置连接超时时间,header,读取超时时间等值。并打开HttpConnection的连接。

 

// put url hash into http header

 

connection.addRequestProperty(

 

SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);

 

// set the read timeout

 

connection.setReadTimeout(readTimeout);

 

// put shuffle version into http header

 

connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,

 

ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);

 

connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,

 

ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);

 

connect(connection, connectionTimeout);

 

.....

 

执行文件的copy操作。此处是迭代执行,每一个读取一个map的文件。

 

并把remaining中的值去掉一个。直到remaining的值全部读取完成。

 

TaskAttemptID[] failedTasks = null;

 

while (!remaining.isEmpty() && failedTasks == null) {

 

copyMapOutput函数中,每次读取一个mapid,

 

根据MergeManagerImpl中的reserve函数,

 

1.检查map的输出是否超过了mapreduce.reduce.memory.totalbytes配置的大小。

 

此配置的默认值

 

是当前RuntimemaxMemory*mapreduce.reduce.shuffle.input.buffer.percent配置的值。

 

Buffer.percent的默认值为0.90;

 

如果mapoutput超过了此配置的大小时,生成一个OnDiskMapOutput实例。

 

2.如果没有超过此大小,生成一个InMemoryMapOutput实例。

 

failedTasks = copyMapOutput(host, input, remaining);

 

}

 

copyMapOutput函数中首先调用的MergeManagerImpl.reserve函数:

 

if (!canShuffleToMemory(requestedSize)) {

 

.....

 

returnnew OnDiskMapOutput<K,V>(mapId, reduceId, this, requestedSize,

 

jobConf, mapOutputFile, fetcher, true);

 

}

 

.....

 

if (usedMemory > memoryLimit) {

 

.....,当前使用的memory已经超过了配置的内存使用大小,此时返回null

 

host重新添加到shuffleSchedulerpendingHosts队列中。

 

returnnull;

 

}

 

return unconditionalReserve(mapId, requestedSize, true);

 

生成一个 InMemoryMapOutput,并把usedMemory加上此mapoutput的大小。

 

privatesynchronized InMemoryMapOutput<K, V> unconditionalReserve(

 

TaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {

 

usedMemory += requestedSize;

 

returnnew InMemoryMapOutput<K,V>(jobConf, mapId, this, (int)requestedSize,

 

codec, primaryMapOutput);

 

}

 

 

 

下面是当usedMemory使用超过了指定的大小后,的处理部分,重新把host添加到队列中。

 

如下所示:copyMapOutput函数

 

if (mapOutput == null) {

 

LOG.info("fetcher#" + id + " - MergeManager returned status WAIT ...");

 

//Not an error but wait to process data.

 

returnEMPTY_ATTEMPT_ID_ARRAY;

 

}

 

此时host中还有没处理完成的mapoutput,Fetcher.run中,重新添加到队列中把此host

 

if (host != null) {

 

scheduler.freeHost(host);

 

metrics.threadFree();

 

}

 

.........

 

接下来还是在copyMapOutput函数中,

 

通过mapoutput也就是merge.reserve函数返回的实例的shuffle函数。

 

如果mapoutputInMemoryMapOutput,在调用shuffle时,直接把map输出写入到内存。

 

如果是OnDiskMapOutput,在调用shuffle时,直接把map的输出写入到local临时文件中。

 

....

 

最后,执行ShuffleScheduler.copySucceeded完成文件的copy,调用mapout.commit函数。

 

scheduler.copySucceeded(mapId, host, compressedLength,

 

endTime - startTime, mapOutput);

 

并从remaining中移出处理过的mapid,

 

 

 

接下来看看MapOutput.commit函数:

 

a.InMemoryMapOutput.commit函数:

publicvoid commit() throws IOException {

 

merger.closeInMemoryFile(this);

 

}

 

调用MergeManagerImpl.closeInMemoryFile函数:

 

publicsynchronizedvoid closeInMemoryFile(InMemoryMapOutput<K,V> mapOutput) {

 

把此mapOutput实例添加到inMemoryMapOutputs列表中。

 

inMemoryMapOutputs.add(mapOutput);

 

LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize()

 

+ ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size()

 

+ ", commitMemory -> " + commitMemory + ", usedMemory ->" + usedMemory);

 

commitMemory的大小增加当前传入的mapoutputsize大小。

 

commitMemory+= mapOutput.getSize();

 

检查是否达到merge的值,

 

此值是mapreduce.reduce.memory.totalbytes配置

 

*mapreduce.reduce.shuffle.merge.percent配置的值,

 

默认是当前Runtimememory*0.90*0.90

 

也就是说,只有有新的mapoutput加入,这个检查条件就肯定会达到

 

// Can hang if mergeThreshold is really low.

 

if (commitMemory >= mergeThreshold) {

 

.......

 

把正在进行mergemapoutput列表添加到一起发起merge操作。

 

inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);

 

inMemoryMergedMapOutputs.clear();

 

inMemoryMerger.startMerge(inMemoryMapOutputs);

 

commitMemory = 0L; // Reset commitMemory.

 

}

 

如果mapreduce.reduce.merge.memtomem.enabled配置为true,默认为false

 

同时inMemoryMapOutputs中的mapoutput个数

 

达到了mapreduce.reduce.merge.memtomem.threshold配置的值,

 

默认值是mapreduce.task.io.sort.factor配置的值,默认为100

 

发起memTomemmerger操作。

 

if (memToMemMerger != null) {

 

if (inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {

 

memToMemMerger.startMerge(inMemoryMapOutputs);

 

}

 

}

 

}

 

 

 

MergemanagerImpl.InMemoryMerger.merger函数操作:

 

在执行inMemoryMerger.startMerge(inMemoryMapOutputs);操作后,会notify此线程,

 

同时执行merger函数:

 

publicvoid merge(List<InMemoryMapOutput<K,V>> inputs) throws IOException {

 

if (inputs == null || inputs.size() == 0) {

 

return;

 

}

 

....................

 

TaskAttemptID mapId = inputs.get(0).getMapId();

 

TaskID mapTaskId = mapId.getTaskID();

 

 

 

List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K, V>>();

 

生成InMemoryReader实例,并把传入的容器清空,把生成好后的segment放到到inmemorysegments中。

 

long mergeOutputSize =

 

createInMemorySegments(inputs, inMemorySegments,0);

 

int noInMemorySegments = inMemorySegments.size();

 

生成一个输出的文件路径,

 

Path outputPath =

 

mapOutputFile.getInputFileForWrite(mapTaskId,

 

mergeOutputSize).suffix(

 

Task.MERGED_OUTPUT_PREFIX);

 

针对输出的临时文件生成一个Write实例。

 

Writer<K,V> writer =

 

new Writer<K,V>(jobConf, rfs, outputPath,

 

(Class<K>) jobConf.getMapOutputKeyClass(),

 

(Class<V>) jobConf.getMapOutputValueClass(),

 

codec, null);

 

 

 

RawKeyValueIterator rIter = null;

 

CompressAwarePath compressAwarePath;

 

try {

 

LOG.info("Initiating in-memory merge with " + noInMemorySegments +

 

" segments...");

 

此部分与map端的输出没什么区别,得到几个segment的文件的一个iterator,

 

此部分是一个优先堆,每一次next都会从所有的segment中读取出最小的一个keyvalue

 

rIter = Merger.merge(jobConf, rfs,

 

(Class<K>)jobConf.getMapOutputKeyClass(),

 

(Class<V>)jobConf.getMapOutputValueClass(),

 

inMemorySegments, inMemorySegments.size(),

 

new Path(reduceId.toString()),

 

(RawComparator<K>)jobConf.getOutputKeyComparator(),

 

reporter, spilledRecordsCounter, null, null);

 

如果没有combiner程序,直接写入到文件,否则,如果有combiner,先执行combiner处理。

 

if (null == combinerClass) {

 

Merger.writeFile(rIter, writer, reporter, jobConf);

 

} else {

 

combineCollector.setWriter(writer);

 

combineAndSpill(rIter, reduceCombineInputCounter);

 

}

 

writer.close();

 

此处与map端的输出不同的地方在这里,这里不写入spillindex文件,

 

而是生成一个 CompressAwarePath,把输出路径,大小写入到此实例中。

 

compressAwarePath = new CompressAwarePath(outputPath,

 

writer.getRawLength(), writer.getCompressedLength());

 

 

 

LOG.info(reduceId +

 

" Merge of the " + noInMemorySegments +

 

" files in-memory complete." +

 

" Local file is " + outputPath + " of size " +

 

localFS.getFileStatus(outputPath).getLen());

 

} catch (IOException e) {

 

//make sure that we delete the ondisk file that we created

 

//earlier when we invoked cloneFileAttributes

 

localFS.delete(outputPath, true);

 

throw e;

 

}

 

此处,把生成的文件添加到onDiskMapOutputs属性中,

 

并检查此容器中的文件是否达到了mapreduce.task.io.sort.factor配置的值,

 

如果是,发起diskmerger操作。

 

// Note the output of the merge

 

closeOnDiskFile(compressAwarePath);

 

}

 

 

 

}

 

上面最后一行的全部定义在下面这里。

 

publicsynchronizedvoid closeOnDiskFile(CompressAwarePath file) {

 

onDiskMapOutputs.add(file);

 

 

 

if (onDiskMapOutputs.size() >= (2 * ioSortFactor - 1)) {

 

onDiskMerger.startMerge(onDiskMapOutputs);

 

}

 

}

 

 

 

b.OnDiskMapOutput.commit函数:

 

tmp文件rename到指定的目录下,生成一个CompressAwarePath实例,调用上面提到的处理程序。

 

publicvoid commit() throws IOException {

 

fs.rename(tmpOutputPath, outputPath);

 

CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath,

 

getSize(), this.compressedSize);

 

merger.closeOnDiskFile(compressAwarePath);

 

}

 

 

 

MergeManagerImpl.OnDiskMerger.merger函数:

 

这个函数到现在基本上没有什么可以解说的东西,注意一点就是,

 

merge一个文件后,会把这个merge后的文件路径重新添加到onDiskMapOutputs 容器中。

 

publicvoid merge(List<CompressAwarePath> inputs) throws IOException {

 

// sanity check

 

if (inputs == null || inputs.isEmpty()) {

 

LOG.info("No ondisk files to merge...");

 

return;

 

}

 

 

 

long approxOutputSize = 0;

 

int bytesPerSum =

 

jobConf.getInt("io.bytes.per.checksum", 512);

 

 

 

LOG.info("OnDiskMerger: We have " + inputs.size() +

 

" map outputs on disk. Triggering merge...");

 

 

 

// 1. Prepare the list of files to be merged.

 

for (CompressAwarePath file : inputs) {

 

approxOutputSize += localFS.getFileStatus(file).getLen();

 

}

 

 

 

// add the checksum length

 

approxOutputSize +=

 

ChecksumFileSystem.getChecksumLength(approxOutputSize, bytesPerSum);

 

 

 

// 2. Start the on-disk merge process

 

Path outputPath =

 

localDirAllocator.getLocalPathForWrite(inputs.get(0).toString(),

 

approxOutputSize, jobConf).suffix(Task.MERGED_OUTPUT_PREFIX);

 

Writer<K,V> writer =

 

new Writer<K,V>(jobConf, rfs, outputPath,

 

(Class<K>) jobConf.getMapOutputKeyClass(),

 

(Class<V>) jobConf.getMapOutputValueClass(),

 

codec, null);

 

RawKeyValueIterator iter = null;

 

CompressAwarePath compressAwarePath;

 

Path tmpDir = new Path(reduceId.toString());

 

try {

 

iter = Merger.merge(jobConf, rfs,

 

(Class<K>) jobConf.getMapOutputKeyClass(),

 

(Class<V>) jobConf.getMapOutputValueClass(),

 

codec, inputs.toArray(new Path[inputs.size()]),

 

true, ioSortFactor, tmpDir,

 

(RawComparator<K>) jobConf.getOutputKeyComparator(),

 

reporter, spilledRecordsCounter, null,

 

mergedMapOutputsCounter, null);

 

 

 

Merger.writeFile(iter, writer, reporter, jobConf);

 

writer.close();

 

compressAwarePath = new CompressAwarePath(outputPath,

 

writer.getRawLength(), writer.getCompressedLength());

 

} catch (IOException e) {

 

localFS.delete(outputPath, true);

 

throw e;

 

}

 

 

 

closeOnDiskFile(compressAwarePath);

 

 

 

LOG.info(reduceId +

 

" Finished merging " + inputs.size() +

 

" map output files on disk of total-size " +

 

approxOutputSize + "." +

 

" Local output file is " + outputPath + " of size " +

 

localFS.getFileStatus(outputPath).getLen());

 

}

 

}

 

 

 

ok,现在mapcopy部分执行完成,回到ShuffleConsumerPluginrun方法中,

 

也就是Shufflerun方法中,接着上面的代码向下分析:

 

此处等待所有的copy操作完成,

 

// Wait for shuffle to complete successfully

 

while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {

 

reporter.progress();

 

 

 

synchronized (this) {

 

if (throwable != null) {

 

thrownew ShuffleError("error in shuffle in " + throwingThreadName,

 

throwable);

 

}

 

}

 

}

 

如果执行到这一行时,说明所有的map copy操作已经完成,

 

关闭查找map运行状态的线程与执行copy操作的几个线程。

 

// Stop the event-fetcher thread

 

eventFetcher.shutDown();

 

 

 

// Stop the map-output fetcher threads

 

for (Fetcher<K,V> fetcher : fetchers) {

 

fetcher.shutDown();

 

}

 

 

 

// stop the scheduler

 

scheduler.close();

 

am发送状态,通知AM,此时要执行排序操作。

 

copyPhase.complete(); // copy is already complete

 

taskStatus.setPhase(TaskStatus.Phase.SORT);

 

reduceTask.statusUpdate(umbilical);

 

 

 

执行最后的merge,其实在合并所有文件与memory中的数据时,也同时会进行排序操作。

 

// Finish the on-going merges...

 

RawKeyValueIterator kvIter = null;

 

try {

 

kvIter = merger.close();

 

} catch (Throwable e) {

 

thrownew ShuffleError("Error while doing final merge " , e);

 

}

 

 

 

// Sanity check

 

synchronized (this) {

 

if (throwable != null) {

 

thrownew ShuffleError("error in shuffle in " + throwingThreadName,

 

throwable);

 

}

 

}

 

最后返回这个合并后的iterator实例。

 

return kvIter;

 

 

 

Merger也就是MergeManagerImpl.close函数:

 

public RawKeyValueIterator close() throws Throwable {

 

关闭几个merge的线程,在关闭时会等待现有的merge完成。

 

// Wait for on-going merges to complete

 

if (memToMemMerger != null) {

 

memToMemMerger.close();

 

}

 

inMemoryMerger.close();

 

onDiskMerger.close();

 

 

 

List<InMemoryMapOutput<K, V>> memory =

 

new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);

 

inMemoryMergedMapOutputs.clear();

 

memory.addAll(inMemoryMapOutputs);

 

inMemoryMapOutputs.clear();

 

List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);

 

onDiskMapOutputs.clear();

 

执行最终的merge操作。

 

return finalMerge(jobConf, rfs, memory, disk);

 

}

 

最后的一个merge操作

 

private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,

 

List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,

 

List<CompressAwarePath> onDiskMapOutputs

 

) throws IOException {

 

LOG.info("finalMerge called with " +

 

inMemoryMapOutputs.size() + " in-memory map-outputs and " +

 

onDiskMapOutputs.size() + " on-disk map-outputs");

 

 

 

finalfloat maxRedPer =

 

job.getFloat(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, 0f);

 

if (maxRedPer > 1.0 || maxRedPer < 0.0) {

 

thrownew IOException(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT +

 

maxRedPer);

 

}

 

得到可以cache到内存的大小,比例通过mapreduce.reduce.input.buffer.percent配置,

 

int maxInMemReduce = (int)Math.min(

 

Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);

 

 

 

 

 

// merge configparams

 

Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();

 

Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();

 

boolean keepInputs = job.getKeepFailedTaskFiles();

 

final Path tmpDir = new Path(reduceId.toString());

 

final RawComparator<K> comparator =

 

(RawComparator<K>)job.getOutputKeyComparator();

 

 

 

// segments required to vacate memory

 

List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();

 

long inMemToDiskBytes = 0;

 

boolean mergePhaseFinished = false;

 

if (inMemoryMapOutputs.size() > 0) {

 

TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();

 

这个地方根据可cache到内存的值,把不能cache到内存的部分生成InMemoryReader实例,

 

并添加到memDiskSegments 容器中。

 

inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,

 

memDiskSegments,

 

maxInMemReduce);

 

finalint numMemDiskSegments = memDiskSegments.size();

 

把内存中多于部分的mapoutput数据写入到文件中,并把文件路径添加到onDiskMapOutputs容器中。

 

if (numMemDiskSegments > 0 &&

 

ioSortFactor > onDiskMapOutputs.size()) {

 

...........

 

此部分主要是写入内存中多于的mapoutput到磁盘中去

 

mergePhaseFinished = true;

 

// must spill to disk, but can't retain in-mem for intermediate merge

 

final Path outputPath =

 

mapOutputFile.getInputFileForWrite(mapId,

 

inMemToDiskBytes).suffix(

 

Task.MERGED_OUTPUT_PREFIX);

 

final RawKeyValueIterator rIter = Merger.merge(job, fs,

 

keyClass, valueClass, memDiskSegments, numMemDiskSegments,

 

tmpDir, comparator, reporter, spilledRecordsCounter, null,

 

mergePhase);

 

Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,

 

keyClass, valueClass, codec, null);

 

try {

 

Merger.writeFile(rIter, writer, reporter, job);

 

writer.close();

 

onDiskMapOutputs.add(new CompressAwarePath(outputPath,

 

writer.getRawLength(), writer.getCompressedLength()));

 

writer = null;

 

// add to list of final disk outputs.

 

} catch (IOException e) {

 

if (null != outputPath) {

 

try {

 

fs.delete(outputPath, true);

 

} catch (IOException ie) {

 

// NOTHING

 

}

 

}

 

throw e;

 

} finally {

 

if (null != writer) {

 

writer.close();

 

}

 

}

 

LOG.info("Merged " + numMemDiskSegments + " segments, " +

 

inMemToDiskBytes + " bytes to disk to satisfy " +

 

"reduce memory limit");

 

inMemToDiskBytes = 0;

 

memDiskSegments.clear();

 

} elseif (inMemToDiskBytes != 0) {

 

LOG.info("Keeping " + numMemDiskSegments + " segments, " +

 

inMemToDiskBytes + " bytes in memory for " +

 

"intermediate, on-disk merge");

 

}

 

}

 

 

 

// segments on disk

 

List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();

 

long onDiskBytes = inMemToDiskBytes;

 

long rawBytes = inMemToDiskBytes;

 

生成目前文件中有的所有的mapoutput路径的onDisk数组

 

CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(

 

new CompressAwarePath[onDiskMapOutputs.size()]);

 

for (CompressAwarePath file : onDisk) {

 

long fileLength = fs.getFileStatus(file).getLen();

 

onDiskBytes += fileLength;

 

rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;

 

 

 

LOG.debug("Disk file: " + file + " Length is " + fileLength);

 

把现在reduce端接收过来并存储到文件中的mapoutput生成segment并添加到distSegments容器中

 

diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,

 

(file.toString().endsWith(

 

Task.MERGED_OUTPUT_PREFIX) ?

 

null : mergedMapOutputsCounter), file.getRawDataLength()

 

));

 

}

 

LOG.info("Merging " + onDisk.length + " files, " +

 

onDiskBytes + " bytes from disk");

 

按内容的大小从小到大排序此distSegments容器

 

Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {

 

publicint compare(Segment<K, V> o1, Segment<K, V> o2) {

 

if (o1.getLength() == o2.getLength()) {

 

return 0;

 

}

 

return o1.getLength() < o2.getLength() ? -1 : 1;

 

}

 

});

 

把现在memory中所有的mapoutput内容生成segment并添加到finalSegments容器中。

 

// build final list of segments from merged backed by disk + in-mem

 

List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();

 

long inMemBytes = createInMemorySegments(inMemoryMapOutputs,

 

finalSegments, 0);

 

LOG.info("Merging " + finalSegments.size() + " segments, " +

 

inMemBytes + " bytes from memory into reduce");

 

if (0 != onDiskBytes) {

 

finalint numInMemSegments = memDiskSegments.size();

 

diskSegments.addAll(0, memDiskSegments);

 

memDiskSegments.clear();

 

// Pass mergePhase only if there is a going to be intermediate

 

// merges. See comment where mergePhaseFinished is being set

 

Progress thisPhase = (mergePhaseFinished) ? null : mergePhase;

 

这个部分是把现在磁盘上的mapoutput生成一个iterator,

 

RawKeyValueIterator diskMerge = Merger.merge(

 

job, fs, keyClass, valueClass, codec, diskSegments,

 

ioSortFactor, numInMemSegments, tmpDir, comparator,

 

reporter, false, spilledRecordsCounter, null, thisPhase);

 

diskSegments.clear();

 

if (0 == finalSegments.size()) {

 

return diskMerge;

 

}

 

把现在磁盘上的iterator也同样添加到finalSegments容器中,

 

也就是此时,这个容器中有两个优先堆排序的队列,每next一次,要从内存与磁盘中找出最小的一个kv.

 

finalSegments.add(new Segment<K,V>(

 

new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));

 

}

 

return Merger.merge(job, fs, keyClass, valueClass,

 

finalSegments, finalSegments.size(), tmpDir,

 

comparator, reporter, spilledRecordsCounter, null,

 

null);

 

 

 

}

 

 

 

shuffle部分现在全部执行完成,重新加到ReduceTask.run函数中,接着代码向下分析:

 

rIter = shuffleConsumerPlugin.run();

 

............

 

RawComparator comparator = job.getOutputValueGroupingComparator();

 

if (useNewApi) {

 

runNewReducer(job, umbilical, reporter, rIter, comparator,

 

keyClass, valueClass);

 

} else {

 

runOldReducer........

 

}

 

在以上代码中执行runNewReducer主要是执行reducerun函数,

 

org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =

 

new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,

 

getTaskID(), reporter);

 

// make a reducer

 

org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =

 

(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)

 

ReflectionUtils.newInstance(taskContext.getReducerClass(), job);

 

org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =

 

new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);

 

job.setBoolean("mapred.skip.on", isSkipping());

 

job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

 

org.apache.hadoop.mapreduce.Reducer.Context

 

reducerContext = createReduceContext(reducer, job, getTaskID(),

 

rIter, reduceInputKeyCounter,

 

reduceInputValueCounter,

 

trackedRW,

 

committer,

 

reporter, comparator, keyClass,

 

valueClass);

 

try {

 

reducer.run(reducerContext);

 

} finally {

 

trackedRW.close(reducerContext);

 

}

 

 

 

以上代码中创建Reducer运行的Context,并执行reducer.run函数:

 

createReduceContext函数定义部分代码:

 

org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>

 

reduceContext =

 

new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, taskId,

 

rIter,

 

inputKeyCounter,

 

inputValueCounter,

 

output,

 

committer,

 

reporter,

 

comparator,

 

keyClass,

 

valueClass);

 

 

 

org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context

 

reducerContext =

 

new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(

 

reduceContext);

 

ReduceContextImpl主要是执行在RawKeyValueInterator中读取数据的相关操作。

 

Reducer.run函数:

 

public void run(Context context) throws IOException, InterruptedException {

 

setup(context);

 

try {

 

while (context.nextKey()) {

 

reduce(context.getCurrentKey(), context.getValues(), context);

 

// If a back up store is used, reset it

 

Iterator<VALUEIN> iter = context.getValues().iterator();

 

if(iter instanceof ReduceContext.ValueIterator) {

 

((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();

 

}

 

}

 

} finally {

 

cleanup(context);

 

}

 

}

 

run函数中通过context.nextkey来得到下一行的数据,这部分主要在ReduceContextImpl中完成:

 

nextkey调用nextKeyValue函数:

 

public boolean nextKeyValue() throws IOException, InterruptedException {

 

if (!hasMore) {

 

key = null;

 

value = null;

 

returnfalse;

 

}

 

此处用来检查是否是一个key下面的第一个value,如果是第一个value时,此值为false,

 

也就是说,nextKeyIsSame的值是true时,表示现在next的数据与currentkey是一行数据。

 

否则表示已经进行了换行操作。

 

firstValue = !nextKeyIsSame;

 

执行一下RawKeyValueInterator(也就是Merge中的队列),得到当前最小的key

 

DataInputBuffer nextKey = input.getKey();

 

key设置到buffer中,设置到buffer中的目的是为了通过keyDeserializer来读取一个key的值。

 

currentRawKey.set(nextKey.getData(), nextKey.getPosition(),

 

nextKey.getLength() - nextKey.getPosition());

 

buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());

 

buffer中读取key的值,并存储到key中,这个地方要注意一下,

 

下面先看看这部分的定义:

 

.........................

 

生成一个keyDeserializer实例,

 

this.keyDeserializer = serializationFactory.getDeserializer(keyClass);

 

buffer当成keyDeserializerInputStream

 

this.keyDeserializer.open(buffer);

 

Deserializer中执行deserializer函数的定义:

 

此部分定义可以看出,一个key/value只会生成实例,此部分从性能上考虑主要是为了减少对象的生成。

 

每次生成一个数据时,都是通过readFields重新去生成Writable实例中的内容,

 

因此,很多同学在reduce中使用value时,会出现数据引用不对的情况,

 

因为对象还是同一个对象,但值是最后一个,所以会出现数据不对的情况

 

public Writable deserialize(Writable w) throws IOException {

 

Writable writable;

 

if (w == null) {

 

writable

 

= (Writable) ReflectionUtils.newInstance(writableClass, getConf());

 

} else {

 

writable = w;

 

}

 

writable.readFields(dataIn);

 

return writable;

 

}

 

.........................

 

读取key的内容

 

key = keyDeserializer.deserialize(key);

 

key相同的方式,得到当前的value的值,

 

DataInputBuffer nextVal = input.getValue();

 

buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength()

 

- nextVal.getPosition());

 

value = valueDeserializer.deserialize(value);

 

 

 

currentKeyLength = nextKey.getLength() - nextKey.getPosition();

 

currentValueLength = nextVal.getLength() - nextVal.getPosition();

 

 

 

isMarked的值为false,同时backupStore属性为null

 

if (isMarked) {

 

backupStore.write(nextKey, nextVal);

 

}

 

input执行一次next操作,此处会从所有的文件/memory中找到最小的一个kv.

 

hasMore = input.next();

 

if (hasMore) {

 

比较一下,是否与currentkey是同一个key,如果是表示在同一行中。也就是key相同。

 

nextKey = input.getKey();

 

nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,

 

currentRawKey.getLength(),

 

nextKey.getData(),

 

nextKey.getPosition(),

 

nextKey.getLength() - nextKey.getPosition()

 

) == 0;

 

} else {

 

nextKeyIsSame = false;

 

}

 

inputValueCounter.increment(1);

 

returntrue;

 

}

 

 

 

接下来是调用reduce函数,此时会通过context.getValues函数把key对应的所有的value传给reduce.

 

此处的context.getValues如下所示:

 

ReduceContextImpl.getValues()

 

public

 

Iterable<VALUEIN> getValues() throws IOException, InterruptedException {

 

returniterable;

 

}

 

以上代码中直接返回的是iterable的实例,此实例在ReduceContextImpl实例生成时生成。

 

private ValueIterable iterable = new ValueIterable();

 

这个类是ReduceContextImpl中的内部类

 

protected class ValueIterable implements Iterable<VALUEIN> {

 

private ValueIterator iterator = new ValueIterator();

 

@Override

 

public Iterator<VALUEIN> iterator() {

 

returniterator;

 

}

 

}

 

此实例中引用一个ValueIterator类,这也是一个内部类。

 

每次进行执行时,通过此ValueIterator.next来获取一条数据,

 

public VALUEIN next() {

 

inReset的值默认为false.也就是说inReset检查内部的代码不会执行,其实backupStore本身值就是null

 

如果想使用backupStore,需要执行其内部的make函数。

 

if (inReset) {

 

.................里面的代码不分析

 

}

 

如果是key下面的第一个value,firstValue设置为false,因为下一次来时,就不是firstValue.

 

返回当前的value

 

// if this is the first record, we don't need to advance

 

if (firstValue) {

 

firstValue = false;

 

returnvalue;

 

}

 

// if this isn't the first record and the next key is different, they

 

// can't advance it here.

 

if (!nextKeyIsSame) {

 

thrownew NoSuchElementException("iterate past last value");

 

}

 

// otherwise, go to the next key/value pair

 

try {

 

这里表示不是第一个value的时候,也就是firstValue的值为false,执行一下nextKeyValue函数,

 

得到当前的value.返回。

 

nextKeyValue();

 

returnvalue;

 

} catch (IOException ie) {

 

thrownew RuntimeException("next value iterator failed", ie);

 

} catch (InterruptedException ie) {

 

// this is bad, but we can't modify the exception list of java.util

 

thrownew RuntimeException("next value iterator interrupted", ie);

 

}

 

}

 

 

 

reduce执行完成后的输出,跟map端无reduce时的输出一样。直接输出。

 

0
0
分享到:
评论

相关推荐

    Hadoop-MapReduce.docx

    MapReduce 是 Hadoop 生态系统中的核心组件,是一种用于处理和生成大数据的编程模型。它被设计用来在大规模分布式环境下进行数据处理,尤其适合PB级别的离线批量计算任务。MapReduce 的基本理念是将复杂的计算任务...

    hadoop-2.10.0-src.tar.gz

    源码中,MapTask和ReduceTask的执行流程值得深入分析。 四、源码学习价值 阅读Hadoop 2.10.0的源码,可以帮助我们: 1. 理解Hadoop的内部工作机制,提升问题排查能力。 2. 学习分布式系统的设计与实现,为自定义...

    Hadoop-2.2.0源码包

    它包括JobTracker(已废弃,2.x版本中被ResourceManager取代)、TaskTracker(已废弃,由NodeManager取代)和Task(Map任务和Reduce任务)等组件。此外,还有Client API,用于编写MapReduce应用程序。 4. **hadoop-...

    Hadoop技术-MapReduce工作原理.pptx

    **ReduceTask运行流程:** 1. **数据拷贝**:Reduce进程启动数据复制线程,通过HTTP请求MapTask获取分配给自己的数据。 2. **内存到磁盘的合并**:数据被放入内存缓冲区,当达到阈值时,启动内存到磁盘的溢写,...

    hadoop-3.4.0-src.tar.gz

    在源码学习过程中,理解Hadoop的配置体系也至关重要,如`hadoop-common-project/hadoop-common`模块中的配置文件,它们定义了Hadoop系统运行的各种参数,直接影响到系统性能和稳定性。 总的来说,通过对"Hadoop-...

    3大数据技术之Hadoop(MapReduce).doc

    MapReduce是大数据处理的核心技术之一,特别是在Hadoop生态系统中,它为处理海量数据提供了高效的分布式计算框架。MapReduce的设计理念是将复杂的分布式计算任务分解为简单、可并行执行的两个主要阶段:Map和Reduce...

    hadoop mapreduce helloworld 能调试

    3. **使用 DebugFlag**:在提交作业时,可以设置 `-Dmapred.map.task.debug.script` 和 `-Dmapred.reduce.task.debug.script` 参数,使得 Map 或 Reduce 任务在完成时生成一个脚本,用于进一步分析。 4. **使用可视...

    Hadoop_MapReduce云计算技术手册

    2006年,Doug Cutting将这些概念应用于Hadoop项目中,实现了开源版本的MapReduce和HDFS。 - **发展**:2006年1月,Doug Cutting加入Yahoo公司,专职于Hadoop项目的开发。此后,Hadoop得到了快速发展,并逐步成为一个...

    拓思爱诺大数据第五天-mapreduce编程

    MapReduce程序在Hadoop集群中运行时,涉及到以下三个关键组成部分: 1. **MRAppMaster**:作为整个作业的主控者,负责作业的初始化、任务调度与状态协调。 2. **MapTask**:每个MapTask负责处理一个数据块(通常...

    4-0大数据技术之Hadoop(MapReduce) (1)

    3. **ReduceTask**:执行Reduce阶段的数据处理。 【数据序列化类型】 Hadoop提供了多种内置的可序列化类型,如BooleanWritable、IntWritable、Text等,用于在MapReduce中传输和存储数据。 【MapReduce 编程规范】...

    提高hadoop的mapreduce job效率笔记

    除了上述参数,还有其他 Job 配置可以优化,如设置适当的`mapreduce.task.io.sort.mb`(排序缓冲区大小)和`mapreduce.reduce.shuffle.parallelcopies`(并行复制副本数),以及启用压缩以减少中间数据的存储空间。...

    23、hadoop集群中yarn运行mapreduce的内存、CPU分配调度计算与优化

    在Hadoop集群中,YARN(Yet Another Resource Negotiator)作为资源管理器,负责调度MapReduce任务的内存和CPU资源。YARN支持基于内存和CPU的两种资源调度策略,以确保集群资源的有效利用。在非默认配置下,合理地...

    Hadoop MapReduce

    Hadoop MapReduce是一个用于处理大数据集的软件框架,它能够将应用程序以并行方式运行在成千上万的商用硬件节点上,同时保证了高可靠性与容错能力。它适用于处理多个TB级别的数据集。 在这个框架中,一个MapReduce...

    尚硅谷大数据技术之Hadoop(MapReduce)1

    - ReduceTask同样并行运行,但依赖所有MapTask的输出。 - 仅支持一个Map阶段和一个Reduce阶段,复杂逻辑需通过多个MapReduce作业串联。 1.4 MapReduce进程 MapReduce执行过程中涉及的主要进程包括JobTracker、...

    hadoop-xml配置

    MapReduce是Hadoop的数据处理模型,分为Map和Reduce两个阶段。`mapreduce.framework.name`设定运行模式,可以是经典的`local`或YARN(`yarn`)。`mapreduce.job.reduces`控制reduce任务的数量,影响数据的并行度和...

    第7章-MapReduce.pdf

    Task包括MapTask和ReduceTask,分别执行Map和Reduce操作。 MapReduce的具体应用非常广泛,可以处理各种类型的大数据问题,如数据挖掘、日志分析、统计分析等。Hadoop MapReduce是MapReduce的一个开源实现,由Apache...

    Hadoop技术内幕 深入理解MapReduce架构设计与实现原理 高清完整中文版PDF下载

    ### Hadoop技术内幕:深入解析MapReduce架构设计与实现原理 #### 一、Hadoop及其重要性 Hadoop是一个开放源代码的分布式计算框架,它能够处理大量的数据集,并通过集群提供高性能的数据处理能力。随着大数据时代的...

    大数据技术之Hadoop(MapReduce)

    - ReduceTask:处理Reduce阶段,对Map输出进行归约操作。 6. **MapReduce编程规范** - 用户编写Mapper和Reducer类,以及提交Job的Driver程序。 - Mapper接收键值对,处理后输出新的键值对。 - Reducer接收...

Global site tag (gtag.js) - Google Analytics