`
guzizai2007
  • 浏览: 360757 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

MemCache客户端实现

 
阅读更多

不记得什么时候从网上下的,看着还不错,分享一下...

1~Cache接口:

package com.yx.cache;

/**
 * @功能: 缓存接口 ,提供对缓存的增删改查 四类操作
 * @作者: smile
 * @时间: 2013-5-2
 */
public interface Cache<T> {
	
	/**
	 * 获取缓存中的数据
	 * 
	 * @param key
	 * @return
	 */
	T get(String key);

	/**
	 * 把数据放入缓存 如果存在与key对应的值,则返回失败
	 * 
	 * @param key
	 * @param value
	 * @return
	 */
	boolean add(String key, T value);

	/**
	 * 把数据放入缓存 如果存在与key对应的值,则覆盖原有的值
	 * 
	 * @param key
	 * @param value
	 * @return
	 */
	boolean set(String key, T value);

	/**
	 * 缓存更新 如果不存在与key对应的缓存值,则不更新
	 * 
	 * @param key
	 * @param value
	 * @return
	 */
	boolean update(String key, T value);

	/**
	 * 删除缓存
	 * 
	 * @param key
	 * @return
	 */
	boolean delete(String key);
}

 2~Cache工厂,创建不同缓存策略的Cache对象:

package com.yx.cache;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.danga.MemCached.MemCachedClient;
import com.danga.MemCached.SockIOPool;
import com.yx.cache.util.ConfigUtil;

/**
 * @功能: 缓存工厂
 * @作者: smile
 * @时间: 2013-5-2
 */
public class CacheFactory {
	
	//MemCache客户端对象
	private static MemCachedClient memCachedClient = null;

	@SuppressWarnings("rawtypes")
	//每个类对应的Cache对象
	private static final Map<String, Cache> map = new ConcurrentHashMap<String, Cache>();

	static {
		// 获取服务器ip地址
		String serverStr = ConfigUtil.getConfigValue("servers", "");
		List<String> servers = new ArrayList<String>();
		for (String s : serverStr.split(",")) {
			s = s.trim();
			if (!"".equals(s)) {
				servers.add(s);
			}
		}
		if (servers.size() < 1) {
			throw new RuntimeException("cache 初始化失败!");
		}

		SockIOPool pool = SockIOPool.getInstance();
		pool.setServers(servers.toArray(new String[] {}));
		pool.setFailover(Boolean.valueOf(ConfigUtil.getConfigValue("failover", "true")));
		pool.setInitConn(Integer.valueOf(ConfigUtil.getConfigValue("initConn", "100")));
		pool.setMinConn(Integer.valueOf(ConfigUtil.getConfigValue("minConn", "25")));
		pool.setMaxConn(Integer.valueOf(ConfigUtil.getConfigValue("maxConn", "250")));
		pool.setMaintSleep(Integer.valueOf(ConfigUtil.getConfigValue("maintSleep", "30")));
		pool.setNagle(Boolean.valueOf(ConfigUtil.getConfigValue("nagle", "false")));// 关闭nagle算法
		pool.setSocketTO(Integer.valueOf(ConfigUtil.getConfigValue("socketTO", "3000")));
		pool.setAliveCheck(Boolean.valueOf(ConfigUtil.getConfigValue("aliveCheck", "true")));
		pool.setHashingAlg(Integer.valueOf(ConfigUtil.getConfigValue("hashingAlg", "0")));
		pool.setSocketConnectTO(Integer.valueOf(ConfigUtil.getConfigValue("socketConnectTO", "3000")));
		
		//服务器权重
		String wStr = ConfigUtil.getConfigValue("weights", "");
		List<Integer> weights = new ArrayList<Integer>();
		for (String s : wStr.split(",")) {
			s = s.trim();
			if (!"".equals(s)) {
				weights.add(Integer.valueOf(s));

			}
		}
		if (weights.size() == servers.size()) {
			pool.setWeights(weights.toArray(new Integer[] {}));
		}
		
		pool.initialize();
		memCachedClient = new MemCachedClient();
	}

	/**
	 * @功能:集中式缓存
	 */
	public static <T> Cache<T> getCommonCache(Class<T> t) {
		Cache<T> cache = map.get(t.getName());
		if (cache == null) {
			cache = createCommonCache(t);
		}
		return cache;
	}

	/**
	 * @功能:分布式缓存
	 */
	public static <T> Cache<T> getClusterCache(Class<T> t) {
		//key值是以 i- 开头的
		Cache<T> cache = map.get("i-" + t.getName());
		if (cache == null) {
			cache = createClusterCache(t);
		}
		return cache;
	}

	/**
	 * @功能:集中式缓存  保证并发条件下线程安全
	 */
	private static synchronized <T> Cache<T> createCommonCache(Class<T> t) {
		//key值直接使用类名
		Cache<T> cache = map.get(t.getName());
		if (cache == null) {
			cache = new CommonCache<T>(t, memCachedClient);
			map.put(t.getName(), cache);
		}
		return cache;
	}

	/**
	 * @功能:分布式缓存  保证并发条件下线程安全
	 */
	private static synchronized <T> Cache<T> createClusterCache(Class<T> t) {
		Cache<T> cache = map.get(t.getName());
		if (cache == null) {
			cache = new ClusterCache<T>(t, memCachedClient);
			map.put(t.getName(), cache);
		}
		return cache;
	}
}

 3~集中式策略缓存:

package com.yx.cache;

import com.danga.MemCached.MemCachedClient;

/**
 * @功能: 集中式缓存
 * @作者: smile
 * @时间: 2013-5-2
 */
public class CommonCache<T> implements Cache<T> {
	
	private static MemCachedClient memCachedClient = null;
	// key值拼接头字符
	private String base = null;

	CommonCache(Class<T> t, MemCachedClient client) {
		memCachedClient = client;
		base = t.getSimpleName() + "-";
	}

	public T get(String key) {
		return (T) memCachedClient.get(base + key);
	}

	public boolean set(String key, T value) {
		return memCachedClient.set(base + key, value);
	}

	@Override
	public boolean update(String key, T value) {
		return memCachedClient.replace(base + key, value);
	}

	@Override
	public boolean delete(String key) {
		return memCachedClient.delete(base + key);
	}

	@Override
	public boolean add(String key, T value) {
		return memCachedClient.add(base + key, value);
	}
}

 4~分布式策略缓存:

package com.yx.cache;

import com.danga.MemCached.MemCachedClient;
import com.schooner.MemCached.SchoonerSockIOPool;
import com.yx.cache.util.HashCodeUtil;
import com.yx.task.ThreadPoolManager;

/**
 * @功能: 分布式缓存
 * @作者: smile
 * @时间: 2013-5-2
 */
public class ClusterCache<T> implements Cache<T> {

	private static MemCachedClient memCachedClient = null;
	// key值拼接头字符
	private String base = null;
	// 管理缓存的线程池
	private static ThreadPoolManager taskManager = ThreadPoolManager.getInstance("cache");
	private SchoonerSockIOPool pool = SchoonerSockIOPool.getInstance();

	ClusterCache(Class<T> t, MemCachedClient client) {
		memCachedClient = client;
		base = "i-" + t.getSimpleName() + "-";
	}

	@Override
	public T get(String key) {
		T value = null;
		if (key == null) {
			return null;
		}
		key = base + key;
		if (pool.getServers().length < 2) {
			value = (T) memCachedClient.get(key);
		} else {
			int hashCode = HashCodeUtil.getHash(key);
			value = (T) memCachedClient.get(key, hashCode);
			if (value == null) {	//第一台不存在 则继续去另一台上找
				hashCode = this.getRehashCode(key, hashCode);
				value = (T) memCachedClient.get(key, hashCode);
				if (value != null) {// 如果在另外一台服务器上取到了缓存,则把缓存复制到第一台上去
					UpdateTask task = new UpdateTask(key, value);
					taskManager.submit(task);
				}
			}
		}
		return value;
	}

	@Override
	public boolean set(String key, T value) {
		if (key == null) {
			return false;
		}
		key = base + key;
		boolean result = false;
		if (pool.getServers().length < 2) {
			result = memCachedClient.set(key, value);
		} else {
			int hashCode = HashCodeUtil.getHash(key);
			result = memCachedClient.set(key, value, hashCode);
			// if (result) {
			hashCode = getRehashCode(key, hashCode);
			memCachedClient.set(key, value, hashCode);
			// }
		}
		return result;
	}

	private int getRehashCode(String key, int oldHashcode) {
		String host = pool.getHost(key, oldHashcode);
		int rehashTries = 0;
		// if (result) {
		int hashCode = HashCodeUtil.getHash(rehashTries + key);
		while (host.equals(pool.getHost(key, hashCode))) {
			rehashTries++;
			hashCode = HashCodeUtil.getHash(rehashTries + key);
		}
		return hashCode;
	}

	@Override
	public boolean update(String key, T value) {
		if (key == null) {
			return false;
		}
		key = base + key;
		boolean result = false;
		if (pool.getServers().length < 2) {
			result = memCachedClient.replace(key, value);
		} else {
			int hashCode = HashCodeUtil.getHash(key);
			result = memCachedClient.replace(key, value, hashCode);//更新缓存
			// if (result) {
			hashCode = getRehashCode(key, hashCode);
			memCachedClient.replace(key, value, hashCode);//更新备用机上缓存
			// }
		}
		return result;
	}

	@Override
	public boolean delete(String key) {
		if (key == null) {
			return false;
		}
		key = base + key;
		boolean result = false;
		if (pool.getServers().length < 2) {
			result = memCachedClient.delete(key);
		} else {
			int hashCode = HashCodeUtil.getHash(key);
			result = memCachedClient.delete(key, hashCode, null);//删除对应机子上缓存
			// if (result) {
			hashCode = this.getRehashCode(key, hashCode);
			memCachedClient.delete(key, hashCode, null);//删除备用机子上缓存
			// }
		}
		return result;
	}

	@Override
	public boolean add(String key, T value) {
		if (key == null) {
			return false;
		}
		key = base + key;
		boolean result = false;
		if (pool.getServers().length < 2) {
			result = memCachedClient.add(key, value);
		} else {
			int hashCode = HashCodeUtil.getHash(key);
			result = memCachedClient.add(key, value, hashCode);
			// if (result) {
			hashCode = getRehashCode(key, hashCode);//根据原始哈希值再进行哈希运算
			memCachedClient.add(key, value, hashCode);//再次保存  备份之用
			// }
		}
		return result;
	}

	
	/**
	 * @功能: 缓存丢失的情况下 恢复缓存
	 * @作者: smile
	 * @时间: 2013-5-2
	 */
	static class UpdateTask implements Runnable {

		private String key;
		private Object value;

		UpdateTask(String key, Object value) {
			this.key = key;
			this.value = value;
		}

		@Override
		public void run() {
			memCachedClient.set(key, value, HashCodeUtil.getHash(key));
		}
	}
}

 5~线程池工具类:

package com.yx.task;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @功能: 线程池管理  不同key值对应不同线程池
 * @作者: smile
 * @时间: 2013-5-2
 */
public class ThreadPoolManager {

	private static final Map<String, ThreadPoolManager> map = new HashMap<String, ThreadPoolManager>();

	final int CORE_SIZE = 5;
	//线程池 初始线程为5
	private ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(CORE_SIZE);

	//提交任务执行
	public void submit(Runnable task) {
		executor.submit(task);
	}

	public boolean finished() {
		return executor.getCompletedTaskCount() == executor.getTaskCount();
	}

	private ThreadPoolManager() {

	}

	//每个key值返回一个单例对象
	public static synchronized ThreadPoolManager getInstance(String key) {
		ThreadPoolManager t = map.get(key);
		if (t == null) {
			t = new ThreadPoolManager();
			map.put(key, t);
		}
		return t;
	}
}

 6~配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<config>
	<!-- server列表,(eg:192.168.1.111:18123,192.168.1.111:18124) -->
	<servers>192.168.1.119:18123,192.168.1.119:18124</servers>
	<!-- 权重,权重数量和servers数量相同,总和等于10.(eg:5,5)如果为空则代表平均分配 -->
	<weights></weights>
	<failover>true</failover><!-- 故障转移 -->
	<initConn>25</initConn><!-- 初始化连接数 -->
	<minConn>10</minConn><!-- 最小连接数 -->
	<maxConn>50</maxConn><!-- 最大连接数 -->
	<maintSleep>3000</maintSleep><!-- 守护线程的启动时间,如果要关闭守护线程设置未0 -->
	<nagle>false</nagle><!-- 是否使用nagle算法 -->
	<socketTO>3000</socketTO><!-- socket读超时时间 -->
	<aliveCheck>true</aliveCheck><!-- 对server的可用性进行检测 -->
	<hashingAlg>0</hashingAlg><!-- hash算法,0代表使用jdk的hash算法,如果要支持不同jdk请设置为 2(自带的hash算法),如果有多台cache服务器,而且考虑到动态添加cache服务器时,请设置 3 -->
	<socketConnectTO>100</socketConnectTO><!-- socket链接超时时间 -->
</config>

 7~读取配置文件工具类:

package com.yx.cache.util;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;

/**
 * @功能: 读取配置参数
 * @作者: smile
 * @时间: 2013-5-2
 */
public class ConfigUtil {

	private static final String CONFILE = "cacheConfig.xml";
	private static final Map<String, String> map = new HashMap<String, String>();

	// 类加载的时候初始化以下静态代码块
	// 主要功能就是读取配置文件参数 存放在一个Map中
	static {
		SAXReader saxReader = new SAXReader();
		InputStream ins = ConfigUtil.class.getClassLoader().getResourceAsStream(CONFILE);
		try {
			if (ins != null) {
				Document doc = saxReader.read(ins);
				Element root = doc.getRootElement();
				Iterator<Element> iter = root.elementIterator();
				while (iter.hasNext()) {
					Element e = iter.next();
					map.put(e.getName(), e.getTextTrim());
				}
			}
		} catch (DocumentException e) {
			e.printStackTrace();
			throw new RuntimeException("找不到配置文件:" + CONFILE);
		} finally {
			try {
				if (ins != null) {
					ins.close();
				} else {
					throw new RuntimeException("找不到配置文件:" + CONFILE);
				}
			} catch (IOException e) {
				e.printStackTrace();
			} finally {
				ins = null;
			}
		}
	}

	public static String getConfigValue(String key, String defaultValue) {
		String tmp = map.get(key);
		return isEmpty(tmp) ? defaultValue : tmp;
	}


	private static boolean isEmpty(String str) {
		if (str == null || "".equals(str)) {
			return true;
		}
		return false;
	}
	
	public static void main(String[] args) {
		System.out.println(map);
	}
}

 8~哈希工具类:

package com.yx.cache.util;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.zip.CRC32;

import com.schooner.MemCached.SchoonerSockIOPool;

public class HashCodeUtil {

	public static final int NATIVE_HASH = 0; // native String.hashCode();
	public static final int OLD_COMPAT_HASH = 1; // original compatibility
	public static final int NEW_COMPAT_HASH = 2; // new CRC32 based
	public static final int CONSISTENT_HASH = 3; // MD5 Based -- Stops

	private static int hashingAlg = SchoonerSockIOPool.getInstance().getHashingAlg();

	/**
	 * Returns a bucket to check for a given key.
	 * 
	 * @param key
	 *            String key cache is stored under
	 * @return int bucket
	 */
	public static final int getHash(String key) {

		switch (hashingAlg) {
		case NATIVE_HASH:
			return key.hashCode();
		case OLD_COMPAT_HASH:
			return origCompatHashingAlg(key);
		case NEW_COMPAT_HASH:
			return newCompatHashingAlg(key);
		case CONSISTENT_HASH:
			return md5HashingAlg(key);
		default:
			// use the native hash as a default
			hashingAlg = NATIVE_HASH;
			return key.hashCode();
		}
	}

	private static int origCompatHashingAlg(String key) {
		int hash = 0;
		char[] cArr = key.toCharArray();

		for (int i = 0; i < cArr.length; ++i) {
			hash = (hash * 33) + cArr[i];
		}

		return hash;
	}

	private static int newCompatHashingAlg(String key) {
		CRC32 checksum = new CRC32();
		checksum.update(key.getBytes());
		int crc = (int) checksum.getValue();
		return (crc >> 16) & 0x7fff;
	}

	private static int md5HashingAlg(String key) {
		MessageDigest md5 = MD5.get();
		md5.reset();
		md5.update(key.getBytes());
		byte[] bKey = md5.digest();
		int res = ((bKey[3] & 0xFF) << 24) | ((bKey[2] & 0xFF) << 16) | ((bKey[1] & 0xFF) << 8) | (bKey[0] & 0xFF);
		return res;
	}

	private static ThreadLocal<MessageDigest> MD5 = new ThreadLocal<MessageDigest>() {
		@Override
		protected final MessageDigest initialValue() {
			try {
				return MessageDigest.getInstance("MD5");
			} catch (NoSuchAlgorithmException e) {
				throw new IllegalStateException(" no md5 algorythm found");
			}
		}
	};

}

 

分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

    纯 Python Memcache 客户端实现的 Python3 端口.zip

    纯 Python Memcache 客户端实现的 Python3 端口重要弃用说明这是https://github.com/linsomniac/python-memcached的 py3 兼容端口从那时起,它又添加了 py23。所以你应该使用它。---纯 Python Memcached 客户端库的 ...

    memcache 客户端监控工具编译版(window版)

    这个压缩包提供的是一个专为Windows编译的MemCache客户端监控工具,这对于Windows用户来说是一个非常实用的资源。 首先,让我们了解一下MemCache的基本工作原理。MemCache采用键值对的形式存储数据,当应用程序需要...

    Memcache客户端Enyim.Caching的演示实例以及Enyim.Caching.dll和log4net.dll

    这个库不仅包含核心的Memcache客户端实现,还整合了log4net日志框架,便于调试和监控。 **Enyim.Caching的关键特性** 1. **连接池管理**:Enyim.Caching实现了连接池机制,可以有效地复用到Memcache服务器的连接,...

    memCache源码java客户端

    Java社区提供了多种memCache客户端,如spymemcached、xmemcached、memcached-client等。其中,spymemcached是较常用的一款,它由Danga Interactive开发并开源,具有简单易用、性能稳定的特点。 ### 二、...

    memcache实现java客户端

    以下是一些关于如何在Java中实现Memcache客户端的重要知识点: 1. **Java Memcache客户端库**:在Java中,常用的Memcache客户端库有Xmemcached和spymemcached。这些库提供了丰富的API,使Java开发者能够方便地将...

    Memcache Session Manager Tomcat8.5.6

    在这个场景中,jar包可能是包含Memcache客户端实现的库,需要被添加到Tomcat的类路径中以便使用。 为了实现这个架构,你需要: - 安装和配置Tomcat 8.5.6。 - 设置Nginx并配置反向代理和负载均衡规则。 - 安装和...

    gomemcache:Go Memcache客户端软件包

    描述这是用于Go编程语言的memcache客户端软件包。 实现以下命令: 获取(单键) 设置,添加,替换,追加,前置删除增量,增量安装go get github.com/kklis/gomemcache 根据您的环境配置,您可能需要root(Linux)或...

    Memcache客户端Enyim.Caching 参考的例子

    Enyim.Caching作为.NET平台的Memcached客户端,提供了全面的功能和良好的性能,是.NET开发者在项目中实现高效缓存管理的理想选择。通过EnyimMemcached-18030这样的社区维护版本,开发者可以享受到持续的改进和支持,...

    Memcache win版 服务器和.net驱动

    常见的.NET Memcache客户端库有EnyimMemcached和StackExchange.Redis,它们提供了丰富的API,使得开发者可以方便地进行数据的存取操作。 EnyimMemcached是.NET社区中广泛使用的Memcache客户端,它支持多种缓存操作...

    Java开发中的Memcache原理及实现

    - 协议简单:Memcached使用基于文本的简单协议,方便不同语言的客户端实现。 - 并发处理:多线程模型,支持多客户端并发请求。 2. Java与Memcached的交互: - 客户端库:Java开发者常用的Memcached客户端库有...

    MemCache Client User Guide

    【MemCache客户端用户指南】 MemCache客户端用户指南主要面向开发者,提供关于如何配置和使用MemCache客户端的详细信息。MemCache是一种高效的分布式内存缓存系统,用于存储和检索数据,以减轻数据库的压力并提高...

    MemCacheClientTest

    1. **Memcache客户端库**:MemcacheClientTest可能包含了多个不同实现的Memcache客户端库,如libmemcached、pylibmc等,这些库提供了与Memcache服务器交互的接口,包括连接、存储、检索和删除数据等操作。...

    Memcache原理及实现

    【Memcache原理及实现】 Memcache,全称Memcached,是一种高性能的分布式内存对象缓存系统,主要用于减轻数据库的负载。它将数据存储在内存中,以键值对的形式提供快速访问,尤其适用于高并发场景。Memcache最初由...

    Memcached客户端手册

    集群配置通常涉及定义多个客户端和Socket池,并通过合适的策略实现负载分发和故障转移。 ## 三、总结 本文主要介绍了Memcached的基本概念及其Java客户端库的使用方法,特别是针对`MemcachedClient`的接口定义、...

    memcache所需要的jar包

    "memcache所需要的jar包"这个标题暗示我们将讨论与Java环境下的Memcache客户端库相关的知识点。 1. **Memcache的基本概念** - Memcache是一个高性能、分布式的内存对象缓存系统,它通过在内存中存储数据来减少对...

    memcache实现网站全局计数器

    本文将深入探讨如何使用Memcached来实现网站全局计数器。 一、Memcached概述 Memcached是由Danga Interactive开发的,最初是为LiveJournal服务设计的,现在已经成为一个广泛使用的开源项目。它支持多语言,可以在...

    go语言memcached客户端.zip

    Go语言,也被称为Golang,是由Google开发的一种静态类型的、编译式的...在实际开发中,开发者可以根据项目需求选择适合的库,例如"github.com/bradfitz/gomemcache/memcache"是一个广受欢迎的Go Memcached客户端实现。

    spring memcache 集成使用

    - **原生客户端**:指的是`com.danga.MemCached`包下的客户端实现,虽然功能较为基础,但在某些场景下可能更加轻量级。 综上所述,通过Spring集成Memcache不仅可以提升应用程序的性能,还能充分利用Spring框架的...

    memcache jar

    "memcache jar"指的是用于Java应用的Memcache客户端的JAR文件,它包含了必要的类和库,使得Java程序能够调用Memcache服务。 1. **Memcache介绍** - Memcache是一个开源项目,最初由Danga Interactive公司创建,...

Global site tag (gtag.js) - Google Analytics