论坛首页 Java企业应用论坛

缓存框架之AOP渐进实现

浏览 12930 次
精华帖 (1) :: 良好帖 (3) :: 新手帖 (0) :: 隐藏帖 (7)
作者 正文
   发表时间:2010-07-18  

案例描述

系统中常常存在这样的一些逻辑:某些业务方法的执行结果在一段时间内比较稳定,不太容易发生变化,这些执行的结果可以借助缓存框架,通过一些细粒度的缓存过期策略,存放于缓存中,再次执行这些业务方法时可以直接到缓存中命中,从而降低系统开销,提高效率

 

解决方案之一:第一小步

首先,我们需要一套缓存体系,缓存的过期策略大致有如下几类:

(1)EternalPolicy:永不过期策略

(2)IdlePolicy:空闲过期策略

(3)LivePolicy:

(4)IdleAndLivePolicy:

于是我们抽取出的Cache接口会像下面这样:

 

public interface Cache {

	Cache putInCacheWithEternalPolicy(String cacheName, Object key, Object value);

	Cache putInCacheWithIdlePolicy(String cacheName, Object key, Object value, Integer idleSeconds);

	Cache putInCacheWithLivePolicy(String cacheName, Object key, Object value, Integer liveSeconds);

	Cache putInCacheWithIdleAndLivePolicy(String cacheName, Object key, Object value, Integer idleSeconds, Integer liveSeconds);

	Object getFromCache(String cacheName, Object key);

}

 有了Cache层就可以开始我们的编码了:

 

public Object invoke(Object parameter) {
	Object value = this.cache.getFromCache(DEFAULT_CACHE_NAME,BUSSINESS_CACHE_KEY)
	if(value != null){
		return value;
	}
	
	value = .....
	this.cache.putInCacheWithLivePolicy(DEFAULT_CACHE_NAME,BUSSINESS_CACHE_KEY,60);
	return value;
}

 

  OK,问题解决,一个丑陋的缓存实现应运而生!先喝口水,回头重构一下代码,发现很多不爽的地方了吧:每一个缓存Key都需要特殊指定,缓存过期策略都硬编码在Java代码中,而且代码中到处充斥着读缓存,写缓存……

 

解决方案之二:拦截篇

不满足命运的我们又开始了探索之路:提供一个缓存拦截器,所有需要缓存的业务方法都用该拦截器拦截,于是乎拦截器的代码大致如下:

 

public class MethodInvokeCacheInterceptor implements MethodInterceptor {

	protected final Log log = LogFactory.getLog(getClass());

	private Cache cache;
	private CachePolicySource policySource;

	@Override
	public final Object invoke(MethodInvocation methodInvocation) throws Throwable {
		Class<?> targetClass = AopUtils.getTargetClass(methodInvocation.getThis());
		Method method = AopUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
		CachePolicy policy = this.policySource.getPolicy(method);
		if (policy == null) {
			log.warn("CachePolicy of {class:[" + targetClass + "],method:[" + method + "] is not found");
			return methodInvocation.proceed();
		}

		Object cacheKey = createCacheKey(method, methodInvocation.getArguments());
		if (log.isDebugEnabled()) {
			log.debug("Cache key is[" + cacheKey + "]");
		}

		Object value = findFromCache(policy.getCacheName(), cacheKey);
		if (value != null) {
			if (log.isInfoEnabled()) {
				log.info("Load cache from [" + policy.getCacheName() + "] with key[" + cacheKey + "]");
			}
		} else {
			value = methodInvocation.proceed();
			putInCache(policy, cacheKey, value);
		}
		return value;
	}

	............

}

  其中CachePolicySource是一个缓存过期策略源,负责管理业务方法的过期策略。有了这个拦截器之后就可以借助

Spring的Aop能力提供业务层的缓存能力了,Spring配置文件如下:

 

<bean id="testServie" class="com.derby.dswitch.common.cache.TestServie" />
	
