private static class QueueFeeder extends Thread { private RecordReader<Text, CrawlDatum> reader; private FetchItemQueues queues; //生产者和消费者的共享序列,分层,一层对应一个host private int size; private long timelimit = -1; public QueueFeeder(RecordReader<Text, CrawlDatum> reader, FetchItemQueues queues, int size) { this.reader = reader; this.queues = queues; this.size = size; this.setDaemon(true); this.setName("QueueFeeder"); } public void setTimeLimit(long tl) { timelimit = tl; } public void run() { boolean hasMore = true; int cnt = 0; int timelimitcount = 0; while (hasMore) { // 这里判断是否设置了这个过滤机制,如果设置了,判断相前时间是否大于这个timelimit,如果大于timelimit,过滤所有的FetchItem if (System.currentTimeMillis() >= timelimit && timelimit != -1) { // enough .. lets' simply // read all the entries from the input without processing them try { Text url = new Text(); CrawlDatum datum = new CrawlDatum(); hasMore = reader.next(url, datum); timelimitcount++; } catch (IOException e) { LOG.error("QueueFeeder error reading input, record " + cnt, e); return; } continue; } int feed = size - queues.getTotalSize(); if (feed <= 0) { // queues are full - spin-wait until they have some free space try { Thread.sleep(1000); } catch (Exception e) {}; continue; } else { LOG.debug("-feeding " + feed + " input urls ..."); while (feed > 0 && hasMore) { try { Text url = new Text(); CrawlDatum datum = new CrawlDatum(); hasMore = reader.next(url, datum); if (hasMore) { queues.addFetchItem(url, datum); cnt++; // 统计总数 feed--; // 剩余队列空间减1 } } catch (IOException e) { LOG.error("QueueFeeder error reading input, record " + cnt, e); return; } } } } LOG.info("QueueFeeder finished: total " + cnt + " records + hit by time limit :" + timelimitcount); } }
这个类主要负责向队列中放数据。
public void run() { activeThreads.incrementAndGet(); // count threads FetchItem fit = null; try { while (true) { fit = fetchQueues.getFetchItem(); if (fit == null) { //如果生产者还存活,或者队列里还有数据,等待 if (feeder.isAlive() || fetchQueues.getTotalSize() > 0) { LOG.debug(getName() + " spin-waiting ..."); // spin-wait. spinWaiting.incrementAndGet(); try { Thread.sleep(500); } catch (Exception e) {} spinWaiting.decrementAndGet(); continue; } else { //认为已经处理完,结束 // all done, finish this thread return; } } lastRequestStart.set(System.currentTimeMillis()); //获得url Text reprUrlWritable = (Text) fit.datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY); if (reprUrlWritable == null) { reprUrl = fit.url.toString(); } else { reprUrl = reprUrlWritable.toString(); } try { // fetch the page redirecting = false; redirectCount = 0; do { if (LOG.isInfoEnabled()) { LOG.info("fetching " + fit.url); } if (LOG.isDebugEnabled()) { LOG.debug("redirectCount=" + redirectCount); } redirecting = false; //从这个url中分析出所使用的协议 Protocol protocol = this.protocolFactory.getProtocol(fit.url.toString()); //根据对应的协议处理 RobotRules rules = protocol.getRobotRules(fit.url, fit.datum); //不符合规则,过滤掉 if (!rules.isAllowed(fit.u)) { // unblock fetchQueues.finishFetchItem(fit, true); if (LOG.isDebugEnabled()) { LOG.debug("Denied by robots.txt: " + fit.url); } output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE); reporter.incrCounter("FetcherStatus", "robots_denied", 1); continue; } //如果delayTime>maxCrawlDelay 过滤掉 if (rules.getCrawlDelay() > 0) { if (rules.getCrawlDelay() > maxCrawlDelay) { // unblock fetchQueues.finishFetchItem(fit, true); LOG.debug("Crawl-Delay for " + fit.url + " too long (" + rules.getCrawlDelay() + "), skipping"); output(fit.url, fit.datum, null, ProtocolStatus.STATUS_ROBOTS_DENIED, CrawlDatum.STATUS_FETCH_GONE); reporter.incrCounter("FetcherStatus", "robots_denied_maxcrawldelay", 1); continue; } else { FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID); fiq.crawlDelay = rules.getCrawlDelay(); } } //进行抓取 ProtocolOutput output = protocol.getProtocolOutput(fit.url, fit.datum); //抓取状态 ProtocolStatus status = output.getStatus(); //抓取内容 Content content = output.getContent(); ParseStatus pstatus = null; // unblock queue fetchQueues.finishFetchItem(fit); String urlString = fit.url.toString(); //计数 reporter.incrCounter("FetcherStatus", status.getName(), 1); //根据状态执行 // 如果状态为WOULDBLOCK,那就进行retry,把当前url放加FetchItemQueues中,进行重试 // 如果是MOVED或者TEMP_MOVED,这时这个网页可以被重定向了,对其重定向的内容进行解析,得到重定向的网址,这时要生成一个新的FetchItem,根据其QueueID放到相应的队列的inProgress集合中,然后再对这个重定向的网页进行抓取 // 如果状态是EXCEPTION,对当前url所属的FetchItemQueue进行检测,看其异常的网页数有没有超过最大异常网页数,如果大于,那就清空这个队列,认为这个队列中的所有网页都有问题。 // 如果状态是RETRY或者是BLOCKED,那就输出CrawlDatum,将其状态设置成STATUS_FETCH_RETRY,在下一轮进行重新抓取 // 如果状态是GONE,NOTFOUND,ACCESS_DENIED,ROBOTS_DENIED,那就输出CrawlDatum,设置其状态为STATUS_FETCH_GONE,可能在下一轮中就不进行抓取了, // 如果状态是NOTMODIFIED,那就认为这个网页没有改变过,那就输出其CrawlDatum,将其状态设成成STATUS_FETCH_NOTMODIFIED. // 如果所有状态都没有找到,那默认输出其CrawlDatum,将其状态设置成STATUS_FETCH_RETRY,在下一轮抓取中再重试 switch(status.getCode()) { case ProtocolStatus.WOULDBLOCK: // retry ? // 重试 fetchQueues.addFetchItem(fit); break; case ProtocolStatus.SUCCESS: // got a page //获得页面 pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth); updateStatus(content.getContent().length); //如果是ParseStatus.SUCCESS_REDIRECT if (pstatus != null && pstatus.isSuccess() && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { //获得跳转url String newUrl = pstatus.getMessage(); int refreshTime = Integer.valueOf(pstatus.getArgs()[1]); //跳转 Text redirUrl = handleRedirect(fit.url, fit.datum, urlString, newUrl, refreshTime < Fetcher.PERM_REFRESH_TIME, Fetcher.CONTENT_REDIR); //获得内容不为空 if (redirUrl != null) { //生成对应的CrawlDatum CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED, fit.datum.getFetchInterval(), fit.datum.getScore()); // transfer existing metadata to the redir newDatum.getMetaData().putAll(fit.datum.getMetaData()); scfilters.initialScore(redirUrl, newDatum); if (reprUrl != null) { newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, new Text(reprUrl)); } //生成对应的FetchItem fit = FetchItem.create(redirUrl, newDatum, queueMode); if (fit != null) { //放入队列中待抓取 FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID); fiq.addInProgressFetchItem(fit); } else { //跳转失败,计数 // stop redirecting redirecting = false; reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect", 1); } } } break; case ProtocolStatus.MOVED: // redirect case ProtocolStatus.TEMP_MOVED: int code; boolean temp; if (status.getCode() == ProtocolStatus.MOVED) { code = CrawlDatum.STATUS_FETCH_REDIR_PERM; temp = false; } else { code = CrawlDatum.STATUS_FETCH_REDIR_TEMP; temp = true; } output(fit.url, fit.datum, content, status, code); String newUrl = status.getMessage(); Text redirUrl = handleRedirect(fit.url, fit.datum, urlString, newUrl, temp, Fetcher.PROTOCOL_REDIR); if (redirUrl != null) { CrawlDatum newDatum = new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED, fit.datum.getFetchInterval(), fit.datum.getScore()); // transfer existing metadata newDatum.getMetaData().putAll(fit.datum.getMetaData()); scfilters.initialScore(redirUrl, newDatum); if (reprUrl != null) { newDatum.getMetaData().put(Nutch.WRITABLE_REPR_URL_KEY, new Text(reprUrl)); } fit = FetchItem.create(redirUrl, newDatum, queueMode); if (fit != null) { FetchItemQueue fiq = fetchQueues.getFetchItemQueue(fit.queueID); fiq.addInProgressFetchItem(fit); } else { // stop redirecting redirecting = false; reporter.incrCounter("FetcherStatus", "FetchItem.notCreated.redirect", 1); } } else { // stop redirecting redirecting = false; } break; case ProtocolStatus.EXCEPTION: logError(fit.url, status.getMessage()); int killedURLs = fetchQueues.checkExceptionThreshold(fit.getQueueID()); if (killedURLs!=0) reporter.incrCounter("FetcherStatus", "AboveExceptionThresholdInQueue", killedURLs); /* FALLTHROUGH */ case ProtocolStatus.RETRY: // retry case ProtocolStatus.BLOCKED: output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY); break; case ProtocolStatus.GONE: // gone case ProtocolStatus.NOTFOUND: case ProtocolStatus.ACCESS_DENIED: case ProtocolStatus.ROBOTS_DENIED: output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_GONE); break; case ProtocolStatus.NOTMODIFIED: output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_NOTMODIFIED); break; default: if (LOG.isWarnEnabled()) { LOG.warn("Unknown ProtocolStatus: " + status.getCode()); } output(fit.url, fit.datum, null, status, CrawlDatum.STATUS_FETCH_RETRY); } if (redirecting && redirectCount > maxRedirect) { fetchQueues.finishFetchItem(fit); if (LOG.isInfoEnabled()) { LOG.info(" - redirect count exceeded " + fit.url); } output(fit.url, fit.datum, null, ProtocolStatus.STATUS_REDIR_EXCEEDED, CrawlDatum.STATUS_FETCH_GONE); } } while (redirecting && (redirectCount <= maxRedirect)); } catch (Throwable t) { // unexpected exception // unblock fetchQueues.finishFetchItem(fit); logError(fit.url, StringUtils.stringifyException(t)); output(fit.url, fit.datum, null, ProtocolStatus.STATUS_FAILED, CrawlDatum.STATUS_FETCH_RETRY); } } } catch (Throwable e) { if (LOG.isErrorEnabled()) { LOG.error("fetcher caught:"+e.toString()); } } finally { if (fit != null) fetchQueues.finishFetchItem(fit); activeThreads.decrementAndGet(); // count threads LOG.info("-finishing thread " + getName() + ", activeThreads=" + activeThreads); } }
private ParseStatus output(Text key, CrawlDatum datum, Content content, ProtocolStatus pstatus, int status, int outlinkDepth) { //封装CrawlDatum datum.setStatus(status); datum.setFetchTime(System.currentTimeMillis()); if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus); ParseResult parseResult = null; if (content != null) { Metadata metadata = content.getMetadata(); //记录content type // store the guessed content type in the crawldatum if (content.getContentType() != null) datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE), new Text(content.getContentType())); // add segment to metadata metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName); // add score to content metadata so that ParseSegment can pick it up. try { scfilters.passScoreBeforeParsing(key, datum, content); } catch (Exception e) { if (LOG.isWarnEnabled()) { LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); } } /* Note: Fetcher will only follow meta-redirects coming from the * original URL. */ if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) { if (!skipTruncated || (skipTruncated && !ParseSegment.isTruncated(content))) { try { //对抓到的源码解析 parseResult = this.parseUtil.parse(content); } catch (Exception e) { LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e)); } } if (parseResult == null) { byte[] signature = SignatureFactory.getSignature(getConf()).calculate(content, new ParseStatus().getEmptyParse(conf)); datum.setSignature(signature); } } /* Store status code in content So we can read this value during * parsing (as a separate job) and decide to parse or not. */ content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status)); } try { output.collect(key, new NutchWritable(datum)); if (content != null && storingContent) output.collect(key, new NutchWritable(content)); if (parseResult != null) { for (Entry<Text, Parse> entry : parseResult) { Text url = entry.getKey(); Parse parse = entry.getValue(); ParseStatus parseStatus = parse.getData().getStatus(); ParseData parseData = parse.getData(); if (!parseStatus.isSuccess()) { LOG.warn("Error parsing: " + key + ": " + parseStatus); parse = parseStatus.getEmptyParse(getConf()); } // Calculate page signature. For non-parsing fetchers this will // be done in ParseSegment byte[] signature = SignatureFactory.getSignature(getConf()).calculate(content, parse); // Ensure segment name and score are in parseData metadata parseData.getContentMeta().set(Nutch.SEGMENT_NAME_KEY, segmentName); parseData.getContentMeta().set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature)); // Pass fetch time to content meta parseData.getContentMeta().set(Nutch.FETCH_TIME_KEY, Long.toString(datum.getFetchTime())); if (url.equals(key)) datum.setSignature(signature); try { scfilters.passScoreAfterParsing(url, content, parse); } catch (Exception e) { if (LOG.isWarnEnabled()) { LOG.warn("Couldn't pass score, url " + key + " (" + e + ")"); } } String fromHost; // collect outlinks for subsequent db update Outlink[] links = parseData.getOutlinks(); int outlinksToStore = Math.min(maxOutlinks, links.length); if (ignoreExternalLinks) { try { fromHost = new URL(url.toString()).getHost().toLowerCase(); } catch (MalformedURLException e) { fromHost = null; } } else { fromHost = null; } int validCount = 0; // Process all outlinks, normalize, filter and deduplicate List<Outlink> outlinkList = new ArrayList<Outlink>(outlinksToStore); HashSet<String> outlinks = new HashSet<String>(outlinksToStore); for (int i = 0; i < links.length && validCount < outlinksToStore; i++) { String toUrl = links[i].getToUrl(); toUrl = ParseOutputFormat.filterNormalize(url.toString(), toUrl, fromHost, ignoreExternalLinks, urlFilters, normalizers); if (toUrl == null) { continue; } validCount++; links[i].setUrl(toUrl); outlinkList.add(links[i]); outlinks.add(toUrl); } // Only process depth N outlinks if (maxOutlinkDepth > 0 && outlinkDepth < maxOutlinkDepth) { reporter.incrCounter("FetcherOutlinks", "outlinks_detected", outlinks.size()); // Counter to limit num outlinks to follow per page int outlinkCounter = 0; // Calculate variable number of outlinks by depth using the divisor (outlinks = Math.floor(divisor / depth * num.links)) int maxOutlinksByDepth = (int)Math.floor(outlinksDepthDivisor / (outlinkDepth + 1) * maxOutlinkDepthNumLinks); String followUrl; // Walk over the outlinks and add as new FetchItem to the queues Iterator<String> iter = outlinks.iterator(); while(iter.hasNext() && outlinkCounter < maxOutlinkDepthNumLinks) { followUrl = iter.next(); // Check whether we'll follow external outlinks if (outlinksIgnoreExternal) { if (!URLUtil.getHost(url.toString()).equals(URLUtil.getHost(followUrl))) { continue; } } reporter.incrCounter("FetcherOutlinks", "outlinks_following", 1); // Create new FetchItem with depth incremented FetchItem fit = FetchItem.create(new Text(followUrl), new CrawlDatum(CrawlDatum.STATUS_LINKED, interval), queueMode, outlinkDepth + 1); fetchQueues.addFetchItem(fit); outlinkCounter++; } } // Overwrite the outlinks in ParseData with the normalized and filtered set parseData.setOutlinks((Outlink[])outlinkList.toArray(new Outlink[outlinkList.size()])); output.collect(url, new NutchWritable( new ParseImpl(new ParseText(parse.getText()), parseData, parse.isCanonical()))); } } } catch (IOException e) { if (LOG.isErrorEnabled()) { LOG.error("fetcher caught:"+e.toString()); } } // return parse status if it exits if (parseResult != null && !parseResult.isEmpty()) { Parse p = parseResult.get(content.getUrl()); if (p != null) { reporter.incrCounter("ParserStatus", ParseStatus.majorCodes[p.getData().getStatus().getMajorCode()], 1); return p.getData().getStatus(); } } return null; } }
相关推荐
通过阅读源码,你可以学习到如何设计和实现大规模的网络爬虫系统,以及如何使用 Hadoop 进行分布式处理。 **使用与开发** 如果你打算使用或开发 Nutch,你需要有扎实的 Java 基础和一定的 Hadoop 知识。首先,解压...
### Nutch 1.2 源码阅读深入解析 #### Crawl类核心作用与流程概览 在深入了解Nutch 1.2源码之前,我们先明确Nutch的架构和工作流程。Nutch作为一款开源搜索引擎框架,其功能涵盖网页抓取、索引构建以及查询处理。...
Nutch 是一个高度可扩展且开放源码的网络爬虫项目,主要用于抓取和索引互联网上的数据。本篇将基于提供的文件内容对 Nutch 的参数设置进行深入解析,帮助读者更好地理解 Nutch 中各个组件的工作原理及配置方式。 ##...
- 打开Cygwin终端,进入Nutch源码目录,配置环境,执行编译和构建命令。 4. **Nutch1.7 测试** - 初始化Nutch的配置文件,根据需求修改`conf/nutch-site.xml`。 - 运行Nutch的基本命令,如抓取种子URL (`bin/...
特别地,Nutch中的Crawl和Fetch阶段展示了如何管理URL队列,以及如何决定何时重新抓取网页。 在学习Lucene和Nutch的源码时,你会遇到以下几个关键概念: 1. **分词**:Lucene使用Analyzer对输入文本进行分词,不同...
- 打开命令行,导航到Nutch源码目录,运行`mvn clean install -DskipTests`命令,这将编译Nutch并创建可执行的JAR文件。 4. **配置Nutch**: - 修改`conf/nutch-site.xml`文件,设置抓取策略、存储路径、抓取间隔...
1. **下载Nutch源码**:首先,从Apache官方网站或者镜像站点下载Nutch-0.9的源代码。将下载的源码解压到你想要的工作目录下,例如`C:\nutch\src\nutch-0.9`。 2. **配置环境变量**:打开Cygwin终端,设置必要的环境...
bin/nutch fetch bin/nutch update bin/nutch parse bin/nutch index ``` 以上命令将生成新的抓取批次、从Web服务器获取页面、更新数据库、解析页面内容并创建索引。 **步骤九:使用Solr或Elasticsearch建立索引** ...
**二、获取Nutch源码** 1. 访问Apache Nutch官方网站(http://nutch.apache.org/releases.html),下载Nutch 2.3.1的源码包。 2. 解压下载的源代码到你选择的工作目录,例如`/usr/local/src/nutch-2.3.1`。 **三、...
在安装Maven后,需要在命令行中执行相应的Maven命令来编译Nutch源码,生成可执行的JAR文件。 在环境准备完毕后,进入Hadoop的配置阶段。这包括修改Hadoop的配置文件如`core-site.xml`、`hdfs-site.xml`以及`mapred-...
3. 使用`bin/nutch`脚本执行各种任务,如`fetch`, `parse`, `index`等。 通过学习和研究Nutch 2.2.1的源码,你可以了解到网络爬虫的基本工作流程,理解如何处理大量网页数据,以及如何使用Hadoop进行分布式计算。这...
### Nutch源码分析 #### 一、Nutch概述及工作流程 Nutch是一个开源的Web爬虫项目,主要用于构建搜索引擎。Nutch基于Hadoop框架,能够高效地抓取、索引和搜索互联网上的信息。Nutch的工作流程主要包括以下几个步骤...
Nutch 的源码研究对于理解搜索引擎的工作原理和网页抓取技术非常有帮助。通过深入分析源码,开发者可以自定义抓取策略、优化性能,甚至开发新的协议插件以支持更多数据源。同时,Nutch 的设计思路也可以为其他分布式...
2. **获取Nutch源码** 从Apache官方网站下载Nutch的最新源代码,通常通过Git克隆仓库。解压后,进入Nutch的工作目录。 3. **配置Nutch** 打开`conf/nutch-site.xml`文件,这是Nutch的主要配置文件。以下是一些...
运行`mvn clean install`命令编译Nutch源码,生成可执行的jar文件。 4. **Cygwin环境**:在Windows系统中,由于Nutch依赖于一些Linux命令,所以可能需要安装Cygwin模拟Linux环境。文件名`cygwin.doc`可能是一个关于...
12. **获取Nutch源码**:使用SVN命令从Apache仓库中检出指定版本的Nutch源代码。 13. **构建项目**:使用Ant构建Nutch项目,并设置URL文件以及配置文件`nutch-site.xml`。 14. **启动爬虫**:通过Nutch提供的命令...
启动Nutch爬虫,可以使用`bin/nutch inject`命令将种子URL注入到爬虫队列,然后通过`bin/nutch fetch`, `bin/nutch updatedb`, `bin/nutch parse`, `bin/nutch index`等命令执行抓取流程。如果要在分布式模式下运行...
3. **Fetcher**:`Fetcher` 根据 `Generator` 生成的抓取列表下载网页,`Fetcher` 命令可设置线程数来控制并发抓取,下载后的网页源码存放在 `content` 文件夹,状态信息存放在 `crawl_fetch` 文件夹。 4. **Parse*...
在深入理解 Nutch 的工作原理之前,了解其源码是至关重要的。本文将解析 Nutch-0.9 版本中的 `Crawl` 类,它是 Nutch 抓取流程的起点。 `Crawl` 类位于 `org.apache.nutch.crawl` 包中,它包含了启动 Nutch 抓取...