`

sources study-part 7-summary

阅读更多

here is my summry during reading the sources,consider to  my ability and  the complexity of hadoop,and i have not read all the sources,there will be some inlogical statements in them,so if you find a little uncomortable in them,tell me your ideas:)

 

一.概念
Map(Mapper class)是一个单独的map task,一个InputSplit产生一个map task(Reduce类似 ,reducer depend on client before submiting  a job)
note: MultithreadedMapper是一个split生成多个mappers,每个mapper并行地运行一个sub split.TOCONFIRM
 MultithreadedMapRunner:以多个线程执行原来的map()方法,相当于是多个线程实现多个记录的处理.
ACL:(SecurityUtil package)AccessControlList,包含users set和groups set...,形如权限访问控制.
TIP:TaskInProgress,contains task attempts,current task index etc.
JIP:JobInPorgress,
job status:PREP, RUNNING, FAILED, KILLED, SUCCEEDED (last three phases is called 'complete status')
map and red task status:STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP
TaskRunner:start a new task from a child jvm
TaskLauncher:start a new TaskRunner to run a task;there two(maplaunch and redlauncher) in a TT.


二.sources code
util.RunJar是命令上运行所有jar的入口.

// -- Properties and components of this instance --see java.net.URI
e.g. hdfs://localhost:10060/user/leibnitz/output/c8/
     file:///
    // Components of all URIs: [<scheme>:]<scheme-specific-part>[#<fragment>]
    private transient String scheme;        // null ==> relative URI
    private transient String fragment;

    // Hierarchical URI components: [//<authority>]<path>[?<query>]
    private transient String authority;        // Registry or server,e.g. localhost:10060

    // Server-based authority: [<userInfo>@]<host>[:<port>]
    private transient String userInfo;
    private transient String host;        // null ==> registry-based
    private transient int port = -1;        // -1 ==> undefined

    // Remaining components of hierarchical URIs
    private transient String path;        // null ==> opaque
    private transient String query;

DFSClient
将要传输的blocks进行分包(chunk,其中的包大小信赖于bytesPerChecksum和校验和占用的字节大小(默认是一个int,即4bytes)
block->packets[64k/packet];packet->chunks
file data => blocks => packets => chunks => checksum size[4B] + chunk data(bytesPerChecksum[512B]),"=>" means split to.

DFSClient上传文件时,启动一个DataStreamer作代理来向其它DNs来传输,并且对于每个block,都会生成一个新的ResponseProcessor来处理
所有DNs的所有acks;但同一时刻只有一对streamer-response;所有的dns都返回acks时就从ackQueue移除该packet;

同一块block的block id在所有的DNs上是一致的

mapred======
assert idx < nrBlocks : "Incorrect index" that means:
expect that idx is less than nrBlocks,else throws a exception

the computation of split files is in the client,and it will sumbit to hdfs before submit a job to JT.

使用远程调试时(-Xrun debug)时,其实只能调试client提交job到JT前的client代码调试,因为提交后将由远程JT来运行,不再受控于client

default task scheduler:JobQueueJobInProgress,constins of two main elemtns:
a JobQueueJobInProgressListener which is used to schedule from client
b EagerTaskInitializationListener which is used to initiate the tasks of jobs.

when does TaskTracker lanuch a task?
when start a TT daemon,it will start two sub threads:map launcher and reduce launcher.these thread monitor the
 tasksToLaunch queue,if the heaertbean response add a task to it(TaskTracker.offerService()),then they will
 be nodified and get task to launch.

TaskScheduler:
it actually contains primaryly the jobs schedule.it will choice the best task(s) to start also.
if use /JobQueueTaskScheduler/,the jobs is scheduled by FIFO sequence.

offerSercie() -> TaskLaucher(thread for poll new task) => MapTaskRunner/ReduceTaskRunner(thread) -> JvmManager => JvmManagerForType => JvmRunner(thread) -> ShellCommandExecutor -> new Jvm instance
1.a task runne per task
2.a jvm manager in a TT,and two JvmMangaerForType are kept in a manager(map and red type);
and some JvmRunner are apawned by type manager.


TOCONF :=================
传输的时候根本没有考虑分块时是否是完整(比如如果分块边界是一个中文非最后一个字节时).
现在只看了传输部分原理 ,没看接收还原部分.初步估计是接收后按record来重新分配存储,如果 一个record跨越多个block时,使用Sequence file中的sync flag来实现




分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics