论坛首页 Java企业应用论坛

memcached-ha 可基于任何客户端的memcached高可用工具

浏览 4183 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2012-12-11   最后修改:2012-12-11

 

 

MemcachedHA

 

MemcachedHA 包装一种memcached client 使系统达到对Memcached的高可用控制

1.数据多点备份(主从数据同步)

主系统通过MemcachedHAMemcached中存放数据时,MemcachedHA会自动同步到其他的Memcached服务器中。

(该同步过程是异步完成,不会影响主系统的响应速度)

 

2.防穿透(提高命中率)

当主系统通过MemcachedHAMemcached中获得数据时,若获得数据失败,MemcachedHA会自动向其他Memcached服务器发送请求获得数据,直到获得数据或者最终失败

(该方法可以通过参数屏蔽掉向其他服务器获得数据的逻辑,只完成一次请求)

 

3.故障恢复(数据恢复,数据高可用)

当某一台数据发生故障宕机或者重启,造成数据丢失。为解决数据丢失后该服务器脱离主系统的问题,MemcachedHA提供了对数据进行恢复的功能。

当主系统通过MemcachedHAMemcached中获得数据时,MemcachedHA会记录未命中的服务器,并在最终获得数据的时将最终获得的数据恢复到未命中的服务器中。

(该功能可以设置为CheckAll级别 即无论是否有未命中的服务器都同步到其他的服务器中,该级别建议在有服务数据丢失的情况下开启)

 


 

 

git 代码查看地址 https://github.com/xuchenCN/memcached-ha

git 已打好的jar包地址 https://github.com/xuchenCN/memcachedha-beta-0.0.1

  • 大小: 22.9 KB
   发表时间:2012-12-11  
目前版本为 beta 0.0.1

测试版

下个版本会加入事务的支持
0 请登录后投票
   发表时间:2012-12-18  

memcached-ha 与spring结合使用

配置context文件

<bean id="clientFactory" class="com.baidu.memcachedha.client.ClientFactory">
    <!-- number conectors of memcahed server in every client-->
    <property name="workPoolSize" value="4" />
    <!-- command time out -->
    <property name="opTimeout" value="5000" />
</bean>


<bean id="clientDispatch" class="com.baidu.memcachedha.MemcachedClientDispatch">
    <!-- then primary cached miss to do retry time -->
    <property name="primaryRetry" value="2" />
    <!-- wait for this time to init breath -->
    <property name="initalTime" value="1000" />
    <!-- every breath time -->
    <property name="breathTime" value="1000" />
    <!-- memcached-ha to do sync thread pool size -->
    <property name="syncThread" value="8" />
    <!-- memcached-ha to do broken reload thread pool size -->
    <property name="reloadThread" value="8" />
    <!-- method name to sync -->
    <property name="syncMethods">
        <set>
            <value>add</value>
            <value>set</value>
        </set>
    </property>

    <!-- client factory -->
    <property name="clientFactory" ref="clientFactory"/>

    <!-- listeners -->
    <property name="syncCommandListener" ref="commandListener"/>
    <property name="reloadcCommandListener" ref="commandListener"/>
    <property name="serverStatListener" ref="serverStatListener"/>

</bean>

<bean id="commandListener" class="com.baidu.memcachedha.test.RequestListener">

</bean>

<bean id="serverStatListener" class="com.baidu.memcachedha.test.ServerListener">

</bean>

 

 

编写 RequestListener

public class RequestListener implements CommandListener{

public void onCommandError(MemcachedClientKeeper<?> keeper, CommandParam param) {
    // TODO Auto-generated method stub

}

public void onCommandResponsed(MemcachedClientKeeper<?> keeper, CommandParam param, Object response)         {
    // TODO Auto-generated method stub

}

public void onAllCommandResponsed(Map<MemcachedClientKeeper<?>, Object> responses, CommandParam param) {
    // TODO Auto-generated method stub

}

 }
 

编写 ServerStatListener

public class ServerListener implements ServerStatListener{

public void serverDown(MemcachedClientKeeper<?> keeper, int activeServerNumber) {
    // TODO Auto-generated method stub

}

public void serverUp(MemcachedClientKeeper<?> keeper, int activeServerNumber) {
    // TODO Auto-generated method stub

}


}
 

添加client到 memcached-ha dispatcher

