`

nutch源码阅读(6)-Generator

 
阅读更多

 

for (i = 0; i < depth; i++) {             
     // generate new segment
     //根据传入参数depth来决定循环次数,生成segment
      Path[] segs = generator.generate(crawlDb, segments, -1, topN, System
          .currentTimeMillis());
      if (segs == null) {
        LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
        break;
      }
      fetcher.fetch(segs[0], threads);  // fetch it
      if (!Fetcher.isParsing(job)) {
        parseSegment.parse(segs[0]);    // parse it, if needed
      }
      crawlDbTool.update(crawlDb, segs, true, true); // update crawldb
    }

 

 

 

 

  public Path[] generate(Path dbDir, Path segments, int numLists, long topN,
      long curTime, boolean filter, boolean norm, boolean force, int maxNumSegments)
      throws IOException {
    //生成临时存储路径
    Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-"
        + System.currentTimeMillis());
    //生成文件锁
    Path lock = new Path(dbDir, CrawlDb.LOCK_NAME);
    FileSystem fs = FileSystem.get(getConf());
    LockUtil.createLockFile(fs, lock, force);

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    long start = System.currentTimeMillis();
    LOG.info("Generator: starting at " + sdf.format(start));
    LOG.info("Generator: Selecting best-scoring urls due for fetch.");
    LOG.info("Generator: filtering: " + filter);
    LOG.info("Generator: normalizing: " + norm);
    if (topN != Long.MAX_VALUE) {
      LOG.info("Generator: topN: " + topN);
    }
    
    if ("true".equals(getConf().get(GENERATE_MAX_PER_HOST_BY_IP))){
      LOG.info("Generator: GENERATE_MAX_PER_HOST_BY_IP will be ignored, use partition.url.mode instead");
    }

    // map to inverted subset due for fetch, sort by score
    JobConf job = new NutchJob(getConf());
    job.setJobName("generate: select from " + dbDir);
    
    //用户如果没有指定的话,就默认为map的数量
    if (numLists == -1) { // for politeness make
      numLists = job.getNumMapTasks(); // a partition per fetch task
    }
    如果mapreduce设置为local,就只用一个mapper
    if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) {
      // override
      LOG.info("Generator: jobtracker is 'local', generating exactly one partition.");
      numLists = 1;
    }
    设置生成时间
    job.setLong(GENERATOR_CUR_TIME, curTime);
    // record real generation time
    long generateTime = System.currentTimeMillis();
    job.setLong(Nutch.GENERATE_TIME_KEY, generateTime);
    job.setLong(GENERATOR_TOP_N, topN);
    job.setBoolean(GENERATOR_FILTER, filter);
    job.setBoolean(GENERATOR_NORMALISE, norm);
    job.setInt(GENERATOR_MAX_NUM_SEGMENTS, maxNumSegments);
    配置作业信息
    FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME));
    job.setInputFormat(SequenceFileInputFormat.class);

    job.setMapperClass(Selector.class);
    job.setPartitionerClass(Selector.class);
    job.setReducerClass(Selector.class);

    FileOutputFormat.setOutputPath(job, tempDir);
    job.setOutputFormat(SequenceFileOutputFormat.class);
    job.setOutputKeyClass(FloatWritable.class);
    job.setOutputKeyComparatorClass(DecreasingFloatComparator.class);
    job.setOutputValueClass(SelectorEntry.class);
    job.setOutputFormat(GeneratorOutputFormat.class);

    try {
      JobClient.runJob(job);
    } catch (IOException e) {
      throw e;
    }

     ...................
     ...................
     ...................
  }

 

  /** Select & invert subset due for fetch. */
    public void map(Text key, CrawlDatum value,
        OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter)
        throws IOException {
      Text url = key;
      //如果有filter设置,先对url进行过滤
      if (filter) {
        // If filtering is on don't generate URLs that don't pass
        // URLFilters
        try {
          if (filters.filter(url.toString()) == null) return;
        } catch (URLFilterException e) {
          if (LOG.isWarnEnabled()) {
            LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")");
          }
        }
      }
      CrawlDatum crawlDatum = value;

      // check fetch schedule
      //检查抓取时间,没有达到抓取时间就过滤掉
      if (!schedule.shouldFetch(url, crawlDatum, curTime)) {
        LOG.debug("-shouldFetch rejected '" + url + "', fetchTime="
            + crawlDatum.getFetchTime() + ", curTime=" + curTime);
        return;
      }

      LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData().get(
          Nutch.WRITABLE_GENERATE_TIME_KEY);
      if (oldGenTime != null) { // awaiting fetch & update
        if (oldGenTime.get() + genDelay > curTime) // still wait for
        // update
        return;
      }
      
      //计算得分
      float sort = 1.0f;
      try {
        sort = scfilters.generatorSortValue((Text) key, crawlDatum, sort);
      } catch (ScoringFilterException sfe) {
        if (LOG.isWarnEnabled()) {
          LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe);
        }
      }
      
      if (restrictStatus != null
        && !restrictStatus.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus()))) return;

      // consider only entries with a score superior to the threshold
      //如果分值小于阀值,过滤掉
      if (scoreThreshold != Float.NaN && sort < scoreThreshold) return;

      // consider only entries with a retry (or fetch) interval lower than threshold
      if (intervalThreshold != -1 && crawlDatum.getFetchInterval() > intervalThreshold) return;

      // sort by decreasing score, using DecreasingFloatComparator
      sortValue.set(sort);
      // record generation time
      //记录生成时间
      crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime);
      entry.datum = crawlDatum;
      entry.url = (Text) key;
      output.collect(sortValue, entry); // invert for sort by score
    }

 

  /** Partition by host / domain or IP. */
    //根据domain或ip 来分配给reduce
    public int getPartition(FloatWritable key, Writable value, int numReduceTasks) {
      return partitioner.getPartition(((SelectorEntry) value).url, key, numReduceTasks);
    }

 

    

 

public void configure(JobConf job) {
      curTime = job.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis());
      limit = job.getLong(GENERATOR_TOP_N, Long.MAX_VALUE) / job.getNumReduceTasks();
      maxCount = job.getInt(GENERATOR_MAX_COUNT, -1);
      // back compatibility with old param
      int oldMaxPerHost = job.getInt(GENERATE_MAX_PER_HOST, -1);
      if (maxCount==-1 && oldMaxPerHost!=-1){
        maxCount = oldMaxPerHost;
        byDomain = false;
      }
      if (GENERATOR_COUNT_VALUE_DOMAIN.equals(job.get(GENERATOR_COUNT_MODE))) byDomain = true;
      filters = new URLFilters(job);
      normalise = job.getBoolean(GENERATOR_NORMALISE, true);
      if (normalise) normalizers = new URLNormalizers(job,
          URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
      scfilters = new ScoringFilters(job);
      partitioner.configure(job);
      filter = job.getBoolean(GENERATOR_FILTER, true);
      genDelay = job.getLong(GENERATOR_DELAY, 7L) * 3600L * 24L * 1000L;
      long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L);
      if (time > 0) genTime.set(time);
      schedule = FetchScheduleFactory.getFetchSchedule(job);
      scoreThreshold = job.getFloat(GENERATOR_MIN_SCORE, Float.NaN);
      intervalThreshold = job.getInt(GENERATOR_MIN_INTERVAL, -1);
      restrictStatus = job.get(GENERATOR_RESTRICT_STATUS, null);
      maxNumSegments = job.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1);
      segCounts = new int[maxNumSegments];
    }

 

 

    /** Collect until limit is reached. */
    public void reduce(FloatWritable key, Iterator<SelectorEntry> values,
        OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter)
        throws IOException {

      while (values.hasNext()) {

        if (count == limit) {
          // do we have any segments left?
          if (currentsegmentnum < maxNumSegments) {
            count = 0;
            currentsegmentnum++;
          } else break;
        }

        SelectorEntry entry = values.next();
        Text url = entry.url;
        String urlString = url.toString();
        URL u = null;

        String hostordomain = null;

        try {
          if (normalise && normalizers != null) {
            urlString = normalizers.normalize(urlString,
                URLNormalizers.SCOPE_GENERATE_HOST_COUNT);
          }
          u = new URL(urlString);
          if (byDomain) {
            hostordomain = URLUtil.getDomainName(u);
          } else {
            hostordomain = new URL(urlString).getHost();
          }
        } catch (Exception e) {
          LOG.warn("Malformed URL: '" + urlString + "', skipping ("
              + StringUtils.stringifyException(e) + ")");
          reporter.getCounter("Generator", "MALFORMED_URL").increment(1);
          continue;
        }

        hostordomain = hostordomain.toLowerCase();

        // only filter if we are counting hosts or domains
        if (maxCount > 0) {
          int[] hostCount = hostCounts.get(hostordomain);
          if (hostCount == null) {
            hostCount = new int[] {1, 0};
            hostCounts.put(hostordomain, hostCount);
          }

          // increment hostCount
          hostCount[1]++;

          // check if topN reached, select next segment if it is
          while (segCounts[hostCount[0]-1] >= limit && hostCount[0] < maxNumSegments) {
            hostCount[0]++;
            hostCount[1] = 0;
          }

          // reached the limit of allowed URLs per host / domain
          // see if we can put it in the next segment?
          if (hostCount[1] >= maxCount) {
            if (hostCount[0] < maxNumSegments) {
              hostCount[0]++;
              hostCount[1] = 0;
            } else {
              if (hostCount[1] == maxCount + 1 && LOG.isInfoEnabled()) {
                LOG.info("Host or domain " + hostordomain + " has more than " + maxCount
                    + " URLs for all " + maxNumSegments + " segments. Additional URLs won't be included in the fetchlist.");
              }
              // skip this entry
              continue;
            }
          }
          entry.segnum = new IntWritable(hostCount[0]);
          segCounts[hostCount[0]-1]++;
        } else {
          entry.segnum = new IntWritable(currentsegmentnum);
          segCounts[currentsegmentnum-1]++;
        }

        output.collect(key, entry);

        // Count is incremented only when we keep the URL
        // maxCount may cause us to skip it.
        count++;
      }
    }
  }

 

   可以通过上述代码看出,Generator的第一个Job,实现的逻辑如下:

                   1.根据条件过滤不满足的

                   2.根据配置生成相应数量的segments

                   3.计算出每个url所属的segments

 

 

 

 

 

分享到:
评论

相关推荐

    apache-nutch-2.3.1-src.tar.gz

    5. **配置文件**:如 `conf/nutch-default.xml` 和 `conf/nutch-site.xml`,分别包含 Nutch 的默认配置和用户自定义配置。 6. **抓取策略**:Nutch 支持基于链接的抓取策略,如 PR(PageRank)和 TF-IDF(Term ...

    Nutch 1.2源码阅读

    ### Nutch 1.2 源码阅读深入解析 #### Crawl类核心作用与流程概览 在深入了解Nutch 1.2源码之前,我们先明确Nutch的架构和工作流程。Nutch作为一款开源搜索引擎框架,其功能涵盖网页抓取、索引构建以及查询处理。...

    apache-nutch-1.3-src.tar.gz_nutch_nutch-1.3.tar.gz

    这个源码包 "apache-nutch-1.3-src.tar.gz" 和 "nutch-1.3.tar.gz" 包含了 Nutch 1.3 的源代码和编译后的二进制文件,对于开发者和研究者来说是非常有价值的资源。 **Nutch 概述** Nutch 是基于 Java 开发的,遵循 ...

    nutch-1.9 源码

    Nutch-1.9 是一个开源的网络爬虫软件,被广泛用于数据挖掘、搜索引擎构建以及网络信息提取。它的最新版本提供了许多改进和优化,使得它成为开发者和研究者手中的利器。Nutch的设计目标是易用性和可扩展性,允许用户...

    apache-nutch的源码

    在`apache-nutch-2.2.1`这个压缩包中,你将找到以下关键组成部分: 1. **源代码结构**:Nutch 的源代码通常分为几个主要模块,包括`conf`(配置文件)、`bin`(脚本和可执行文件)、`src`(源代码)以及`lib`(库...

    apache-nutch-1.4-bin.tar.gz

    在这个"apache-nutch-1.4-bin.tar.gz"压缩包中,包含了运行 Nutch 的所有必要组件和配置文件,适合初学者和开发者快速部署和实验。 **Nutch 的核心组成部分:** 1. **爬虫(Spider)**:Nutch 的爬虫负责在网络中...

    nutch_src 源码 tar—zip格式

    "apache-nutch-1.4-src.zip"是Nutch源码的zip压缩版本,用户可以直接解压并访问其中的源代码。 要获取和解压这些源码,你可以使用各种工具,如在Linux或Mac系统中使用命令行的tar和unzip命令,或者在Windows中使用...

    nutch配置nutch-default.xml

    nutch配置nutch-default.xml

    apache-nutch-1.6-src.tar.gz

    - **Java编程**:源码阅读和开发需要基本的Java编程技能,特别是对多线程和网络编程的理解。 - **Ant构建工具**:掌握如何使用Ant来构建、测试和部署Nutch项目。 通过深入研究Nutch 1.6源码,你可以学习到如何设计...

    nutch-1.5.1源码

    Nutch-1.5.1源码是Apache Nutch项目的一个重要版本,它是一个高度可扩展的、开源的网络爬虫和全文搜索引擎框架。Nutch最初由Doug Cutting创建,后来成为了Hadoop项目的一部分,因为其在大数据处理和分布式计算方面的...

    apache-nutch-2.3.1-src

    apache-nutch-2.3.1-src.tar ,网络爬虫的源码, 用ivy2管理, ant runtime 编译 apache-nutch-2.3.1-src.tar ,网络爬虫的源码, 用ivy2管理, ant runtime 编译

    apache-nutch-1.7-src.tar.gz

    在“apache-nutch-1.7-src.tar.gz”这个压缩包中,你将获得Nutch 1.7的源代码,这使得开发者可以深入了解其工作原理,并对其进行定制和扩展。解压后的文件夹“apache-nutch-1.7”包含了所有必要的组件和配置文件。 ...

    nutch2.2.1-src

    3. **配置Nutch**:修改`conf/nutch-site.xml`等配置文件,设置爬虫的启动参数,如抓取范围、URL过滤规则等。 4. **创建数据库**:Nutch通常使用Hadoop HDFS作为数据存储,因此需要设置Hadoop环境,并创建相应的...

    apache-nutch-1.5.1-bin.tar.gz

    Nutch是一款刚刚诞生的完整的开源搜索引擎系统,可以结合数据库进行索引,能快速构建所需系统。Nutch 是基于Lucene的,Lucene为 Nutch 提供了文本索引和搜索的API,所以它使用Lucene作为索引和检索的模块。Nutch的...

    apach-nutch-1.9-bin.tar.gz

    4. **配置与部署**:解压 "apache-nutch-1.9" 文件后,需要根据你的环境配置`conf/nutch-site.xml`文件,设置包括抓取间隔、并发度、存储路径等参数。同时,可能还需要配置`conf/regex-urlfilter.txt`和`conf/...

    nutch1.6源码

    Nutch 1.6 是一个开源的网络爬虫项目,由Apache软件基金会开发,主要用于抓取、索引和搜索Web内容。...通过阅读和研究源码,可以提升对这些技术的理解,并能为开发自己的爬虫或搜索引擎项目打下坚实基础。

    apache-nutch-1.6-bin.tar.gz最新版

    nutch不用安装,是个应用程序,下载后为nutch-1.6.tar.gz,双击桌面上的cygwin快捷方式;执行以下命令: $ cd D:/Downloads/Soft $ tar zxvf nutch-1.0.tar.gz 在e盘下面出现nutch-0.9文件夹说明解压成功了.然后环境...

    lucene+nutch搜索引擎光盘源码(1-8章)

    《lucene+nutch搜索引擎光盘源码(1-8章)》是一套全面解析Lucene和Nutch搜索引擎技术的源代码教程,涵盖了从基础到进阶的多个层面。这套资源包含8个章节的源码,由于文件大小限制,被分成了多个部分进行上传。 ...

    apache-nutch-2.3-src.zip

    1. **源码目录结构**:解压后的apache-nutch-2.3目录包含了源代码、配置文件、构建脚本等。主要目录有`src/main/`,其中`src/main/java`存放Java源码,`src/main/resources`存储配置文件,`src/main/webapp`包含Web...

Global site tag (gtag.js) - Google Analytics