<bean id="methodInvokeCacheTnterceptor" class="org.springframework.aop.support.NameMatchMethodPointcutAdvisor">
	<property name="mappedNames">
		<list>
			<value>*hello*</value>
		</list>
	</property>
	<property name="advice">
		<bean class="com.derby.dswitch.common.cache.support.MethodInvokeCacheInterceptor">
			<property name="cache" ref="cache" />
			<property name="policySource">
				<bean class="com.derby.dswitch.common.cache.support.NameMatchCachePolicySource">
					<constructor-arg>
						<map>
							<entry key="*com.derby.dswitch.common.cache.TestServie.hello(java.lang.String)*">
								<bean class="com.derby.dswitch.common.cache.support.CachePolicy">
									<property name="cacheName" value="test.cache.name" />
									<property name="liveSeconds" value="2" />
								</bean>
							</entry>
						</map>
					</constructor-arg>
				</bean>
			</property>
		</bean>
	</property>
</bean>

<bean class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator">
	<property name="beanNames">
		<list>
			<value>testServie</value>
		</list>
	</property>
	<property name="interceptorNames">
		<list>
			<value>methodInvokeCacheTnterceptor</value>
		</list>
	</property>
</bean>

  仔细观察任然发现了不爽的地方,拦截器的Pointcut和缓存过期策略的配置存在重合的地方,解决这个问题的方法其实也很简单,提供一个CacheProxyFactoryBean就可以轻松搞定,实现代码也许是这样的:

 

public class NameMatchMethodInvokeCacheProxyFactoryBean extends AbstractSingletonProxyFactoryBean implements FactoryBean {

	............

	@Override
	protected Object createMainInterceptor() {
		return new DefaultPointcutAdvisor(this.pointcut, this.interceptor);
	}

	@Required
	public void setCachePolicy(Map<String, CachePolicy> properties) {
		Assert.notNull(properties);
		for (Iterator<Entry<String, CachePolicy>> it = properties.entrySet().iterator(); it.hasNext();) {
			Entry<String, CachePolicy> entry = it.next();
			this.properties.put(NameMatchUtils.optimize(entry.getKey()), entry.getValue());
		}
	}

	@Override
	public void afterPropertiesSet() {
		this.interceptor.setPolicySource(this.createCachePolicySource(this.properties));
		this.pointcut = createPointcut(this.properties.keySet());
		super.afterPropertiesSet();
	}
}

配置文件如下:

 

<bean id="methodInvokeCacheTnterceptor" class="com.derby.dswitch.common.cache.support.NameMatchMethodInvokeCacheProxyFactoryBean"
	autowire="no">
	<property name="target">
		<bean class="com.derby.dswitch.common.cache.TestServie" />
	</property>
	<property name="cache" ref="cache" />
	<property name="cachePolicy">
		<map>
			<entry key="*com.derby.dswitch.common.cache.TestServie.hello(java.lang.String)*">
				<bean class="com.derby.dswitch.common.cache.support.CachePolicy">
					<property name="cacheName" value="test.cache.name" />
					<property name="liveSeconds" value="2" />
				</bean>
			</entry>
		</map>
	</property>
</bean>

至此,业务层的缓存能力在AOP的强大力量下,以一种比较优雅的方式得到了完美的解决。咖啡有点冷了,去加点热水……

 

解决方案之三:Annotation篇

  一个优秀的程序员是不能满足于仅仅能用的基础上,还得充分考虑到易用性。拦截篇中提供的配置方式始终有点复杂的感觉,如果能提供Annotation配置的话,易用性就得到了极大的提高。

 

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface CacheEnable {

	String cacheName() default "default.cache.name";

	int liveSeconds() default 0;

	int idleSeconds() default 0;

}

  为了使用该Annotation,需要用到Spring的标签扩展能力。提供一个类CacheAnnotationDrivenBeanDefinitionParser扩展Spring中的org.springframework.beans.factory.xml.BeanDefinitionParser,其代码如下

 

public class CacheAnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {

	private void registerMethodInvokeCacheInterceptor(Element element, ParserContext parserContext) {
		String policySourceBeanName = getBeanName(CacheAnnotationCachePolicySource.class);
		if (parserContext.getRegistry().containsBeanDefinition(policySourceBeanName)) {
			return;
		}

		AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);

		Object elementSource = parserContext.extractSource(element);