MemcachedClientDispatch dispatch = (MemcachedClientDispatch)ctx.getBean("clientDispatch");

    String ip1 = "your memcached ip";
    int port1 = your memcached port;
    String ip2 = "your memcached ip";
    int port2 = your memcached port;
    XMemcachedClient client1 = new XMemcachedClient(ip1, port1);
    XMemcachedClient client2 = new XMemcachedClient(ip1, port2);
    MemcachedClientKeeper<XMemcachedClient> k1 = new MemcachedClientKeeper<XMemcachedClient>(client1, ip1, port1);
    MemcachedClientKeeper<XMemcachedClient> k2 = new MemcachedClientKeeper<XMemcachedClient>(client2, ip1, port2);

    dispatch.addKeeper(k1);
    dispatch.addKeeper(k2);

    String key = "testKey";

    CommandParam getParam = new CommandParam("get", new Class[] { String.class }, new Object[] { key });
    CommandParam addParam = new CommandParam("add", new Class[] { String.class, int.class, Object.class },
            new Object[] { key, 0, "testValue" });

    dispatch.command(key, addParam, true, ReloadLevel.UNRELOAD, 0);

    System.out.println(dispatch.command(key, getParam, false, ReloadLevel.ONLYMISS, 0));
 

 

0 请登录后投票
   发表时间:2012-12-18  

编写or扩展门面类

memcached-ha 自带的门面类

package com.baidu.memcachedha;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;

/**
 * memcached-ha template class you can extends this class
 * reference this class program your memcached-ha facade
 * @author xuchenCN
 *
 * @param <T>
 */
public class MemcachedHA<T> {

	protected MemcachedClientDispatch dispatch = new MemcachedClientDispatch();

	public MemcachedClientDispatch getDispatch() {
		return dispatch;
	}

	public void setDispatch(MemcachedClientDispatch dispatch) {
		this.dispatch = dispatch;
	}
	
	/**
	 * this method can be instance a <tt>MemcachedClientKeeper</tt>
	 * @param server
	 * @param ip
	 * @param port
	 * @throws IOException
	 * 
	 * @see MemcachedClientKeeper
	 */
	public void addServer(T server, String ip, int port) throws IOException {
		MemcachedClientKeeper<T> k = new MemcachedClientKeeper<T>(server, ip, port);
		dispatch.addKeeper(k);
	}
	
	/**
	 * execute the memcached client method of 'set' and needSync param is true , unreload 
	 * @param key
	 * @param expiry
	 * @param value
	 * @return 
	 * @throws SecurityException
	 * @throws IllegalArgumentException
	 * @throws NoSuchMethodException
	 * @throws IllegalAccessException
	 * @throws InvocationTargetException
	 */
	public Object set(String key, int expiry, Object value) throws SecurityException, IllegalArgumentException,
			NoSuchMethodException, IllegalAccessException, InvocationTargetException {
		CommandParam cp = new CommandParam("set", new Class[] { String.class, Integer.class, String.class }, new Object[] {
				key, expiry, value });
		return dispatch.command(key, cp);
	}
	
