题记:
自己一直在EE企业级混着,最近想转型网络应用和产品这块,就来学习下memcached客户端分布式缓存,memcached是用c写的简单缓存,应用socket来交换数据。
阿里的岑文初,就职于阿里软件公司研发中心平台一部,任架构师。当前主要工作涉及阿里软件开发平台服务框架(ASF)设计与实现,服务集成平台(SIP)设计与实现。
开源了一个for java版的memcached客户端(http://code.google.com/p/memcache-client-forjava ),同时加入了LocalCached本地缓存的简单实现用于优化memcached,于是想学习下,希望对自己有所帮助。
引子——本地缓存
wenchu写了个简单的本地缓存,目的是让应用在进行memcached查询之前,再做次缓存以提高效率。实现很简单,用ConcurrentHashMap来缓存,并设置了一个简单的超时机制,利用ScheduledExecutorService开启1个线程定时遍历所有map,并清理过期的数据。如下:
DefaultCacheImpl.java
/**
* 具体内容存放的地方
*/
ConcurrentHashMap<String, Object>[] caches;
/**
* 超期信息存储,存储所有的key,超时时间
*/
ConcurrentHashMap<String, Long> expiryCache;
/**
* 清理超期内容的服务
*/
private ScheduledExecutorService scheduleService;
/**
* 清理超期信息的时间间隔,默认10分钟
*/
private int expiryInterval = 10;
/**
* 内部cache的个数,根据key的hash对module取模来定位到具体的某一个内部的Map,
* 减小阻塞情况发生。
*/
private int moduleSize = 10;
注意:
/**
* 内部cache的个数,根据key的hash对module取模来定位到具体的某一个内部的Map,
* 减小阻塞情况发生。
*/
所以每次get和put的时候,需要取模计算用哪个map,如下:
DefaultCacheImpl.java
private ConcurrentHashMap<String, Object>getCache(String key)
{
long hashCode = (long)key.hashCode();
if (hashCode < 0)
hashCode = -hashCode;
int moudleNum = (int)hashCode % moduleSize;
return caches[moudleNum];
}
如果换我实现,我肯定就一个ConcurrentHashMap了,向前辈学习:)
本地缓存的超时机制——能否优化?
为了建立超时机制,作者除开用了Map数组来存储key-value的值,还需要存储超时时间,而expiryCache这个Map就是用来控制时间的,他存储key和ttl。所以在每次put的时候,expiryCache需要设定这对key-value的超时时间,而在每次get的时候需要判断取的key是否超时,如果这个key超时了,则返回空。
这里每次都做了判断了,我想是不是能减去这个判断?
DefaultCacheImpl.java
// 判断函数,判断某key-value是否超时
private void checkValidate(String key)
{
if (key != null && expiryCache.get(key) != null && expiryCache.get(key) != -1
&& new Date(expiryCache.get(key)).before(new Date()))
{
getCache(key).remove(key);
expiryCache.remove(key);
}
}
// get函数
public Object get(String key)
{
checkValidate(key);// 每次都check了
return getCache(key).get(key);
}
从另一个角度说,get如果不每次检查有效性是有可能获取到“超时信息”的,所以为了保证一定取到有效数据。
如果去掉“定时清理”线程,只留get来做清理工作的话,那么又会存在某个从来没有get过key-value永远存在内存里。我觉得如果性能有要求的话,是可以去掉清理线程的。
上层应用对集群的封装——MemcachedCache.java
此类是面向使用者的封装类,封装了底层memcachedClient的操作,他对外提供了clear(),put(),get(),remove(),add(),replace()等等需要用到的方法。对于上层应用来说,并不需要关心是本地缓存还是远程获取值,并且也不用关心与某个memcached服务器的通讯细节。
1,clear
如果做了集群,则调用所有集群进行flushAll()
可以看看flushAll方法:
public boolean clear()
{
boolean result = false;
if (helper.hasCluster())
{
List<MemCachedClient> caches = helper.getClusterCache();
for(MemCachedClient cache : caches)
{
try
{
result = cache.flushAll(null);
}
catch(Exception ex)
{
Logger.error(new StringBuilder(helper.getCacheName())
.append(" cluster clear error"),ex);
result = false;
}
}
return result;
}
else
return helper.getInnerCacheClient().flushAll( null );
}
2,put
如果集群,则构造命令入命令队列,并异步发送给所有集群执行;
如果非集群,则只发送给某memcached执行命令;
public Object put(String key, Object value, Date expiry)
{
boolean result = getCacheClient(key).set(key,value,expiry);
//移除本地缓存的内容
if (result)
localCache.remove(key);
if (helper.hasCluster())
{
Object[] commands = new Object[]{CacheCommand.PUT,key,value,expiry};
addCommandToQueue(commands);// 构造的命令加入命令队列,让异步线程发送给所有集群memcached服务器
}
else
if (!result)
throw new java.lang.RuntimeException
(new StringBuilder().append("put key :").append(key).append(" error!").toString());
return value;
}
....其他几个方法都是大同小异了。
真正的底层实现——MemCachedClient.java
底层Socket通讯模块——SockIOPool.java
首先这个多io池采用多例模式,自己管理自己的实例并对外提供。
// store instances of pools
private static ConcurrentMap<String, SockIOPool> pools = new ConcurrentHashMap<String, SockIOPool>();
而且pools管理的是多个连接池,每个SockIOPool管理着多个Sock,key是socket对象,value是socket状态,如下:
private ConcurrentMap<String, ConcurrentMap<SockIO, Integer>> socketPool;
可以看到这里的SockIO类是一个静态内部类,主要工作就是管理一个Socket连接(TCP),给Memcached服务器发送命令并接收答复。
比如包括,新建立一个TCP连接(初始化host,port,timeout等),发送命令,接收答复。
public static class SockIO
{
// pool
private SockIOPool pool;
// data
private String host;
private Socket sock;
private DataInputStream in;
private BufferedOutputStream out;
private byte[] recBuf;
private int recBufSize = 1028;
private int recIndex = 0;
//判断是否需要检查链接处于可用状态
private long aliveTimeStamp = 0;
public SockIO(SockIOPool pool, String host, int port, int timeout,
int connectTimeout, boolean noDelay) throws IOException,
UnknownHostException
{
this.pool = pool;
recBuf = new byte[recBufSize];
// get a socket channel
sock = getSocket(host, port, connectTimeout);
if (timeout >= 0)
sock.setSoTimeout(timeout);
// testing only
sock.setTcpNoDelay(noDelay);
// wrap streams
in = new DataInputStream(sock.getInputStream());
out = new BufferedOutputStream(sock.getOutputStream());
this.host = host + ":" + port;
}
....
}
发送命令:
void flush() throws IOException
{
if (sock == null || !sock.isConnected())
{
log.error("++++ attempting to write to closed socket");
throw new IOException(
"++++ attempting to write to closed socket");
}
out.flush();
}
void write(byte[] b) throws IOException
{
if (sock == null || !sock.isConnected())
{
log.error("++++ attempting to write to closed socket");
throw new IOException(
"++++ attempting to write to closed socket");
}
out.write(b);
}
接收答复:
public String readLine() throws IOException
{
if (sock == null || !sock.isConnected())
{
throw new IOException("++++ attempting to read from closed socket");
}
String result = null;
ByteArrayOutputStream bos = new ByteArrayOutputStream();
//StringBuilder content = new StringBuilder();
int readCount = 0;
// some recbuf releave
if (recIndex > 0 && read(bos))
{
return bos.toString();
}
while((readCount = in.read(recBuf,recIndex,recBuf.length - recIndex)) > 0)
{
recIndex = recIndex + readCount;
if (read(bos))
break;
}
result = bos.toString();
if (result == null || (result != null && result.length() <= 0 && recIndex <= 0))
{
throw new IOException("++++ Stream appears to be dead, so closing it down");
}
//update alive state
aliveTimeStamp = System.currentTimeMillis();
return result;
}
关于底层的socket通讯作者已经通过SockIO这个内部静态类已经写出来了,下面的工作就是Pool如何管理这些socket,并下达命令。
看上面MemcachedCache.java中的put方法
public Object put(String key, Object value, Date expiry)
{
boolean result = getCacheClient(key).set(key,value,expiry);
...}
然后根据key的hashcode与集群大小取模操作,选择某个memcached服务器组:
public MemCachedClient getCacheClient(String key)
{
if (cacheClient == null)
{
Logger.error("cacheClient can't be injected into MemcachedCacheHelper");
throw new java.lang.RuntimeException("cacheClient can't be injected into MemcachedCacheHelper");
}
if (hasCluster())
{
List<MemCachedClient> clusters = getClusterCache();
long keyhash = key.hashCode();
int index = (int)keyhash % clusters.size();
if (index < 0 )
index *= -1;
return clusters.get(index);
}
else
return cacheClient;
}
取到Client后,调用MemCachedClient的set方法:
public boolean set( String key, Object value, Date expiry ) {
return set( "set", key, value, expiry, null, primitiveAsString );
}
private boolean set( String cmdname, String key, Object value, Date expiry, Integer hashCode, boolean asString ) {
// 参数合法性检查
try {
key = sanitizeKey( key );// 是否进行UTF-8编码
}
catch ( UnsupportedEncodingException e ) {
// 省略
}
if ( value == null ) {
log.error( "trying to store a null value to cache" );
return false;
}
// get SockIO obj
SockIOPool.SockIO sock = pool.getSock( key, hashCode );
if ( sock == null ) {
if ( errorHandler != null )
errorHandler.handleErrorOnSet( this, new IOException( "no socket to server available" ), key );
return false;
}
if ( expiry == null )
expiry = new Date(0);
// store flags
int flags = 0;
// byte array to hold data
byte[] val;
if ( NativeHandler.isHandled( value ) ) {
// 略 对value的值进行非序列化,根据value的类型转换成byte
}
else {
// 略 always serialize for non-primitive types 对value对象进行序列化
}
// now try to compress if we want to
// and if the length is over the threshold
if ( compressEnable && val.length > compressThreshold ) {
// 略 对value进行压缩工作
}
// now write the data to the cache server
try {
String cmd = new StringBuilder().append(cmdname).append(" ")
.append(key).append(" ").append(flags).append(" ")
.append(expiry.getTime() / 1000).append(" ")
.append(val.length).append("\r\n").toString();
sock.write( cmd.getBytes() );
sock.write( val );
sock.write(B_RETURN);
sock.flush();
// get result code
String line = sock.readLine();
if (log.isInfoEnabled())
log.info( new StringBuilder().append("++++ memcache cmd (result code): ").append(cmd)
.append(" (").append(line).append(")").toString() );
if ( STORED.equals( line ) ) {
if (log.isInfoEnabled())
log.info(new StringBuilder().append("++++ data successfully stored for key: ").append(key).toString() );
sock.close();
sock = null;
return true;
}
else if ( NOTSTORED.equals( line ) )
{
if (log.isInfoEnabled())
log.info( new StringBuilder().append("++++ data not stored in cache for key: ").append(key).toString() );
}
else {
log.error( new StringBuilder().append("++++ error storing data in cache for key: ")
.append(key).append(" -- length: ").append(val.length).toString() );
log.error( new StringBuilder().append("++++ server response: ").append(line).toString() );
}
}
catch ( IOException e ) {
// if we have an errorHandler, use its hook
if ( errorHandler != null )
errorHandler.handleErrorOnSet( this, e, key );
// exception thrown
log.error( "++++ exception thrown while writing bytes to server on set" );
log.error( e.getMessage(), e );
try {
sock.trueClose();
}
catch ( IOException ioe ) {
log.error( new StringBuilder().append("++++ failed to close socket : ").append(sock.toString()).toString() );
}
sock = null;
}
if ( sock != null ) {
sock.close();
sock = null;
}
return false;
}
可以看到Memcached类主要做了一个本地化缓存和选择集群调用相应MemcachedClient的工作,而MemcachedClient做了数据准备的工作,包括编码的转换,对象的序列化,压缩。
前期数据的准备工作做好之后,就需要存储数据了,这里就会遇到选择哪个服务器的问题。
而SockIOPool类主要做了策略选择某个Server,并提供此Server的SockIO工作。作者是通过key和hashCode的计算选定某个Server,然后通过SockIO sock = getConnection(server);从这个server池中取得一个有效的SockIO(甚至是重新创建一个新的Socket),如果所有socket都无效的话,则重新计算hashcode,选择一个新的Server来做,如此遍历下去直到找到一个有效的SockIO。
SockIOPool初始化的时候建立一个名为“default”的连接池,然后为Server[]数组中每个Server创建SockIO池,保持长连接(因为本地端口资源有限,需要限制最大连接池数),通过createSocket方法,这个方法首先判断需要创建的host是否在失败过,如果失败过则根据条件重新创建,并设置状态为SOCKET_STATUS_ACTIVE,也就是sockets = new ConcurrentHashMap<SockIO, SOCKET_STATUS_ACTIVE>(); pool.putIfAbsent(host, sockets);,如果创建失败,则入失败的Map,这个Map的作用避免每次为失败过的Server进行重新创建,到这时,所有的Server就创建好了。
所以实际上最重要的两个方法是getSock和getConnection,getSock是根据key来计算并选择某个Server,getConnection则根据某个Server来从池中获取某个SockIO。
上层类MemCachedClient不关心底层选择的是哪个Server,只要拿到SockIO后,便可以进行操作了。
异步数据同步
异步数据同步是通过ExecutorService开启线程,读取队列中的命令集并向集群中所有memcached服务器发送命令来实现数据同步。
分享到:
相关推荐
Memcache-client for PHP 是一个专为PHP开发的用于与Memcached服务器进行交互的客户端库。Memcached是一种高性能、分布式内存对象缓存系统,常用于减轻数据库的负载,提高Web应用的响应速度。PHP的Memcache客户端库...
tar -zxvf pecl-memcache-4.0.4.tar.gz && cd /root/pecl-memcache-4.0.4 && /usr/local/php7/bin/phpize && ./configure --with-php-config=/usr/local/php7/bin/php-config && make && make install
谷歌对于memcached提供给Java的客户端有spymemcached、xmemcached、memcache-client-forjava等多种形式,但memcache-client-forjava是使用最多、最稳定的。里边的文件和文档(有中文文档)都是从官网下载的,里边的...
本篇文章将详细介绍两个常用的Java Memcached客户端:xmemcached和memcache-client-forjava。 **1. xmemcached** xmemcached是由Ketoo开发的一个高性能、高可用性的Java Memcached客户端。它提供了丰富的API,支持...
本文将围绕"php_memcache-2.2.7-7.0-nts-vc14-x64 扩展DLL"这一主题,深入探讨PHP与Memcache的结合使用,以及相关文件的详细信息。 首先,"php_memcache-2.2.7-7.0-nts-vc14-x64 扩展DLL"这个标题揭示了几个关键点...
在本篇文章中,我们将深入探讨如何在64位PHP5.5环境下安装和使用"php_memcache-3.0.8-5.5-ts-vc11-x64"这一特定版本。 首先,"php_memcache-3.0.8-5.5-ts-vc11-x64"是针对PHP 5.5版本的64位系统设计的,这意味着它...
赠送jar包:netty-codec-memcache-4.1.73.Final.jar; 赠送原API文档:netty-codec-memcache-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-memcache-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:...
$memcache->set('key', 'value', 0, 3600); // 0 表示永不过期,3600 秒过期 ``` - 获取数据: ```php $value = $memcache->get('key'); echo $value; // 输出 "value" ``` - 删除数据: ```php $memcache->delete...
标题中的“php_memcache-3.0.9 for php7-nts-vc14-x64 扩展DLL,亲测有效”表明这是一个专为PHP 7设计的Memcache扩展库,版本为3.0.9,适用于非线程安全(NTS)且基于Visual C++ 14编译器的64位系统。这个扩展是经过...
PHP7-memcache-dll-master.zip 是一个针对PHP 7.0及以上版本的Memcache扩展插件包。Memcache是一款广泛使用的开源分布式内存对象缓存系统,它能够提高Web应用程序的性能,通过将数据存储在内存中,减少对数据库的...
$memcache->connect('localhost', 11211)) { die("无法连接到Memcache服务器"); } ``` 这里的'localhost'是Memcache服务器的地址,11211是默认端口。 六、使用Memcache API 连接成功后,你就可以使用Memcache提供...
$memcache->connect('localhost', 11211); echo "连接成功! "; $memcache->set('key', 'Hello, Memcached!', 0, 60); $value = $memcache->get('key'); echo "获取的值: " . $value; $memcache->close(); ?> ```...
$memcache->connect('localhost', 11211); // 默认端口是11211 ``` **示例代码解析** `example.php`文件通常包含更复杂的使用示例,可能包括存储和检索数据、检查缓存是否存在、删除缓存等操作。例如: ```php $...
《PHP与Memcache扩展:深入理解php_memcache-3.0.7-5.4-nts-vc9-x86.zip》 在PHP的世界里,Memcache是一个广泛使用的分布式内存对象缓存系统,用于提高Web应用程序的性能。本文将详细探讨PHP与Memcache的结合,特别...
6. `memcache_increment()` 和 `memcache_decrement()`:对整数值的键进行加减操作。 **四、Memcache在实际应用中的优势** 1. **性能提升**:通过将常用数据缓存到内存中,避免了频繁的数据库查询,极大地提高了...
标题 "php_memcache-3.0.8-5.6-nts-vc11-x86" 提供的信息是关于一个特定版本的 PHP Memcache 扩展,这是一款用于 PHP 的缓存模块,专为 PHP 5.6 构建,并且适配于非线程安全(NTS)版本且采用 Visual C++ 11 编译器...
标题 "php_memcache-3.0.9 for php7-nts-vc14-x86 扩展DL" 涉及的是一个针对PHP 7的特定版本的Memcache扩展,该扩展专为非线程安全(NTS)且基于Visual C++ 14编译器(对应于Visual Studio 2015)的32位(x86)环境...
赠送jar包:netty-codec-memcache-4.1.74.Final.jar; 赠送原API文档:netty-codec-memcache-4.1.74.Final-javadoc.jar; 赠送源代码:netty-codec-memcache-4.1.74.Final-sources.jar; 赠送Maven依赖信息文件:...
赠送jar包:netty-codec-memcache-4.1.73.Final.jar; 赠送原API文档:netty-codec-memcache-4.1.73.Final-javadoc.jar; 赠送源代码:netty-codec-memcache-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:...
1. **解压源代码**:使用`tar -zxvf memcache-3.0.9.tar.gz`命令来解压压缩包。 2. **进入源代码目录**:`cd memcache-3.0.9`。 3. **配置编译**:运行`phpize`以准备编译环境,然后执行`./configure --with-...