精华帖 (0) :: 良好帖 (0) :: 新手帖 (2) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2010-09-07
最后修改:2010-10-18
题记: 自己一直在EE企业级混着,最近想转型网络应用和产品这块,就来学习下memcached客户端分布式缓存,memcached是用c写的简单缓存,应用socket来交换数据。 阿里的岑文初,就职于阿里软件公司研发中心平台一部,任架构师。当前主要工作涉及阿里软件开发平台服务框架(ASF)设计与实现,服务集成平台(SIP)设计与实现。 开源了一个for java版的memcached客户端(http://code.google.com/p/memcache-client-forjava ),同时加入了LocalCached本地缓存的简单实现用于优化memcached,于是想学习下,希望对自己有所帮助。
引子——本地缓存 wenchu写了个简单的本地缓存,目的是让应用在进行memcached查询之前,再做次缓存以提高效率。实现很简单,用ConcurrentHashMap来缓存,并设置了一个简单的超时机制,利用ScheduledExecutorService开启1个线程定时遍历所有map,并清理过期的数据。如下: DefaultCacheImpl.java /** * 具体内容存放的地方 */ ConcurrentHashMap<String, Object>[] caches; /** * 超期信息存储,存储所有的key,超时时间 */ ConcurrentHashMap<String, Long> expiryCache; /** * 清理超期内容的服务 */ private ScheduledExecutorService scheduleService; /** * 清理超期信息的时间间隔,默认10分钟 */ private int expiryInterval = 10; /** * 内部cache的个数,根据key的hash对module取模来定位到具体的某一个内部的Map, * 减小阻塞情况发生。 */ private int moduleSize = 10;
注意: /**
所以每次get和put的时候,需要取模计算用哪个map,如下: DefaultCacheImpl.java private ConcurrentHashMap<String, Object>getCache(String key) { long hashCode = (long)key.hashCode(); if (hashCode < 0) hashCode = -hashCode; int moudleNum = (int)hashCode % moduleSize; return caches[moudleNum]; }
如果换我实现,我肯定就一个ConcurrentHashMap了,向前辈学习:)
本地缓存的超时机制——能否优化? 为了建立超时机制,作者除开用了Map数组来存储key-value的值,还需要存储超时时间,而expiryCache这个Map就是用来控制时间的,他存储key和ttl。所以在每次put的时候,expiryCache需要设定这对key-value的超时时间,而在每次get的时候需要判断取的key是否超时,如果这个key超时了,则返回空。 这里每次都做了判断了,我想是不是能减去这个判断? DefaultCacheImpl.java // 判断函数,判断某key-value是否超时 private void checkValidate(String key) { if (key != null && expiryCache.get(key) != null && expiryCache.get(key) != -1 && new Date(expiryCache.get(key)).before(new Date())) { getCache(key).remove(key); expiryCache.remove(key); } } // get函数 public Object get(String key) { checkValidate(key);// 每次都check了 return getCache(key).get(key); }
从另一个角度说,get如果不每次检查有效性是有可能获取到“超时信息”的,所以为了保证一定取到有效数据。 如果去掉“定时清理”线程,只留get来做清理工作的话,那么又会存在某个从来没有get过key-value永远存在内存里。我觉得如果性能有要求的话,是可以去掉清理线程的。
上层应用对集群的封装——MemcachedCache.java 此类是面向使用者的封装类,封装了底层memcachedClient的操作,他对外提供了clear(),put(),get(),remove(),add(),replace()等等需要用到的方法。对于上层应用来说,并不需要关心是本地缓存还是远程获取值,并且也不用关心与某个memcached服务器的通讯细节。 1,clear 如果做了集群,则调用所有集群进行flushAll() 可以看看flushAll方法:
public boolean clear() { boolean result = false; if (helper.hasCluster()) { List<MemCachedClient> caches = helper.getClusterCache(); for(MemCachedClient cache : caches) { try { result = cache.flushAll(null); } catch(Exception ex) { Logger.error(new StringBuilder(helper.getCacheName()) .append(" cluster clear error"),ex); result = false; } } return result; } else return helper.getInnerCacheClient().flushAll( null ); }
2,put 如果集群,则构造命令入命令队列,并异步发送给所有集群执行; 如果非集群,则只发送给某memcached执行命令; public Object put(String key, Object value, Date expiry) { boolean result = getCacheClient(key).set(key,value,expiry); //移除本地缓存的内容 if (result) localCache.remove(key); if (helper.hasCluster()) { Object[] commands = new Object[]{CacheCommand.PUT,key,value,expiry}; addCommandToQueue(commands);// 构造的命令加入命令队列,让异步线程发送给所有集群memcached服务器 } else if (!result) throw new java.lang.RuntimeException (new StringBuilder().append("put key :").append(key).append(" error!").toString()); return value; }
....其他几个方法都是大同小异了。
真正的底层实现——MemCachedClient.java
底层Socket通讯模块——SockIOPool.java 首先这个多io池采用多例模式,自己管理自己的实例并对外提供。 // store instances of pools private static ConcurrentMap<String, SockIOPool> pools = new ConcurrentHashMap<String, SockIOPool>(); 而且pools管理的是多个连接池,每个SockIOPool管理着多个Sock,key是socket对象,value是socket状态,如下:
private ConcurrentMap<String, ConcurrentMap<SockIO, Integer>> socketPool; 可以看到这里的SockIO类是一个静态内部类,主要工作就是管理一个Socket连接(TCP),给Memcached服务器发送命令并接收答复。 比如包括,新建立一个TCP连接(初始化host,port,timeout等),发送命令,接收答复。 public static class SockIO { // pool private SockIOPool pool; // data private String host; private Socket sock; private DataInputStream in; private BufferedOutputStream out; private byte[] recBuf; private int recBufSize = 1028; private int recIndex = 0; //判断是否需要检查链接处于可用状态 private long aliveTimeStamp = 0; public SockIO(SockIOPool pool, String host, int port, int timeout, int connectTimeout, boolean noDelay) throws IOException, UnknownHostException { this.pool = pool; recBuf = new byte[recBufSize]; // get a socket channel sock = getSocket(host, port, connectTimeout); if (timeout >= 0) sock.setSoTimeout(timeout); // testing only sock.setTcpNoDelay(noDelay); // wrap streams in = new DataInputStream(sock.getInputStream()); out = new BufferedOutputStream(sock.getOutputStream()); this.host = host + ":" + port; } .... }
发送命令: void flush() throws IOException { if (sock == null || !sock.isConnected()) { log.error("++++ attempting to write to closed socket"); throw new IOException( "++++ attempting to write to closed socket"); } out.flush(); } void write(byte[] b) throws IOException { if (sock == null || !sock.isConnected()) { log.error("++++ attempting to write to closed socket"); throw new IOException( "++++ attempting to write to closed socket"); } out.write(b); }
接收答复: public String readLine() throws IOException { if (sock == null || !sock.isConnected()) { throw new IOException("++++ attempting to read from closed socket"); } String result = null; ByteArrayOutputStream bos = new ByteArrayOutputStream(); //StringBuilder content = new StringBuilder(); int readCount = 0; // some recbuf releave if (recIndex > 0 && read(bos)) { return bos.toString(); } while((readCount = in.read(recBuf,recIndex,recBuf.length - recIndex)) > 0) { recIndex = recIndex + readCount; if (read(bos)) break; } result = bos.toString(); if (result == null || (result != null && result.length() <= 0 && recIndex <= 0)) { throw new IOException("++++ Stream appears to be dead, so closing it down"); } //update alive state aliveTimeStamp = System.currentTimeMillis(); return result; } 关于底层的socket通讯作者已经通过SockIO这个内部静态类已经写出来了,下面的工作就是Pool如何管理这些socket,并下达命令。
看上面MemcachedCache.java中的put方法 public Object put(String key, Object value, Date expiry)
然后根据key的hashcode与集群大小取模操作,选择某个memcached服务器组:
public MemCachedClient getCacheClient(String key)
取到Client后,调用MemCachedClient的set方法: public boolean set( String key, Object value, Date expiry ) {
private boolean set( String cmdname, String key, Object value, Date expiry, Integer hashCode, boolean asString ) { // 参数合法性检查 try { key = sanitizeKey( key );// 是否进行UTF-8编码 } catch ( UnsupportedEncodingException e ) { // 省略 } if ( value == null ) { log.error( "trying to store a null value to cache" ); return false; } // get SockIO obj SockIOPool.SockIO sock = pool.getSock( key, hashCode ); if ( sock == null ) { if ( errorHandler != null ) errorHandler.handleErrorOnSet( this, new IOException( "no socket to server available" ), key ); return false; } if ( expiry == null ) expiry = new Date(0); // store flags int flags = 0; // byte array to hold data byte[] val; if ( NativeHandler.isHandled( value ) ) { // 略 对value的值进行非序列化,根据value的类型转换成byte } else { // 略 always serialize for non-primitive types 对value对象进行序列化 } // now try to compress if we want to // and if the length is over the threshold if ( compressEnable && val.length > compressThreshold ) { // 略 对value进行压缩工作 } // now write the data to the cache server try { String cmd = new StringBuilder().append(cmdname).append(" ") .append(key).append(" ").append(flags).append(" ") .append(expiry.getTime() / 1000).append(" ") .append(val.length).append("\r\n").toString(); sock.write( cmd.getBytes() ); sock.write( val ); sock.write(B_RETURN); sock.flush(); // get result code String line = sock.readLine(); if (log.isInfoEnabled()) log.info( new StringBuilder().append("++++ memcache cmd (result code): ").append(cmd) .append(" (").append(line).append(")").toString() ); if ( STORED.equals( line ) ) { if (log.isInfoEnabled()) log.info(new StringBuilder().append("++++ data successfully stored for key: ").append(key).toString() ); sock.close(); sock = null; return true; } else if ( NOTSTORED.equals( line ) ) { if (log.isInfoEnabled()) log.info( new StringBuilder().append("++++ data not stored in cache for key: ").append(key).toString() ); } else { log.error( new StringBuilder().append("++++ error storing data in cache for key: ") .append(key).append(" -- length: ").append(val.length).toString() ); log.error( new StringBuilder().append("++++ server response: ").append(line).toString() ); } } catch ( IOException e ) { // if we have an errorHandler, use its hook if ( errorHandler != null ) errorHandler.handleErrorOnSet( this, e, key ); // exception thrown log.error( "++++ exception thrown while writing bytes to server on set" ); log.error( e.getMessage(), e ); try { sock.trueClose(); } catch ( IOException ioe ) { log.error( new StringBuilder().append("++++ failed to close socket : ").append(sock.toString()).toString() ); } sock = null; } if ( sock != null ) { sock.close(); sock = null; } return false; }
可以看到Memcached类主要做了一个本地化缓存和选择集群调用相应MemcachedClient的工作,而MemcachedClient做了数据准备的工作,包括编码的转换,对象的序列化,压缩。 前期数据的准备工作做好之后,就需要存储数据了,这里就会遇到选择哪个服务器的问题。 而SockIOPool类主要做了策略选择某个Server,并提供此Server的SockIO工作。作者是通过key和hashCode的计算选定某个Server,然后通过SockIO sock = getConnection(server);从这个server池中取得一个有效的SockIO(甚至是重新创建一个新的Socket),如果所有socket都无效的话,则重新计算hashcode,选择一个新的Server来做,如此遍历下去直到找到一个有效的SockIO。
SockIOPool初始化的时候建立一个名为“default”的连接池,然后为Server[]数组中每个Server创建SockIO池,保持长连接(因为本地端口资源有限,需要限制最大连接池数),通过createSocket方法,这个方法首先判断需要创建的host是否在失败过,如果失败过则根据条件重新创建,并设置状态为SOCKET_STATUS_ACTIVE,也就是sockets = new ConcurrentHashMap<SockIO, SOCKET_STATUS_ACTIVE>(); pool.putIfAbsent(host, sockets);,如果创建失败,则入失败的Map,这个Map的作用避免每次为失败过的Server进行重新创建,到这时,所有的Server就创建好了。 所以实际上最重要的两个方法是getSock和getConnection,getSock是根据key来计算并选择某个Server,getConnection则根据某个Server来从池中获取某个SockIO。 上层类MemCachedClient不关心底层选择的是哪个Server,只要拿到SockIO后,便可以进行操作了。
异步数据同步 异步数据同步是通过ExecutorService开启线程,读取队列中的命令集并向集群中所有memcached服务器发送命令来实现数据同步。
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2010-10-18
最后修改:2010-10-18
http://code.google.com/p/memcache-client-forjava/issues/detail?id=20#c1
问:正如你所说,memcached并非是用来做存储使用的,所以很不理解为什么要做成集 群式的,我的情况是有多个client对应多个memcached服务器端,由于所有服务器都在 一个局域网中,所以一个client对memcached进行了存储操作的同时,也会使用局域网 广播的形式通过只所有其他的client,这样所有的client就知道这个key值所对应的 value存储到了那一天memcached服务器上,即节省了memcached对内存的使用,也提高 了多client的命中率问题。 个人建议,如果有什么不妥的地方,欢迎讨论。 xiazhiquan@gmail.com 答: 作者好像是以key的hashcode计算出某台mc,所以我觉得在一种理想状态,每台client根据相同的hashcode都能够准确定位到那台mc。所以可以无需再广播通知到其他所有client了。 但是如果原hashcode计算出来的mc正好间歇性不能工作的话,就会出现一些数据的紊乱存储了。其他client就会不确定此key的value存储在哪台mc上了。 |
|
返回顶楼 | |
浏览 5144 次