先简单说下为什么要学习Lucene吧,目前我们项目组在做公司自己的分布式缓存的项目,使用缓存是很好,大大加快了常用数据的访问速度,这对增加系统IO是很有帮助的,缓存大都基于HASH 和TREE的索引结构,这两种结构基本满足了绝大部分的缓存查询需要,但是和数据一样,对模糊查询的效率很低,所以我们想利用Lucene来让我们的缓存支持全文检索。
Lucene 读写索引文件的时候都需要一个Directory的实例,Directory 是一个抽象类,所以我们只要根据这个抽象类实现自己的读写就行了。
package cn.tang.test; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.lucene.store.BaseDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.SingleInstanceLockFactory; import org.apache.lucene.util.Accountable; import com.tibco.as.space.ASException; import com.tibco.as.space.Space; import com.tibco.as.space.Tuple; import com.tibco.as.space.browser.Browser; import com.tibco.as.space.browser.BrowserDef; import com.tibco.as.space.browser.BrowserDef.BrowserType; public class ASDirectory extends BaseDirectory implements Accountable { protected final AtomicLong sizeInBytes = new AtomicLong(); Space space = ASEnv.getInstance().getSpace("dataSpace"); Space indexSpace = ASEnv.getInstance().getSpace("indexSpace"); public ASDirectory() { try { // TODO need change after make clear with lucene lock. setLockFactory(new SingleInstanceLockFactory()); } catch (IOException e) { e.printStackTrace(); } } @Override public long ramBytesUsed() { ensureOpen(); return sizeInBytes.get(); } @Override public void close() throws IOException { isOpen = false; try { indexSpace.clear(); space.clear(); } catch (ASException e) { e.printStackTrace(); } } @Override public IndexOutput createOutput(String name, IOContext context) throws IOException { ensureOpen(); ASWriter file = new ASWriter(name, this); return new ASOutputStream(file, true); } @Override public void deleteFile(String name) throws IOException { ensureOpen(); Tuple queryTuple = Tuple.create(); queryTuple.put(Constants.FIELD_FILE_NAME, name); try { Tuple tuple = indexSpace.take(queryTuple); if (tuple == null) { throw new FileNotFoundException(name); } // remove data from data space; String filter = Constants.FIELD_FILE_NAME + "='" + name + "'"; Browser browse = space.browse(BrowserType.TAKE, BrowserDef.create(), filter); while (browse.next() != null) { } } catch (ASException e) { e.printStackTrace(); } } @Override public boolean fileExists(String name) throws IOException { ensureOpen(); Tuple queryTuple = Tuple.create(); queryTuple.put(Constants.FIELD_FILE_NAME, name); try { Tuple tuple = indexSpace.get(queryTuple); if (tuple == null) { throw new FileNotFoundException(name); } return true; } catch (ASException e) { e.printStackTrace(); } return false; } @Override public long fileLength(String arg0) throws IOException { return 0; } @Override public String[] listAll() throws IOException { ensureOpen(); List<String> allNames = new ArrayList<String>(); try { Browser browse = indexSpace.browse(BrowserType.GET); Tuple tuple = null; while (true) { tuple = browse.next(); if (tuple != null) { allNames.add(tuple.getString(Constants.FIELD_FILE_NAME)); } else { break; } } } catch (ASException e) { e.printStackTrace(); } return allNames.toArray(new String[] {}); } @Override public IndexInput openInput(String name, IOContext context) throws IOException { ensureOpen(); Tuple queryTuple = Tuple.create(); queryTuple.put(Constants.FIELD_FILE_NAME, name); try { Tuple tuple = indexSpace.get(queryTuple); if (tuple == null) { throw new FileNotFoundException(name); } ASReader file = new ASReader(name); return new ASInputStream(name, file); } catch (ASException e) { e.printStackTrace(); } return null; } @Override public void sync(Collection<String> arg0) throws IOException { } }
代码不复杂,只是通过这个类得到两个流,实际工作的是这两个流。对应的两个流分别是IndexInput 和IndexOutput 类型的。索引再实现这个两个流就可以使用了,下面的输入输出流是根据RAMInputStream 和RAMOutputSteam改过来的,这两个类抽象的很好,基本不用多少改变就可以用。
package cn.tang.test; import java.io.EOFException; import java.io.IOException; import org.apache.lucene.store.IndexInput; public class ASInputStream extends IndexInput implements Cloneable { static final int BUFFER_SIZE = Constants.BUFFER_SIZE; private final ASReader asReader; private final long length; private byte[] currentBuffer; private int currentBufferIndex; private int bufferPosition; private long bufferStart; private int bufferLength; public ASInputStream(String name, ASReader asReader) throws IOException { this(name, asReader, asReader.length); } ASInputStream(String name, ASReader asReader, long length) throws IOException { super("ASInputStream(name=" + name + ")"); this.asReader = asReader; this.length = length; if (length / BUFFER_SIZE >= Integer.MAX_VALUE) { throw new IOException("ASInputStream too large length=" + length + ": " + name); } currentBufferIndex = -1; currentBuffer = null; } @Override public void close() { // nothing to do here } @Override public long length() { return length; } @Override public byte readByte() throws IOException { if (bufferPosition >= bufferLength) { currentBufferIndex++; switchCurrentBuffer(true); } return currentBuffer[bufferPosition++]; } @Override public void readBytes(byte[] b, int offset, int len) throws IOException { while (len > 0) { if (bufferPosition >= bufferLength) { currentBufferIndex++; switchCurrentBuffer(true); } int remainInBuffer = bufferLength - bufferPosition; int bytesToCopy = len < remainInBuffer ? len : remainInBuffer; try { System.arraycopy(currentBuffer, bufferPosition, b, offset, bytesToCopy); } catch (Exception e) { e.printStackTrace(); } offset += bytesToCopy; len -= bytesToCopy; bufferPosition += bytesToCopy; } } private final void switchCurrentBuffer(boolean enforceEOF) throws IOException { bufferStart = (long) BUFFER_SIZE * (long) currentBufferIndex; if (bufferStart > length || currentBufferIndex >= asReader.numBuffers()) { // end of file reached, no more buffers left if (enforceEOF) { throw new EOFException("read past EOF: " + this); } else { // Force EOF if a read takes place at this position currentBufferIndex--; bufferPosition = BUFFER_SIZE; } } else { currentBuffer = asReader.getBuffer(currentBufferIndex); bufferPosition = 0; long buflen = length - bufferStart; bufferLength = buflen > BUFFER_SIZE ? BUFFER_SIZE : (int) buflen; } } @Override public long getFilePointer() { return currentBufferIndex < 0 ? 0 : bufferStart + bufferPosition; } @Override public void seek(long pos) throws IOException { if (currentBuffer == null || pos < bufferStart || pos >= bufferStart + BUFFER_SIZE) { currentBufferIndex = (int) (pos / BUFFER_SIZE); switchCurrentBuffer(false); } bufferPosition = (int) (pos % BUFFER_SIZE); } @Override public IndexInput slice(String sliceDescription, final long offset, final long length) throws IOException { if (offset < 0 || length < 0 || offset + length > this.length) { throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: " + this); } final String newResourceDescription = (sliceDescription == null) ? toString() : (toString() + " [slice=" + sliceDescription + "]"); return new ASInputStream(newResourceDescription, asReader, offset + length) { { seek(0L); } @Override public void seek(long pos) throws IOException { if (pos < 0L) { throw new IllegalArgumentException("Seeking to negative position: " + this); } super.seek(pos + offset); } @Override public long getFilePointer() { return super.getFilePointer() - offset; } @Override public long length() { return super.length() - offset; } @Override public IndexInput slice(String sliceDescription, long ofs, long len) throws IOException { return super.slice(sliceDescription, offset + ofs, len); } }; } }
下面是ASOutputStream, 为什么都要以AS开头,呵呵其实我们的缓存简称就是AS。
package cn.tang.test; import java.io.IOException; import java.util.zip.CRC32; import java.util.zip.Checksum; import org.apache.lucene.store.BufferedChecksum; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.util.Accountable; public class ASOutputStream extends IndexOutput implements Accountable { static final int BUFFER_SIZE = 1024; private final ASWriter asWriter; private byte[] currentBuffer; private int currentBufferIndex; private int bufferPosition; private long bufferStart; private int bufferLength; private final Checksum crc; public ASOutputStream(ASWriter asWriter, boolean checksum) { this.asWriter = asWriter; this.currentBufferIndex = -1; this.currentBuffer = null; if (checksum) { crc = new BufferedChecksum(new CRC32()); } else { crc = null; } } @Override public void close() throws IOException { flush(); } @Override public void writeByte(byte b) throws IOException { if (bufferPosition == bufferLength) { currentBufferIndex++; switchCurrentBuffer(); } if (crc != null) { crc.update(b); } currentBuffer[bufferPosition++] = b; } @Override public void writeBytes(byte[] b, int offset, int len) throws IOException { assert b != null; if (crc != null) { crc.update(b, offset, len); } while (len > 0) { if (bufferPosition == bufferLength) { currentBufferIndex++; switchCurrentBuffer(); } int remainInBuffer = currentBuffer.length - bufferPosition; int bytesToCopy = len < remainInBuffer ? len : remainInBuffer; System.arraycopy(b, offset, currentBuffer, bufferPosition, bytesToCopy); offset += bytesToCopy; len -= bytesToCopy; bufferPosition += bytesToCopy; } } private final void switchCurrentBuffer() { if (currentBuffer != null) { asWriter.writeBuffer(currentBufferIndex-1, currentBuffer,bufferPosition); } if (currentBufferIndex == asWriter.numBuffers()) { currentBuffer = asWriter.addBuffer(BUFFER_SIZE); } else { currentBuffer = asWriter.getBuffer(currentBufferIndex); } bufferPosition = 0; bufferStart = (long) BUFFER_SIZE * (long) currentBufferIndex; bufferLength = currentBuffer.length; } @Override public void flush() throws IOException { asWriter.writeBuffer(currentBufferIndex, currentBuffer,bufferPosition); long pointer = bufferStart + bufferPosition; if (pointer > asWriter.length) { asWriter.setLength(pointer); } asWriter.updateMetaData(); } @Override public long getFilePointer() { return currentBufferIndex < 0 ? 0 : bufferStart + bufferPosition; } /** Returns byte usage of all buffers. */ @Override public long ramBytesUsed() { return (long) asWriter.numBuffers() * (long) BUFFER_SIZE; } @Override public long getChecksum() throws IOException { if (crc == null) { throw new IllegalStateException("internal ASOutputStream created with checksum disabled"); } else { return crc.getValue(); } } }
这两个类中最重要的方法是SwitchCurrentBuffer() ,但是要理解这个方法首先要知道这两个类是工作的。我把类使用的实际读写的ASReader和ASWriter先贴出来。
package cn.tang.test; import org.apache.lucene.util.Accountable; import com.tibco.as.space.ASException; import com.tibco.as.space.Space; import com.tibco.as.space.Tuple; public class ASReader implements Accountable { long length; int maxChunkNumber; protected long sizeInBytes; protected String name; private Space space; private Space indexSpace; public ASReader(String name) { this.name = name; space = ASEnv.getInstance().getSpace("dataSpace"); indexSpace = ASEnv.getInstance().getSpace("indexSpace"); Tuple queryTuple = Tuple.create(); queryTuple.put(Constants.FIELD_FILE_NAME, name); try { Tuple tuple = indexSpace.get(queryTuple); this.length = tuple.getLong(Constants.LENGTH); this.maxChunkNumber = tuple.getInt(Constants.MAX_CHUNK_NUMBER); } catch (ASException e) { e.printStackTrace(); } } public synchronized long getLength() { return length; } protected final synchronized byte[] getBuffer(int index) { Tuple queryTuple = Tuple.create(); queryTuple.put(Constants.FIELD_CHUNK_NUMBER, index); queryTuple.put(Constants.FIELD_FILE_NAME, name); Tuple tuple = null; try { tuple = space.get(queryTuple); } catch (ASException e) { e.printStackTrace(); } if (tuple != null) { byte[] buffer = tuple.getBlob(Constants.FIELD_DATA); // if (name.equals("segments.gen")) { // System.out.println("getBuffer " + name + "_" + index + "\t length:" + buffer.length); // } return buffer; } else { return null; } } protected final synchronized int numBuffers() { return maxChunkNumber; } @Override public synchronized long ramBytesUsed() { return length; } }
package cn.tang.test; import org.apache.lucene.util.Accountable; import com.tibco.as.space.ASException; import com.tibco.as.space.Space; import com.tibco.as.space.Tuple; public class ASWriter implements Accountable { long length; int maxChunkNumber; ASDirectory directory; protected long sizeInBytes; protected String name; private Space space; private Space indexSpace; public ASWriter(String name, ASDirectory directory) { // for output stream, use directory to calc total size; this.directory = directory; this.name = name; space = ASEnv.getInstance().getSpace("dataSpace"); indexSpace = ASEnv.getInstance().getSpace("indexSpace"); } public synchronized long getLength() { return length; } public synchronized void setLength(long length) { this.length = length; } protected final byte[] addBuffer(int size) { byte[] buffer = new byte[size]; maxChunkNumber++; return buffer; } protected final void updateMetaData() { Tuple tuple = Tuple.create(); tuple.put(Constants.FIELD_FILE_NAME, name); tuple.put(Constants.MAX_CHUNK_NUMBER, maxChunkNumber); tuple.put(Constants.LENGTH, length); try { indexSpace.put(tuple); } catch (ASException e) { e.printStackTrace(); } } protected final synchronized void writeBuffer(int index, byte[] buffer, int position) { // System.out.println("writeBuffer " + name + "_" + index + "\tsize:" + position); Tuple tuple = Tuple.create(); tuple.put(Constants.FIELD_FILE_NAME, name); tuple.put(Constants.FIELD_CHUNK_NUMBER, index); byte[] updatedBuffer = new byte[position]; System.arraycopy(buffer, 0, updatedBuffer, 0, position); tuple.put(Constants.FIELD_DATA, updatedBuffer); try { space.put(tuple); } catch (ASException e) { e.printStackTrace(); } } protected final synchronized byte[] getBuffer(int index) { Tuple queryTuple = Tuple.create(); queryTuple.put(Constants.FIELD_CHUNK_NUMBER, index); queryTuple.put(Constants.FIELD_FILE_NAME, name); Tuple tuple = null; try { tuple = space.get(queryTuple); } catch (ASException e) { e.printStackTrace(); } if (tuple != null) { return tuple.getBlob(Constants.FIELD_DATA); } else { return null; } } protected final synchronized int numBuffers() { return maxChunkNumber; } @Override public synchronized long ramBytesUsed() { return sizeInBytes; } }
ASOutputStream要写300个字节的数组,列车长安排座位了,一节车厢200人,刚开始一节车厢都没有,肯定不够了,怎么办(SwithCurrentBuffer()), 先去取一个车厢ASWriter.addBuffer(200),然后安排好了200人,还不够?继续(SwitchCurrentBuffer()),
ASInputSteram 这个列车长来了,首先我看下车上一共有多少人,ASReader.getLength(),哦,300呀,接到通知,365号没有买票?看第一节车厢看看(SwitchCurrentBuffer(1)),哦这车厢200人,肯定在后面的车厢,继续 SwitchCurrentBuffer(2),但是这车厢只有一半的人,人不够400,Throw EOFException("怎么搞的,找完了都没有这个人“).
大概就这样,后面的例子举的不够好,一共300人,要找365,直接比较是否 > length就好了。
到这里为止,解释完毕了,代码里面有很多AS的内容,简单解释下,SPace 的概念就像一个DB的Table :).
