论坛首页 Java企业应用论坛

Nutch源代码解读--3

浏览 1488 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2012-04-30  
以上是Hadoop的基本流程,如果需要详细连接可以登入http://hadoop.apache.org阅读更加详细地资料。这里我对其先介绍下简要地。等下结合nutch里如何使用这些实现并行计算地。 Hadoop主要分为2个部分,一个是分布式文件系统,一个是MapReduce的编程模型或者说一个框架。MapReduce相信大家都会了解一点(http://labs.google.com/papers/mapreduce.html).以上就是Hadoop提供地MapReduce的编程模型。首先把输入的每个大文件,利用InputFormatter提供地getSplites()把一个大文件拆分成小块,再利用InputFormatter提供的RecordReader的格式进行读取把一个InputSplits转为init Key-value pairs.启动一个Mapper Task把输入的Init Key-value Pairs进行相应的处理,转化为Intermediate Key-value paris,这个时候Hadoop的框架会协助这些产生值进行Group,以及按照提供的Partiion Class对结果进行分区,那么分区的个数就是以后Reduce Task的个数。(当然还可以提供Combine Class对本机局部的输出信息进行预处理,这样减少了Reduce过程读入的信息量).Mapper Task的结果一般保存在SequenceFile Class里。如果没有定义Reduce Class 那么默认就吧Mapper Class的中间Key-values按照提供的OutputFormatter进行输出。如果有定义了Reduce Task,那么就会先执行Reduce Task,过程是:首先从分布在各个Servers上的相关Key的所有Partition数据都Copy到本机Local的文件系统中,再新建一个文件用与存储,同时对于这些Key-values进行Merge,Sort操作。接下来就是简单地通过遍历这个文件,One Key- One Iterator,进行遍历,利用提供的OutputFormatter将结果输出到文件系统。(^-^我的理解就是上面这样咯.如果有什么不对的地方,希望大家指出来). 下面就是具体结合这个fetcher过程里的并行计算代码,来具体领悟下吧。 首先来下下Fetcher.fetch()这个crawl main()里调用的方法吧。 Mapper方法: publicclass Fetcher extends ToolBase implements MapRunnable { publicvoid fetch(Path segment, int threads) throws IOException { JobConf job = new NutchJob(getConf());//通过Configuration新建一个JobConf job.setJobName("fetch " + segment);//命名JobName job.setInputPath(new Path(segment, CrawlDatum.GENERATE_DIR_NAME));//输入Map-Reduce的Input路径 job.setInputFormat(InputFormat.class);//设置InputFormat提供RecordReader以及InputSplits,对照之前的图 job.setMapRunnerClass(Fetcher.class);//设置Mapper Class job.setOutputPath(segment);//设置Mapper-Reduce Output输出路径 job.setOutputFormat(FetcherOutputFormat.class);//设置OuptFormater提供ReportWriter进行输出 job.setOutputKeyClass(Text.class);//输出的Key Class job.setOutputValueClass(FetcherOutput.class);//输出的 Value Class JobClient.runJob(job);//执行Hadoop Map-Reduce Task,calling run(RecordReader,OutputCollectort,Reporter) } . . . publicvoid run(RecordReader input, OutputCollector output, Reporter reporter) throws IOException { //执行已经经过InputFormat的处理,提供了RecordReader,reporter,output,进行Mapper Task .设置这些使得在FectcherThead内部线程里,可以使用input.next()读取Mapper Input this.input = input; this.output = output; this.reporter = reporter;//底下是启动一个FetcherThread的,具体内容之前的1,2已经提及 } 在以上的Mapper Task执行完之后,由于没有指定Reduce方法所以默认把Mapper Intermediate Key-value作为输出.具体地可以看FetcherOutputFormat类,里面提供了ReportWriter对结果进行输出。 这个就是Hadoop执行这个Fetcher的具体流程了。当然有一些map-reduce Task并没有使用实现这个MapRunnbale的方法,而是使用Mapper这个接口,通过写里面的map(WritableComparable, Writable, OutputCollector, Reporter)方法,来对输入的key-value进行相应地处理。比如默认地CrawlDb.createJob(Configuration, Path)里设置地默认的Mapper类CrawlDBFilter这个mapper类。具体大概地执行流程也是之前那样地。 介绍到这里,希望大家了解了Map-Reduce在Hadoop里怎么使用地,然后可以更好地看懂Nutch代码哈哈。 ^-^有问题,或者我有什么理解不对的地方,欢迎批评指点。Nutch学习笔记待续~~~~
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics