`
xiaobian
  • 浏览: 587948 次
  • 来自: 北京
社区版块
存档分类
最新评论

Java对象缓存系统的实现,实现了LRU算法,并可以进行集群同步

    博客分类:
  • Java
阅读更多

LRU算法实现:

package com.javaeye.xiaobian.jgroups;

import java.util.LinkedHashMap;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LRUCache<K, V> extends LinkedHashMap<K, V> {
	private final int maxCapacity;

	private static final float DEFAULT_LOAD_FACTOR = 0.75f;

	private final Lock lock = new ReentrantLock();

	public LRUCache(int maxCapacity) {
		super(maxCapacity, DEFAULT_LOAD_FACTOR, true);
		this.maxCapacity = maxCapacity;
	}

	@Override
	protected boolean removeEldestEntry(java.util.Map.Entry<K, V> eldest) {
		return size() > maxCapacity;
	}

	/**
	 * Weather contains someone Object according to a key
	 * 
	 * @param key
	 */
	@Override
	public boolean containsKey(Object key) {
		lock.lock();
		try {
			return super.containsKey(key);
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Get one Object according to a key
	 * 
	 * ��ȡ@param key
	 */
	@Override
	public V get(Object key) {
		lock.lock();
		try {
			return super.get(key);
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Put an Object to cache
	 * 
	 * @param key,valueӻ���
	 */
	@Override
	public V put(K key, V value) {
		lock.lock();
		try {
			return super.put(key, value);
		} finally {
			lock.unlock();
		}
	}

	/**
	 * The cache's size
	 */
	public int size() {
		lock.lock();
		try {
			return super.size();
		} finally {
			lock.unlock();
		}
	}

	/**
	 * clear the cache
	 * 
	 */
	public void clear() {
		lock.lock();
		try {
			super.clear();
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Delete one Object from cache
	 * 
	 * @param key
	 */
	public void delete(Object key) {
		lock.lock();
		try {
			super.remove(key);
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Delete some Objects from cache
	 * 
	 * @param objArrs
	 */
	public void batchDelete(Object[] objArrs) {
		lock.lock();
		try {
			for (int i = 0; i < objArrs.length; i++) {
				delete(objArrs[i]);
			}
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Get all Objects from cache
	 * 
	 * @return Set
	 */
	public Set getAll() {
		lock.lock();
		try {
			return this.entrySet();
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Update one Object in the Cache
	 * 
	 * @param key
	 * @param value
	 */
	public void update(K key, V value) {
		lock.lock();
		try {
			super.remove(key);
			this.put(key, value);
		} finally {
			lock.unlock();
		}
	}

}

 

 

利用Jgroups实现集群同步:

 

package com.javaeye.xiaobian.jgroups;

import java.io.Serializable;

import org.jgroups.Channel;
import org.jgroups.ExtendedReceiverAdapter;
import org.jgroups.JChannel;
import org.jgroups.Message;

public class CacheSynchronizer {

	private static String protocolStackString = "UDP(mcast_addr=236.11.11.11;mcast_port=32767;ip_ttl=64;"
			+ "mcast_send_buf_size=32000;mcast_recv_buf_size=64000):"
			+ "PING(timeout=2000;num_initial_members=3):"
			+ "MERGE2(min_interval=5000;max_interval=10000):"
			+ "FD_SOCK:"
			+ "VERIFY_SUSPECT(timeout=1500):"
			+ "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):"
			+ "pbcast.STABLE(desired_avg_gossip=20000):"
			+ "UNICAST(timeout=2500,5000):"
			+ "FRAG:"
			+ "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=false)";
	private static String groupName = "CACHEGROUP";

	private Channel channel = null;

	private ReceiveCallback recvCallback = null;

	// inner class ,receive the broadcast and handle
	private class ReceiveCallback extends ExtendedReceiverAdapter {
		private Cache cache = null;

		public void setCache(Cache baseCache) {
			cache = baseCache;
		}

		public void receive(Message msg) {
			if (cache == null)
				return;
			CacheMessage message = (CacheMessage) msg.getObject();
			Object key = message.getKey();
			Object value = message.getValue();

			if (message.getOperation() == CacheConstants.CLUSTER_ENTRY_ADD) {
				cache.put(key, value);
			} else if (message.getOperation() == CacheConstants.CLUSTER_ENTRY_DELETE) {
				cache.delete(key);
			} else if (message.getOperation() == CacheConstants.CLUSTER_ENTRY_DELETE_SOME) {
				cache.batchDelete((Object[]) key);
			} else if (message.getOperation() == CacheConstants.CLUSTER_ENTRY_UPDATE) {
				cache.update(key, value);
			}

		}
	}

	public CacheSynchronizer(Cache cache) throws Exception {
		channel = new JChannel(protocolStackString);
		recvCallback = new ReceiveCallback();
		recvCallback.setCache(cache);
		channel.setReceiver(recvCallback);
		channel.connect(groupName);
	}

	/**
	 * Send broadcast message
	 * 
	 * @param msg
	 * @throws Exception
	 */

	public void sendCacheFlushMessage(Object msg) throws Exception {
		channel.send(null, null, (Serializable) msg); // 发送广播
	}

}

 

 

辅助类:

 

package com.javaeye.xiaobian.jgroups;

public class CacheConstants {

	/**
	 * Cluster add handle
	 */
	public final static int CLUSTER_ENTRY_ADD = 10;

	/**
	 * Cluster update handle
	 */
	public final static int CLUSTER_ENTRY_UPDATE = 20;

	/**
	 * Cluster delete handle
	 */
	public final static int CLUSTER_ENTRY_DELETE = 30;

	/**
	 * Cluster delete sone handle
	 */
	public final static int CLUSTER_ENTRY_DELETE_SOME = 40;
}

 

package com.javaeye.xiaobian.jgroups;

import java.io.Serializable;

public abstract class CacheMessage implements Serializable {
	public abstract int getOperation();

	public abstract Object getKey();

	public abstract Object getValue();
}

 

package com.javaeye.xiaobian.jgroups;

import java.io.Serializable;

public class SynCacheMessage extends CacheMessage implements Serializable {

	private Object key;
	private Object value;
	private int op;

	public SynCacheMessage() {

	}

	public SynCacheMessage(Object key, Object value, int op) {
		this.key = key;
		this.op = op;
		this.value = value;
	}

	public Object getKey() {
		return key;
	}

	public int getOperation() {
		return op;
	}

	public Object getValue() {
		return value;
	}

}

 

 

实际使用的Cache类:

 

package com.javaeye.xiaobian.jgroups;

import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

public class Cache {
	private static Cache singleton = null;
	private LRUCache lruCache = null;
	private CacheSynchronizer cacheSyncer = null;

	private Cache(int size) {
		lruCache = new LRUCache(size);
		createSynchronizer();
	}

	public static Cache getInstance() {
		if (singleton == null) {
			singleton = new Cache(100000);
		}
		return singleton;
	}

	public CacheSynchronizer getSynchronizer() {
		return cacheSyncer;
	}

	public int size() {
		return lruCache.size();
	}

	public void createSynchronizer() {
		try {
			cacheSyncer = new CacheSynchronizer(this);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void batchDelete(Object[] objArrs) {
		lruCache.batchDelete(objArrs);

	}

	public void delete(Object key) {
		lruCache.remove(key);
	}

	public void deleteAll() {
		lruCache.clear();
	}

	public Object get(Object key) {
		return lruCache.get(key);
	}

	public void put(Object key, Object value) {
		lruCache.put(key, value);
	}

	public void update(Object key, Object value) {
		lruCache.update(key, value);
	}

	public Set getAll() {
		return lruCache.getAll();
	}

	@Override
	public String toString() {

		Set set = Collections.synchronizedSet(getAll());
		StringBuffer sb = new StringBuffer();
		Iterator it = set.iterator();
		while (it.hasNext()) {
			Map.Entry entry = (Map.Entry) it.next();
			sb.append("key == " + entry.getKey());
			sb.append("  value == " + entry.getValue());
			sb.append("\n");
		}
		return sb.toString();
	}
}

 

测试:

 

package com.javaeye.xiaobian.jgroups;

public class Test {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub

		Cache cache = Cache.getInstance();

		for (int i = 0; i < 10; i++) {
			cache.put(String.valueOf(i), "cache" + String.valueOf(i));
			CacheMessage msg = new SynCacheMessage(String.valueOf(i), "cache"
					+ String.valueOf(i), CacheConstants.CLUSTER_ENTRY_ADD);
			cache.getSynchronizer().sendCacheFlushMessage(msg);
			System.out.println(cache.size());
			System.out.println(cache.toString());
			Thread.currentThread().sleep(1000 * 5);
		}
		/*
		 * Object[] keys = new Object[10]; for (int i = 0; i < 10; i++) {
		 * keys[i] = String.valueOf(i); } CacheMessage msg1 = new
		 * SynCacheMessage(keys, "cache",
		 * CacheConstants.CLUSTER_ENTRY_DELETE_SOME);
		 * 
		 * cache.getSynchronizer().sendCacheFlushMessage(msg1);
		 */

		for (int i = 0; i < 10; i++) {
			CacheMessage msg = new SynCacheMessage(String.valueOf(i),
					"cacheUpdate" + String.valueOf(i),
					CacheConstants.CLUSTER_ENTRY_UPDATE);
			cache.update(String.valueOf(i), "cacheUpdate" + String.valueOf(i));
			cache.getSynchronizer().sendCacheFlushMessage(msg);
			System.out.println(cache.size());
			System.out.println(cache.toString());
			Thread.currentThread().sleep(1000 * 5);
		}

	}
}

 

 

 

 参考:

http://618119.com/tag/jgroups

http://bypassfacebook.net/browse.php?b=5&u=Oi8vcmVuZXh1LmJsb2dzcG90LmNvbS8yMDA2LzA4L211bHRpY2FzdC1hbmQtamdyb3Vwc18yOC5odG1s

http://www.blogjava.net/swingboat/archive/2007/07/16/130565.html

http://bbs.81tech.com/read.php?tid=84461

 

http://whitesock.iteye.com/blog/199956

  • lib.rar (2.8 MB)
  • 下载次数: 91
1
0
分享到:
评论

相关推荐

    Java流行ehcache缓存

    - **内存管理**:Ehcache使用LRU(Least Recently Used)算法自动管理内存,当缓存满时,最不常使用的数据会被淘汰。 - **持久化**:支持将缓存数据存储到硬盘,防止数据丢失。 - **缓存过期策略**:可以设置时间...

    memcached缓存使用演示

    系统会根据内存大小自动管理缓存对象,当内存满时,会使用LRU(Least Recently Used)最近最少使用算法来淘汰旧的、不常使用的数据。 ### 2. 安装与配置Memcached 在Linux环境下,可以通过包管理器(如apt-get或...

    Ehcache 整合Spring 使用页面、对象缓存

    - **灵活的缓存淘汰策略**:提供了LRU(Least Recently Used)、LFU(Least Frequently Used)以及FIFO(First In First Out)等多种缓存淘汰算法,可以根据实际需求选择最合适的算法。 - **支持集群/分布式缓存**:...

    分布式缓存服务器memcacaed的源代码

    由于Memcached是分布式设计,多台服务器可以组成集群,通过一致性哈希算法实现数据的分发。源代码中的`server.c/h`文件处理了服务器之间的通信。 10. **编译与部署** 源代码中包含`configure`脚本和`Makefile`,...

    memcache缓存分布式集群

    Memcache是一种广泛使用的高性能分布式内存对象缓存系统,主要用于减轻数据库的负载,通过将数据存储在内存中,实现快速访问。在大型应用中,单台Memcache服务器可能无法满足高并发和大容量的需求,这时就需要搭建...

    有关单一窗口缓存配置和集群的事宜_V1.1.docx

    例如,使用 Redis 作为缓存服务器,使用LRU 算法来实现缓存的淘汰策略,使用 TCP 协议来实现缓存的通信等。 结论 本文档介绍了单一窗口缓存配置和集群的实现方式,涉及到缓存配置、缓存读取、缓存数据分页等多个...

    PyPI 官网下载 | lru-dict-1.1.6.tar.gz

    Zookeeper作为一个分布式协调服务,通常用于配置管理、命名服务、集群同步等,结合lru-dict,可以更有效地处理短暂的、高频率的请求。 云原生(Cloud Native)理念强调构建和运行应用程序的方式,以充分利用云计算...

    Memcached_缓存系统

    1. 分布式架构:每个服务器节点独立工作,通过一致性哈希算法实现数据分布,使得添加或删除服务器时,对整个缓存系统的影响降到最低。 2. 内存存储:数据仅保存在内存中,这意味着一旦服务器重启,所有缓存数据将...

    Java LocalCache 本地缓存的实现实例

    Java LocalCache 本地缓存的实现实例主要介绍了两种LocalCache的实现方法,一种是基于ConcurrentHashMap实现基本本地缓存,另外一种是基于LinkedHashMap实现LRU策略的本地缓存。 基于ConcurrentHashMap的实现 ...

    java源码:分布式缓存框架 SwarmCache.zip

    源码中,你可以看到如何使用JGroups进行节点间的通信,以及如何实现分布式缓存的数据一致性算法,如Gossip协议或Paxos协议。 `commons-logging.jar`是Apache Commons Logging库,它提供了一个抽象层,允许在不修改...

    iBATIS缓存介绍

    - **3.2.2 LRU**:最近最少使用缓存,采用LRU算法管理缓存数据。 - **3.2.3 FIFO**:先进先出缓存,采用FIFO算法管理缓存数据。 - **3.2.4 OSCACHE**:操作系统级别的缓存,利用操作系统提供的缓存功能。 - **3.2.5 ...

    行业分类-设备装置-基于集群的流媒体缓存代理服务器系统.zip

    在流媒体缓存代理服务器系统中,每个节点都可以独立处理用户的请求,当某个节点负载过高时,其他节点可以接手处理,实现负载均衡。此外,集群还提供了故障容错能力,如果一个节点出现问题,其他节点仍能继续提供服务...

    memcached是一个高性能的分布式的内存对象缓存系统[参照].pdf

    【memcached】是一个高度优化的分布式内存对象缓存系统,其设计目的是为了加速网络应用程序,减少对数据库的依赖。最初由Danga Interactive公司的Brad Fitzpatrick为LiveJournal网站开发,后来广泛应用到众多大型...

    高并发缓存器(基于ehcache)

    Ehcache支持本地缓存、分布式缓存和 terracotta 集群模式,广泛应用于Java Web应用和企业级系统中。其主要特性包括: 1. **内存和磁盘存储**:Ehcache允许将数据存储在内存中,当内存不足时,可以自动将部分数据...

    Java开发中的Memcache原理及实现

    Memcached是一种高性能、分布式内存对象缓存系统,它能够存储数据并提供快速访问,从而提高应用的响应速度。在Java开发中,使用Memcached可以有效缓解数据库的压力,提高应用的性能。本文将深入探讨Memcached的工作...

    JetCache is a Java cache framework..zip

    这使得多节点应用能够共享同一份缓存数据,提高了数据一致性,并且可以通过缓存集群来扩展缓存容量。 3. **二级缓存**:在某些情况下,一级缓存(本地缓存)可能无法满足高可用性或数据一致性要求。JetCache提供了...

    详解大型网站web服务器缓存.zip

    例如,可以使用LRU(最近最少使用)算法,当缓存空间满时,优先淘汰最近最少使用的条目。 缓存的分类主要有内存缓存、磁盘缓存和混合缓存。内存缓存速度快但容量有限,适用于处理小量高频数据;磁盘缓存容量大但...

Global site tag (gtag.js) - Google Analytics