本博客属原创文章,欢迎转载!转载请务必注明出处:http://guoyunsky.iteye.com/blog/1169912
队列很常见,但大部分的队列是将数据放入到内存.如果数据过多,就有内存溢出危险,而且长久占据着内存,也会影响性能.比如爬虫,将要抓取的URL放到内存,而URL过多,内存肯定要爆.在读Heritrix源码中,发现Heritrix是基于Bdb实现了一个持久化队列,于是我就将这块代码独立出来,平时使用也蛮爽的,现在拿出来共享.同时数据已经持久化,相比放在内存的一次性,可以循环累加使用.
大家也知道BDB的高性能和嵌入式.但这个持久化队列我觉得比较适合单机.如果涉及到分布式,就不大适合了.毕竟分布式要通信,负载均衡,冗余等.可以用其他的数据库等替代.
这里大概先说下实现原理,BDB是Key-Value型数据库,而队列是FIFO.所以这个持久化队列以位置作为BDB的Key,数据作为BDB的Value.然后用两个变量,分别记录队列两头的位置,也就是头部和尾部.当有数据插入的时候,就以尾部的位置为这个数据的Key.而当要取出数据时,以头部位置作为Key,获取这个Key的数据.原理大概如此,这个类也继承AbstractQueue,这里贴上代码.以下代码需引用bdb-je,common-io,junit.请在附件中下载
- 自定义的BDB环境类,可以缓存StoredClassCatalog并共享
package com.guoyun.util; import java.io.File; import com.sleepycat.bind.serial.StoredClassCatalog; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.Environment; import com.sleepycat.je.EnvironmentConfig; /** * BDB数据库环境,可以缓存StoredClassCatalog并共享 * * @contributor guoyun */ public class BdbEnvironment extends Environment { StoredClassCatalog classCatalog; Database classCatalogDB; /** * Constructor * * @param envHome 数据库环境目录 * @param envConfig config options 数据库换纪念馆配置 * @throws DatabaseException */ public BdbEnvironment(File envHome, EnvironmentConfig envConfig) throws DatabaseException { super(envHome, envConfig); } /** * 返回StoredClassCatalog * @return the cached class catalog */ public StoredClassCatalog getClassCatalog() { if(classCatalog == null) { DatabaseConfig dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); try { classCatalogDB = openDatabase(null, "classCatalog", dbConfig); classCatalog = new StoredClassCatalog(classCatalogDB); } catch (DatabaseException e) { // TODO Auto-generated catch block throw new RuntimeException(e); } } return classCatalog; } @Override public synchronized void close() throws DatabaseException { if(classCatalogDB!=null) { classCatalogDB.close(); } super.close(); } }
2. 基于BDB实现的持久化队列
package com.guoyun.util; import java.io.File; import java.io.IOException; import java.io.Serializable; import java.util.AbstractQueue; import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.io.FileUtils; import com.sleepycat.bind.EntryBinding; import com.sleepycat.bind.serial.SerialBinding; import com.sleepycat.bind.serial.StoredClassCatalog; import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.collections.StoredMap; import com.sleepycat.collections.StoredSortedMap; import com.sleepycat.je.Database; import com.sleepycat.je.DatabaseConfig; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.DatabaseExistsException; import com.sleepycat.je.DatabaseNotFoundException; import com.sleepycat.je.EnvironmentConfig; /** * 持久化队列,基于BDB实现,也继承Queue,以及可以序列化.但不等同于Queue的时,不再使用后需要关闭 * 相比一般的内存Queue,插入和获取值需要多消耗一定的时间 * 这里为什么是继承AbstractQueue而不是实现Queue接口,是因为只要实现offer,peek,poll几个方法即可, * 其他如remove,addAll,AbstractQueue会基于这几个方法去实现 * * @contributor guoyun * @param <E> */ public class BdbPersistentQueue<E extends Serializable> extends AbstractQueue<E> implements Serializable { private static final long serialVersionUID = 3427799316155220967L; private transient BdbEnvironment dbEnv; // 数据库环境,无需序列化 private transient Database queueDb; // 数据库,用于保存值,使得支持队列持久化,无需序列化 private transient StoredMap<Long,E> queueMap; // 持久化Map,Key为指针位置,Value为值,无需序列化 private transient String dbDir; // 数据库所在目录 private transient String dbName; // 数据库名字 private AtomicLong headIndex; // 头部指针 private AtomicLong tailIndex; // 尾部指针 private transient E peekItem=null; // 当前获取的值 /** * 构造函数,传入BDB数据库 * * @param db * @param valueClass * @param classCatalog */ public BdbPersistentQueue(Database db,Class<E> valueClass,StoredClassCatalog classCatalog){ this.queueDb=db; this.dbName=db.getDatabaseName(); headIndex=new AtomicLong(0); tailIndex=new AtomicLong(0); bindDatabase(queueDb,valueClass,classCatalog); } /** * 构造函数,传入BDB数据库位置和名字,自己创建数据库 * * @param dbDir * @param dbName * @param valueClass */ public BdbPersistentQueue(String dbDir,String dbName,Class<E> valueClass){ headIndex=new AtomicLong(0); tailIndex=new AtomicLong(0); this.dbDir=dbDir; this.dbName=dbName; createAndBindDatabase(dbDir,dbName,valueClass); } /** * 绑定数据库 * * @param db * @param valueClass * @param classCatalog */ public void bindDatabase(Database db, Class<E> valueClass, StoredClassCatalog classCatalog){ EntryBinding<E> valueBinding = TupleBinding.getPrimitiveBinding(valueClass); if(valueBinding == null) { valueBinding = new SerialBinding<E>(classCatalog, valueClass); // 序列化绑定 } queueDb = db; queueMap = new StoredSortedMap<Long,E>( db, // db TupleBinding.getPrimitiveBinding(Long.class), //Key valueBinding, // Value true); // allow write } /** * 创建以及绑定数据库 * * @param dbDir * @param dbName * @param valueClass * @throws DatabaseNotFoundException * @throws DatabaseExistsException * @throws DatabaseException * @throws IllegalArgumentException */ private void createAndBindDatabase(String dbDir, String dbName,Class<E> valueClass) throws DatabaseNotFoundException, DatabaseExistsException,DatabaseException,IllegalArgumentException{ File envFile = null; EnvironmentConfig envConfig = null; DatabaseConfig dbConfig = null; Database db=null; try { // 数据库位置 envFile = new File(dbDir); // 数据库环境配置 envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setTransactional(false); // 数据库配置 dbConfig = new DatabaseConfig(); dbConfig.setAllowCreate(true); dbConfig.setTransactional(false); dbConfig.setDeferredWrite(true); // 创建环境 dbEnv = new BdbEnvironment(envFile, envConfig); // 打开数据库 db = dbEnv.openDatabase(null, dbName, dbConfig); // 绑定数据库 bindDatabase(db,valueClass,dbEnv.getClassCatalog()); } catch (DatabaseNotFoundException e) { throw e; } catch (DatabaseExistsException e) { throw e; } catch (DatabaseException e) { throw e; } catch (IllegalArgumentException e) { throw e; } } /** * 值遍历器 */ @Override public Iterator<E> iterator() { return queueMap.values().iterator(); } /** * 大小 */ @Override public int size() { synchronized(tailIndex){ synchronized(headIndex){ return (int)(tailIndex.get()-headIndex.get()); } } } /** * 插入值 */ @Override public boolean offer(E e) { synchronized(tailIndex){ queueMap.put(tailIndex.getAndIncrement(), e); // 从尾部插入 } return true; } /** * 获取值,从头部获取 */ @Override public E peek() { synchronized(headIndex){ if(peekItem!=null){ return peekItem; } E headItem=null; while(headItem==null&&headIndex.get()<tailIndex.get()){ // 没有超出范围 headItem=queueMap.get(headIndex.get()); if(headItem!=null){ peekItem=headItem; continue; } headIndex.incrementAndGet(); // 头部指针后移 } return headItem; } } /** * 移出元素,移出头部元素 */ @Override public E poll() { synchronized(headIndex){ E headItem=peek(); if(headItem!=null){ queueMap.remove(headIndex.getAndIncrement()); peekItem=null; return headItem; } } return null; } /** * 关闭,也就是关闭所是用的BDB数据库但不关闭数据库环境 */ public void close(){ try { if(queueDb!=null){ queueDb.sync(); queueDb.close(); } } catch (DatabaseException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (UnsupportedOperationException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 清理,会清空数据库,并且删掉数据库所在目录,慎用.如果想保留数据,请调用close() */ @Override public void clear() { try { close(); if(dbEnv!=null&&queueDb!=null){ dbEnv.removeDatabase(null, dbName==null?queueDb.getDatabaseName():dbName); dbEnv.close(); } } catch (DatabaseNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (DatabaseException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ try { if(this.dbDir!=null){ FileUtils.deleteDirectory(new File(this.dbDir)); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
3. 测试类,测试数据准确性和性能
package com.guoyun.util; import java.io.File; import java.util.Queue; import java.util.concurrent.LinkedBlockingQueue; import junit.framework.TestCase; public class BdbPersistentQueueTest extends TestCase{ Queue<String> memoryQueue; Queue<String> persistentQueue; @Override protected void setUp() throws Exception { super.setUp(); memoryQueue=new LinkedBlockingQueue<String>(); String dbDir="E:/java/test/bdbDir"; File file=new File(dbDir); if(!file.exists()||!file.isDirectory()){ file.mkdirs(); } persistentQueue=new BdbPersistentQueue(dbDir,"pq",String.class); } @Override protected void tearDown() throws Exception { super.tearDown(); memoryQueue.clear(); memoryQueue=null; persistentQueue.clear(); persistentQueue=null; } /** * 排放值 * @param queue * @return 排放的数据个数 */ public int drain(Queue<String> queue){ int count=0; while(true){ try { queue.remove(); count++; } catch (Exception e) { return count; } } } /** * * @param queue * @param size */ public void fill(Queue<String> queue,int size){ for(int i=0;i<size;i++){ queue.add(i+""); } } public void checkTime(int size){ System.out.println("1.内存Queue插入和排空数据所耗时间"); long time=0; long start=System.nanoTime(); fill(memoryQueue,size); time=System.nanoTime()-start; System.out.println("\t填充 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size)+" 纳秒"); start=System.nanoTime(); drain(memoryQueue); time=System.nanoTime()-start; System.out.println("\t排空 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size)+" 纳秒"); System.out.println("2.持久化Queue插入和排空数据所耗时间"); start=System.nanoTime(); fill(persistentQueue,size); time=System.nanoTime()-start; System.out.println("\t填充 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size/1000000)+" 豪秒"); start=System.nanoTime(); drain(persistentQueue); time=System.nanoTime()-start; System.out.println("\t排空 "+size+" 条数据耗时: "+(double)time/1000000+" 毫秒,单条耗时: "+((double)time/size/1000)+" 豪秒"); } /** * 十万数量级测试 */ public void testTime_tenThousand(){ System.out.println("========测试1000000(十万)条数据================="); checkTime(100000); } /** * 百万数量级测试 */ public void testTime_mil(){ System.out.println("========测试1000000(百万)条数据================="); checkTime(1000000); } /** * 千万数量级测试,注意要防止内存溢出 */ public void testTime_tenMil(){ System.out.println("========测试10000000(千万)条数据================="); checkTime(10000000); } /** * 测试队列数据准确性 * @param queue * @param queueName * @param size */ public void checkDataExact(Queue<String> queue,String queueName,int size){ if(queue.size()!=size){ System.err.println("Error size of "+queueName); } String value=null; for(int i=0;i<size;i++){ value=queue.remove(); if(!((i+"").equals(value))){ System.err.println("Error "+queueName+":"+i+"->"+value); } } } /** * 测试队列中数据的准确性,包括长度 */ public void testExact(){ int size=100; fill(memoryQueue,size); fill(persistentQueue,size); checkDataExact(memoryQueue,"MemoryQueue",100); checkDataExact(persistentQueue,"PersistentQueue",100); } }
4.测试性能
========测试1000000(十万)条数据=================
1.内存Queue插入和排空数据所耗时间
填充 100000 条数据耗时: 53.550787 毫秒,单条耗时: 535.50787 纳秒
排空 100000 条数据耗时: 27.09901 毫秒,单条耗时: 270.9901 纳秒
2.持久化Queue插入和排空数据所耗时间
填充 100000 条数据耗时: 1399.644305 毫秒,单条耗时: 0.01399644305 豪秒
排空 100000 条数据耗时: 2104.765179 毫秒,单条耗时: 21.04765179 豪秒
持久化写入是内存写入的26倍,读取是77倍
========测试1000000(百万)条数据=================
1.内存Queue插入和排空数据所耗时间
填充 1000000 条数据耗时: 699.105888 毫秒,单条耗时: 699.105888 纳秒
排空 1000000 条数据耗时: 158.792281 毫秒,单条耗时: 158.792281 纳秒
2.持久化Queue插入和排空数据所耗时间
填充 1000000 条数据耗时: 11978.132218 毫秒,单条耗时: 0.011978132218 豪秒
排空 1000000 条数据耗时: 22355.617205 毫秒,单条耗时: 22.355617204999998 豪秒
持久化写入是内存写入的17倍,读取是141倍
========测试10000000(千万)条数据=================
1.内存Queue插入和排空数据所耗时间
填充 10000000 条数据耗时: 9678.377046 毫秒,单条耗时: 967.8377046 纳秒
排空 10000000 条数据耗时: 1473.416825 毫秒,单条耗时: 147.3416825 纳秒
2.持久化Queue插入和排空数据所耗时间
填充 10000000 条数据耗时: 151177.036391 毫秒,单条耗时: 0.0151177036391 豪秒
排空 10000000 条数据耗时: 361642.655135 毫秒,单条耗时: 36.164265513500006 豪秒
持久化写入是内存写入的15倍,读取是245倍
可以看出写入和遍历一条都是在毫秒级别,还有千万级的数据,BDB的性能着实牛逼.而且随着数据的增多,写的时间在缩短,读的时间在增长.
更多技术文章、感悟、分享、勾搭,请用微信扫描:
相关推荐
3. **数据持久性**:Berkeley DB 支持事务,确保数据的持久性。 #### 十、使用游标 游标提供了灵活的数据访问方式,支持随机访问和顺序访问。 1. **打开和关闭游标**:使用 `db->cursor` 和 `dbc->close` 函数。 ...
- **多样化的存储方式**: 提供哈希文件、B树、定长记录(队列)和变长记录等存储方式,允许开发者根据实际需求选择最合适的存储结构。 - **灵活的应用**: 即使在没有事务管理的情况下,也能单独使用该子系统,为...
- **日志管理**:由于DB支持事务和持久化,因此常用于日志记录,例如网络设备的配置日志。 - **缓存系统**:其高速存取特性使其成为缓存系统的理想选择,如Memcached的早期版本就曾使用过Berkeley DB。 - **...
Berkeley DB 是一款开源的、基于文件系统的键值存储数据库系统,由Oracle公司开发,被广泛用于需要高性能、高可靠性和轻量级数据管理的场合。它支持多种访问方法,包括B树、哈希、Recno(记录号)和Queue(队列)等...
**Berkeley DB (BDB)** 是一款开源的嵌入式数据库系统,由Oracle公司开发,广泛应用于需要快速、高效数据存储的应用程序中。它提供了一种键值对存储模型,适用于构建事务处理、缓存、日志记录等多种场景。BDB在设计...
- **20.8 一个新的 Berkeley DB Java 持久 API** - **API 介绍**:介绍一种新的 Java 接口,用于操作 Berkeley DB 数据库。 - **20.9 如何操作 Berkeley DB 的 Records** - **记录操作**:讲解如何使用各种 API 对...
BDBN,全称可能是“Berkeley DB (BDB) Network Store”,是指ActiveMQ中使用Berkeley DB作为持久化存储的一种实现方式。 Berkeley DB是甲骨文公司提供的一款嵌入式数据库系统,它轻量级、高效且易于集成,适合于...
1. 缓存系统:如Redis早期版本曾用Berkeley DB作为持久化存储。 2. 网络服务:如NFS服务器的元数据管理。 3. 系统日志:存储和检索系统日志信息。 4. 邮件系统:如Postfix的邮件队列管理。 5. 配置管理:存储和查询...
- **数据持久化**:Berkeley DB支持将数据持久化存储到磁盘上,确保数据安全。 - **数据库使用示例**:提供了一个完整的示例程序,演示了如何使用Berkeley DB执行基本的CRUD操作。 #### 四、游标使用 - **游标操作...
4. **内存管理**:BDB JE能够将数据存储在内存中,实现高速访问,同时也可以选择将数据持久化到磁盘,保证数据安全。 5. **多模式API**:提供了多种访问模式,如简单的键值访问、树形结构的BTree访问以及集合和队列...
Berkeley DB 支持两种主要的数据存储方式:直接持久化层 (DPL) 和基础 API。 - **直接持久化层 (DPL)**:这是一种高级抽象层,允许开发者以面向对象的方式存储和检索数据。它自动管理数据的序列化和反序列化。 - **...
- **数据持久性**:Berkeley DB支持数据持久化,即数据可以在程序关闭后仍然保留,这对于需要长期保存数据的应用来说非常重要。 - **使用C结构体**:Berkeley DB支持使用C语言中的结构体来表示记录,使得数据的组织...
ActiveMQ Store BDBN 2.0,作为Apache ActiveMQ项目的一部分,是一个基于Berkeley DB (BDB)的持久化存储解决方案,用于管理和存储消息队列中的数据。在Java应用程序中,JAR(Java Archive)文件是用于打包类库、资源...
在ActiveMQ中,BDB存储用于持久化消息,确保在系统崩溃或重启后仍能恢复消息队列的状态。 ActiveMQ提供了多种存储策略,包括本地文件系统存储、LevelDB、JDBC和BDB。BDB存储因其强大的事务支持和高可用性,常被选择...
activemq-store-bdb-2.0.jar是Apache ActiveMQ利用Berkeley DB进行消息持久化的关键组件。了解并掌握它的使用方法和配置细节,对于优化ActiveMQ的消息存储性能,确保数据的可靠性和一致性至关重要。在实际应用中,...
libevent是一个事件驱动库,而BerkleyDB则用于持久化存储队列数据,防止因服务崩溃或服务器故障导致数据丢失。 以下是安装步骤: 1. 首先,确认libevent和libevent-devel是否已安装。可以通过`rpm -qa | grep ...
这个版本的存储模块特别引入了Berkeley DB Java Edition(BDBNJ),它是一个高性能、可嵌入的数据存储解决方案,适用于需要持久化消息存储的应用场景。 Apache ActiveMQ是Apache软件基金会开发的一个开源消息代理,...
- **BerkeleyDB**:BerkeleyDB是一种开源的嵌入式数据库系统,以其强大的事务处理能力闻名。它曾经作为MySQL的一个存储引擎,适合需要事务支持和高并发的应用场景。但是,其GPL许可可能会限制商业应用。 - **...