`
hongs_yang
  • 浏览: 60991 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

hadoop-mapreduce中maptask运行分析

阅读更多

MapTask运行通过执行.run方法:

 

1.生成TaskAttemptContextImpl实例,此实例中的Configuration就是job本身。

 

2.得到用户定义的Mapper实现类,也就是map函数的类。

 

3.得到InputFormat实现类。

 

4.得到当前task对应的InputSplit.

 

5.通过InputFormat,得到对应的RecordReader

 

6.生成RecordWriter实例,

 

如果reduce个数为0,生成为MapTask.NewDirectOutputCollector

 

如果reduce个数不为0,但肯定是一个大于0的数,生成MapTask.NewOutputCollector

 

如果是有reduce的情况,在collector中会生成一个buffercollector用来进行内存排序。

 

通过mapreduce.job.map.output.collector.class配置,默认为MapTask.MapOutputBuffer

 

MapOutputBuffer中:

 

通过mapreduce.map.sort.spill.percent配置内存flush的比值,默认为0.8

 

spill的中文意思是溢出。

 

通过mapreduce.task.io.sort.mb配置内存bufer的大小,默认是100mb

 

通过mapreduce.task.index.cache.limit.bytes配置(还不知道是做什么的),默认为1024*1024

 

提示,这个配置是用来cache进行spill操作的index的大小。当spillindex达到此值的时候,

 

需要写入spillindex的文件。

 

通过map.sort.class配置排序实现类,默认为QuickSort,快速排序

 

通过mapreduce.map.output.compress.codec配置map的输出的压缩处理程序。

 

通过mapreduce.map.output.compress配置map输出是否启用压缩。默认为false.

 

MapOutputBuffer实例生成部分结束。

 

 

 

在生成MapTask.NewOutputCollector同时,会

 

检查是否用户有定义的Partitioner,默认是HashPartitioner

 

如果生成的实例为MapTask.NewDirectOutputCollector,也就是没有Reduce的情况下,

 

不执行排序操作也不执行buffer的缓冲操作,直接写入到output的文件中。

 

通过OutputFormatRecordWriter

 

 

 

 

 

 

 

 

 

以下是mapper.run方法的执行代码:

 

publicvoid run(Context context) throws IOException, InterruptedException {

 

setup(context);

 

try {

 

while (context.nextKeyValue()) {

 

map(context.getCurrentKey(), context.getCurrentValue(), context);

 

}

 

} finally {

 

cleanup(context);

 

}

 

}

 

由上面的代码可以看出,map运行时,会执行一次setup函数,完成时会执行一次cleanup函数。

 

中间只要有值就会调用map函数。

 

其中run中传入的context生成由来:

 

if (job.getNumReduceTasks() == 0) {

 

output =

 

newNewDirectOutputCollector(taskContext, job, umbilical, reporter);

 

} else {

 

output = newNewOutputCollector(taskContext, job, umbilical, reporter);

 

}

 

MapContextImpl实例,包含input(RecordReader)output,也就是上面提到的collector.

 

org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>

 

mapContext =

 

new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(),

 

input, output,

 

committer,

 

reporter, split);

 

WrappedMapper.Context实例。包含MapContextImpl实例。

 

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context

 

mapperContext =

 

new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(

 

mapContext);

 

 

 

 

 

接着看mapper.run中的context.nextKeyValue()函数:

 

调用WrappedMapper.Context.nextKeyValue()函数,-->

 

调用MapContextImpl.nextKeyValue函数,-->

 

调用RecordReader.nextKeyValue函数,RecordReader不在说明。

 

 

 

map函数对过程处理完成后,会通过context.write写入分析的数据,

 

context.write(word, one);

 

 

 

看看此部分是如何执行的:

 

调用WrappedMapper.Context.write-->

 

调用MapContextImpl.write-->TaskInputOutputContextImpl.write-->

 

MapTask.NewOutputCollector.write/MapTask.NewDirectOutputCollector.write

 

 

 

MapTask.NewDirectOutputCollector.write:

 

这个里面没什么可以说的,直接写入到输出文件中。

 

NewDirectOutputCollector(MRJobConfig jobContext,

 

JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)

 

throws IOException, ClassNotFoundException, InterruptedException {

 

............................................

 

out = outputFormat.getRecordWriter(taskContext);

 

............................................

 

}

 

 

 

写入函数的定义

 

publicvoid write(K key, V value)

 

throws IOException, InterruptedException {

 

reporter.progress();

 

long bytesOutPrev = getOutputBytes(fsStats);

 

直接写入文件。

 

out.write(key, value);

 

long bytesOutCurr = getOutputBytes(fsStats);

 

fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);

 

mapOutputRecordCounter.increment(1);

 

}

 

 

 

 

 

重点来看看MapTask.NewOutputCollector.write这部分的实现:

 

通过Partitioner来生成reducepartition值,调用MapOutputBuffer.collect函数。

 

也就是写入到buffer中。

 

publicvoid write(K key, V value) throws IOException, InterruptedException {

 

collector.collect(key, value,

 

partitioner.getPartition(key, value, partitions));

 

}

 

 

 

 

 

MapOutputBuffer.collector:

 

 

 

public synchronized void collect(K key, V value, final int partition

 

) throws IOException {

 

reporter.progress();

 

检查传入的key的类型是否是jobMapOutputKeyClass的值

 

if (key.getClass() != keyClass) {

 

thrownew IOException("Type mismatch in key from map: expected "

 

+ keyClass.getName() + ", received "

 

+ key.getClass().getName());

 

}

 

检查传入的value的类型是否是jobMapOutputValueClass的值。

 

if (value.getClass() != valClass) {

 

thrownew IOException("Type mismatch in value from map: expected "

 

+ valClass.getName() + ", received "

 

+ value.getClass().getName());

 

}

 

检查partition是否在指定的范围内。

 

if (partition < 0 || partition >= partitions) {

 

thrownew IOException("Illegal partition for " + key + " (" +

 

partition + ")");

 

}

 

检查sortSpillException的值是否为空,如果不为空,表示有spill错误,throw ioexception

 

checkSpillException();

 

把可写入的buffer的剩余部分减去一个固定的值,并检查可用的buffer是否达到了sortspill的值

 

默认是buffer0.8的大小,如果buffer0.8METASIZE取于不等于0时,

 

得到的值可能会比0.8METASIZE这么一点。

 

bufferRemaining -= METASIZE;

 

if (bufferRemaining <= 0) {

 

执行spill操作,这部分等下再进行分析

 

// start spill if the thread is not running and the soft limit has been

 

// reached

 

spillLock.lock();

 

try {

 

......................此部分代码先不看

 

} finally {

 

spillLock.unlock();

 

}

 

}

 

 

 

try {

 

第一次进入时,bufindex的值为0,以后的每一次是key.len+1+value.len+1的值增加。

 

// serialize key bytes into buffer

 

int keystart = bufindex;

 

key写入到此实例中的一个BlockingBuffer类型的属性bb中。这是一个buffer.

 

在写入时把bufferRemaining的值减去key.length的长度。这里面也会检查buffer是否够用

 

key写入到kvbuffer中,同时把bufindex的值加上key.lengthKvbuffer就是具体的buffer.

 

在执行写入key/value时,首先是先把bufferRemaining的值减去key.length/value.length的长度。

 

同时检查此时bufferRemaining的值是否会小于或等于0,如果是需要先做spill操作。

 

否则把数据写入kvbuffer中,并把bufindex的值加上key.length/value.length

 

具体的写入操作请查看MapTask.Buffer中的write函数。

 

keySerializer.serialize(key);

 

这个地方有可能会出现,为什么呢,因为buffer是不停在重复使用,当使用到后面时,

 

前面可能会已经执行了spill操作。因此到bufindex达到最后的时候,会回到开始位置接着写。

 

if (bufindex < keystart) {

 

// wrapped the key; must make contiguous

 

bb.shiftBufferedKey();

 

keystart = 0;

 

}

 

此时的valstart的值为key结束后的下一个下标值。按key同样的方式写入value

 

// serialize value bytes into buffer

 

finalint valstart = bufindex;

 

valSerializer.serialize(value);

 

下面这一行是一个长度为0byte array,不做操作。

 

// It's possible for records to have zero length, i.e. the serializer

 

// will perform no writes. To ensure that the boundary conditions are

 

// checked and that the kvindex invariant is maintained, perform a

 

// zero-length write into the buffer. The logic monitoring this could be

 

// moved into collect, but this is cleaner and inexpensive. For now, it

 

// is acceptable.

 

bb.write(b0, 0, 0);

 

通过bufmark属性标记下bufindex的值。并返回bufindex的值。此时bufindex的值是val结束的下标。

 

// the record must be marked after the preceding write, as the metadata

 

// for this record are not yet written

 

int valend = bb.markRecord();

 

 

 

mapOutputRecordCounter.increment(1);

 

mapOutputByteCounter.increment(

 

distanceTo(keystart, valend, bufvoid));

 

记录kvmeta信息,此处是一个IntBuffer的缓冲区,每次向kvmeta中写入4个下标的值,

 

第一次时,kvindex0,第二次是kvindex的值为kvmeta.capacity()-4的值。

 

也就是说第一次是从前面开始写,从第二次开始都是从后面向前面开始写。

 

partition的值写入到meta的第2个下标,把keystart写入到第一个下标,

 

valstart的值写入到meta的第0个下标,把value的长度写入到第三个下标。

 

Kvmetabuffer是如下图例的样子

 

4byte

4byte

4byte

4byte

4byte

4byte

4byte

4byte

4byte

4byte

4byte

4byte

VALSTART(0)

KEYSTART(1)

PARTITION(2)

VALLEN(3)

 

 

 

 

VALSTART(0)

KEYSTART(1)

PARTITION(2)

VALLEN(3)

 

 

 

// write accounting info

 

kvmeta.put(kvindex + PARTITION, partition);

 

kvmeta.put(kvindex + KEYSTART, keystart);

 

kvmeta.put(kvindex + VALSTART, valstart);

 

kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));

 

// advance kvindex

 

kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();

 

} catch (MapBufferTooSmallException e) {

 

LOG.info("Record too large for in-memory buffer: " + e.getMessage());

 

spillSingleRecord(key, value, partition);

 

mapOutputRecordCounter.increment(1);

 

return;

 

}

 

}

 

 

 

写入数据到buffer中的实现:

 

通过MapTask.MapOutputBuffer.Buffer.write方法

 

public void write(byte b[], int off, int len)

 

throws IOException {

 

// must always verify the invariant that at least METASIZE bytes are

 

// available beyond kvindex, even when len == 0

 

bufferRemaining -= len;

 

if (bufferRemaining <= 0) {

 

...............................................

 

}

 

此处检查bufindex(kvbuffer中现在的下标值)+len是否达到了bufvoid(默认是kvbuffer的最后)

 

如果执行过spill操作,buffer写入到下标的最后时,重新开始从0开始写入后,

 

bufvoid的值是上一次写入完成的bufmark的值(最后一次完成写入的下标)

 

也就是说现在写入已经达到buffer的最后位置,但是要写入的数据装不完,

 

如:要写入数据是5byte,但现在kvbuffer最后端只能写入3byte,

 

此时会把于下的2byte写入到kvbuffer的开始位置。这就是环行buffer

 

// here, we know that we have sufficient space to write

 

if (bufindex + len > bufvoid) {

 

finalint gaplen = bufvoid - bufindex;

 

System.arraycopy(b, off, kvbuffer, bufindex, gaplen);

 

len -= gaplen;

 

off += gaplen;

 

bufindex = 0;

 

}

 

System.arraycopy(b, off, kvbuffer, bufindex, len);

 

bufindex += len;

 

}

 

}

 

 

 

关于当bufindex的值小于keystart时,也就是环行部分重新开始写入时,执行的shiftBufferedKey

 

这个部分主要是把buffer中要写入的数据超过了buffer能在最后写入的值时:

 

write后示例值:

 

要写入的byte array [1,2,3,4,5]

 

执行写入后buffer的内容如下:最后只能存储3byte,这里把123写入到最后,

 

同时把45写入到最前面部分

 

4

5

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

1

2

3

 

 

 

执行shiftBufferedKey以后,此时buffer的内容变成如下:

 

1

2

3

4

5

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

protected void shiftBufferedKey() throws IOException {

 

此时的bufmark是上一次完成写入的bufindex的下标值,得到最后写入的几个byte的长度。

 

比如上面提到的,写5个,但最后只有3byte的长度,那么这里得到的就是3.

 

// spillLock unnecessary; both kvend and kvindex are current

 

int headbytelen = bufvoidbufmark;

 

bufvoid也就是可写入的最后下标的值修改成上一次完成写入的最后一个下标值。

 

bufvoid = bufmark;

 

finalint kvbidx = 4 * kvindex;

 

finalint kvbend = 4 * kvend;

 

finalint avail =

 

Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));

 

if (bufindex + headbytelen < avail) {

 

把环行部分,如上面最后从头开始写入的2byte向后移动3byte

 

System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);

 

bufer最后部分的3byte写入到开始部分。

 

System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);

 

bufindex向后移动几个byte,并重新计算可用的空间

 

bufindex += headbytelen;

 

bufferRemaining -= kvbuffer.length - bufvoid;

 

} else {

 

byte[] keytmp = newbyte[bufindex];

 

System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);

 

bufindex = 0;

 

out.write(kvbuffer, bufmark, headbytelen);

 

out.write(keytmp);

 

}

 

}

 

}

 

 

 

数据达到bufferlimit时,执行的spill操作:

 

if (bufferRemaining <= 0) {

 

// start spill if the thread is not running and the soft limit has been

 

// reached

 

spillLock.lock();

 

try {

 

do {

 

如果spillInProgress的值为true时,表示spill操作正在进行。

 

if (!spillInProgress) {

 

finalint kvbidx = 4 * kvindex;

 

第一次执行时kvend的值为0,第二次时是kvindex的上一个值(kvend)*4,

 

kvend表示已经完成的kvmeta的下标值,kvindex表示现在准备使用的下标值

 

finalint kvbend = 4 * kvend;

 

得到已经使用的字节数

 

// serialized, unspilled bytes always lie between kvindex and

 

// bufindex, crossing the equator. Note that any void space

 

// created by a reset must be included in "used" bytes

 

finalint bUsed = distanceTo(kvbidx, bufindex);

 

得到已经使用的字节数是否已经达到spill的配置大小,也就是buffer0.8默认。

 

finalboolean bufsoftlimit = bUsed >= softLimit;

 

这里表示spill完成,回收空间,

 

if ((kvbend + METASIZE) % kvbuffer.length !=

 

equator - (equator % METASIZE)) {

 

// spill finished, reclaim space

 

resetSpill();

 

bufferRemaining = Math.min(

 

distanceTo(bufindex, kvbidx) - 2 * METASIZE,

 

softLimit - bUsed) - METASIZE;

 

continue;

 

} elseif (bufsoftlimit && kvindex != kvend) {

 

发起spill操作

 

// spill records, if any collected; check latter, as it may

 

// be possible for metadata alignment to hit spill pcnt

 

startSpill();

 

finalint avgRec = (int)

 

(mapOutputByteCounter.getCounter() /

 

mapOutputRecordCounter.getCounter());

 

// leave at least half the split buffer for serialization data

 

// ensure that kvindex >= bufindex

 

finalint distkvi = distanceTo(bufindex, kvbidx);

 

finalint newPos = (bufindex +

 

Math.max(2 * METASIZE - 1,

 

Math.min(distkvi / 2,

 

distkvi / (METASIZE + avgRec) * METASIZE)))

 

% kvbuffer.length;

 

setEquator(newPos);

 

bufmark = bufindex = newPos;

 

finalint serBound = 4 * kvend;

 

// bytes remaining before the lock must be held and limits

 

// checked is the minimum of three arcs: the metadata space, the

 

// serialization space, and the soft limit

 

bufferRemaining = Math.min(

 

// metadata max

 

distanceTo(bufend, newPos),

 

Math.min(

 

// serialization max

 

distanceTo(newPos, serBound),

 

// soft limit

 

softLimit)) - 2 * METASIZE;

 

}

 

}

 

} while (false);

 

} finally {

 

spillLock.unlock();

 

}

 

}

 

 

 

发起startSpill操作

 

private void startSpill() {

 

assert !spillInProgress;

 

记录住最后一个完成的kvindex的下标。

 

kvend = (kvindex + NMETA) % kvmeta.capacity();

 

记录住标记住的最后一个完成的kv写入在kvbuffer中的下标

 

bufend = bufmark;

 

设置spill操作正在进行

 

spillInProgress = true;

 

LOG.info("Spilling map output");

 

LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +

 

"; bufvoid = " + bufvoid);

 

LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +

 

"); kvend = " + kvend + "(" + (kvend * 4) +

 

"); length = " + (distanceTo(kvend, kvstart,

 

kvmeta.capacity()) + 1) + "/" + maxRec);

 

此处使用了java新的线程通信的方法,notify线程的等待。

 

spillReady.signal();

 

}

 

 

 

此时,MapTask.MapOutputBuffer.SpillThread线程接收到signal命令:

 

public void run() {

 

spillLock.lock();

 

spillThreadRunning = true;

 

try {

 

while (true) {

 

spillDone.signal();

 

while (!spillInProgress) {

 

如果线程发现spillInProgress的值是false时,等待,

 

buffer中的数据达到sortlimit的值时,通过spillReady.signalnotify此线程。

 

spillReady.await();

 

}

 

try {

 

spillLock.unlock();

 

sortAndSpill();

 

} catch (Throwable t) {

 

sortSpillException = t;

 

} finally {

 

spillLock.lock();

 

if (bufend < bufstart) {

 

bufvoid = kvbuffer.length;

 

}

 

kvstart = kvend;

 

bufstart = bufend;

 

spillInProgress = false;

 

}

 

}

 

} catch (InterruptedException e) {

 

Thread.currentThread().interrupt();

 

} finally {

 

spillLock.unlock();

 

spillThreadRunning = false;

 

}

 

}

 

}

 

 

 

执行排序与spill操作:

 

调用MapTask.MapOutputBuffer.sortAndSpill函数:

 

 

 

private void sortAndSpill() throws IOException, ClassNotFoundException,

 

InterruptedException {

 

//approximate the length of the output file to be the length of the

 

//buffer + header lengths for the partitions

 

finallong size = (bufend >= bufstart

 

? bufend - bufstart

 

: (bufvoid - bufend) + bufstart) +

 

partitions * APPROX_HEADER_LENGTH;

 

FSDataOutputStream out = null;

 

try {

 

生成写入文件,路径通过在mapreduce.cluster.local.dir配置中写入localmr路径

 

在路径下生成(attempid)_spill_(numspills).out或者output/spill(numspills).out文件。

 

// create spill file

 

final SpillRecord spillRec = new SpillRecord(partitions);

 

final Path filename =

 

mapOutputFile.getSpillFileForWrite(numSpills, size);

 

out = rfs.create(filename);

 

 

 

finalint mstart = kvend / NMETA;

 

finalint mend = 1 + // kvend is a valid record

 

(kvstart >= kvend

 

? kvstart

 

: kvmeta.capacity() + kvstart) / NMETA;

 

执行排序操作。把buffer中的数据进行排序。排序的比较器通过MapOutputBuffer.compare,

 

默认是通过key进行排序。

 

sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);

 

int spindex = mstart;

 

final IndexRecord rec = new IndexRecord();

 

final InMemValBytes value = new InMemValBytes();

 

for (int i = 0; i < partitions; ++i) {

 

IFile.Writer<K, V> writer = null;

 

try {

 

long segmentStart = out.getPos();

 

writer = new Writer<K, V>(job, out, keyClass, valClass, codec,

 

spilledRecordsCounter);

 

检查是否有combiner处理程序,如果没有,直接把buffer中排序后的数据写入到spill文件中。

 

注意,写入时,数据是按partition从小到大写入。

 

if (combinerRunner == null) {

 

// spill directly

 

DataInputBuffer key = new DataInputBuffer();

 

while (spindex < mend &&

 

kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {

 

finalint kvoff = offsetFor(spindex % maxRec);

 

int keystart = kvmeta.get(kvoff + KEYSTART);

 

int valstart = kvmeta.get(kvoff + VALSTART);

 

key.reset(kvbuffer, keystart, valstart - keystart);

 

getVBytesForOffset(kvoff, value);

 

writer.append(key, value);

 

++spindex;

 

}

 

} else {

 

此时表示配置有combiner处理程序,通过执行combiner中的reduce程序,把数据进行处理后写入。

 

int spstart = spindex;

 

while (spindex < mend &&

 

kvmeta.get(offsetFor(spindex % maxRec)

 

+ PARTITION) == i) {

 

++spindex;

 

}

 

// Note: we would like to avoid the combiner if we've fewer

 

// than some threshold of records for a partition

 

if (spstart != spindex) {

 

combineCollector.setWriter(writer);

 

RawKeyValueIterator kvIter =

 

new MRResultIterator(spstart, spindex);

 

combinerRunner.combine(kvIter, combineCollector);

 

}

 

}

 

 

 

// close the writer

 

writer.close();

 

此处每写入一个partition的数据后,

 

生成一个针对此partition在文件中的开始位置,写入此partition的长度。

 

并添加到spillindex中。

 

// record offsets

 

rec.startOffset = segmentStart;

 

rec.rawLength = writer.getRawLength();

 

rec.partLength = writer.getCompressedLength();

 

spillRec.putIndex(rec, i);

 

 

 

writer = null;

 

} finally {

 

if (null != writer) writer.close();

 

}

 

}

 

如果splitindex中的cache的数据大于了配置的值,把新生成的spillindex写入index文件。

 

如果spillindex没有达到配置的值时,所有的spillindex文件存储到内存中,

 

如果达到了配置的值以后生成的spillindex文件不进行cache,直接写入到文件中,

 

后期在读取时通过numSpills的值来从文件中读取,

 

示例代码:for (int i = indexCacheList.size(); i < numSpills; ++i)

 

如上代码就是从indexCacheList.size开始,因为此时超过了cachespillindex直接写入到了文件。

 

把于下的spillindex从文件中读取出来。

 

 

 

if (totalIndexCacheMemory >= indexCacheMemoryLimit) {

 

// create spill index file

 

Path indexFilename =

 

mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions

 

* MAP_OUTPUT_INDEX_RECORD_LENGTH);

 

spillRec.writeToFile(indexFilename, job);

 

} else {

 

否则把spillindex添加到index cache中,并把长度累加起来。

 

indexCacheList.add(spillRec);

 

totalIndexCacheMemory +=

 

spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;

 

}

 

LOG.info("Finished spill " + numSpills);

 

++numSpills;

 

} finally {

 

if (out != null) out.close();

 

}

 

}

 

 

 

map输出的IFile - spill文件格式:

 

partition1

partition2

keylen(4)

vallen(4)

key

value

keylen(4)

vallen(4)

key

value

 

 

 

 

 

Mapspill-index文件格式:

 

partition1

partition2

SegmentStart(

partition的开始位置)

rawlen(总长度)

CompressedLength(总长度)

SegmentStart(

partition的开始位置)

rawlen(总长度)

CompressedLength(总长度)

 

 

 

 

 

此时,mapper.run函数完成,执行如下操作:output.close操作。

 

try {

 

input.initialize(split, mapperContext);

 

mapper.run(mapperContext);

 

mapPhase.complete();

 

setPhase(TaskStatus.Phase.SORT);

 

statusUpdate(umbilical);

 

input.close();

 

input = null;

 

output.close(mapperContext);

 

output = null;

 

} finally {

 

closeQuietly(input);

 

closeQuietly(output, mapperContext);

 

}

 

 

 

此处分析output.close主要分析有reduce的情况,如果没有reduce是直接关闭输出文件。

 

MapTask.NewOutputCollector.close

 

调用MapTask.MapOutputBuffer.flush把于下的数据spill到文件,等待SpillThread线程完成。

 

执行mergeParts函数合并小的spill文件。

 

public void close(TaskAttemptContext context

 

) throws IOException,InterruptedException {

 

try {

 

collector.flush();

 

} catch (ClassNotFoundException cnf) {

 

thrownew IOException("can't find class ", cnf);

 

}

 

collector.close();

 

}

 

}

 

 

 

MapOutputBuffer.flush函数操作

 

public void flush() throws IOException, ClassNotFoundException,

 

InterruptedException {

 

先把buffer中的数据执行sortspill操作。

 

LOG.info("Starting flush of map output");

 

spillLock.lock();

 

try {

 

while (spillInProgress) {

 

reporter.progress();

 

spillDone.await();

 

}

 

checkSpillException();

 

 

 

finalint kvbend = 4 * kvend;

 

if ((kvbend + METASIZE) % kvbuffer.length !=

 

equator - (equator % METASIZE)) {

 

// spill finished

 

resetSpill();

 

}

 

if (kvindex != kvend) {

 

kvend = (kvindex + NMETA) % kvmeta.capacity();

 

bufend = bufmark;

 

LOG.info("Spilling map output");

 

LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +

 

"; bufvoid = " + bufvoid);

 

LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +

 

"); kvend = " + kvend + "(" + (kvend * 4) +

 

"); length = " + (distanceTo(kvend, kvstart,

 

kvmeta.capacity()) + 1) + "/" + maxRec);

 

sortAndSpill();

 

}

 

} catch (InterruptedException e) {

 

thrownew IOException("Interrupted while waiting for the writer", e);

 

} finally {

 

spillLock.unlock();

 

}

 

assert !spillLock.isHeldByCurrentThread();

 

// shut down spill thread and wait for it to exit. Since the preceding

 

// ensures that it is finished with its work (and sortAndSpill did not

 

// throw), we elect to use an interrupt instead of setting a flag.

 

// Spilling simultaneously from this thread while the spill thread

 

// finishes its work might be both a useful way to extend this and also

 

// sufficient motivation for the latter approach.

 

Try {

 

等待spillthread操作完成。

 

spillThread.interrupt();

 

spillThread.join();

 

} catch (InterruptedException e) {

 

thrownew IOException("Spill failed", e);

 

}

 

// release sort buffer before the merge

 

kvbuffer = null;

 

合并所有的spill小文件。

 

mergeParts();

 

Path outputPath = mapOutputFile.getOutputFile();

 

fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());

 

}

 

 

 

mergeParts函数:

 

 

 

private void mergeParts() throws IOException, InterruptedException,

 

ClassNotFoundException {

 

// get the approximate size of the final output/index files

 

long finalOutFileSize = 0;

 

long finalIndexFileSize = 0;

 

final Path[] filename = new Path[numSpills];

 

final TaskAttemptID mapId = getTaskID();

 

首先得到所有的spill的数据文件。

 

for(int i = 0; i < numSpills; i++) {

 

filename[i] = mapOutputFile.getSpillFile(i);

 

finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();

 

}

 

如果只有一个spill文件,直接把生成的spill数据文件与索引文件生成为map的输出文件

 

说得坦白点就是把spill文件直接rename到目标mapoutput路径下

 

if (numSpills == 1) { //the spill is the final output

 

sameVolRename(filename[0],

 

mapOutputFile.getOutputFileForWriteInVolume(filename[0]));

 

if (indexCacheList.size() == 0) {

 

sameVolRename(mapOutputFile.getSpillIndexFile(0),

 

mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));

 

} else {

 

indexCacheList.get(0).writeToFile(

 

mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);

 

}

 

sortPhase.complete();

 

return;

 

}

 

如果包含多个spill文件,先读取未被cache到内存部分的索引文件(spillindex)

 

// read in paged indices

 

for (int i = indexCacheList.size(); i < numSpills; ++i) {

 

Path indexFileName = mapOutputFile.getSpillIndexFile(i);

 

indexCacheList.add(new SpillRecord(indexFileName, job));

 

}

 

 

 

//make correction in the length to include the sequence file header

 

//lengths for each partition

 

finalOutFileSize += partitions * APPROX_HEADER_LENGTH;

 

finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;

 

生成对应的output文件与output index文件。Index中记录有partition的开始位置与长度

 

job中的attempid目录下生成一个file.out文件是数据文件的输出

 

job中的attempid目录下生成一个file.out.index文件是数据索引文件

 

Path finalOutputFile =

 

mapOutputFile.getOutputFileForWrite(finalOutFileSize);

 

Path finalIndexFile =

 

mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);

 

 

 

//The output stream for the final single output file

 

FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);

 

如果numSpills个数为0表示没有生成输出文件,此时生成一个空的数据文件,并生成一个索引文件,

 

此索引文件中每一个partition的索引都为0

 

if (numSpills == 0) {

 

//create dummy files

 

IndexRecord rec = new IndexRecord();

 

SpillRecord sr = new SpillRecord(partitions);

 

try {

 

for (int i = 0; i < partitions; i++) {

 

long segmentStart = finalOut.getPos();

 

Writer<K, V> writer =

 

new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);

 

writer.close();

 

rec.startOffset = segmentStart;

 

rec.rawLength = writer.getRawLength();

 

rec.partLength = writer.getCompressedLength();

 

sr.putIndex(rec, i);

 

}

 

sr.writeToFile(finalIndexFile, job);

 

} finally {

 

finalOut.close();

 

}

 

sortPhase.complete();

 

return;

 

}

 

{

 

sortPhase.addPhases(partitions); // Divide sort phase into sub-phases

 

 

 

IndexRecord rec = new IndexRecord();

 

final SpillRecord spillRec = new SpillRecord(partitions);

 

此时,从最小的partition开始合并所有的小的spill文件

 

for (int parts = 0; parts < partitions; parts++) {

 

//create the segments to be merged

 

List<Segment<K,V>> segmentList =

 

new ArrayList<Segment<K, V>>(numSpills);

 

此处开始迭代所有的spill数据文件,得到spill文件中对应的partitionsegment,

 

添加到一个集合容器中(此时通过每一个spill文件对应的index可以拿到segment在文件中的位置)

 

for(int i = 0; i < numSpills; i++) {

 

IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);

 

 

 

Segment<K,V> s =

 

new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,

 

indexRecord.partLength, codec, true);

 

segmentList.add(i, s);

 

 

 

if (LOG.isDebugEnabled()) {

 

LOG.debug("MapId=" + mapId + " Reducer=" + parts +

 

"Spill =" + i + "(" + indexRecord.startOffset + "," +

 

indexRecord.rawLength + ", " + indexRecord.partLength + ")");

 

}

 

}

 

读取merge因子,通过mapreduce.task.io.sort.factor配置。默认为100

 

int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);

 

// sort the segments only if there are intermediate merges

 

boolean sortSegments = segmentList.size() > mergeFactor;

 

//merge

 

@SuppressWarnings("unchecked")

 

生成一个Merger.MergeQueue队列,根据所有此partition中的segments,

 

如果当前的spill文件个数超过了配置的merge因子的个数,把segment按文件大小从小到大排序。

 

RawKeyValueIterator kvIter = Merger.merge(job, rfs,

 

keyClass, valClass, codec,

 

segmentList, mergeFactor,

 

new Path(mapId.toString()),

 

job.getOutputKeyComparator(), reporter, sortSegments,

 

null, spilledRecordsCounter, sortPhase.phase(),

 

TaskType.MAP);

 

生成Writer实例,

 

//write merged output to disk

 

long segmentStart = finalOut.getPos();

 

Writer<K, V> writer =

 

new Writer<K, V>(job, finalOut, keyClass, valClass, codec,

 

spilledRecordsCounter);

 

如果combiner没有配置,

 

或者spill文件的个数还不达到mapreduce.map.combine.minspills配置的个数,默认为3

 

不执行combiner操作。直接写入文件。

 

if (combinerRunner == null || numSpills < minSpillsForCombine) {

 

Merger.writeFile(kvIter, writer, reporter, job);

 

} else {

 

否则执行combiner操作并写入文件。combiner其实可以理解为没有shufflereduce

 

combineCollector.setWriter(writer);

 

combinerRunner.combine(kvIter, combineCollector);

 

}

 

提示:Merger.MergeQueue队列中每next去读取一条记录,

 

就会从所有的segment中读取出最小的一个kv,并写入此kv的值,

 

去执行next操作把所有的segment都放到一个优先级堆中,通过优先堆排序取出最小的一个kv.

 

//close

 

writer.close();

 

 

 

sortPhase.startNextPhase();

 

记录当前partition的索引信息。

 

// record offsets

 

rec.startOffset = segmentStart;

 

rec.rawLength = writer.getRawLength();

 

rec.partLength = writer.getCompressedLength();

 

spillRec.putIndex(rec, parts);

 

}

 

所有partition合并完成后,写入索引文件。并删除spill的小数据文件。

 

spillRec.writeToFile(finalIndexFile, job);

 

finalOut.close();

 

for(int i = 0; i < numSpills; i++) {

 

rfs.delete(filename[i],true);

 

}

 

}

 

}

 

 

 

 

 

结束语:每个spill文件写入时会执行快速排序(内存中)combiner操作,

 

最后多个spill合并时使用外部排序(磁盘)来对文件进行比较并取出最小的kv,写入文件,

 

此时如果spill文件的个数超过配置的值时,会再做一次combiner操作。

 

 

 

 

 

0
0
分享到:
评论

相关推荐

    03-Hadoop-MapReduce.docx

    MapReduce工作流程包括JobTracker(在Hadoop 2.x中被ResourceManager替代)、TaskTracker(被NodeManager替代)以及MapTask和ReduceTask。JobTracker负责任务调度,TaskTracker执行实际的任务。 **1.5 WordCount...

    Hadoop-MapReduce.docx

    - MapTask:独立运行,负责Map阶段的数据处理。 - ReduceTask:负责Reduce阶段的数据处理,从MapTask获取处理后的数据并执行reduce()方法。 6. **MapReduce 编程规范** - Mapper:继承自定义父类,输入输出为...

    hadoop-2.10.0-src.tar.gz

    源码中,MapTask和ReduceTask的执行流程值得深入分析。 四、源码学习价值 阅读Hadoop 2.10.0的源码,可以帮助我们: 1. 理解Hadoop的内部工作机制,提升问题排查能力。 2. 学习分布式系统的设计与实现,为自定义...

    hadoop-src源代码

    1. `hadoop-mapreduce`:包含JobTracker、TaskTracker和Task等组件。JobTracker负责作业调度,TaskTracker在节点上执行任务,Task则表示一个具体的计算任务。 2. `MapReduceRuntime`:负责Map和Reduce任务的生命...

    Hadoop源代码分析(MapTask)

    Hadoop的MapTask类是Hadoop MapReduce框架中的一部分,负责执行Map任务。MapTask类继承自Task类,是MapReduce框架中的一个重要组件。本文将对MapTask类的源代码进行分析,了解其内部机制和实现细节。 MapTask类的...

    Hadoop-2.2.0源码包

    它包括JobTracker(已废弃,2.x版本中被ResourceManager取代)、TaskTracker(已废弃,由NodeManager取代)和Task(Map任务和Reduce任务)等组件。此外,还有Client API,用于编写MapReduce应用程序。 4. **hadoop-...

    hadoop-2.8.1-src.tar.gz

    Hadoop是Apache软件基金会开发的一个开源分布式计算框架,它允许在大规模集群中高效处理和存储海量数据。Hadoop 2.8.1是Hadoop发展中的一个重要版本,提供了许多改进和新特性,使得大数据处理更加稳定和高效。在这个...

    Hadoop技术-MapReduce工作原理.pptx

    **MapTask运行流程:** 1. **数据输入**:InputFormat类负责解析输入数据,如默认的TextInputFormat将文件逻辑地切割成多个split,每个split对应一个MapTask。split通常基于数据块(block)进行,以优化I/O操作。 2....

    hadoop-3.4.0-src.tar.gz

    在源码学习过程中,理解Hadoop的配置体系也至关重要,如`hadoop-common-project/hadoop-common`模块中的配置文件,它们定义了Hadoop系统运行的各种参数,直接影响到系统性能和稳定性。 总的来说,通过对"Hadoop-...

    Hadoop应用系列2--MapReduce原理浅析(上)

    `MapTask`和`ReduceTask`类则分别负责Map和Reduce阶段的具体执行。此外,`Partitioner`控制分区逻辑,`OutputFormat`和`InputFormat`处理输入输出格式。 六、工具支持 Hadoop提供了丰富的工具来支持MapReduce作业...

    Hadoop.MapReduce.分析

    ### Hadoop.MapReduce 分析 #### 一、概述 Hadoop.MapReduce 是一种分布式计算模型,主要用于处理大规模数据集。其基本思想源自Google提出的MapReduce论文。本文将深入解析Hadoop.MapReduce的工作原理、核心组件...

    3大数据技术之Hadoop(MapReduce).doc

    MapReduce是大数据处理的核心技术之一,特别是在Hadoop生态系统中,它为处理海量数据提供了高效的分布式计算框架。MapReduce的设计理念是将复杂的分布式计算任务分解为简单、可并行执行的两个主要阶段:Map和Reduce...

    hadoop mapreduce helloworld 能调试

    3. **使用 DebugFlag**:在提交作业时,可以设置 `-Dmapred.map.task.debug.script` 和 `-Dmapred.reduce.task.debug.script` 参数,使得 Map 或 Reduce 任务在完成时生成一个脚本,用于进一步分析。 4. **使用可视...

    拓思爱诺大数据第五天-mapreduce编程

    MapReduce程序在Hadoop集群中运行时,涉及到以下三个关键组成部分: 1. **MRAppMaster**:作为整个作业的主控者,负责作业的初始化、任务调度与状态协调。 2. **MapTask**:每个MapTask负责处理一个数据块(通常...

    Hadoop_MapReduce云计算技术手册

    2006年,Doug Cutting将这些概念应用于Hadoop项目中,实现了开源版本的MapReduce和HDFS。 - **发展**:2006年1月,Doug Cutting加入Yahoo公司,专职于Hadoop项目的开发。此后,Hadoop得到了快速发展,并逐步成为一个...

    提高hadoop的mapreduce job效率笔记

    当某些 Task 运行缓慢时,Hadoop 可以启动额外的 Task 实例来尝试替换它们。这有助于减少整体作业时间,但也可能浪费资源。因此,合理配置推测执行阈值是必要的。 8. **Shuffle阶段优化**: 优化 Shuffle 阶段的...

    4-0大数据技术之Hadoop(MapReduce) (1)

    2. **MapTask**:执行Map阶段的数据处理。 3. **ReduceTask**:执行Reduce阶段的数据处理。 【数据序列化类型】 Hadoop提供了多种内置的可序列化类型,如BooleanWritable、IntWritable、Text等,用于在MapReduce中...

Global site tag (gtag.js) - Google Analytics