- 浏览: 641604 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
liuche20083736:
非常好
从问题看本质: 研究TCP close_wait的内幕 -
xiaopohai85707:
优化算法与原来需求不符
过滤字符的性能调优?挤一挤还是有的 -
kmy_白衣:
生成的area图有时候 标签的数值和图标上看上去的数值不一致。 ...
OpenFlashChart2之恶心文档 -
tom&jerry:
大神,请教一个问题,按名称排序为何无效,用的2.4.3 XPA ...
深入浅出jackrabbit之十三 查询之AST和QT -
jd2bs:
改成精确匹配可以了< filter-mapping &g ...
细谈Ehcache页面缓存的使用
我们从文本提取的逻辑中走出来,回到主体流程。
在前面的文章中,我们可以看到一次索引创建的操作,可能会产生多个 persistentindex 对象,而这些对象其实代表着一个索引目录。随着创建索引的次数越来越多,那么索引目录也在增多,但是索引目录中的数据却不是很多,所以我们需要把多个目录合并,其实也就是索引的合并。
执行这个操作的类是 IndexMerger ,看其定义为:
class IndexMerger extends Thread implements IndexListener /*由此可见它是一个线程,并且同时充当着listener的角色,看看它的构造方法: */ IndexMerger(MultiIndex multiIndex) { this.multiIndex = multiIndex; setName("IndexMerger"); setDaemon(true); try { mergerIdle.acquire(); } catch (InterruptedException e) { // will never happen, lock is free upon construction throw new InternalError("Unable to acquire mutex after construction"); } }
还是一个 deamon 线程。而且一构造就来了一个 mergerIdle.acquire(); 真是迫不及待啊。啥意思啊?得到一把锁,一把非阻塞的锁。
在创建完 IndexMerger ,那么就有可能把 PersistentIndex 加进来了,因为 Merger 类必须知道哪些 PersistentIndex 是需要 Merger 的,那么我们看看负责这段逻辑的代码:这段代码主要负责 3 个功能,一个是初始化 indexBuckets ,这个一个 ArrayList ,其中放的是需要 Merger 的 PersistentIndex 的列表,也就是我们可以认为 indexBucket 里放的还是 list ,这里有一个非常奇怪的设计,就是在初始化的时候将 PersistentIndex 按照 docnums 的范围分组了,一组就是一个 indexBucket 。
第二个是把需要加入的 PersistentIndex 加入到对应的分组中。
第三个是判断是否需要合并,如果需要就加到一个队列中,等待被合并。
先看第一段代码:
synchronized (lock) { // initially create buckets if (indexBuckets.size() == 0) { long lower = 0; // default minMergeDocs is 100 long upper = minMergeDocs; //default maxMergeDocs is 2147483647 // IndexBucket实际上就是一个ArrayList while (upper < maxMergeDocs) { indexBuckets.add(new IndexBucket(lower, upper, true)); lower = upper + 1; //default mergeFactor is 10 upper *= mergeFactor; } // one with upper = maxMergeDocs indexBuckets.add(new IndexBucket(lower, maxMergeDocs, false)); // and another one as overflow, just in case... indexBuckets.add(new IndexBucket(maxMergeDocs + 1, Long.MAX_VALUE, false)); } ············
仔细阅读代码,我们发现,在初始化 indexBuckets 的代码中,其实按照范围来初始化的,比如当添加第一 IndexBucket 的时候 lower=0 , upper=100
即 new IndexBucket(0 100 , rue )
第二个则为: new IndexBucket(101, 100*10, t rue )
第三个则为: new IndexBucket(1001, 100*10*10, t rue )
第四个则为: new IndexBucket(10001, 100*10*10*10, t rue )
第五个则为: new IndexBucket(100001, 100*10*10*10*10, t rue )
````````````
一直持续下去直到 upper 小于 2147483647 ,且是 10 的最大幂。那么就是说 10 亿,当一个目录中有 10 亿个 document 的 index 数据时,这个目录将不再参与 merge 过程, indexBuckets 中总共有 8 个 IndexBucket, 不过在循环外面还有两个创建 IndexBucket 的语句,不过这两个都是不允许参加合并的,所以第 3 个参数是 false ,也就是说一共有 10 个,第九个是:
new IndexBucket(1000000001, 2147483647, false)
那么第十个是:
new IndexBucket(2147483648, 0x7fffffffffffffffL, false)
搞清楚 indexBuckets 的 初始化之后,我们再来看看第二个步骤,把根据 docNums 把对应的 persistentindex 加入到 IndexBucket 中 :
// put index in bucket IndexBucket bucket = (IndexBucket) indexBuckets.get(indexBuckets.size() - 1); for (int i = 0; i < indexBuckets.size(); i++) { bucket = (IndexBucket) indexBuckets.get(i); if (bucket.fits(numDocs)) { break; } } /*如果indexBuckets 没有值,那么就把Index 添加到第10个IndexBucket中,否则就从indexBuckets 的第一IndexBucket开始匹配,根据numDocs的值放到对应的IndexBucket中。*/ bucket.add(new Index(name, numDocs)); if (log.isDebugEnabled()) { log.debug("index added: name=" + name + ", numDocs=" + numDocs); } // if bucket does not allow merge, we don't have to continue //如果是最后两个IndexBucket,那么即刻退出 if (!bucket.allowsMerge()) { return; } /*这段代码没有什么难的,接着看第3个步骤: */ // check if we need a merge //超过indexbucket中超过10个元素<其实就是10个目录>则开始合并 if (bucket.size() >= mergeFactor) { long targetMergeDocs = bucket.upper; targetMergeDocs = Math.min(targetMergeDocs * mergeFactor, maxMergeDocs); // sum up docs in bucket List indexesToMerge = new ArrayList(); int mergeDocs = 0; for (Iterator it = bucket.iterator(); it.hasNext() && mergeDocs <= targetMergeDocs;) { indexesToMerge.add(it.next()); } /* 结合上下文,indexesToMerge.size()这值会小于2吗?????*/ if (indexesToMerge.size() > 2) { // found merge Index[] idxs = (Index[]) indexesToMerge.toArray(new Index[indexesToMerge.size()]); bucket.removeAll(indexesToMerge); if (log.isDebugEnabled()) { log.debug("requesting merge for " + indexesToMerge); } mergeTasks.add(new Merge(idxs)); log.debug("merge queue now contains " + mergeTasks.size() + " tasks."); } }
这段代码的主要功能是把 indexbucket 里的 persistentindex 信息拿出来,而且量超过 2 的话就把他们加入到一个队列中,并将它们从该 indexbucket 里删除。通过这个步骤,那么 mergeTasks 队列中就存在一些需要合并的 index 了。
中场总结:
通过上面的方法和前面的索引提交的文章我们得到一些重要信息:当用户把 ramdirectory 中超过 100 的 docs 的 index data 刷到 fsdirectory 中时,新建一个目录,作为这个新 fsdirectory 的目录,接着把这个 fsdirectory 对应的 PersistentIndex 加到 IndexMerger 类的某个 IndexBucket 中,接着当某个 IndexBucket 中的 PersistentIndex 数量(即这些目录的数量)超过 10 ( mergefactor )的时候,就会执行合并的操作。
那么下面的问题是,合并之后,这 10 个目录将会何去何从,它们是把另外 9 个合并到其中一个中去呢还是怎么滴?接着看吧。
显然,这里又用到生产消费模型,任何调用 indexAdded 方法的都属性生产者,生产者根据一些条件,有选择的把需要合并的 persistentindex 放到 mergeTasks 的队列中,有了生产者肯定存在消费者,文章开头提过, IndexMerger 类是一个 deamon 线程,看看它的 run 方法,那么就发现,其实它就是消费者。它主要完成以下几个功能:
1 判断消费者是否空闲
2 判断队列中是否有退出命令
3 如果空闲则进入 wait 状态
4 根据 persistentindex 的名字取到所有的 persistentindex
的 IndexReader 对象
5 再创建一个新的 PersistentIndex, , 原来的 index 文件合并到这个新的目录中
6 将前面的 IndexReader 对象添加到 PersistentIndex 的 indexwriter 方法中,并执行 optimize 。
7 关闭这些 readers
8 根据名字删除已经被合并的 PersistentIndex 的索引文件和目录等。
我们再来看看代码,代码中已经加入了 ahuaxuan 的注释:
public void run() { for (;;) { boolean isIdle = false; //队列长度为0,表示消费者处于空闲状态,那么会进入wait状态 if (mergeTasks.size() == 0) { mergerIdle.release(); isIdle = true; } /*2判断队列中是否有退出命令 */ Merge task = (Merge) mergeTasks.remove(); if (task == QUIT) { mergerIdle.release(); break; } if (isIdle) { try { mergerIdle.acquire(); } catch (InterruptedException e) { Thread.interrupted(); log.warn("Unable to acquire mergerIdle sync"); } } log.debug("accepted merge request"); // reset deleted documents deletedDocuments.clear(); // get readers /*4 根据persistentindex的名字取到所有的persistentindex 的IndexReader对象 */ String[] names = new String[task.indexes.length]; for (int i = 0; i < task.indexes.length; i++) { names[i] = task.indexes[i].name; } try { log.debug("create new index"); /*再创建一个新的PersistentIndex,原来的index文件合并到这个新的目录中 */ PersistentIndex index = multiIndex.getOrCreateIndex(null); boolean success = false; try { log.debug("get index readers from MultiIndex"); IndexReader[] readers = multiIndex.getIndexReaders(names, this); try { // do the merge long time = System.currentTimeMillis(); /*6 将前面的IndexReader对象添加到PersistentIndex的indexwriter方法中,并执行optimize。 */ index.addIndexes(readers); time = System.currentTimeMillis() - time; int docCount = 0; for (int i = 0; i < readers.length; i++) { docCount += readers[i].numDocs(); } log.info("merged " + docCount + " documents in " + time + " ms into " + index.getName() + "."); } finally { for (int i = 0; i < readers.length; i++) { /*7 关闭这些readers */ try { readers[i].close(); } catch (IOException e) { log.warn("Unable to close IndexReader: " + e); } } } // inform multi index // if we cannot get the sync immediately we have to quit if (!indexReplacement.attempt(0)) { log.debug("index merging canceled"); break; } try { log.debug("replace indexes"); multiIndex.replaceIndexes(names, index, deletedDocuments); } finally { indexReplacement.release(); } success = true; } finally { if (!success) { // delete index log.debug("deleting index " + index.getName()); /*8 根据名字删除已经被合并的PersistentIndex的索引文件和目录等。 */ multiIndex.deleteIndex(index); } } } catch (Throwable e) { log.error("Error while merging indexes: " + e); } } log.info("IndexMerger terminated"); }
看到这里爱思考的同学们一定会意识到这里还漏了什么,是什么呢?前面讲到,一个 bucket 中超过 10 个目录,会被合并一个新的目录,那么也就是说这个新目录中至少有 1000 个 document 的索引数据,这样下来,如果我有 100000 个节点,而且恰好每个目录中之后 1000 个 document 的数据,那么就得用 100 个目录来存储数据了。这样带来的问题是,每做一次查询,都需要把 100 个 indexReader 传给 search ,即使使用多线程并行搜索,那目录数也还是太多了,而且如果是 100w 个节点,那就更不得了了,所以 jackrabbit 中一定还有机制会把这些目录合并成更大目录的逻辑。为什么这么说,因为之前在创建 indexbucket 中的时候,分了 8 个允许合并的段,而上面的逻辑只会用到前面一个 bucket ,后面的几个肯定是有用处的,那么是谁来触发它们的,它们在哪里呢?
我们看到在上面的 run 方法中,我们有一个方法没有讲到: multiIndex .replaceIndexes(names, index, deletedDocuments );
我们将会在这个方法中寻找到真相,同样, ahuaxuan 在代码中加入了自己的注释
/* obsoleteIndexes 是需要被删除的 dir ,因为他们的数据已经被合并到新的目录里, index 参数则表示那个对应那个新目录的 PersistentIndex , deleted 表示需要被删除的类 */
void replaceIndexes(String[] obsoleteIndexes, PersistentIndex index, Collection deleted) throws IOException { /*在multiIndex中,到处都是synchronized ,而且都是锁定multiindex对象,为啥呢? 详见后文*/ synchronized (this) { /*这段代码在multiIndex#update方法中也出现过,你知道它的用途吗,其实可以猜出来*/ synchronized (updateMonitor) { updateInProgress = true; } try { // if we are reindexing there is already an active transaction if (!reindexing) { executeAndLog(new Start(Action.INTERNAL_TRANS_REPL_INDEXES)); } // delete obsolete indexes /*10个目录已经合并成一个了,那这个10个目录该删的就删,不需要犹豫*/ Set names = new HashSet(Arrays.asList(obsoleteIndexes)); for (Iterator it = names.iterator(); it.hasNext();) { // do not try to delete indexes that are already gone String indexName = (String) it.next(); if (indexNames.contains(indexName)) { executeAndLog(new DeleteIndex(getTransactionId(), indexName)); } } // Index merger does not log an action when it creates the target // index of the merge. We have to do this here. /*还记得CreateIndex的作用吗?复习一下:根据名字获取PersistentIndex对象,如果名字不存在或者为null,则新建一个PersistentIndex对象,罗嗦一句,一个PersistentIndex代表一个目录*/ executeAndLog(new CreateIndex(getTransactionId(), index.getName())); /*又来了AddIndex对象,还记得它的作用吗,将这个persistentIndex加入到*/ executeAndLog(new AddIndex(getTransactionId(), index.getName())); // delete documents in index for (Iterator it = deleted.iterator(); it.hasNext();) { Term id = (Term) it.next(); index.removeDocument(id); } index.commit(); if (!reindexing) { // only commit if we are not reindexing // when reindexing the final commit is done at the very end executeAndLog(new Commit(getTransactionId())); } } finally { synchronized (updateMonitor) { updateInProgress = false; updateMonitor.notifyAll(); releaseMultiReader(); } } } if (reindexing) { // do some cleanup right away when reindexing attemptDelete(); } }
看完这段方法,我们发现,小的目录合并成大目录之后,这个大目录又被加到 indexbucket 等待下一次被合并,如此递归,一直当一个目录的 document 的 index 数据超过 10 亿,那么就不会再合并了, ahuaxuan 画了一张图:
=========================================================================
图中的0-100表示最基层的目录级别,这些目录只包含0-100个document的index数据,而默认参数情况下这些目录根本用不着,因为在前面的流程中,我们看到,ramdirectory中的数据只有满100才会加入到fsdirectory中,这意味着一开始用到的目录就是101-1000级别的目录(101-1000的目录表示这些目录中的document的index数据也只有101-1000个这个范围。)。这种目录超过10个就会合并成一个新目录。依次类推高层目录。见图中ahuaxuan的注释
说到这里,大部分人都知道了,很多参数可以控制合并的调优,这些参数在前文已经讲过了,不再赘述。
到这里,IndexMerger的主体流程基本上完成了,其实就是一个生产-消费模型+小目录生产大目录,大目录生成更大目录的算法,这样做的好处是什么?当然是尽量少改动索引文件,应该说是便于分布式的查询架构。但是在后文中,我们会详细分析jackrabbit还没有为分布式查询准备好的原因,它的这块设计还有待改进,人无完人,框架亦是如此,不用过于苛求,也不必抱怨,用的不爽,那么就---改它,再不行---重新实现(某个模块或者全部)。
TO BE CONTINUE
发表评论
-
深入浅出jcr之16 该死的RMI,我们需要HTTP+简单RPC协议
2009-12-12 13:22 6720从这篇文 ... -
深入浅出jackrabbit之十五 文档提取优化2.docx
2009-10-22 18:38 3886/** *author:ahuaxuan *2009- ... -
深入浅出jackrabbit之十四 分布式文档提取
2009-09-24 12:20 4744/** *author:ahuaxuan *200 ... -
深入浅出jackrabbit之十三 查询之AST和QT
2009-09-10 10:12 3447简介:在前面的文章中 ... -
深入浅出jcr之十二 key-value存储系统
2009-08-26 09:31 3772作者:ahuaxuan 在写文章方面,惰性心理 ... -
深入浅出jcr之十一 jackrabbit改进要点
2009-08-18 18:22 3631作者,ahuaxuan 在看过前 ... -
深入浅出jcr之十 redolog 和 recovery.docx
2009-08-18 18:14 2173作者:ahuaxuan 在前面的 ... -
深入浅出 jackrabbit 九 索引合并(下)
2009-07-22 14:16 2118在上文中,ahuaxuan讲到了索引创建的主体流程,但是索引合 ... -
深入浅出 jackrabbit 七 文本提取(下)
2009-07-21 17:29 2531接上文,说到文本提取,在上一篇文章中,我们是管中窥豹,并没有把 ... -
深入浅出 jackrabbit 六 文本提取(上)
2009-07-21 17:27 3327用lucene作过索引的同 ... -
深入浅出 jackrabbit 之五 索引提交(下)
2009-07-14 17:53 2275接上文,在上面一篇文章中,我们谈到了update中的Delet ... -
深入浅出 jackrabbit 之四 索引提交(上)
2009-07-14 09:10 2970在上上篇文章中,我们了解了创建索引的一般流程,在上篇文章中,我 ... -
深入浅出 jackrabbit 3 创建 document
2009-07-01 13:03 4329/** *作者:ahuaxuan 张荣华 *日期:2009-0 ... -
深入浅出 jackrabbit 2 索引概览
2009-06-30 08:51 5383任何一个数据库都离不 ... -
深入浅出 jackrabbit 十 查询概览
2009-06-20 10:29 6301/** *author: ahuaxuan *date: ... -
深入浅出 jackrabbit 1
2009-05-19 18:31 12484/** * author:ahuaxuan( ...
相关推荐
《深入浅出 Jackrabbit 1》 Jackrabbit 是一个开源的、实现了 Java Content Repository (JCR) API 的内容管理系统,它允许程序通过统一的方式访问、存储和管理各种数据,包括文本、图像、视频等多媒体信息。这篇...
这个“jackrabbit最全入门教程”将会带你深入理解这个强大的内容管理解决方案。 首先,我们需要了解什么是JCR。JCR提供了一种统一的方式来访问和管理数字内容,无论这些内容是文档、图像、视频还是其他形式的数据。...
JackRabbit学习参考资料总汇涉及了深入浅出的JackRabbit内容仓库API的学习,内容涉及多个专题,整个学习资料是PDF文档格式。从标签来看,这份资料主要涉及JackRabbit以及JCR(Java Content Repository)的内容仓库...
jackrabbit, 在amqplib上,简单的amqp/rabbitmq作业队列基于 node Jackrabbitnode.js 在不讨厌生命的情况下。producer.js:var jackrabbit = require('jackrabbit');var rabbit = jackrabbit(process
### Jackrabbit 在项目实施中的常见问题与解决方案 #### 一、Jackrabbit简介 Jackrabbit 是一个完全用 Java 编写的 JCR(Java Content Repository)实现,它可以作为一个独立的服务运行,也可以嵌入到更大的应用...
jackrabbit 1.5.6 jar
在本文中,我们将深入探讨Apache Jackrabbit的基础知识,以及如何开始使用它。 一、JCR和Apache Jackrabbit的概念 1. JCR:JCR为存储和检索非结构化信息提供了一个模型和API。它允许开发者创建可以跨各种存储后端...
Apache Jackrabbit API 是一个强大的内容管理系统(CMS)的核心组件,它是Apache Software Foundation 开发的Java Content Repository (JCR) 的实现。JCR 是一个标准,它定义了一个用于存储、管理和检索结构化内容的...
jackrabbit-standalone-1.6.5.jar是webDav的支持jar包。
Apache Jackrabbit 是一个...对于开发人员来说,理解这些功能以及如何将 Jackrabbit 集成到现有应用中是深入学习的关键部分。通过实践示例代码和探索 Jackrabbit API 文档,你可以逐步掌握这个强大的内容管理系统框架。
Apache Jackrabbit是一个开源的、实现了Java Content ...总之,这个"Jackrabbit入门实例"是学习和探索JCR和Jackrabbit的好起点,它涵盖了基本的操作和概念,帮助你快速上手并深入了解这个强大的内容管理系统。
标题中的“查看jackrabbit仓库的小工具”指的是一个用于观察和管理Apache Jackrabbit仓库的实用程序。Jackrabbit是Java Content Repository (JCR) API的一个开源实现,它提供了一个内容管理系统(CMS)的基础框架,...
通过深入理解和使用"jackrabbit-webdav-2.3.2.src.zip"中的源代码,开发者不仅可以学习WebDAV协议的工作原理,还能了解如何在Android环境中实现高效稳定的WebDAV客户端功能。此外,对于想要对Jackrabbit进行定制化...
标题中的"jackrabbit-webdav-2.7.1.zip"指的是Apache Jackrabbit的一个特定版本——2.7.1的WebDAV模块的压缩包。Apache Jackrabbit是Java内容存储库(Content Repository)的一个实现,它遵循JCR(Java Content ...
综上所述,Apache Jackrabbit 2.6 是一个强大而灵活的内容管理解决方案,它为开发者提供了丰富的功能和API,以构建高效、可扩展的CMS应用。其对JCR规范的支持以及与多种系统的兼容性,使其成为构建企业级内容管理...
jackrabbit-webdav-2.1.0.jar 具体用法可以网上查找
这两个项目将帮助我们深入理解和快速入门Jackrabbit的使用。 1. Jackrabbit核心概念: - JCR:JSR 170定义了内容存储的标准接口,使得应用程序可以透明地访问和操作不同类型的存储系统。 - Node:在JCR中,内容被...
jackrabbit开发用jar包,jackrabbit是基于Lucene的一种站内搜索技术,它用xml文件为他的元数据,自动穿件索引,使用xpath或者xquery的查询方法。
jackrabbit-api-1.5.0.jar