`

nutch源码阅读(9)-Fetch

 
阅读更多

Fetcher这个模块在Nutch中有单独一个包在实现,在org.apache.nutch.fetcher,其中有Fetcher.java, FetcherOutput 和FetcherOutputFormat来组成,看上去很简单,但其中使用到了多线程,多线程的生产者与消费者模型,MapReduce的多路径输出等方法。

下面我们来看一下Fetcher的注释,从中我们可以得到很多有用的信息。
首先,这是一种基于队列的fetcher方法,它使用了一种经典的线程模型,生产者(a-QueueFeeder)与消费者(many-FetcherThread)模型,注意,这里有多个消费者。生产者从Generate产生的fetchlists中分类得到一批FetchItemQueue,每一个FetchItmeQueue都是由一类相同host的FetchItem组成,这些FetchItem是用来描述被抓取的对象。当一个FetchItem从FetchItemQueue中取出后,QueueFeeder这个生产者会不断的向队列中加入新的FetchItem,直到这个队列满了为止或者已经没有fetchlist可读取,当队列中的所有FetchItem都被抓取完成后,所有抓取线程都会退出运行。每一个FetchItemQueue都有一套自己的抓取策略,如最大的并行抓取个数,两次抓取的间隔等,如果当FetcherThread向队列申请一个FetchItem时,FetchItemQueue发现当前的FetchItem没有满足抓取策略,那这里它就会返回null,表达当前FetchItem还没有准备好被抓取。如果这些所有FetchItem都没有准备好被抓取,那这时FetchThread就会进入等待状态,直到条件满足被促发或者是等待超时,它会认为任务已经被挂起,这时FetchThread会自动退出。

 

 

..................
..................
..................

 for (i = 0; i < depth; i++) {             // generate new segment
      Path[] segs = generator.generate(crawlDb, segments, -1, topN, System
          .currentTimeMillis());
      if (segs == null) {
        LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
        break;
      }
      //通过之前生成的segments抓取
      fetcher.fetch(segs[0], threads);  // fetch it
      if (!Fetcher.isParsing(job)) {
        parseSegment.parse(segs[0]);    // parse it, if needed
      }
      crawlDbTool.update(crawlDb, segs, true, true); // update crawldb
    }

..................
..................
..................

 

 

 

//threads默认通过fetcher.threads.fetch设置,也可通过参数传递
 public void fetch(Path segment, int threads)
    throws IOException {
    //设置agentName
    //在http.agent.name中设置,会检查在http.robots.agents中是否存在
    checkConfiguration();
    //记录开始时间,以及segment路径
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    long start = System.currentTimeMillis();
    if (LOG.isInfoEnabled()) {
      LOG.info("Fetcher: starting at " + sdf.format(start));
      LOG.info("Fetcher: segment: " + segment);
    }

    // set the actual time for the timelimit relative
    // to the beginning of the whole job and not of a specific task
    // otherwise it keeps trying again if a task fails
    long timelimit = getConf().getLong("fetcher.timelimit.mins", -1);
    if (timelimit != -1) {
      timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
      LOG.info("Fetcher Timelimit set for : " + timelimit);
      getConf().setLong("fetcher.timelimit", timelimit);
    }

    // Set the time limit after which the throughput threshold feature is enabled
    timelimit = getConf().getLong("fetcher.throughput.threshold.check.after", 10);
    timelimit = System.currentTimeMillis() + (timelimit * 60 * 1000);
    getConf().setLong("fetcher.throughput.threshold.check.after", timelimit);

    int maxOutlinkDepth = getConf().getInt("fetcher.follow.outlinks.depth", -1);
    if (maxOutlinkDepth > 0) {
      LOG.info("Fetcher: following outlinks up to depth: " + Integer.toString(maxOutlinkDepth));

      int maxOutlinkDepthNumLinks = getConf().getInt("fetcher.follow.outlinks.num.links", 4);
      int outlinksDepthDivisor = getConf().getInt("fetcher.follow.outlinks.depth.divisor", 2);

      int totalOutlinksToFollow = 0;
      for (int i = 0; i < maxOutlinkDepth; i++) {
        totalOutlinksToFollow += (int)Math.floor(outlinksDepthDivisor / (i + 1) * maxOutlinkDepthNumLinks);
      }

      LOG.info("Fetcher: maximum outlinks to follow: " + Integer.toString(totalOutlinksToFollow));
    }

    JobConf job = new NutchJob(getConf());
    job.setJobName("fetch " + segment);
    //配置线程数
    job.setInt("fetcher.threads.fetch", threads);
    job.set(Nutch.SEGMENT_NAME_KEY, segment.getName());

    // for politeness, don't permit parallel execution of a single task
    job.setSpeculativeExecution(false);
    //配置输入
    FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
    job.setInputFormat(InputFormat.class);
    //配置MapRunner
    job.setMapRunnerClass(Fetcher.class);
    //配置输出
    FileOutputFormat.setOutputPath(job, segment);
    job.setOutputFormat(FetcherOutputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NutchWritable.class);

    JobClient.runJob(job);

    long end = System.currentTimeMillis();
    LOG.info("Fetcher: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
  }

 

 

public RecordWriter<Text, NutchWritable> getRecordWriter(final FileSystem fs,
                                      final JobConf job,
                                      final String name,
                                      final Progressable progress) throws IOException {
    
    //定义输出目录
    Path out = FileOutputFormat.getOutputPath(job);
    //定义抓取的输出目录
    final Path fetch =
      new Path(new Path(out, CrawlDatum.FETCH_DIR_NAME), name);
    //定义抓取内容的输出目录
    final Path content =
      new Path(new Path(out, Content.DIR_NAME), name);
    //定义压缩类型
    final CompressionType compType = SequenceFileOutputFormat.getOutputCompressionType(job);
    //定义输出对象
    final MapFile.Writer fetchOut =
      new MapFile.Writer(job, fs, fetch.toString(), Text.class, CrawlDatum.class,
          compType, progress);
    
    return new RecordWriter<Text, NutchWritable>() {
        private MapFile.Writer contentOut;
        private RecordWriter<Text, Parse> parseOut;

        {
          //如果配置"fetcher.store.content"为true,则生成content
          if (Fetcher.isStoringContent(job)) {
            contentOut = new MapFile.Writer(job, fs, content.toString(),
                                            Text.class, Content.class,
                                            compType, progress);
          }
          //如果配置"fetcher.parse"为true,而抽取出外连接
          if (Fetcher.isParsing(job)) {
            parseOut = new ParseOutputFormat().getRecordWriter(fs, job, name, progress);
          }
        }

        public void write(Text key, NutchWritable value)
          throws IOException {

          Writable w = value.get();
          // 对对象类型进行判断,调用相应的抽象输出,写到不同的文件中去 
          if (w instanceof CrawlDatum)
            fetchOut.append(key, w);
          else if (w instanceof Content)
            contentOut.append(key, w);
          else if (w instanceof Parse)
            parseOut.write(key, (Parse)w);
        }

        public void close(Reporter reporter) throws IOException {
          fetchOut.close();
          if (contentOut != null) {
            contentOut.close();
          }
          if (parseOut != null) {
            parseOut.close(reporter);
          }
        }

      };

  }  

 

 

 

 

分享到:
评论

相关推荐

    apache-nutch-1.3-src.tar.gz_nutch_nutch-1.3.tar.gz

    通过阅读源码,你可以学习到如何设计和实现大规模的网络爬虫系统,以及如何使用 Hadoop 进行分布式处理。 **使用与开发** 如果你打算使用或开发 Nutch,你需要有扎实的 Java 基础和一定的 Hadoop 知识。首先,解压...

    Nutch 1.2源码阅读

    ### Nutch 1.2 源码阅读深入解析 #### Crawl类核心作用与流程概览 在深入了解Nutch 1.2源码之前,我们先明确Nutch的架构和工作流程。Nutch作为一款开源搜索引擎框架,其功能涵盖网页抓取、索引构建以及查询处理。...

    nutch-param-set

    Nutch 是一个高度可扩展且开放源码的网络爬虫项目,主要用于抓取和索引互联网上的数据。本篇将基于提供的文件内容对 Nutch 的参数设置进行深入解析,帮助读者更好地理解 Nutch 中各个组件的工作原理及配置方式。 ##...

    Apache Nutch 1.7 学习总结

    - 打开Cygwin终端,进入Nutch源码目录,配置环境,执行编译和构建命令。 4. **Nutch1.7 测试** - 初始化Nutch的配置文件,根据需求修改`conf/nutch-site.xml`。 - 运行Nutch的基本命令,如抓取种子URL (`bin/...

    Lucene nutch 搜索引擎 开发 实例 源码

    特别地,Nutch中的Crawl和Fetch阶段展示了如何管理URL队列,以及如何决定何时重新抓取网页。 在学习Lucene和Nutch的源码时,你会遇到以下几个关键概念: 1. **分词**:Lucene使用Analyzer对输入文本进行分词,不同...

    windows下的nutch配置总结

    - 打开命令行,导航到Nutch源码目录,运行`mvn clean install -DskipTests`命令,这将编译Nutch并创建可执行的JAR文件。 4. **配置Nutch**: - 修改`conf/nutch-site.xml`文件,设置抓取策略、存储路径、抓取间隔...

    nutch-0.9 环境搭建所需最小cygwin

    1. **下载Nutch源码**:首先,从Apache官方网站或者镜像站点下载Nutch-0.9的源代码。将下载的源码解压到你想要的工作目录下,例如`C:\nutch\src\nutch-0.9`。 2. **配置环境变量**:打开Cygwin终端,设置必要的环境...

    搭建nutch开发环境步骤

    bin/nutch fetch bin/nutch update bin/nutch parse bin/nutch index ``` 以上命令将生成新的抓取批次、从Web服务器获取页面、更新数据库、解析页面内容并创建索引。 **步骤九:使用Solr或Elasticsearch建立索引** ...

    Nutch-1.0分布式安装手册.rar

    在安装Maven后,需要在命令行中执行相应的Maven命令来编译Nutch源码,生成可执行的JAR文件。 在环境准备完毕后,进入Hadoop的配置阶段。这包括修改Hadoop的配置文件如`core-site.xml`、`hdfs-site.xml`以及`mapred-...

    nutch2.2.1

    3. 使用`bin/nutch`脚本执行各种任务,如`fetch`, `parse`, `index`等。 通过学习和研究Nutch 2.2.1的源码,你可以了解到网络爬虫的基本工作流程,理解如何处理大量网页数据,以及如何使用Hadoop进行分布式计算。这...

    Nutch2.3.1 环境搭建

    **二、获取Nutch源码** 1. 访问Apache Nutch官方网站(http://nutch.apache.org/releases.html),下载Nutch 2.3.1的源码包。 2. 解压下载的源代码到你选择的工作目录,例如`/usr/local/src/nutch-2.3.1`。 **三、...

    nutch源码分析

    ### Nutch源码分析 #### 一、Nutch概述及工作流程 Nutch是一个开源的Web爬虫项目,主要用于构建搜索引擎。Nutch基于Hadoop框架,能够高效地抓取、索引和搜索互联网上的信息。Nutch的工作流程主要包括以下几个步骤...

    Nutch源码研究

    Nutch 的源码研究对于理解搜索引擎的工作原理和网页抓取技术非常有帮助。通过深入分析源码,开发者可以自定义抓取策略、优化性能,甚至开发新的协议插件以支持更多数据源。同时,Nutch 的设计思路也可以为其他分布式...

    搜索引擎nutch配置

    2. **获取Nutch源码** 从Apache官方网站下载Nutch的最新源代码,通常通过Git克隆仓库。解压后,进入Nutch的工作目录。 3. **配置Nutch** 打开`conf/nutch-site.xml`文件,这是Nutch的主要配置文件。以下是一些...

    Nutch安装配置

    运行`mvn clean install`命令编译Nutch源码,生成可执行的jar文件。 4. **Cygwin环境**:在Windows系统中,由于Nutch依赖于一些Linux命令,所以可能需要安装Cygwin模拟Linux环境。文件名`cygwin.doc`可能是一个关于...

    Nutch相关框架视频教程 (1-20)(PDF)

    12. **获取Nutch源码**:使用SVN命令从Apache仓库中检出指定版本的Nutch源代码。 13. **构建项目**:使用Ant构建Nutch项目,并设置URL文件以及配置文件`nutch-site.xml`。 14. **启动爬虫**:通过Nutch提供的命令...

    Hadoop-2.4.0+Hbase-0.94.18+Nutch-2.3集群爬虫配置攻略

    启动Nutch爬虫,可以使用`bin/nutch inject`命令将种子URL注入到爬虫队列,然后通过`bin/nutch fetch`, `bin/nutch updatedb`, `bin/nutch parse`, `bin/nutch index`等命令执行抓取流程。如果要在分布式模式下运行...

    Nutch简要文档

    3. **Fetcher**:`Fetcher` 根据 `Generator` 生成的抓取列表下载网页,`Fetcher` 命令可设置线程数来控制并发抓取,下载后的网页源码存放在 `content` 文件夹,状态信息存放在 `crawl_fetch` 文件夹。 4. **Parse*...

    nutch crawl代码解析

    在深入理解 Nutch 的工作原理之前,了解其源码是至关重要的。本文将解析 Nutch-0.9 版本中的 `Crawl` 类,它是 Nutch 抓取流程的起点。 `Crawl` 类位于 `org.apache.nutch.crawl` 包中,它包含了启动 Nutch 抓取...

Global site tag (gtag.js) - Google Analytics