`
mengxs
  • 浏览: 28815 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

memcached java 客户端 优化

阅读更多

memcached java客户端 https://github.com/gwhalin/Memcached-Java-Client 在使用中发现对 同一JVM种对相同key的写操作时并发写的,这一点很不好,下面是对 memcached客户端优化使用,参考oscache的机制。

 

对MemCachedClient  二次封装目的:

      优化对同一key的读写操作

对客户端的二次封装:

 

import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.danga.MemCached.MemCachedClient;

/**
 * memcached客户端优化,确保对相同key的写操作并发执行时只要一次写入memcached
 * @author mengxiansheng
 *
 */
public class CacheTuning {
	// 创建全局的唯一实例
	private static ConcurrentHashMap updateStates = new ConcurrentHashMap();
	protected static MemCachedClient memCachedClient = null;
	private static transient final Log log = LogFactory
			.getLog(CacheTuning.class);
	public void setMemCachedClient(MemCachedClient mcc) {
		this.memCachedClient = mcc;
	}

	protected EntryUpdateState getUpdateState(String key) {
		EntryUpdateState updateState;
		// Try to find the matching state object in the updating entry map.
		updateState = (EntryUpdateState) updateStates.get(key);
		if (updateState == null) {
			// It's not there so add it.
			updateState = new EntryUpdateState();
			updateStates.put(key, updateState);
		} else {
			// Otherwise indicate that we start using it to prevent its
			// removal
			// until all threads are done with it.
			updateState.incrementUsageCounter();
		}
		return updateState;
	}

	/**
	 * 释放key对EntryUpdateState实例的使用,当没有线程使用EntryUpdateState实例是,将该实例在map中删除
 	 * 
	 * @param state
	 *            the state to release the usage of
	 * @param key
	 *            the associated key.
	 */
	protected void releaseUpdateState(EntryUpdateState state, String key) {
		int usageCounter = state.decrementUsageCounter();
		checkEntryStateUpdateUsage(key, state, usageCounter);
	}

	/**
	 * Utility method to check if the specified usage count is zero, and if so
	 * remove the corresponding EntryUpdateState from the updateStates. This is
	 * designed to factor common code.
	 * 
	 * Warning: This method should always be called while holding both the
	 * updateStates field and the state parameter
	 * 
	 * @throws Exception
	 */
	private void checkEntryStateUpdateUsage(String key, EntryUpdateState state,
			int usageCounter) {
		// Clean up the updateStates map to avoid a memory leak once no thread
		// is using this EntryUpdateState instance anymore.
		if (usageCounter == 0) {
			log.debug("在更新状态列表中删除:" + key + "\t");
			EntryUpdateState removedState = (EntryUpdateState) updateStates
					.remove(key);
			if (state != removedState) {
				if (log.isErrorEnabled()) {
					try {
						throw new Exception(
								"memcached: internal error: removed state ["
										+ removedState + "] from key [" + key
										+ "] whereas we expected [" + state
										+ "]");
					} catch (Exception e) {
						log.error(e);
					}
				}
			}
		}
	}

	/**
	 * 保护型构造方法,不允许实例化!
	 * 
	 */
	protected CacheTuning() {

	}

 
	/**
	 * 添加一个指定的值到缓存中.
	 * 
	 * @param key
	 * @param value
	 * @return
	 */

	public boolean putInCache(String key, Object value, long secords) {
		// set方法:在cache中存储一个指定对象
		// add 和replace 方法功能差不多
		// add -- 如果不存在 这个key的对象,将会存储一个对象到cache中
		// replace --只有当存在指定key对象的时候 会覆盖已有对象
		EntryUpdateState updateState = getUpdateState(key);
		try {
			boolean ok = memCachedClient.add(key, value, new Date(
					secords * 1000));
			completeUpdate(key);
			log.debug(key + "添加成功,值:" + value.toString());
			return ok;
		} catch (Exception e) {
			e.printStackTrace();
			this.cancelUpdate(key);
		} finally {
			// Make sure we release the usage count for this
			// EntryUpdateState since we don't use it anymore. If the
			// current thread started the update, then the counter was
			// increased by one in startUpdate()
			log.debug(Thread.currentThread().getId() + " 唤醒:" + key);
		}

		return false;
	}

	/**
	 * Removes the update state for the specified key and notifies any other
	 * threads that are waiting on this object. This is called automatically by
	 * the {@link #putInCache} method, so it is possible that no
	 * EntryUpdateState was hold when this method is called.
	 * 
	 * @param key
	 *            The cache key that is no longer being updated.
	 */
	protected void completeUpdate(String key) {
		EntryUpdateState state;
		state = (EntryUpdateState) updateStates.get(key);
		if (state != null) {
			synchronized (state) {
				int usageCounter = state.completeUpdate();
				state.notify();
				checkEntryStateUpdateUsage(key, state, usageCounter);
			}
		} else {
			// If putInCache() was called directly (i.e. not as a result of a
			// NeedRefreshException) then no EntryUpdateState would be found.
		}
	}

	public boolean replace(String key, Object value) {
		return memCachedClient.replace(key, value);
	}
	/**
	 * 在缓存中删除指定的key
	 * 
	 * @param key 指定删除的key
	 *             
	 * @return <code>true</code>, 如果删除成功
	 */
	public boolean delete(String key) {
		return memCachedClient.delete(key);
	}
	
	/**
	 * 根据指定的关键字获取对象.
	 * 
	 * @param key
	 * @return
	 */
	public Object getFromCache(String key) throws NeedsRefreshException {
		EntryUpdateState updateState = null;
		updateState = getUpdateState(key);
		Object obj = memCachedClient.get(key);
		//log.debug("key:"+key+" 的缓存对象 is null ? "+(obj==null));
		if(obj!=null){
			return obj;
		}
		boolean reload = false;
		if (obj == null) {// 不存在缓存数据,数据已过期或者第一次访问
			try {
				synchronized (updateState) {
					if (updateState.isAwaitingUpdate()
							|| updateState.isCancelled()) {// 第一次访问该数据或者前面线程对该数据的更新已取消
						updateState.startUpdate();// 开始更新
					} else if (updateState.isUpdating()) {// 该数据正在更新
						// //其他线程正在更新这个缓存
						do {
							try {
								updateState.wait();
							} catch (InterruptedException e) {
							}
						} while (updateState.isUpdating());
						if (updateState.isCancelled()) {// 更新被取消
							updateState.startUpdate();
						} else if (updateState.isComplete()) {// 其他线程完成了对该数据的更新
							reload = true;
						} else {
							log.error("Invalid update state for cache entry "
									+ key);
						}

					} else {// 其他线程已经对该数据更新完成,重新获取数据
						reload = true;
					}
				}
			} catch (Exception e) {
				log.error(e.getMessage(), e.getCause());
			} finally {
				// Make sure we release the usage count for this
				// EntryUpdateState since we don't use it anymore. If
				// the current thread started the update, then the
				// counter was
				// increased by one in startUpdate()
				releaseUpdateState(updateState, key);// 释放 对 EntryUpdateState
														// 的使用
														// EntryUpdateState的计数器减一
			}
		}

		if (reload) {
			obj = memCachedClient.get(key);
		}
		if (obj == null) {// 没有获取到缓存对象,则更新
			throw new NeedsRefreshException("需要重新获取:" + key);
		}
		return obj;
	}

	private void cancelUpdate(String key) {
		// TODO Auto-generated method stub
		EntryUpdateState state;
		if (key != null) {
			state = (EntryUpdateState) updateStates.get(key);
			if (state != null) {
				synchronized (state) {
					int usageCounter = state.cancelUpdate();
					state.notify();
					checkEntryStateUpdateUsage(key, state, usageCounter);
				}
			} else {
				if (log.isErrorEnabled()) {
					log
							.error("internal error: expected to get a state from key ["
									+ key + "]");
				}
			}

		}
	}

	public boolean putInCache(String key, Object value) {
		// TODO Auto-generated method stub
		return this.putInCache(key, value, 0);
	}

}

 

 

 缓存实体状态类:

 

 

 

 

/**
 * 缓存更新状态。
 * Holds the state of a Cache Entry that is in the process of being (re)generated.
 * This is not synchronized; the synchronization must be handled by the calling
 * classes.
 *
 * @author Author: mengxiansheng
  */
public class EntryUpdateState {
    /**
     * 初始化状态
     */
    public static final int NOT_YET_UPDATING = -1;

    /**
     * 正在处理中状态
     */
    public static final int UPDATE_IN_PROGRESS = 0;

    /**
     * 更新完成状态
     */
    public static final int UPDATE_COMPLETE = 1;

    /**
     * 取消更新状态
     */
    public static final int UPDATE_CANCELLED = 2;

    /**
     *初始化状态
     */
    int state = NOT_YET_UPDATING;
    
    /**
     * 线程计数器,实例可以控制这个计数器,当计数器是0的时候,可以说明缓存实例已经被释放
     * 计数器受 EntryStateUpdate实例监听器的保护
     * A counter of the number of threads that are coordinated through this instance. When this counter gets to zero, then the reference to this
     * instance may be released from the Cache instance.
     * This is counter is protected by the EntryStateUpdate instance monitor.
     */
    private  int nbConcurrentUses = 1;

    /**
     *   是否等待更新
     *   如果是初始化状态,则等待更新
     * This is the initial state when an instance this object is first created.
     * It indicates that a cache entry needs updating, but no thread has claimed
     * responsibility for updating it yet.
     */
    public boolean isAwaitingUpdate() {
        return state == NOT_YET_UPDATING;
    }

    /**
     * 更新已经被取消
     * The thread that was responsible for updating the cache entry (ie, the thread
     * that managed to grab the update lock) has decided to give up responsibility
     * for performing the update. OSCache will notify any other threads that are
     * waiting on the update so one of them can take over the responsibility.
     */
    public boolean isCancelled() {
        return state == UPDATE_CANCELLED;
    }

    /**
     * 更新完成
     * The update of the cache entry has been completed.
     */
    public boolean isComplete() {
        return state == UPDATE_COMPLETE;
    }

    /**
     * 正在更新
     * The cache entry is currently being generated by the thread that got hold of
     * the update lock.
     */
    public boolean isUpdating() {
        return state == UPDATE_IN_PROGRESS;
    }

    /**
     * 取消更新,只能是出于增在更新状态UPDATE_IN_PROGRESS 的可以取消更新
     * Updates the state to <code>UPDATE_CANCELLED</code>. This should <em>only<em>
     * be called by the thread that managed to get the update lock.
     * @return the counter value after the operation completed
     */
    public int cancelUpdate() {
//        if (state != UPDATE_IN_PROGRESS) {
//            throw new IllegalStateException("Cannot cancel cache update - current state (" + state + ") is not UPDATE_IN_PROGRESS");
//        }

        state = UPDATE_CANCELLED;
        return decrementUsageCounter();
    }

    /**
     * 完成更新
     * Updates the state to <code>UPDATE_COMPLETE</code>. This should <em>only</em>
     * be called by the thread that managed to get the update lock.
     * @return the counter value after the operation completed
     */
    public int completeUpdate() {
        if (state != UPDATE_IN_PROGRESS) {
            throw new IllegalStateException("Cannot complete cache update - current state (" + state + ") is not UPDATE_IN_PROGRESS");
        }

        state = UPDATE_COMPLETE;
        return decrementUsageCounter();
    }

    /**
     * 开始更新,当状态不是初始状态并且不是取消状态,才可以调用此方法,否则报异常
     * Attempt to change the state to <code>UPDATE_IN_PROGRESS</code>. Calls
     * to this method must be synchronized on the EntryUpdateState instance.
     * @return the counter value after the operation completed
     */
    public int startUpdate() {
        if ((state != NOT_YET_UPDATING) && (state != UPDATE_CANCELLED)) {
            throw new IllegalStateException("Cannot begin cache update - current state (" + state + ") is not NOT_YET_UPDATING or UPDATE_CANCELLED");
        }

        state = UPDATE_IN_PROGRESS;
        return incrementUsageCounter();
    }

    /**
     * 并发数增加
     * Increments the usage counter by one
     * @return the counter value after the increment
     */
	public synchronized int incrementUsageCounter() {
		nbConcurrentUses++;
		return nbConcurrentUses;
	}
	
    /**
     * 获取并发数
     * Gets the current usage counter value
     * @return a positive number.
     */
	public synchronized int getUsageCounter() {
		return nbConcurrentUses;
	}
	
	
    /**
     * 计数器递减,并发数<=0时抛出异常.
     * Decrements the usage counter by one. This method may only be called when the usage number is greater than zero
     * @return the counter value after the decrement
     */
	public synchronized int decrementUsageCounter() {
		if (nbConcurrentUses <=0) {
            throw new IllegalStateException("Cannot decrement usage counter, it is already equals to [" + nbConcurrentUses + "]");
		}
		nbConcurrentUses--;		
		return nbConcurrentUses;
	}

	
}

 

 

需要刷新异常:

/**
 * 
 *当缓存实体过期或不存在时抛出异常
 * @author        mengxiansheng
  */
public final class NeedsRefreshException extends Exception {

    /**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	/**
     * Current object in the cache
     */
    private Object cacheContent = null;
    
    /**
     * Create a NeedsRefreshException
     */
    public NeedsRefreshException(String message, Object cacheContent) {
        super(message);
        this.cacheContent = cacheContent;
    }

    /**
     * Create a NeedsRefreshException
     */
    public NeedsRefreshException(Object cacheContent) {
        super();
        this.cacheContent = cacheContent;
    }
    
    /**
     * Retrieve current object in the cache
     */
    public Object getCacheContent() {
        return cacheContent;
    }

}

 

 

spring 管理配置

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
	<bean class="ApplicationContextUtil"></bean>
	<!--socket连接池-->
	<bean id="SockIOPool" class="com.danga.MemCached.SockIOPool"
		factory-method="getInstance" init-method="initialize">
		<!--服务器列表,可以根据需要增减-->
		<property name="servers">
			<list>
				<value>127.0.0.1:11211</value>
			</list>
		</property>
		<!--服务器权重,根据内存大小分配-->
		<property name="weights">
			<list>
				<value>2</value>
			</list>
		</property>
		<!--初始化,最小,最大连接数,建议开发环境使用1,1,5,线上环境使用5,5,250-->
		<property name="initConn" value="1"></property>
		<property name="minConn" value="1"></property>
		<property name="maxConn" value="5"></property>

		<!--时间-->
		<property name="maxIdle" value="21600000"></property>
		<property name="maintSleep" value="30000"></property>
		<property name="nagle" value="false"></property>
		<property name="socketTO" value="3000"></property>
		<property name="socketConnectTO" value="3000"></property>
	</bean>

	<bean name="MemCachedClient"
		class="com.danga.MemCached.MemCachedClient" depends-on="SockIOPool">
		<property name="compressEnable" value="true"></property>
		<property name="compressThreshold" value="131072"></property>
	</bean>
	<bean name="Mem" class="CacheTuning">
		<property name="memCachedClient" ref="MemCachedClient" />
	</bean>


</beans>

 

并发测试:

 
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


public class Test {
	//ApplicationContextUtil 类自己实现,用来加载spring配置文件,
	static CacheTuning mt=(CacheTuning)ApplicationContextUtil.getContext().getBean("Mem");

	private static transient final Log log = LogFactory
	.getLog(Test.class);
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
 		long t1=System.currentTimeMillis();
   		for(int i=0;i<1000;i++){
    	   Thread tt=new Thread(new ts().setTime(t1));
    	   tt.start();
        }
	}

}

class ts implements java.lang.Runnable{
	private static transient final Log log = LogFactory
	.getLog(ts.class);
 	long t1=0;
	public ts setTime(long t1){
		this.t1=t1;
		return this;
	}
	public void run() {
		// TODO Auto-generated method stub
  		String aa=null;
		long _t1=System.currentTimeMillis();
 		try{
 			Test.mt.delete("keyss");
			aa=(String)Test.mt.getFromCache("keyss");
   		}catch(NeedsRefreshException e){
  				aa="kdj快来撒飞机快lsd就疯了";
 	 			Test.mt.putInCache("keyss", aa,100);
  	 			log.info(Thread.currentThread().getId()+"新增:keyss");
  		} 
   		
		long t2=System.currentTimeMillis();
		log.info(Thread.currentThread().getId()+"耗时:\t"+(t2-_t1)+"\t总耗时\t"+(t2-t1)+"\t"+aa);
  
	}
	
}

 

分享到:
评论

相关推荐

    memcached java客户端驱动包

    **Memcached Java客户端驱动包详解** Memcached是一种高性能的分布式内存对象缓存系统,用于减少数据库负载,提高网站性能。Java连接Memcached的驱动包使得Java开发者能够方便地与Memcached进行交互,实现数据的...

    Memcached之java客户端开发

    **Memcached之Java客户端开发详解** Memcached是一种高性能、分布式内存对象缓存系统,用于减少数据库负载,提高网站性能。它通过将数据存储在内存中,以快速响应来自应用程序的请求,避免了反复读取数据库的开销。...

    Memcached 客户端 服务端 管理工具

    客户端是应用与Memcached交互的接口,常见的客户端库有libmemcached(C语言)、pylibmc(Python)和memcached-client(Java)等。客户端的主要任务包括: - **连接管理**:建立与服务端的TCP连接或Unix域套接字连接...

    Memcached java的使用实例

    常见的Java Memcached客户端有spymemcached和xmemcached,这里以spymemcached为例。可以通过Maven在`pom.xml`文件中添加依赖: ```xml &lt;groupId&gt;net.spy&lt;/groupId&gt; &lt;artifactId&gt;spymemcached &lt;version&gt;2.12.3 ...

    java_memcached-release_2.5.1.jar Memcache java jar 包

    Java Memcached是一个流行的Java客户端库,用于与Memcached缓存系统进行交互。Memcached是一种分布式内存对象缓存系统,常用于减轻数据库负载,提高Web应用的性能。在本例中,我们关注的是`java_memcached-release_...

    memcached java源码(performance分支)

    在Java环境中,我们常常使用Java客户端库来与Memcached服务器进行交互。这个文档将深入探讨"performance"分支的Java源码,它是基于"master"分支优化后的版本,具有更高的性能表现。 ### 1. 性能优化策略 ...

    memcached linux安装说明+ java客户端调用例子

    在Linux系统上安装和配置memcached,并通过Java客户端进行调用是常见的操作流程,下面将详细介绍这一过程。 ### 一、memcached的Linux安装 1. **更新系统**: 首先确保你的Linux系统已经更新到最新版本,运行`sudo ...

    memcaChed java client jar包

    Java客户端库使得Java开发者可以方便地与Memcached服务进行交互。标题中的"memcached java client jar包"指的是用于Java应用程序与Memcached服务器通信的Java库的JAR文件。 在Java应用中使用Memcached客户端,首先...

    memcached 完整的项目,服务端(win) 客户端 包括jar包

    **Memcached 完整项目详解** Memcached 是一个高性能、分布式的内存对象缓存系统,用于在应用服务器之间共享...通过理解服务端的部署和客户端的使用方法,开发者能够更好地利用 Memcached 实现分布式系统的性能优化。

    memCache源码java客户端

    **memCache源码分析——Java...总之,理解并熟练使用memCache的Java客户端,对于优化应用性能、减轻数据库压力至关重要。在实际项目中,应根据具体场景选择合适的操作方式,并注意性能调优,以充分发挥memCache的优势。

    memCached win64服务端和Java客户端

    2. **连接与操作**:Java客户端库提供了API来连接memcached服务器,执行基本的缓存操作,如设置、获取、删除键值对。这些操作通常是异步的,以保证高并发下的性能。 3. **API接口**:常见的Java客户端库如...

    java_memcached-release_2.6.3.rar 客户端所有包

    这个`java_memcached-release_2.6.3.rar`压缩包包含了用于Java客户端开发所需的全部依赖库,使得开发者能够轻松地在Java应用中集成Memcached功能。 1. **Memcached**:Memcached是一个高性能、分布式的内存对象缓存...

    java memcached客户端

    java memcached客户端进过一些性能上的优化,建立了集群的概念

    MemCached Cache Java Client封装优化历程.docx

    在实际使用中,通常会开发 Java 客户端库来封装 Memcached 的操作,提供更方便的 API。封装过程可能包括: 1. **连接池管理**:为了提高性能,客户端可以使用连接池管理多个到 Memcached 服务器的连接,避免频繁...

    memcached客户端所需jar包

    在这个场景下,"memcached客户端所需jar包"指的是Java开发中用来连接和操作Memcached的库的Java Archive (JAR) 文件。 标题提到的"memcached客户端所需jar包",通常包括以下几个关键组件: 1. **Spymemcached**:...

    内存缓存服务器(memcached)客户端比较报告[收集].pdf

    测试环境是在两台不同的服务器上搭建,其中一台运行Memcached服务器(版本1.4.5,RHEL 5 64位操作系统,8核4GB内存),另一台运行Java客户端,两台服务器间通过100M内部局域网连接。 测试结果以图表形式展示,包括...

    memcached for java client 例子

    7. **性能优化**:在使用Java客户端时,可以考虑利用异步操作、批量操作以及预热缓存等策略来提高性能。 8. **异常处理**:在实际开发中,需要注意处理网络异常、超时异常以及数据一致性问题,确保程序的健壮性。 ...

    memcached java源码(master分支)

    5. **性能优化**:考虑到Memcached的目标是提供高速缓存服务,Java客户端可能会包含各种性能优化措施,比如批量操作、异步处理、连接池优化等。 6. **异常处理**:源码中会有对网络错误、超时、数据一致性问题等...

    memcached客户端和服务端程序和jar包

    Spymemcached 是一个轻量级且高度优化的 Memcached 客户端,它提供了简单易用的 API,使得在Java应用中集成 Memcached 变得轻松。使用 Spymemcached,你需要添加其 JAR 包到项目的类路径中。例如,在Maven项目中,...

    java_memcached所需jar包

    Java Memcached是一个流行的Java客户端库,用于与Memcached缓存服务器进行交互。Memcached是一种分布式内存对象缓存系统,常用于提高网站应用的性能,通过将数据存储在内存中,减少对数据库的访问,从而加快读取速度...

Global site tag (gtag.js) - Google Analytics