	/**
	 * execute the memcached client method of 'set'
	 * @param key
	 * @param expiry
	 * @param value
	 * @param needSync when response is null and you need invoke method at other memcached server ,set is 'true'
	 * @return 
	 * @throws SecurityException
	 * @throws IllegalArgumentException
	 * @throws NoSuchMethodException
	 * @throws IllegalAccessException
	 * @throws InvocationTargetException
	 */
	public Object set(String key, int expiry, Object value, boolean needSync) throws SecurityException,
			IllegalArgumentException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
		CommandParam cp = new CommandParam("set", new Class[] { String.class, Integer.class, String.class }, new Object[] {
				key, expiry, value });
		return dispatch.command(key, cp, needSync);
	}
	
	/**
	 * execute the memcached client method of 'add' and needSync param is true , unreload 
	 * @param key
	 * @param expiry
	 * @param value
	 * @return
	 * @throws SecurityException
	 * @throws IllegalArgumentException
	 * @throws NoSuchMethodException
	 * @throws IllegalAccessException
	 * @throws InvocationTargetException
	 */
	public Object add(String key, int expiry, Object value) throws SecurityException, IllegalArgumentException,
			NoSuchMethodException, IllegalAccessException, InvocationTargetException {
		CommandParam cp = new CommandParam("add", new Class[] { String.class, Integer.class, String.class }, new Object[] {
				key, expiry, value });
		return dispatch.command(key, cp);
	}

	/**
	 * execute the memcached client method of 'set'
	 * @param key
	 * @param expiry
	 * @param value
	 * @param needSync when response is null and you need invoke method at other memcached server ,set is 'true'
	 * @return
	 * @throws SecurityException
	 * @throws IllegalArgumentException
	 * @throws NoSuchMethodException
	 * @throws IllegalAccessException
	 * @throws InvocationTargetException
	 */
	public Object add(String key, int expiry, Object value, boolean needSync) throws SecurityException,
			IllegalArgumentException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
		CommandParam cp = new CommandParam("add", new Class[] { String.class, Integer.class, String.class }, new Object[] {
				key, expiry, value });
		return dispatch.command(key, cp, needSync);
	}
	
	/**
	 * execute the memcached client method of 'get' and unreload
	 * @param key
	 * @return
	 * @throws SecurityException
	 * @throws IllegalArgumentException
	 * @throws NoSuchMethodException
	 * @throws IllegalAccessException
	 * @throws InvocationTargetException
	 */
	public Object get(String key) throws SecurityException, IllegalArgumentException, NoSuchMethodException,
			IllegalAccessException, InvocationTargetException {
		CommandParam cp = new CommandParam("get", new Class[] { String.class }, new Object[] { key, });
		return dispatch.command(key, cp);
	}
	
	/**
	 * execute the memcached client method of 'get'
	 * @param key
	 * @param reload if you need this return value synchronous to other memcached server set it > 0  use <tt>ReloadLevel</tt>
	 * @param reloadExpiry when reload what expiry you want
	 * @return
	 * @throws SecurityException
	 * @throws IllegalArgumentException
	 * @throws NoSuchMethodException
	 * @throws IllegalAccessException
	 * @throws InvocationTargetException
	 */
	public Object get(String key, final int reload, final int reloadExpiry) throws SecurityException,
			IllegalArgumentException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
		CommandParam cp = new CommandParam("get", new Class[] { String.class }, new Object[] { key, });
		return dispatch.command(key, cp,false,reload,reloadExpiry);
	}
}

 

 

可以扩展此类定制自己的方法

0 请登录后投票
   发表时间:2012-12-19  
a. 异步复制有一个问题,如何保证数据的可靠性
b. memcached它的设计理念主要是cache,不是store. 所以进行故障failover,感觉和理念有冲突,慎重

还有一个问题,需要考虑,memcached的数据有expired的概念,也就是说未命中不一定是数据丢失,可能是原本数据就过期了,这时你再failover到其他机器上去操作??
0 请登录后投票
   发表时间:2012-12-20   最后修改:2012-12-20
agapple 写道
a. 异步复制有一个问题,如何保证数据的可靠性
b. memcached它的设计理念主要是cache,不是store. 所以进行故障failover,感觉和理念有冲突,慎重

还有一个问题,需要考虑,memcached的数据有expired的概念,也就是说未命中不一定是数据丢失,可能是原本数据就过期了,这时你再failover到其他机器上去操作??


感谢你的回复

a.异步复制的数据即为真实数据,我想你是想问异步复制是否会导致一致性问题;由于每次操作的key是确定的,即以此key计算primary的cache节点,所以前端服务在集群情况下针对一个key的操作都会定位到primary节点上(类似于基于sessionID的负载均衡)。

b.随着业务的需求和压力的增加,已经有很多场景不允许缓存丢失的现象发生(内存是新硬盘,有些系统根本无持久层,或者数据无法从持久层恢复),如缓存丢失造成大量数据穿透导致的故障。
而memcached-ha是这一种降低缓存穿透,高可用策略的实现。并不是说想改变memcached本身的意义。
memcached-ha也有参数可以控制到底哪些操作时需要进行恢复操作的。

恢复操作的设计目的是当节点中有一个cache节点宕机情况发生数据全部丢失,而这个节点从此将永久脱离cache集群,造成资源浪费(当然前提是cache不允许穿透的情况下,若允许穿透的话当然可以从持久层中reload)。而这时才对memcached-ha开启reload参数,用于检查并从新将数据恢复到已宕机的节点中,提高资源的可用性。

至于expired的概念当然是可以的,不开启reload的情况下不会对miss的情况进行检测的

谢谢你的问题 欢迎讨论 共同进步
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics