`
tmj_159
  • 浏览: 705865 次
  • 性别: Icon_minigender_1
  • 来自: 永州
社区版块
存档分类
最新评论

Lucene 自定义索引文件的存取

 
阅读更多

前面介绍了Lucene的关于索引文件存取的API和一些接口,通过这些接口我们知道了,其实索引文件的读取也是通过流的方式来处理的。有了这个认识就可以通过接口做我们自己的实现。

 

先简单说下为什么要学习Lucene吧,目前我们项目组在做公司自己的分布式缓存的项目,使用缓存是很好,大大加快了常用数据的访问速度,这对增加系统IO是很有帮助的,缓存大都基于HASH 和TREE的索引结构,这两种结构基本满足了绝大部分的缓存查询需要,但是和数据一样,对模糊查询的效率很低,所以我们想利用Lucene来让我们的缓存支持全文检索。

大致的逻辑是这样,利用缓存API来添加一个切面来通过Lucene来做索引,然后查询的时候把传统的部分缓存查询转换成Lucene的查询,同时为了加快速度我们打算把索引文件存在缓存中。

 

Lucene 读写索引文件的时候都需要一个Directory的实例,Directory 是一个抽象类,所以我们只要根据这个抽象类实现自己的读写就行了。

package cn.tang.test;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.lucene.store.BaseDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.SingleInstanceLockFactory;
import org.apache.lucene.util.Accountable;

import com.tibco.as.space.ASException;
import com.tibco.as.space.Space;
import com.tibco.as.space.Tuple;
import com.tibco.as.space.browser.Browser;
import com.tibco.as.space.browser.BrowserDef;
import com.tibco.as.space.browser.BrowserDef.BrowserType;

public class ASDirectory extends BaseDirectory implements Accountable {
    protected final AtomicLong sizeInBytes = new AtomicLong();
    Space space = ASEnv.getInstance().getSpace("dataSpace");
    Space indexSpace = ASEnv.getInstance().getSpace("indexSpace");

    public ASDirectory() {
        try {
            // TODO need change after make clear with lucene lock.
            setLockFactory(new SingleInstanceLockFactory());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public long ramBytesUsed() {
        ensureOpen();
        return sizeInBytes.get();
    }

    @Override
    public void close() throws IOException {
        isOpen = false;
        try {
            indexSpace.clear();
            space.clear();
        } catch (ASException e) {
            e.printStackTrace();
        }
    }

    @Override
    public IndexOutput createOutput(String name, IOContext context) throws IOException {
        ensureOpen();
        ASWriter file = new ASWriter(name, this);
        return new ASOutputStream(file, true);

    }

    @Override
    public void deleteFile(String name) throws IOException {
        ensureOpen();
        Tuple queryTuple = Tuple.create();
        queryTuple.put(Constants.FIELD_FILE_NAME, name);

        try {
            Tuple tuple = indexSpace.take(queryTuple);
            if (tuple == null) {
                throw new FileNotFoundException(name);
            }

            // remove data from data space;
            String filter = Constants.FIELD_FILE_NAME + "='" + name + "'";
            Browser browse = space.browse(BrowserType.TAKE, BrowserDef.create(), filter);
            while (browse.next() != null) {
            }

        } catch (ASException e) {
            e.printStackTrace();
        }
    }

    @Override
    public boolean fileExists(String name) throws IOException {
        ensureOpen();
        Tuple queryTuple = Tuple.create();
        queryTuple.put(Constants.FIELD_FILE_NAME, name);

        try {
            Tuple tuple = indexSpace.get(queryTuple);
            if (tuple == null) {
                throw new FileNotFoundException(name);
            }
            return true;
        } catch (ASException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public long fileLength(String arg0) throws IOException {
        return 0;
    }

    @Override
    public String[] listAll() throws IOException {
        ensureOpen();
        List<String> allNames = new ArrayList<String>();
        try {
            Browser browse = indexSpace.browse(BrowserType.GET);
            Tuple tuple = null;
            while (true) {
                tuple = browse.next();
                if (tuple != null) {
                    allNames.add(tuple.getString(Constants.FIELD_FILE_NAME));
                } else {
                    break;
                }
            }
        } catch (ASException e) {
            e.printStackTrace();
        }
        return allNames.toArray(new String[] {});
    }

    @Override
    public IndexInput openInput(String name, IOContext context) throws IOException {
        ensureOpen();
        Tuple queryTuple = Tuple.create();
        queryTuple.put(Constants.FIELD_FILE_NAME, name);

        try {
            Tuple tuple = indexSpace.get(queryTuple);
            if (tuple == null) {
                throw new FileNotFoundException(name);
            }
            ASReader file = new ASReader(name);
            return new ASInputStream(name, file);
        } catch (ASException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void sync(Collection<String> arg0) throws IOException {
    }

}

 代码不复杂,只是通过这个类得到两个流,实际工作的是这两个流。对应的两个流分别是IndexInput 和IndexOutput 类型的。索引再实现这个两个流就可以使用了,下面的输入输出流是根据RAMInputStream 和RAMOutputSteam改过来的,这两个类抽象的很好,基本不用多少改变就可以用。

package cn.tang.test;

import java.io.EOFException;
import java.io.IOException;

import org.apache.lucene.store.IndexInput;

public class ASInputStream extends IndexInput implements Cloneable {
    static final int BUFFER_SIZE = Constants.BUFFER_SIZE;

    private final ASReader asReader;
    private final long length;

    private byte[] currentBuffer;
    private int currentBufferIndex;

    private int bufferPosition;
    private long bufferStart;
    private int bufferLength;

    public ASInputStream(String name, ASReader asReader) throws IOException {
        this(name, asReader, asReader.length);
    }

    ASInputStream(String name, ASReader asReader, long length) throws IOException {
        super("ASInputStream(name=" + name + ")");
        this.asReader = asReader;
        this.length = length;
        if (length / BUFFER_SIZE >= Integer.MAX_VALUE) {
            throw new IOException("ASInputStream too large length=" + length + ": " + name);
        }

        currentBufferIndex = -1;
        currentBuffer = null;
    }

    @Override
    public void close() {
        // nothing to do here
    }

    @Override
    public long length() {
        return length;
    }

    @Override
    public byte readByte() throws IOException {
        if (bufferPosition >= bufferLength) {
            currentBufferIndex++;
            switchCurrentBuffer(true);
        }
        return currentBuffer[bufferPosition++];
    }

    @Override
    public void readBytes(byte[] b, int offset, int len) throws IOException {
        while (len > 0) {
            if (bufferPosition >= bufferLength) {
                currentBufferIndex++;
                switchCurrentBuffer(true);
            }

            int remainInBuffer = bufferLength - bufferPosition;
            int bytesToCopy = len < remainInBuffer ? len : remainInBuffer;
            try {
                System.arraycopy(currentBuffer, bufferPosition, b, offset, bytesToCopy);
            } catch (Exception e) {
                e.printStackTrace();
            }
            offset += bytesToCopy;
            len -= bytesToCopy;
            bufferPosition += bytesToCopy;
        }
    }

    private final void switchCurrentBuffer(boolean enforceEOF) throws IOException {
        bufferStart = (long) BUFFER_SIZE * (long) currentBufferIndex;
        if (bufferStart > length || currentBufferIndex >= asReader.numBuffers()) {
            // end of file reached, no more buffers left
            if (enforceEOF) {
                throw new EOFException("read past EOF: " + this);
            } else {
                // Force EOF if a read takes place at this position
                currentBufferIndex--;
                bufferPosition = BUFFER_SIZE;
            }
        } else {
            currentBuffer = asReader.getBuffer(currentBufferIndex);
            bufferPosition = 0;
            long buflen = length - bufferStart;
            bufferLength = buflen > BUFFER_SIZE ? BUFFER_SIZE : (int) buflen;
        }
    }

    @Override
    public long getFilePointer() {
        return currentBufferIndex < 0 ? 0 : bufferStart + bufferPosition;
    }

    @Override
    public void seek(long pos) throws IOException {
        if (currentBuffer == null || pos < bufferStart || pos >= bufferStart + BUFFER_SIZE) {
            currentBufferIndex = (int) (pos / BUFFER_SIZE);
            switchCurrentBuffer(false);
        }
        bufferPosition = (int) (pos % BUFFER_SIZE);
    }

    @Override
    public IndexInput slice(String sliceDescription, final long offset, final long length) throws IOException {
        if (offset < 0 || length < 0 || offset + length > this.length) {
            throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this);
        }
        final String newResourceDescription = (sliceDescription == null) ? toString() : (toString() + " [slice=" + sliceDescription + "]");
        return new ASInputStream(newResourceDescription, asReader, offset + length) {
            {
                seek(0L);
            }

            @Override
            public void seek(long pos) throws IOException {
                if (pos < 0L) {
                    throw new IllegalArgumentException("Seeking to negative position: " + this);
                }
                super.seek(pos + offset);
            }

            @Override
            public long getFilePointer() {
                return super.getFilePointer() - offset;
            }

            @Override
            public long length() {
                return super.length() - offset;
            }

            @Override
            public IndexInput slice(String sliceDescription, long ofs, long len) throws IOException {
                return super.slice(sliceDescription, offset + ofs, len);
            }
        };
    }
}

 下面是ASOutputStream, 为什么都要以AS开头,呵呵其实我们的缓存简称就是AS。

package cn.tang.test;

import java.io.IOException;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.util.Accountable;

public class ASOutputStream extends IndexOutput implements Accountable {
    static final int BUFFER_SIZE = 1024;

    private final ASWriter asWriter;

    private byte[] currentBuffer;
    private int currentBufferIndex;

    private int bufferPosition;
    private long bufferStart;
    private int bufferLength;

    private final Checksum crc;

    public ASOutputStream(ASWriter asWriter, boolean checksum) {
        this.asWriter = asWriter;
        this.currentBufferIndex = -1;
        this.currentBuffer = null;

        if (checksum) {
            crc = new BufferedChecksum(new CRC32());
        } else {
            crc = null;
        }
    }

    @Override
    public void close() throws IOException {
        flush();
    }

    @Override
    public void writeByte(byte b) throws IOException {
        if (bufferPosition == bufferLength) {
            currentBufferIndex++;
            switchCurrentBuffer();
        }
        if (crc != null) {
            crc.update(b);
        }
        currentBuffer[bufferPosition++] = b;
    }

    @Override
    public void writeBytes(byte[] b, int offset, int len) throws IOException {
        assert b != null;
        if (crc != null) {
            crc.update(b, offset, len);
        }
        while (len > 0) {
            if (bufferPosition == bufferLength) {
                currentBufferIndex++;
                switchCurrentBuffer();
            }

            int remainInBuffer = currentBuffer.length - bufferPosition;
            int bytesToCopy = len < remainInBuffer ? len : remainInBuffer;
            System.arraycopy(b, offset, currentBuffer, bufferPosition, bytesToCopy);
            offset += bytesToCopy;
            len -= bytesToCopy;
            bufferPosition += bytesToCopy;
        }
    }

    private final void switchCurrentBuffer() {
        if (currentBuffer != null) {
            asWriter.writeBuffer(currentBufferIndex-1, currentBuffer,bufferPosition);
        }
        if (currentBufferIndex == asWriter.numBuffers()) {
            currentBuffer = asWriter.addBuffer(BUFFER_SIZE);
        } else {
            currentBuffer = asWriter.getBuffer(currentBufferIndex);
        }
        bufferPosition = 0;
        bufferStart = (long) BUFFER_SIZE * (long) currentBufferIndex;
        bufferLength = currentBuffer.length;
    }

    @Override
    public void flush() throws IOException {
        asWriter.writeBuffer(currentBufferIndex, currentBuffer,bufferPosition);
        long pointer = bufferStart + bufferPosition;
        if (pointer > asWriter.length) {
            asWriter.setLength(pointer);
        }
        asWriter.updateMetaData();
    }

    @Override
    public long getFilePointer() {
        return currentBufferIndex < 0 ? 0 : bufferStart + bufferPosition;
    }

    /** Returns byte usage of all buffers. */
    @Override
    public long ramBytesUsed() {
        return (long) asWriter.numBuffers() * (long) BUFFER_SIZE;
    }

    @Override
    public long getChecksum() throws IOException {
        if (crc == null) {
            throw new IllegalStateException("internal ASOutputStream created with checksum disabled");
        } else {
            return crc.getValue();
        }
    }
}

 同样基本不用改。

 这两个类中最重要的方法是SwitchCurrentBuffer() ,但是要理解这个方法首先要知道这两个类是工作的。我把类使用的实际读写的ASReader和ASWriter先贴出来。

package cn.tang.test;

import org.apache.lucene.util.Accountable;

import com.tibco.as.space.ASException;
import com.tibco.as.space.Space;
import com.tibco.as.space.Tuple;

public class ASReader implements Accountable {
    long length;
    int maxChunkNumber;

    protected long sizeInBytes;

    protected String name;

    private Space space;
    private Space indexSpace;

    public ASReader(String name) {
        this.name = name;
        space = ASEnv.getInstance().getSpace("dataSpace");
        indexSpace = ASEnv.getInstance().getSpace("indexSpace");

        Tuple queryTuple = Tuple.create();
        queryTuple.put(Constants.FIELD_FILE_NAME, name);
        try {
            Tuple tuple = indexSpace.get(queryTuple);
            this.length = tuple.getLong(Constants.LENGTH);
            this.maxChunkNumber = tuple.getInt(Constants.MAX_CHUNK_NUMBER);
        } catch (ASException e) {
            e.printStackTrace();
        }
    }

    public synchronized long getLength() {
        return length;
    }

    protected final synchronized byte[] getBuffer(int index) {
        Tuple queryTuple = Tuple.create();
        queryTuple.put(Constants.FIELD_CHUNK_NUMBER, index);
        queryTuple.put(Constants.FIELD_FILE_NAME, name);
        Tuple tuple = null;
        try {
            tuple = space.get(queryTuple);
        } catch (ASException e) {
            e.printStackTrace();
        }
        if (tuple != null) {
            byte[] buffer = tuple.getBlob(Constants.FIELD_DATA);
//            if (name.equals("segments.gen")) {
//                System.out.println("getBuffer " + name + "_" + index + "\t length:" + buffer.length);
//            }
            return buffer;
        } else {
            return null;
        }
    }

    protected final synchronized int numBuffers() {
        return maxChunkNumber;
    }

    @Override
    public synchronized long ramBytesUsed() {
        return length;
    }

}

 这里面最重要的当然是读byte数组的方法了,注意名字是getBuffer而且和普通的读取类不一样,没有单个byte的读取方法。等把ASWriter贴出来再进一步解释。

package cn.tang.test;

import org.apache.lucene.util.Accountable;

import com.tibco.as.space.ASException;
import com.tibco.as.space.Space;
import com.tibco.as.space.Tuple;

public class ASWriter implements Accountable {
    long length;
    int maxChunkNumber;
    ASDirectory directory;
    protected long sizeInBytes;

    protected String name;

    private Space space;
    private Space indexSpace;

    public ASWriter(String name, ASDirectory directory) {
        // for output stream, use directory to calc total size;
        this.directory = directory;
        this.name = name;
        space = ASEnv.getInstance().getSpace("dataSpace");
        indexSpace = ASEnv.getInstance().getSpace("indexSpace");
    }

    public synchronized long getLength() {
        return length;
    }

    public synchronized void setLength(long length) {
        this.length = length;
    }

    protected final byte[] addBuffer(int size) {
        byte[] buffer = new byte[size];
        maxChunkNumber++;
        return buffer;
    }

    protected final void updateMetaData() {
        Tuple tuple = Tuple.create();
        tuple.put(Constants.FIELD_FILE_NAME, name);
        tuple.put(Constants.MAX_CHUNK_NUMBER, maxChunkNumber);
        tuple.put(Constants.LENGTH, length);
        try {
            indexSpace.put(tuple);
        } catch (ASException e) {
            e.printStackTrace();
        }
    }

    protected final synchronized void writeBuffer(int index, byte[] buffer, int position) {
//        System.out.println("writeBuffer " + name + "_" + index + "\tsize:" + position);
        Tuple tuple = Tuple.create();
        tuple.put(Constants.FIELD_FILE_NAME, name);
        tuple.put(Constants.FIELD_CHUNK_NUMBER, index);
        byte[] updatedBuffer = new byte[position];
        System.arraycopy(buffer, 0, updatedBuffer, 0, position);
        tuple.put(Constants.FIELD_DATA, updatedBuffer);
        try {
            space.put(tuple);
        } catch (ASException e) {
            e.printStackTrace();
        }
    }

    protected final synchronized byte[] getBuffer(int index) {
        Tuple queryTuple = Tuple.create();
        queryTuple.put(Constants.FIELD_CHUNK_NUMBER, index);
        queryTuple.put(Constants.FIELD_FILE_NAME, name);
        Tuple tuple = null;
        try {
            tuple = space.get(queryTuple);
        } catch (ASException e) {
            e.printStackTrace();
        }
        if (tuple != null) {
            return tuple.getBlob(Constants.FIELD_DATA);
        } else {
            return null;
        }
    }

    protected final synchronized int numBuffers() {
        return maxChunkNumber;
    }

    @Override
    public synchronized long ramBytesUsed() {
        return sizeInBytes;
    }

}

 现在开始解答这个核心的问题,操作缓存不像文件的读取,直接放一个大文件中就行,磁盘来处理分块的问题,但是放入缓存中或者数据库中,这样对性能是有影响的,我们把这些数据分为对等的N块。

就像一辆火车,有N快,而这些带Stream的类就像是列车长(为什么不说是乘务员,因为有多个,哈哈),列车长查座位的时候需要从一个车厢一个车厢的来。

先考虑写。

ASOutputStream要写300个字节的数组,列车长安排座位了,一节车厢200人,刚开始一节车厢都没有,肯定不够了,怎么办(SwithCurrentBuffer()), 先去取一个车厢ASWriter.addBuffer(200),然后安排好了200人,还不够?继续(SwitchCurrentBuffer()),

在来个车厢(addBuffer(200)),剩下的100也安置好了,然后车准备开了,应该没有人来了吧(flush())。快下班了要准备交接的内容,统计下一共有多少人,ASWriter.updateLength()好了,下班。

然后考虑读。

ASInputSteram 这个列车长来了,首先我看下车上一共有多少人,ASReader.getLength(),哦,300呀,接到通知,365号没有买票?看第一节车厢看看(SwitchCurrentBuffer(1)),哦这车厢200人,肯定在后面的车厢,继续 SwitchCurrentBuffer(2),但是这车厢只有一半的人,人不够400,Throw EOFException("怎么搞的,找完了都没有这个人“).

 

大概就这样,后面的例子举的不够好,一共300人,要找365,直接比较是否 > length就好了。

 

到这里为止,解释完毕了,代码里面有很多AS的内容,简单解释下,SPace 的概念就像一个DB的Table :).

 

 

 

分享到:
评论

相关推荐

    Lucene读取索引文件

    《深入理解Lucene:解析索引文件的读取》 Lucene,作为一款强大的全文搜索引擎库,被广泛应用于各类信息检索系统中。它的核心功能之一就是构建和读取索引文件,以高效地进行文本搜索。本文将深入探讨Lucene如何读取...

    lucene自定义排序实现

    因此,了解如何在 Lucene 中实现自定义排序是非常关键的。在这个话题中,我们将深入探讨如何根据特定的业务需求对搜索结果进行定制排序。 首先,我们要明白 Lucene 默认的排序机制。默认情况下,Lucene 搜索结果是...

    lucene索引文件格式介绍

    索引文件格式是Lucene实现快速搜索的关键。以下是对Lucene索引文件格式的详细说明。 首先,我们要理解Lucene索引的基本结构。一个Lucene索引位于一个文件夹中,这个文件夹包含了多个段(Segment)。每个段是独立的...

    lucene索引查看文件

    这是Lucene3.0索引查看文件 直接运行jar包就可以打开ui界面 陪好路径就可以使用了

    Lucene索引文件查看工具lukeall4.7.1

    《深入理解Lucene索引文件查看工具LukeAll 4.7.1》 在信息检索领域,Lucene作为一款强大的全文搜索引擎库,被广泛应用在各种数据检索系统中。然而,对于开发者来说,理解并调试Lucene创建的索引文件并非易事。此时...

    luke源码--查看lucene索引文件

    《深入理解Luke:洞察Lucene索引文件》 在信息技术领域,搜索引擎的高效运作离不开对数据的快速检索,而Lucene作为开源全文检索库,扮演了核心角色。在这个过程中,Luke工具提供了一种直观的方式,让我们能够查看和...

    lucene 自定义评分

    然而,Lucene 的默认评分机制可能无法满足所有场景下的需求,这就需要我们对其进行自定义评分来实现特定的权重分配。本文将深入探讨如何在 Lucene 中实现自定义评分,以及它对提高搜索质量的重要性。 在 Lucene 中...

    Lucene 删除 合并索引

    Lucene 删除 合并索引,可以指定几个索引文件合并成一个索引文件。自己写的,有很多不足之处请多指教

    Lucene索引文件格式

    《Lucene索引文件格式详解》 Lucene,作为一款强大的全文搜索引擎库,其索引文件格式是实现高效搜索的关键。本文将深入解析Lucene 1.3版本的索引文件结构,帮助读者理解其内部运作机制。 首先,我们要理解Lucene...

    lucene.net 索引文件图形界面管理器

    描述中的“lucene.net 索引文件图形界面管理器”可能是指 NLuke,这是一个基于 .NET 的 Lucene 索引浏览器。NLuke 提供了可视化的界面,用户可以查看索引的详细信息,包括字段、分词、文档记录等。此外,NLuke 还...

    Lucene建索引及查询关键字

    在Eclipse环境中运用java,Lucene建索引及查询关键字

    lucene做索引查询流程

    lucene 做索引查询流程,来自《lucene in action》

    基于lucene技术的增量索引

    本文将深入探讨如何利用Lucene实现增量索引,这是一种在数据库或文件系统更新时仅对新数据或变化数据进行索引的技术,以降低资源消耗并保持搜索性能。 **1. Lucene基础知识** Lucene首先需要理解的是它的核心概念,...

    Lucene创建索引步骤

    Lucene创建索引步骤: 1、创建Directory(索引位置) 2、创建IndexWrite(写入索引) 3、创建Document对象 4、为Document添加Field(相当于添加属性:类似于表与字段的关系) 5、通过IndexWriter添加文档到索引中

    lucene索引查看工具及源码

    在使用 Lucene 进行信息检索时,有时我们需要对建立的索引进行查看、调试或分析,这时就需要借助 Lucene 的索引查看工具。 Luke 是一个非常实用的 Lucene 索引浏览器,全称为 Lucidworks Luke。它允许用户以图形化...

    Lucene对本地文件多目录创建索引

    标题中的“Lucene对本地文件多目录创建索引”指的是使用Apache Lucene库来构建一个搜索引擎,该搜索引擎能够索引本地计算机上的多个文件目录。Lucene是一个强大的全文搜索库,它允许开发者在Java应用程序中实现高级...

    lucene 对 xml建立索引

    ### Lucene对XML文档建立索引的技术解析与实践 #### 一、引言 随着互联网技术的迅猛发展,非结构化数据(如XML文档)在企业和组织中的应用日益广泛。如何高效地处理这些非结构化的数据,特别是进行快速检索成为了一...

    深入 Lucene 索引机制深入 Lucene 索引机制

    不同类型的文件索引需要不同的解析器,HTML和XML文件索引可能需要额外的步骤来过滤掉无用的标签内容。虽然这可能导致索引过程稍微慢一些,但不影响检索效率。 3.1 Demo 说明 在提供的示例中,`lucene-demos-1.4-...

    lucene 7索引查看工具 -luke-7.2.0-luke-release.zip

    Luke是一个方便的索引查看和诊断工具,可以访问Lucene构建的索引文件,显示和修改某些索引内容。luke 7.2.0 支持最新的 lucene7 索引查看 github地址:https://github.com/DmitryKey/luke/releases

Global site tag (gtag.js) - Google Analytics