		RootBeanDefinition policySourceDefinition = new RootBeanDefinition(CacheAnnotationCachePolicySource.class);
		policySourceDefinition.setSource(elementSource);
		policySourceDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
		parserContext.registerBeanComponent(new BeanComponentDefinition(policySourceDefinition, policySourceBeanName));

		AbstractBeanDefinition adviceDefinition = null;
		if (!element.hasAttribute(CACHE_INTERCEPTOR_ATTRIBUTE_NAME)) {
			adviceDefinition = new RootBeanDefinition(MethodInvokeCacheInterceptor.class);
			adviceDefinition.setSource(elementSource);
			adviceDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
		} else {
			adviceDefinition = (AbstractBeanDefinition) parserContext.getRegistry().getBeanDefinition(
					element.getAttribute(CACHE_INTERCEPTOR_ATTRIBUTE_NAME));
		}
		String cacheBeanName = getCacheBeanName(element);
		adviceDefinition.getPropertyValues().addPropertyValue("cache", new RuntimeBeanReference(cacheBeanName));
		adviceDefinition.getPropertyValues().addPropertyValue("policySource", new RuntimeBeanReference(policySourceBeanName));

		RootBeanDefinition pointcutDefinition = new RootBeanDefinition(CacheAnnotationPointcut.class);
		pointcutDefinition.setSource(elementSource);
		pointcutDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
		pointcutDefinition.getPropertyValues().addPropertyValue("policySource", new RuntimeBeanReference(policySourceBeanName));

		RootBeanDefinition advisorDefinition = new RootBeanDefinition(DefaultPointcutAdvisor.class);
		advisorDefinition.setSource(elementSource);
		advisorDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
		advisorDefinition.getPropertyValues().addPropertyValue("advice", adviceDefinition);
		advisorDefinition.getPropertyValues().addPropertyValue("pointcut", pointcutDefinition);

		String advisorBeanName = getBeanName(MethodInvokeCacheInterceptor.class);
		parserContext.registerBeanComponent(new BeanComponentDefinition(advisorDefinition, advisorBeanName));

		if (log.isInfoEnabled()) {
			log.info("register internal advisor [" + advisorBeanName + "], advice ["
					+ MethodInvokeCacheInterceptor.class.getName() + "] annotation [" + CacheEnable.class.getName() + "]");
		}
	}

}

  于是我们的业务层就可以通过如下配置得到缓存的能力:

 

public class TestServie implements ITestServie {

	@Override
	@CacheEnable(cacheName = "test.annotation", liveSeconds = 2)
	public String hello(String name) {
		return "Hello,Passyt";
	}

}

  至此,本文所要表达的目的已经全部实现,现在业务层可以通过三种配置方式得到缓存的能力。至于缓存如何实现,可以参考附件中提供的源码,其中包括几个测试Sample。

 

后话

以上任然遗留几个问题

(1)如何提供一种动态更新缓存过期的机制,如在系统运行中动态修改某个业务的缓存过期策略

(2)Cache的实现方式和Ehcache绑定过紧,基于内存的缓存实现可以参考google-collections中的MapMaker,但是它不提供空闲过期策略,需要做适当的扩展;同样基于Oscache的实现也不空闲过期策略

 

   发表时间:2010-07-19  
怎么没点注释啊 

看起来有点晕
0 请登录后投票
   发表时间:2010-07-19  
com.derby.dswitch 不知道你公司有没有保密协议...
0 请登录后投票
   发表时间:2010-07-19  
很好 打个标以后仔细看看
0 请登录后投票
   发表时间:2010-07-19  
这种实现方式只是适合被缓存的数据是只读的情况,否则有很多地方都不完善。
另外,spring-modules-cache已经提供了一个比较简单的实现。
0 请登录后投票
   发表时间:2010-07-19  
本地缓存是吧

对于缓存的过期策略 可以使用lazy remove 策略。
在get的时候判断是否过期就好了。

当然 在一定的使用场景下lazy remove和非lazy remove 可以配合使用
0 请登录后投票
   发表时间:2010-07-19  
whitesock 写道
这种实现方式只是适合被缓存的数据是只读的情况,否则有很多地方都不完善。
另外,spring-modules-cache已经提供了一个比较简单的实现。

