for (i = 0; i < depth; i++) { // generate new segment //根据传入参数depth来决定循环次数,生成segment Path[] segs = generator.generate(crawlDb, segments, -1, topN, System .currentTimeMillis()); if (segs == null) { LOG.info("Stopping at depth=" + i + " - no more URLs to fetch."); break; } fetcher.fetch(segs[0], threads); // fetch it if (!Fetcher.isParsing(job)) { parseSegment.parse(segs[0]); // parse it, if needed } crawlDbTool.update(crawlDb, segs, true, true); // update crawldb }
public Path[] generate(Path dbDir, Path segments, int numLists, long topN, long curTime, boolean filter, boolean norm, boolean force, int maxNumSegments) throws IOException { //生成临时存储路径 Path tempDir = new Path(getConf().get("mapred.temp.dir", ".") + "/generate-temp-" + System.currentTimeMillis()); //生成文件锁 Path lock = new Path(dbDir, CrawlDb.LOCK_NAME); FileSystem fs = FileSystem.get(getConf()); LockUtil.createLockFile(fs, lock, force); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = System.currentTimeMillis(); LOG.info("Generator: starting at " + sdf.format(start)); LOG.info("Generator: Selecting best-scoring urls due for fetch."); LOG.info("Generator: filtering: " + filter); LOG.info("Generator: normalizing: " + norm); if (topN != Long.MAX_VALUE) { LOG.info("Generator: topN: " + topN); } if ("true".equals(getConf().get(GENERATE_MAX_PER_HOST_BY_IP))){ LOG.info("Generator: GENERATE_MAX_PER_HOST_BY_IP will be ignored, use partition.url.mode instead"); } // map to inverted subset due for fetch, sort by score JobConf job = new NutchJob(getConf()); job.setJobName("generate: select from " + dbDir); //用户如果没有指定的话,就默认为map的数量 if (numLists == -1) { // for politeness make numLists = job.getNumMapTasks(); // a partition per fetch task } 如果mapreduce设置为local,就只用一个mapper if ("local".equals(job.get("mapred.job.tracker")) && numLists != 1) { // override LOG.info("Generator: jobtracker is 'local', generating exactly one partition."); numLists = 1; } 设置生成时间 job.setLong(GENERATOR_CUR_TIME, curTime); // record real generation time long generateTime = System.currentTimeMillis(); job.setLong(Nutch.GENERATE_TIME_KEY, generateTime); job.setLong(GENERATOR_TOP_N, topN); job.setBoolean(GENERATOR_FILTER, filter); job.setBoolean(GENERATOR_NORMALISE, norm); job.setInt(GENERATOR_MAX_NUM_SEGMENTS, maxNumSegments); 配置作业信息 FileInputFormat.addInputPath(job, new Path(dbDir, CrawlDb.CURRENT_NAME)); job.setInputFormat(SequenceFileInputFormat.class); job.setMapperClass(Selector.class); job.setPartitionerClass(Selector.class); job.setReducerClass(Selector.class); FileOutputFormat.setOutputPath(job, tempDir); job.setOutputFormat(SequenceFileOutputFormat.class); job.setOutputKeyClass(FloatWritable.class); job.setOutputKeyComparatorClass(DecreasingFloatComparator.class); job.setOutputValueClass(SelectorEntry.class); job.setOutputFormat(GeneratorOutputFormat.class); try { JobClient.runJob(job); } catch (IOException e) { throw e; } ................... ................... ................... }
/** Select & invert subset due for fetch. */ public void map(Text key, CrawlDatum value, OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter) throws IOException { Text url = key; //如果有filter设置,先对url进行过滤 if (filter) { // If filtering is on don't generate URLs that don't pass // URLFilters try { if (filters.filter(url.toString()) == null) return; } catch (URLFilterException e) { if (LOG.isWarnEnabled()) { LOG.warn("Couldn't filter url: " + url + " (" + e.getMessage() + ")"); } } } CrawlDatum crawlDatum = value; // check fetch schedule //检查抓取时间,没有达到抓取时间就过滤掉 if (!schedule.shouldFetch(url, crawlDatum, curTime)) { LOG.debug("-shouldFetch rejected '" + url + "', fetchTime=" + crawlDatum.getFetchTime() + ", curTime=" + curTime); return; } LongWritable oldGenTime = (LongWritable) crawlDatum.getMetaData().get( Nutch.WRITABLE_GENERATE_TIME_KEY); if (oldGenTime != null) { // awaiting fetch & update if (oldGenTime.get() + genDelay > curTime) // still wait for // update return; } //计算得分 float sort = 1.0f; try { sort = scfilters.generatorSortValue((Text) key, crawlDatum, sort); } catch (ScoringFilterException sfe) { if (LOG.isWarnEnabled()) { LOG.warn("Couldn't filter generatorSortValue for " + key + ": " + sfe); } } if (restrictStatus != null && !restrictStatus.equalsIgnoreCase(CrawlDatum.getStatusName(crawlDatum.getStatus()))) return; // consider only entries with a score superior to the threshold //如果分值小于阀值,过滤掉 if (scoreThreshold != Float.NaN && sort < scoreThreshold) return; // consider only entries with a retry (or fetch) interval lower than threshold if (intervalThreshold != -1 && crawlDatum.getFetchInterval() > intervalThreshold) return; // sort by decreasing score, using DecreasingFloatComparator sortValue.set(sort); // record generation time //记录生成时间 crawlDatum.getMetaData().put(Nutch.WRITABLE_GENERATE_TIME_KEY, genTime); entry.datum = crawlDatum; entry.url = (Text) key; output.collect(sortValue, entry); // invert for sort by score }
/** Partition by host / domain or IP. */ //根据domain或ip 来分配给reduce public int getPartition(FloatWritable key, Writable value, int numReduceTasks) { return partitioner.getPartition(((SelectorEntry) value).url, key, numReduceTasks); }
public void configure(JobConf job) { curTime = job.getLong(GENERATOR_CUR_TIME, System.currentTimeMillis()); limit = job.getLong(GENERATOR_TOP_N, Long.MAX_VALUE) / job.getNumReduceTasks(); maxCount = job.getInt(GENERATOR_MAX_COUNT, -1); // back compatibility with old param int oldMaxPerHost = job.getInt(GENERATE_MAX_PER_HOST, -1); if (maxCount==-1 && oldMaxPerHost!=-1){ maxCount = oldMaxPerHost; byDomain = false; } if (GENERATOR_COUNT_VALUE_DOMAIN.equals(job.get(GENERATOR_COUNT_MODE))) byDomain = true; filters = new URLFilters(job); normalise = job.getBoolean(GENERATOR_NORMALISE, true); if (normalise) normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_GENERATE_HOST_COUNT); scfilters = new ScoringFilters(job); partitioner.configure(job); filter = job.getBoolean(GENERATOR_FILTER, true); genDelay = job.getLong(GENERATOR_DELAY, 7L) * 3600L * 24L * 1000L; long time = job.getLong(Nutch.GENERATE_TIME_KEY, 0L); if (time > 0) genTime.set(time); schedule = FetchScheduleFactory.getFetchSchedule(job); scoreThreshold = job.getFloat(GENERATOR_MIN_SCORE, Float.NaN); intervalThreshold = job.getInt(GENERATOR_MIN_INTERVAL, -1); restrictStatus = job.get(GENERATOR_RESTRICT_STATUS, null); maxNumSegments = job.getInt(GENERATOR_MAX_NUM_SEGMENTS, 1); segCounts = new int[maxNumSegments]; }
/** Collect until limit is reached. */ public void reduce(FloatWritable key, Iterator<SelectorEntry> values, OutputCollector<FloatWritable,SelectorEntry> output, Reporter reporter) throws IOException { while (values.hasNext()) { if (count == limit) { // do we have any segments left? if (currentsegmentnum < maxNumSegments) { count = 0; currentsegmentnum++; } else break; } SelectorEntry entry = values.next(); Text url = entry.url; String urlString = url.toString(); URL u = null; String hostordomain = null; try { if (normalise && normalizers != null) { urlString = normalizers.normalize(urlString, URLNormalizers.SCOPE_GENERATE_HOST_COUNT); } u = new URL(urlString); if (byDomain) { hostordomain = URLUtil.getDomainName(u); } else { hostordomain = new URL(urlString).getHost(); } } catch (Exception e) { LOG.warn("Malformed URL: '" + urlString + "', skipping (" + StringUtils.stringifyException(e) + ")"); reporter.getCounter("Generator", "MALFORMED_URL").increment(1); continue; } hostordomain = hostordomain.toLowerCase(); // only filter if we are counting hosts or domains if (maxCount > 0) { int[] hostCount = hostCounts.get(hostordomain); if (hostCount == null) { hostCount = new int[] {1, 0}; hostCounts.put(hostordomain, hostCount); } // increment hostCount hostCount[1]++; // check if topN reached, select next segment if it is while (segCounts[hostCount[0]-1] >= limit && hostCount[0] < maxNumSegments) { hostCount[0]++; hostCount[1] = 0; } // reached the limit of allowed URLs per host / domain // see if we can put it in the next segment? if (hostCount[1] >= maxCount) { if (hostCount[0] < maxNumSegments) { hostCount[0]++; hostCount[1] = 0; } else { if (hostCount[1] == maxCount + 1 && LOG.isInfoEnabled()) { LOG.info("Host or domain " + hostordomain + " has more than " + maxCount + " URLs for all " + maxNumSegments + " segments. Additional URLs won't be included in the fetchlist."); } // skip this entry continue; } } entry.segnum = new IntWritable(hostCount[0]); segCounts[hostCount[0]-1]++; } else { entry.segnum = new IntWritable(currentsegmentnum); segCounts[currentsegmentnum-1]++; } output.collect(key, entry); // Count is incremented only when we keep the URL // maxCount may cause us to skip it. count++; } } }
可以通过上述代码看出,Generator的第一个Job,实现的逻辑如下:
1.根据条件过滤不满足的
2.根据配置生成相应数量的segments
3.计算出每个url所属的segments
相关推荐
5. **配置文件**:如 `conf/nutch-default.xml` 和 `conf/nutch-site.xml`,分别包含 Nutch 的默认配置和用户自定义配置。 6. **抓取策略**:Nutch 支持基于链接的抓取策略,如 PR(PageRank)和 TF-IDF(Term ...
### Nutch 1.2 源码阅读深入解析 #### Crawl类核心作用与流程概览 在深入了解Nutch 1.2源码之前,我们先明确Nutch的架构和工作流程。Nutch作为一款开源搜索引擎框架,其功能涵盖网页抓取、索引构建以及查询处理。...
这个源码包 "apache-nutch-1.3-src.tar.gz" 和 "nutch-1.3.tar.gz" 包含了 Nutch 1.3 的源代码和编译后的二进制文件,对于开发者和研究者来说是非常有价值的资源。 **Nutch 概述** Nutch 是基于 Java 开发的,遵循 ...
Nutch-1.9 是一个开源的网络爬虫软件,被广泛用于数据挖掘、搜索引擎构建以及网络信息提取。它的最新版本提供了许多改进和优化,使得它成为开发者和研究者手中的利器。Nutch的设计目标是易用性和可扩展性,允许用户...
在`apache-nutch-2.2.1`这个压缩包中,你将找到以下关键组成部分: 1. **源代码结构**:Nutch 的源代码通常分为几个主要模块,包括`conf`(配置文件)、`bin`(脚本和可执行文件)、`src`(源代码)以及`lib`(库...
在这个"apache-nutch-1.4-bin.tar.gz"压缩包中,包含了运行 Nutch 的所有必要组件和配置文件,适合初学者和开发者快速部署和实验。 **Nutch 的核心组成部分:** 1. **爬虫(Spider)**:Nutch 的爬虫负责在网络中...
"apache-nutch-1.4-src.zip"是Nutch源码的zip压缩版本,用户可以直接解压并访问其中的源代码。 要获取和解压这些源码,你可以使用各种工具,如在Linux或Mac系统中使用命令行的tar和unzip命令,或者在Windows中使用...
nutch配置nutch-default.xml
- **Java编程**:源码阅读和开发需要基本的Java编程技能,特别是对多线程和网络编程的理解。 - **Ant构建工具**:掌握如何使用Ant来构建、测试和部署Nutch项目。 通过深入研究Nutch 1.6源码,你可以学习到如何设计...
Nutch-1.5.1源码是Apache Nutch项目的一个重要版本,它是一个高度可扩展的、开源的网络爬虫和全文搜索引擎框架。Nutch最初由Doug Cutting创建,后来成为了Hadoop项目的一部分,因为其在大数据处理和分布式计算方面的...
apache-nutch-2.3.1-src.tar ,网络爬虫的源码, 用ivy2管理, ant runtime 编译 apache-nutch-2.3.1-src.tar ,网络爬虫的源码, 用ivy2管理, ant runtime 编译
在“apache-nutch-1.7-src.tar.gz”这个压缩包中,你将获得Nutch 1.7的源代码,这使得开发者可以深入了解其工作原理,并对其进行定制和扩展。解压后的文件夹“apache-nutch-1.7”包含了所有必要的组件和配置文件。 ...
3. **配置Nutch**:修改`conf/nutch-site.xml`等配置文件,设置爬虫的启动参数,如抓取范围、URL过滤规则等。 4. **创建数据库**:Nutch通常使用Hadoop HDFS作为数据存储,因此需要设置Hadoop环境,并创建相应的...
Nutch是一款刚刚诞生的完整的开源搜索引擎系统,可以结合数据库进行索引,能快速构建所需系统。Nutch 是基于Lucene的,Lucene为 Nutch 提供了文本索引和搜索的API,所以它使用Lucene作为索引和检索的模块。Nutch的...
4. **配置与部署**:解压 "apache-nutch-1.9" 文件后,需要根据你的环境配置`conf/nutch-site.xml`文件,设置包括抓取间隔、并发度、存储路径等参数。同时,可能还需要配置`conf/regex-urlfilter.txt`和`conf/...
Nutch 1.6 是一个开源的网络爬虫项目,由Apache软件基金会开发,主要用于抓取、索引和搜索Web内容。...通过阅读和研究源码,可以提升对这些技术的理解,并能为开发自己的爬虫或搜索引擎项目打下坚实基础。
nutch不用安装,是个应用程序,下载后为nutch-1.6.tar.gz,双击桌面上的cygwin快捷方式;执行以下命令: $ cd D:/Downloads/Soft $ tar zxvf nutch-1.0.tar.gz 在e盘下面出现nutch-0.9文件夹说明解压成功了.然后环境...
《lucene+nutch搜索引擎光盘源码(1-8章)》是一套全面解析Lucene和Nutch搜索引擎技术的源代码教程,涵盖了从基础到进阶的多个层面。这套资源包含8个章节的源码,由于文件大小限制,被分成了多个部分进行上传。 ...
1. **源码目录结构**:解压后的apache-nutch-2.3目录包含了源代码、配置文件、构建脚本等。主要目录有`src/main/`,其中`src/main/java`存放Java源码,`src/main/resources`存储配置文件,`src/main/webapp`包含Web...