/**
* 队列接口
*
* @author yourname (mailto:yourname@primeton.com)
*/
public interface IQueue<E> {
//入队
void offer(E o);
//出队
E poll();
// 启动
void start();
// 停止
void stop();
}
/**
* 持久化队列
*
* @author yourname (mailto:yourname@primeton.com)
*/
public class PersistenceQueue<E> implements IQueue<E>{
private IPersistence persistence;
//读块缓存
private LinkedList<E> dataCacheForRead;
//写块缓存
private ArrayList<E> dataCacheForWrite;
//容量
private int capacity;
public PersistenceQueue(int capacity, IPersistence persistence) {
if (capacity <= 0) {
throw new IllegalArgumentException("Illegal Capacity: "+ capacity);
}
dataCacheForRead = new LinkedList<E>();
dataCacheForWrite = new ArrayList<E>(capacity);
this.capacity = capacity;
this.persistence = persistence;
}
//入队
@SuppressWarnings("unchecked")
public void offer(E o) {
if (!isStarted()) {
throw new IllegalStateException("Queue has not yet started!");
}
synchronized(dataCacheForWrite) {
//如果写满
if (dataCacheForWrite.size() >= capacity) {
persistence.persistent(dataCacheForWrite);
dataCacheForWrite.clear();
}
dataCacheForWrite.add(o);
}
}
//出队
@SuppressWarnings("unchecked")
public E poll() {
if (!isStarted()) {
throw new IllegalStateException("Queue has not yet started!");
}
synchronized(dataCacheForRead) {
//如果为空
if (dataCacheForRead.isEmpty()) {
//持久化如果为空
if (persistence.isEmpty()) {
//直接从写缓存块中读出
synchronized(dataCacheForWrite) {
dataCacheForRead.addAll(dataCacheForWrite);
dataCacheForWrite.clear();
}
} else {
dataCacheForRead.addAll(persistence.load(capacity));
}
}
return dataCacheForRead.poll();
}
}
public void start() {
persistence.start();
if (persistence.isEmpty()) {
return;
}
dataCacheForRead.addAll(persistence.load(capacity));
}
public void stop() {
if (dataCacheForWrite.isEmpty()) {
return;
}
persistence.persistent(dataCacheForWrite);
dataCacheForWrite.clear();
persistence.stop();
}
}
/**
* 持久化接口
*
* @author yourname (mailto:yourname@primeton.com)
*/
public interface IPersistence<E> {
//是否为空
boolean isEmpty();
//持久化
void persistent(List<E> list);
//加载
List<E> load(int length);
// 启动
void start();
// 停止
void stop();
}
/**
* 目录方式的持久化
*
* @author yourname (mailto:yourname@primeton.com)
*/
public class DirPersistence<E> implements IPersistence<E> {
private static final ILogger log = DebugLoggerFactory.getLogger(DirPersistence.class);
private static final String FILE_QUEUE_NAME = "queue.meta";
private File dir = null;
private File queueMetaFile = null;
private ConcurrentLinkedQueue<File> fileQueue;
private IDataMarshaller marshaler = null;
public DirPersistence(File dir) {
if (dir == null) {
throw new IllegalArgumentException("Dir is null!");
}
if (!dir.exists()) {
log.warn("Path[{0}] is not existed!", new Object[]{dir.getAbsolutePath()});
dir.mkdirs();
}
if (dir.isFile()) {
throw new IllegalArgumentException("Path'" + dir.getAbsolutePath() + "' is not dir!");
}
this.dir = dir;
this.queueMetaFile = new File(dir, FILE_QUEUE_NAME);
if (!queueMetaFile.exists()) {
try {
queueMetaFile.createNewFile();
} catch (IOException e) {
throw new IllegalArgumentException("Path'" + queueMetaFile.getAbsolutePath() + "' can not create!");
}
}
marshaler = DataMarshallerFactory.getDataMarshaller();
}
public void start() {
if (queueMetaFile.length() <= 0) {
fileQueue = new ConcurrentLinkedQueue<File>();
return;
}
FileInputStream in = null;
try {
in = new FileInputStream(queueMetaFile);
fileQueue = (ConcurrentLinkedQueue<File>) marshaler.unmarshal(in, null);
} catch (Exception e) {
log.error("File[{0}] load error!", new Object[]{queueMetaFile.getAbsolutePath()}, e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException ignore) {
log.debug(ignore);
}
}
}
}
public void stop() {
FileOutputStream out = null;
try {
out = new FileOutputStream(queueMetaFile);
marshaler.marshal(fileQueue, out, null);
out.flush();
} catch (Exception e) {
log.error("File[{0}] persistent error!", new Object[]{queueMetaFile.getAbsolutePath()}, e);
} finally {
if (out != null) {
try {
out.close();
} catch (IOException ignore) {
log.debug(ignore);
}
}
}
}
public boolean isEmpty() {
return fileQueue == null || fileQueue.isEmpty();
}
@SuppressWarnings("unchecked")
//注意:length参数此时无效,即使Queue长度调整了,系统在运行一段时间后,长度会自动一致。(读块、写块和一个文件。)
public List<E> load(int length) {
List<E> resultList = new ArrayList<E>();
if (!isEmpty()) {
File file = fileQueue.poll();
if (file.exists()) {
FileInputStream in = null;
boolean isNeedDelete = true;
try {
in = new FileInputStream(file);
resultList = (List<E>) marshaler.unmarshal(in, null);
} catch (Exception e) {
isNeedDelete = false;
log.error("File[{0}] load error!", new Object[]{file.getAbsolutePath()}, e);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException ignore) {
log.debug(ignore);
}
}
}
if (isNeedDelete) {
file.delete();
}
} else {
log.warn("PersistentFile[{0}] is not existed!", new Object[]{file.getAbsolutePath()});
}
}
return resultList;
}
public void persistent(List<E> list) {
if (list == null) {
return;
}
String fileName = "queue_" + System.currentTimeMillis() + ".queue";
File file = new File(dir, fileName);
FileOutputStream out = null;
try {
file.createNewFile();
out = new FileOutputStream(file);
marshaler.marshal(list, out, null);
out.flush();
this.fileQueue.offer(file);
//文件写入时异步方式,需要等待20ms,否则load时会报错
Thread.sleep(20);
} catch (Exception e) {
log.error("File[{0}] persistent error!", new Object[]{file.getAbsolutePath()}, e);
} finally {
if (out != null) {
try {
out.close();
} catch (IOException ignore) {
log.debug(ignore);
}
}
}
}
}
分享到:
相关推荐
在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...
2. **消息持久化**:消息持久化是为了在服务器重启或网络故障后仍然能够恢复未处理的消息。通过设置消息和交换机为持久化,即使RabbitMQ重启,消息也不会丢失。 3. **备用交换机(Backup Exchange)**:在主交换机...
### Memlink:高性能、持久化的Key=>List/Queue数据引擎 #### 一、Memlink概述 Memlink是一款由天涯社区研发并开源的数据引擎,旨在解决Web应用中对key-list/queue系统的需求。与传统的key-value系统相比,Memlink...
【标题】:PersistentQueue——基于内存映射文件的持久化队列 在计算机科学中,队列是一种常用的数据结构,用于实现先进先出(FIFO)的逻辑。然而,普通队列在进程关闭后,其数据会丢失。为了解决这个问题,我们...
- **Node故障**: 当Cluster中的某个Node发生故障时,需要有一个有效的机制来处理故障Node上的资源,尤其是持久化Queue。这通常涉及到Cluster的高可用性设计,比如通过镜像队列(mirror queues)来保证数据的可用性。 ...
persistent, Haskell的持久化接口允许多种存储方法 了解更多: http://yesodweb.com/book/persistent Haskell数据存储数据存储通常被称为"orm"数据存储。 虽然'o'传统上是指对象,但概念可以概括为:avoidance of b
本篇主要围绕"ActiveMQ中Topic持久化Demo"进行深入探讨,旨在帮助读者理解如何在ActiveMQ中实现Topic的持久化。 ActiveMQ 是一个功能强大的消息代理,支持多种协议,包括 OpenWire、STOMP、AMQP 和 MQTT。它提供了...
在上述代码中,`durable = "false"`表示创建一个非持久化的Queue,而`exchange`和`key`定义了Topic和Queue的绑定,使得消息能够路由到正确的消费者。 在实际项目中,Queue常用于处理有序和一对一的消息传递,例如...
除了基本的Map接口,MapDB还提供了其他数据结构,如Set、List、Queue和SortedSet等,这些数据结构同样支持持久化存储。通过这些接口,开发者可以方便地构建复杂的数据库解决方案,例如实现队列服务、索引服务等。 ...
Durable Subscription 是一种持久化的订阅方式,JMS Provider 将会费点儿脑筋来记下这个 Durable Subscription 的消息订阅者是谁,即使当消息到达之后,该 Durable Subscription 消息订阅者不在,JMS Provider 也会...
- **Queue持久化**:队列声明时设置`durable`参数为`true`,队列在服务器重启后仍然存在。这样,未被消费的消息不会因为服务器重启而丢失。 - **Message持久化**:在发送消息时,设置`delivery_mode`属性为2,表示...
4. **持久化级别**:ActiveMQ 允许用户选择不同的消息持久化级别,例如,可以选择仅持久化消息头,或者同时持久化消息头和正文。这可以根据性能和数据完整性需求进行调整。 5. **事务管理**:在 ActiveMQ 中,可以...
2. **消息持久化**:在RabbitMQ中,可以设置消息和队列为持久化,确保即使服务器重启,消息也不会丢失。持久化消息在写入时会被保存到磁盘,而持久化队列会在服务器重启后自动恢复。这样,即便在高并发或系统故障...
标题中的“持久化性能测试1”指的是针对消息队列(Message Queue,MQ)系统的持久化功能进行的一次性能评估。这种测试通常是为了了解在不同配置和工作负载下,MQ系统如何处理消息的存储和检索,特别是在系统重启或...
RabbitMQ 消息持久化与 Spring AMQP 实现详解 RabbitMQ 的消息持久化是指在 RabbitMQ Server 中保留消息的机制,以便在 Server 崩溃或重启后可以恢复消息。消息持久化是通过在交换器、队列和消息三个方面实现的。 ...
- **Queue**:默认为有状态的,消息会持久化在消息服务器上,如ActiveMQ通常将消息存储在本地文件系统中。 - **完整性保障**: - **Topic**:不能保证消息的完整性和一致性,即不能确保所有订阅者都能接收到消息...
在iOS开发中,持久化存储是将数据保存到设备上,以便在应用程序关闭或设备重启后仍然能够访问这些数据的关键技术。iOS提供了多种持久化存储方法,包括UserDefaults、SQLite数据库、Core Data以及文件系统等。这里...
【标题】基于Websphere MQ非持久化消息实现异步转同步 在分布式系统中,异步处理常常被用于提高系统的响应速度和处理能力。Websphere MQ(WebSphere Message Broker,简称WMQ)是IBM提供的一种高效、可靠的中间件,...
队列有多种属性可以调整,如最大消息大小、队列的访问模式(读写、只读等)以及队列是否允许消息的持久化存储。例如,设置队列为只读: ```csharp queue.Access = MessageQueueAccessMode.Read; ``` **事务处理:** ...
标题中的“基于Websphere MQ持久化消息实现异步转同步—方案二”是指在分布式系统中,通过使用Websphere MQ(WebSphere Message Broker,一种消息中间件)来处理异步通信,并通过消息的持久化特性,确保消息在异常...