不明白非只读缓存适用的场景,能不能解释一下呢
0 请登录后投票
   发表时间:2010-07-19  
关于非只读缓存,如果强调数据一致性,那么一个普遍存在的问题在于将性能问题转化为可伸缩性问题:
● 本地缓存:如果读取远多于更新,那么仍然可以使用缓存;只要通过合适的并发控制,可以完全保证数据的一致性。
● 分布式缓存:如果业务上能够容忍一定程度上的不一致性,那么仍然可以使用非只读缓存;如果完全不能容忍不一致性,那么也可以实现(Terracotta就是一个可选的解决方案),只是代价高昂。

一些情况下并不一定(甚至也不可能)完全保证数据一致性。例如:
● 在一个并发环境中取得了一个List的size,在此之后任何时间点该List的size都可能发生变化,那么取得该size的线程又有多大把握能正确使用这个size?
● 如果使用了MySQL的replication,那么应用程序必须时刻对同步延迟做好准备。
● 目前NoSQL中比较流行(同时也有一定争议)的CAP原则。

对于这种通过AOP方式(楼主实现的方式有些繁琐)实现的缓存(不管只读还是非只读),尽管其优点可能在于对应用透明,但是仍然需要考虑很多因素,例如:
● 是否对从cache中获得的数据进行保护性拷贝。
● 如果被拦截的方法在一个事务上下文(特别是支持事务传播的上下文)中执行,那么情况就会变得非常复杂。

以下对两段伪码,简单地分析一下其不足之处:
1 read
Object result = getFromCache();
if(result.hit()) {
  return result.value();
}
Object r = proceed();
putIntoCache(r);
return r;

● 如果希望缓存有最大程度的并发性,那么的proceed()方法执行的过程中,应该释放对该缓存的锁。
● 如果proceed()方法执行的过程中释放对cache的锁,那么多个线程可能同时发现缓存不命中,最终都会调用proceed()方法(甚至以相同的参数进行调用)。假设proceed()方法占用了有限的资源,那么同一时刻,同一个参数,应该只允许一个线程调用proceed()方法,其它以相同参数调用该proceed()方法的线程应等待执行该proceed()方法的线程的结果。
● 如果允许多个线程以相同的参数调用proceed()方法,那么这些线程得到的返回值可能不同(例如数据库中的值发生了变化),那么如何决定到底哪个线程的结果应该放入缓存。
● 如果proceed()方法在一个事务上下文中执行,其结果未必是目前数据库中的最新数据,那么在这种情况下通常不应该将结果放入缓存。例如使用MySQL数据库,并使用其REPEATABLE READ事务隔离级别(基于MVCC快照)。
● 还有其它很多情况不应该将所有proceed()的结果放入缓存,例如如果使用MySQL的replication,通常不希望将从slave读取的结果放入缓存。

2 flush:
try {
    return proceed();
} finally {
    flushCache();
}

● 如果proceed()方法在一个事务上下文中执行,那么不能在proceed()方法执行完毕后清空缓存,而是应该等事务真正提交或回滚之后。在该事务执行的过程中,别的线程应该仍然可以访问缓存中的数据(类似数据库的事务隔离,除非使用了的READ UNCOMMITTED)。当前线程不应该继续访问该缓存:因为缓存没有被立即清空,因此可能包含(对当前线程来说)旧的数据。
0 请登录后投票
   发表时间:2010-07-20  
whitesock 写道
关于非只读缓存,如果强调数据一致性,那么一个普遍存在的问题在于将性能问题转化为可伸缩性问题:
● 本地缓存:如果读取远多于更新,那么仍然可以使用缓存;只要通过合适的并发控制,可以完全保证数据的一致性。
● 分布式缓存:如果业务上能够容忍一定程度上的不一致性,那么仍然可以使用非只读缓存;如果完全不能容忍不一致性,那么也可以实现(Terracotta就是一个可选的解决方案),只是代价高昂。

一些情况下并不一定(甚至也不可能)完全保证数据一致性。例如:
● 在一个并发环境中取得了一个List的size,在此之后任何时间点该List的size都可能发生变化,那么取得该size的线程又有多大把握能正确使用这个size?
● 如果使用了MySQL的replication,那么应用程序必须时刻对同步延迟做好准备。
● 目前NoSQL中比较流行(同时也有一定争议)的CAP原则。

