`
mshijie
  • 浏览: 96245 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

使用Berkeley DB构建持久化队列

    博客分类:
  • Java
阅读更多

Berkeley DB简介

Berkeley DB(以下简称Bdb)是一个嵌入式的键值数据库。Bdb目前有两个版本,一个是使用c++构建的版本,还有一个java版本。c++版本支持在众多的语言中使用,Berkeley DB Java Edition(以下简称JE)完全用java语言编写。JE执行在应用程序中,完全不需要Client/Server的通信。JE更容易部署和嵌入到java程序中,所以我选择了使用Berkeley DB Java Edition(sleep cat)。

 

Berkeley DB编程接口  

在Java程序中使用JE,首先需要初始化一个数据库环境。

File home = new File(envHome);
EnvironmentConfig environmentConfig = new EnvironmentConfig();
environmentConfig.setTransactional(true);
environmentConfig.setAllowCreate(true);
environmentConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
Environment environment = new Environment(home, environmentConfig);


home是一个存在的目录,JE将把自己所有的数据库文件存储到这个目录下。setTransactional设置数据库支持事务。setAllowCreate设置数据库不存在时,是否创建数据库。setDurability设置Bdb的写数据的方式,这个很重要,对性能和数据安全的影响和很大。Bdb数据为了性能考虑,写入的数据不会立即写回到磁盘中,必须手动同步和关闭数据库的时候才回写回数据。可以通过设置Durability修改默认行为。Durability预设值了以下几种策略

COMMIT_SYNC  
事务提交时,同步数据到磁盘,并等待磁盘完成,这是最安全的策略,也是最耗时的策略。    
COMMIT_WRITE_NO_SYNC  
事务提交时,写数据到磁盘,不等待磁盘完成,安全和速度折中的策略。
COMMIT_NO_SYNC 
COMMIT_WRITE_NO_SYNC
READ_ONLY_TXN

 
COMMIT_SYNC   能保证应用或者系统挂掉而不丢失数据,保证了事务的完整性。而COMMIT_WRITE_NO_SYNC   只能保证应用挂掉不丢失数据,不能保证系统挂掉时的事务完整性。

JE提供了两个层次的编程接口,高层的Direct Persistence Layer(DPL),DPL适合直接保存和读取一个Java对象的场景。DPL读取使用annotation配置的元信息保存和读取数据。

 

 

Direct Persistence Layer  

待存取的用户类

@Entity
public class User {

    @PrimaryKey(sequence = "SEQ_USER_ID")
    private Integer userId;

    @SecondaryKey(relate = MANY_TO_ONE)
    private String nick;

    //constructor, getter and setter
}


PrimaeyKey配置userId为主键,sequence配置了一个自增序列,@SecondaryKey配置了可查询的索引。MANY_TO_ONE表示,nick的值在数据库中可重复,多个user可使用同一个nick,按nick查询会返回一个列表。

    private static EntityStore createStore(Environment envionment) throws DatabaseException {
        StoreConfig storeConfig = new StoreConfig();
        storeConfig.setAllowCreate(true);
        storeConfig.setTransactional(true);
        return new EntityStore(envionment, "store1", storeConfig);
    }


创建一个EntityStore

PrimaryIndex<Integer, User> primaryKey = entityStore.getPrimaryIndex(Integer.class, User.class);

primaryKey.put(new User("user" + i));
primaryKey.get(1);


通过EntityStore获得对应的PrimaryIndex,即可保存,读取,查询Java对象了。

 

 

Base API  

同样,首先创建一个Database。

 DatabaseConfig myDbConfig = new DatabaseConfig();
 myDbConfig.setAllowCreate(true);
 myDbConfig.setTransactional(true);
 myDbConfig.setSortedDuplicates(true);

 Database myDb = myEnv.openDatabase(null,     // txn handle
           dbName,   // Database file name
           myDbConfig);


使用Database保存数据时,提供的Key和Value都需要序列化为DatabaseEntry。

 

 

Bdb JE Java Collections  

JE提供了一组高层的Collection API,通过java的Map,List等API封装了底层存取操作。比较适合用来构建持久化的数据结构。

 

 

构建持久化队列


public class BdbMessageQueue<T> {

    private static Logger log = Logger.getLogger(BdbMessageQueue.class);

    private static final String MESSAGE_STORE = "message_store";

    private Database messageDb;
    private StoredSortedMap<Long, T> messageMap;

    private TransactionRunner transactionRunner;

    //EnqueueWorker和DequeueWorker的同步对象
    private Object syncObject = new Object();

    public BdbMessageQueue(BdbEnvironment bdbEnvironment, String queueName)
            throws DatabaseException {
        try {
            // Set the Berkeley DB config for opening all stores.
            DatabaseConfig dbConfig = new DatabaseConfig();
            dbConfig.setTransactional(true);
            dbConfig.setAllowCreate(true);


            // Open the Berkeley DB database for the part, supplier and shipment
            // stores.  The stores are opened with no duplicate keys allowed.
            messageDb = bdbEnvironment.getEnvironment().openDatabase(null, MESSAGE_STORE + queueName, dbConfig);

            EntryBinding messageKeyBinding =
                    new SerialBinding(bdbEnvironment.getJavaCatalog(), Long.class);
            EntryBinding messageValueBinding =
                    new SerialBinding(bdbEnvironment.getJavaCatalog(), Object.class);

            messageMap = new StoredSortedMap(messageDb, messageKeyBinding, messageValueBinding, true);

            // Create transactionRunner for the transactional operation
            transactionRunner = new TransactionRunner(bdbEnvironment.getEnvironment());

        } catch (DatabaseException dbe) {
            throw new VipServiceException(VipErrorCode.DBD_ERROR, dbe);
        }
    }


    /**
     * 安全关闭berkeley 数据库
     */
    public void close() {
        try {
            if (messageDb != null) messageDb.close();
        } catch (DatabaseException dbe) {
            throw new VipServiceException(VipErrorCode.DBD_ERROR, dbe);
        }
    }

    /**
     * 出队
     *
     * @return
     */
    public void dequeue(TaskCallback task) {
        try {
            DequeueWorker worker = new DequeueWorker(task);
            transactionRunner.run(worker);
        } catch (Exception e) {
            throw new VipServiceException(VipErrorCode.DBD_QUEUE_ERROR, e);
        }
    }

    /**
     * 入队
     *
     * @param message
     */
    public void enqueue(T message) {
        try {
            EnqueueWorker worker = new EnqueueWorker(message);
            transactionRunner.run(worker);
        } catch (Exception e) {
            throw new VipServiceException(VipErrorCode.DBD_QUEUE_ERROR, e);
        }
    }

    /**
     * 出队列事务Worker,内部类
     */
    private class DequeueWorker implements TransactionWorker {

        private TaskCallback task;

        private DequeueWorker(TaskCallback task) {
            this.task = task;
        }

        public void doWork() throws Exception {

            Long firstKey;
            T message;

            synchronized (syncObject) {
                //没有获得消息就不起床
                while ((firstKey = messageMap.firstKey()) == null ||
                        (message = messageMap.get(firstKey)) == null) {
                    syncObject.wait();
                }
            }
            //如果执行任务的时候,抛出了RuntimeException,消息不会从队列中删除。BDB事务也会回滚。
            task.handelMessage(message);

            messageMap.remove(firstKey);

            if (log.isDebugEnabled()) {
                log.debug(String.format("DequeueWorker dequeue %1$s. ", message));
            }
        }

    }

    /**
     * 入队列事务Worker,内部类
     */
    private class EnqueueWorker implements TransactionWorker {
        private T message;

        private EnqueueWorker(T message) {
            this.message = message;
        }

        public void doWork() throws Exception {
            synchronized (syncObject) {
                Long lastKey = messageMap.lastKey();
                lastKey = (lastKey == null) ? 1L : lastKey + 1;
                messageMap.put(lastKey, message);

                syncObject.notify();
            }

            if (log.isDebugEnabled()) {
                log.debug(String.format("EnqueueWorker enqueue %1$s. ", message));
            }
        }
    }
}


代码很长,代码中使用的JE提供的StoredMap高层次的API。

文本只是一个code show,如果你对Berkeley db感兴趣,请阅读Oracle的官方文档。

Getting Started Guide

Writing Transactional Applications

Java Collections Tutorial

Getting Started with BDB JE High Availability

 

5
2
分享到:
评论

相关推荐

    BerkeleyDB PDF

    3. **数据持久性**:Berkeley DB 支持事务,确保数据的持久性。 #### 十、使用游标 游标提供了灵活的数据访问方式,支持随机访问和顺序访问。 1. **打开和关闭游标**:使用 `db-&gt;cursor` 和 `dbc-&gt;close` 函数。 ...

    Berkeley_DB_概述

    - **多样化的存储方式**: 提供哈希文件、B树、定长记录(队列)和变长记录等存储方式,允许开发者根据实际需求选择最合适的存储结构。 - **灵活的应用**: 即使在没有事务管理的情况下,也能单独使用该子系统,为...

    Berkeley_DB4.6.20.rar_Berkeley DB

    - **日志管理**:由于DB支持事务和持久化,因此常用于日志记录,例如网络设备的配置日志。 - **缓存系统**:其高速存取特性使其成为缓存系统的理想选择,如Memcached的早期版本就曾使用过Berkeley DB。 - **...

    Berkeley DB(BDB)

    **Berkeley DB (BDB)** 是一款开源的嵌入式数据库系统,由Oracle公司开发,广泛应用于需要快速、高效数据存储的应用程序中。它提供了一种键值对存储模型,适用于构建事务处理、缓存、日志记录等多种场景。BDB在设计...

    Berkeley DB入门篇.doc

    环境管理在Berkeley DB中扮演着关键角色,它提供了一个环境类(DB_ENV)来管理多个数据库的共享资源,如缓存、日志文件、事务等。异常处理和错误返回机制使得开发者能够更好地处理运行时的错误和异常,确保系统的...

    Berkeley_DB_参考手册

    - **20.8 一个新的 Berkeley DB Java 持久 API** - **API 介绍**:介绍一种新的 Java 接口,用于操作 Berkeley DB 数据库。 - **20.9 如何操作 Berkeley DB 的 Records** - **记录操作**:讲解如何使用各种 API 对...

    activemq-store-bdbn-1.2.jar.zip

    BDBN,全称可能是“Berkeley DB (BDB) Network Store”,是指ActiveMQ中使用Berkeley DB作为持久化存储的一种实现方式。 Berkeley DB是甲骨文公司提供的一款嵌入式数据库系统,它轻量级、高效且易于集成,适合于...

    db-5.1.25.tar.gz

    1. 缓存系统:如Redis早期版本曾用Berkeley DB作为持久化存储。 2. 网络服务:如NFS服务器的元数据管理。 3. 系统日志:存储和检索系统日志信息。 4. 邮件系统:如Postfix的邮件队列管理。 5. 配置管理:存储和查询...

    Berkley DB 参考手册(PDF版本)

    - **数据持久化**:Berkeley DB支持将数据持久化存储到磁盘上,确保数据安全。 - **数据库使用示例**:提供了一个完整的示例程序,演示了如何使用Berkeley DB执行基本的CRUD操作。 #### 四、游标使用 - **游标操作...

    Berkely DB java

    Berkeley DB 支持两种主要的数据存储方式:直接持久化层 (DPL) 和基础 API。 - **直接持久化层 (DPL)**:这是一种高级抽象层,允许开发者以面向对象的方式存储和检索数据。它自动管理数据的序列化和反序列化。 - **...

    BerkeleyDBJavaEdition3.2.74.zip

    Berkeley DB Java Edition(简称BDB JE)是Oracle公司提供的一款高性能、轻量级的嵌入式数据库系统,专门针对Java应用程序设计。它允许开发者在应用程序中直接存储和管理数据,而无需设置独立的数据库服务器。这个...

    BDB内存数据库

    - **数据持久性**:Berkeley DB支持数据持久化,即数据可以在程序关闭后仍然保留,这对于需要长期保存数据的应用来说非常重要。 - **使用C结构体**:Berkeley DB支持使用C语言中的结构体来表示记录,使得数据的组织...

    activemq-store-bdbn-2.0.jar.zip

    ActiveMQ Store BDBN 2.0,作为Apache ActiveMQ项目的一部分,是一个基于Berkeley DB (BDB)的持久化存储解决方案,用于管理和存储消息队列中的数据。在Java应用程序中,JAR(Java Archive)文件是用于打包类库、资源...

    解析linux下安装memcacheq(mcq)全过程笔记.docx

    libevent是一个事件驱动库,而BerkleyDB则用于持久化存储队列数据,防止因服务崩溃或服务器故障导致数据丢失。 以下是安装步骤: 1. 首先,确认libevent和libevent-devel是否已安装。可以通过`rpm -qa | grep ...

    activemq-store-bdb-1.5.jar.zip

    在ActiveMQ中,BDB存储用于持久化消息,确保在系统崩溃或重启后仍能恢复消息队列的状态。 ActiveMQ提供了多种存储策略,包括本地文件系统存储、LevelDB、JDBC和BDB。BDB存储因其强大的事务支持和高可用性,常被选择...

    sqsfj:从 code.google.compsqsfj 自动导出

    sqsfj 从 code.google.com/p/sqsfj 自动导出 Sqs4J是一个纯Java实现,基于HTTP GET / POST协议的轻量级开源消息队列服务。 使用Netty实现HTTP服务,使用Berkeley DB做持久化存储。 压力测试可达8000/s。

    activemq-store-bdb-2.0.jar.zip

    activemq-store-bdb-2.0.jar是Apache ActiveMQ利用Berkeley DB进行消息持久化的关键组件。了解并掌握它的使用方法和配置细节,对于优化ActiveMQ的消息存储性能,确保数据的可靠性和一致性至关重要。在实际应用中,...

    activemq-store-bdbn-1.3.jar.zip

    BDB JE是Oracle提供的一个键值存储系统,可以用于本地持久化数据。在ActiveMQ中,BDB JE作为消息存储后端,负责保存和检索消息。它提供了事务支持,确保了数据的一致性和完整性,同时具有高性能和可扩展性,适合处理...

Global site tag (gtag.js) - Google Analytics