对于这种通过AOP方式(楼主实现的方式有些繁琐)实现的缓存(不管只读还是非只读),尽管其优点可能在于对应用透明,但是仍然需要考虑很多因素,例如:
● 是否对从cache中获得的数据进行保护性拷贝。
● 如果被拦截的方法在一个事务上下文(特别是支持事务传播的上下文)中执行,那么情况就会变得非常复杂。

以下对两段伪码,简单地分析一下其不足之处:
1 read
Object result = getFromCache();
if(result.hit()) {
  return result.value();
}
Object r = proceed();
putIntoCache(r);
return r;

● 如果希望缓存有最大程度的并发性,那么的proceed()方法执行的过程中,应该释放对该缓存的锁。
● 如果proceed()方法执行的过程中释放对cache的锁,那么多个线程可能同时发现缓存不命中,最终都会调用proceed()方法(甚至以相同的参数进行调用)。假设proceed()方法占用了有限的资源,那么同一时刻,同一个参数,应该只允许一个线程调用proceed()方法,其它以相同参数调用该proceed()方法的线程应等待执行该proceed()方法的线程的结果。
● 如果允许多个线程以相同的参数调用proceed()方法,那么这些线程得到的返回值可能不同(例如数据库中的值发生了变化),那么如何决定到底哪个线程的结果应该放入缓存。
● 如果proceed()方法在一个事务上下文中执行,其结果未必是目前数据库中的最新数据,那么在这种情况下通常不应该将结果放入缓存。例如使用MySQL数据库,并使用其REPEATABLE READ事务隔离级别(基于MVCC快照)。
● 还有其它很多情况不应该将所有proceed()的结果放入缓存,例如如果使用MySQL的replication,通常不希望将从slave读取的结果放入缓存。

2 flush:
try {
    return proceed();
} finally {
    flushCache();
}

● 如果proceed()方法在一个事务上下文中执行,那么不能在proceed()方法执行完毕后清空缓存,而是应该等事务真正提交或回滚之后。在该事务执行的过程中,别的线程应该仍然可以访问缓存中的数据(类似数据库的事务隔离,除非使用了的READ UNCOMMITTED)。当前线程不应该继续访问该缓存:因为缓存没有被立即清空,因此可能包含(对当前线程来说)旧的数据。

总结一下,你提出的其实就是三个问题:
(1)缓存的伸缩能力,如分布式缓存:注意本文中的Cache只是一个接口,其实现方式才决定着如何扩展,如源码中使用Ehcache实现Cache层,应该可以利用其集群能力,而且这点并不是本文阐述的目的
(2)缓存的线程安全:本文的确忽略了这点,只需在Interceptor中加入对缓存的双层检查就OK了
(3)上下文环境的缓存安全,如数据库事务上下文中的缓存:这点比较麻烦了,需要抽出一个事务层处理隔离和传播,所有缓存数据必须要在事务commit之后才能flush,如果是数据库级别的事务,可以扩展一个TransactionManager,所有flush的缓存数据先临时put进TransactionCacheContext中,然后在事务提交时flush到真正的缓存中

PS:本文一开始的基调就是"某些业务方法的执行结果在一段时间内比较稳定,不太容易发生变化",对于变化过于频繁的调用不适合使用缓存.
0 请登录后投票
   发表时间:2010-07-20   最后修改:2010-07-20
    总结的不全面:首先是正确性,其次才是性能,可伸缩性,可扩展性等其它方面因素。
    通过EHCache解决伸缩性的想法有些天真,Terracotta收购EHCache之后,也许EHCache会表现的好些。
    从cache中取得的数据,没有保护性拷贝非常危险。
    关于线程安全,并不想你想象的那么简单:只要是proceed()方法执行时不持有对cache的锁,那么就存在多个线程以相同参数调用proceed()而返回值不同的情况,此时如何决定将哪个返回值应该放入缓存中? 不要期望该返回值有个什么版本号可以用于校验。
   
   
   




0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics