锁定老帖子 主题:缓存框架之AOP渐进实现
精华帖 (1) :: 良好帖 (3) :: 新手帖 (0) :: 隐藏帖 (7)
|
|
---|---|
作者 | 正文 |
发表时间:2010-07-18
案例描述 系统中常常存在这样的一些逻辑:某些业务方法的执行结果在一段时间内比较稳定,不太容易发生变化,这些执行的结果可以借助缓存框架,通过一些细粒度的缓存过期策略,存放于缓存中,再次执行这些业务方法时可以直接到缓存中命中,从而降低系统开销,提高效率。
解决方案之一:第一小步 首先,我们需要一套缓存体系,缓存的过期策略大致有如下几类: (1)EternalPolicy:永不过期策略 (2)IdlePolicy:空闲过期策略 (3)LivePolicy: (4)IdleAndLivePolicy: 于是我们抽取出的Cache接口会像下面这样:
public interface Cache { Cache putInCacheWithEternalPolicy(String cacheName, Object key, Object value); Cache putInCacheWithIdlePolicy(String cacheName, Object key, Object value, Integer idleSeconds); Cache putInCacheWithLivePolicy(String cacheName, Object key, Object value, Integer liveSeconds); Cache putInCacheWithIdleAndLivePolicy(String cacheName, Object key, Object value, Integer idleSeconds, Integer liveSeconds); Object getFromCache(String cacheName, Object key); } 有了Cache层就可以开始我们的编码了:
public Object invoke(Object parameter) { Object value = this.cache.getFromCache(DEFAULT_CACHE_NAME,BUSSINESS_CACHE_KEY) if(value != null){ return value; } value = ..... this.cache.putInCacheWithLivePolicy(DEFAULT_CACHE_NAME,BUSSINESS_CACHE_KEY,60); return value; }
OK,问题解决,一个丑陋的缓存实现应运而生!先喝口水,回头重构一下代码,发现很多不爽的地方了吧:每一个缓存Key都需要特殊指定,缓存过期策略都硬编码在Java代码中,而且代码中到处充斥着读缓存,写缓存……
解决方案之二:拦截篇 不满足命运的我们又开始了探索之路:提供一个缓存拦截器,所有需要缓存的业务方法都用该拦截器拦截,于是乎拦截器的代码大致如下:
public class MethodInvokeCacheInterceptor implements MethodInterceptor { protected final Log log = LogFactory.getLog(getClass()); private Cache cache; private CachePolicySource policySource; @Override public final Object invoke(MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = AopUtils.getTargetClass(methodInvocation.getThis()); Method method = AopUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); CachePolicy policy = this.policySource.getPolicy(method); if (policy == null) { log.warn("CachePolicy of {class:[" + targetClass + "],method:[" + method + "] is not found"); return methodInvocation.proceed(); } Object cacheKey = createCacheKey(method, methodInvocation.getArguments()); if (log.isDebugEnabled()) { log.debug("Cache key is[" + cacheKey + "]"); } Object value = findFromCache(policy.getCacheName(), cacheKey); if (value != null) { if (log.isInfoEnabled()) { log.info("Load cache from [" + policy.getCacheName() + "] with key[" + cacheKey + "]"); } } else { value = methodInvocation.proceed(); putInCache(policy, cacheKey, value); } return value; } ............ } 其中CachePolicySource是一个缓存过期策略源,负责管理业务方法的过期策略。有了这个拦截器之后就可以借助 Spring的Aop能力提供业务层的缓存能力了,Spring配置文件如下:
<bean id="testServie" class="com.derby.dswitch.common.cache.TestServie" /> <bean id="methodInvokeCacheTnterceptor" class="org.springframework.aop.support.NameMatchMethodPointcutAdvisor"> <property name="mappedNames"> <list> <value>*hello*</value> </list> </property> <property name="advice"> <bean class="com.derby.dswitch.common.cache.support.MethodInvokeCacheInterceptor"> <property name="cache" ref="cache" /> <property name="policySource"> <bean class="com.derby.dswitch.common.cache.support.NameMatchCachePolicySource"> <constructor-arg> <map> <entry key="*com.derby.dswitch.common.cache.TestServie.hello(java.lang.String)*"> <bean class="com.derby.dswitch.common.cache.support.CachePolicy"> <property name="cacheName" value="test.cache.name" /> <property name="liveSeconds" value="2" /> </bean> </entry> </map> </constructor-arg> </bean> </property> </bean> </property> </bean> <bean class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator"> <property name="beanNames"> <list> <value>testServie</value> </list> </property> <property name="interceptorNames"> <list> <value>methodInvokeCacheTnterceptor</value> </list> </property> </bean> 仔细观察任然发现了不爽的地方,拦截器的Pointcut和缓存过期策略的配置存在重合的地方,解决这个问题的方法其实也很简单,提供一个CacheProxyFactoryBean就可以轻松搞定,实现代码也许是这样的:
public class NameMatchMethodInvokeCacheProxyFactoryBean extends AbstractSingletonProxyFactoryBean implements FactoryBean { ............ @Override protected Object createMainInterceptor() { return new DefaultPointcutAdvisor(this.pointcut, this.interceptor); } @Required public void setCachePolicy(Map<String, CachePolicy> properties) { Assert.notNull(properties); for (Iterator<Entry<String, CachePolicy>> it = properties.entrySet().iterator(); it.hasNext();) { Entry<String, CachePolicy> entry = it.next(); this.properties.put(NameMatchUtils.optimize(entry.getKey()), entry.getValue()); } } @Override public void afterPropertiesSet() { this.interceptor.setPolicySource(this.createCachePolicySource(this.properties)); this.pointcut = createPointcut(this.properties.keySet()); super.afterPropertiesSet(); } } 配置文件如下:
<bean id="methodInvokeCacheTnterceptor" class="com.derby.dswitch.common.cache.support.NameMatchMethodInvokeCacheProxyFactoryBean" autowire="no"> <property name="target"> <bean class="com.derby.dswitch.common.cache.TestServie" /> </property> <property name="cache" ref="cache" /> <property name="cachePolicy"> <map> <entry key="*com.derby.dswitch.common.cache.TestServie.hello(java.lang.String)*"> <bean class="com.derby.dswitch.common.cache.support.CachePolicy"> <property name="cacheName" value="test.cache.name" /> <property name="liveSeconds" value="2" /> </bean> </entry> </map> </property> </bean> 至此,业务层的缓存能力在AOP的强大力量下,以一种比较优雅的方式得到了完美的解决。咖啡有点冷了,去加点热水……
解决方案之三:Annotation篇 一个优秀的程序员是不能满足于仅仅能用的基础上,还得充分考虑到易用性。拦截篇中提供的配置方式始终有点复杂的感觉,如果能提供Annotation配置的话,易用性就得到了极大的提高。
@Documented @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface CacheEnable { String cacheName() default "default.cache.name"; int liveSeconds() default 0; int idleSeconds() default 0; } 为了使用该Annotation,需要用到Spring的标签扩展能力。提供一个类CacheAnnotationDrivenBeanDefinitionParser扩展Spring中的org.springframework.beans.factory.xml.BeanDefinitionParser,其代码如下
public class CacheAnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser { private void registerMethodInvokeCacheInterceptor(Element element, ParserContext parserContext) { String policySourceBeanName = getBeanName(CacheAnnotationCachePolicySource.class); if (parserContext.getRegistry().containsBeanDefinition(policySourceBeanName)) { return; } AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element); Object elementSource = parserContext.extractSource(element); RootBeanDefinition policySourceDefinition = new RootBeanDefinition(CacheAnnotationCachePolicySource.class); policySourceDefinition.setSource(elementSource); policySourceDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); parserContext.registerBeanComponent(new BeanComponentDefinition(policySourceDefinition, policySourceBeanName)); AbstractBeanDefinition adviceDefinition = null; if (!element.hasAttribute(CACHE_INTERCEPTOR_ATTRIBUTE_NAME)) { adviceDefinition = new RootBeanDefinition(MethodInvokeCacheInterceptor.class); adviceDefinition.setSource(elementSource); adviceDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); } else { adviceDefinition = (AbstractBeanDefinition) parserContext.getRegistry().getBeanDefinition( element.getAttribute(CACHE_INTERCEPTOR_ATTRIBUTE_NAME)); } String cacheBeanName = getCacheBeanName(element); adviceDefinition.getPropertyValues().addPropertyValue("cache", new RuntimeBeanReference(cacheBeanName)); adviceDefinition.getPropertyValues().addPropertyValue("policySource", new RuntimeBeanReference(policySourceBeanName)); RootBeanDefinition pointcutDefinition = new RootBeanDefinition(CacheAnnotationPointcut.class); pointcutDefinition.setSource(elementSource); pointcutDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); pointcutDefinition.getPropertyValues().addPropertyValue("policySource", new RuntimeBeanReference(policySourceBeanName)); RootBeanDefinition advisorDefinition = new RootBeanDefinition(DefaultPointcutAdvisor.class); advisorDefinition.setSource(elementSource); advisorDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); advisorDefinition.getPropertyValues().addPropertyValue("advice", adviceDefinition); advisorDefinition.getPropertyValues().addPropertyValue("pointcut", pointcutDefinition); String advisorBeanName = getBeanName(MethodInvokeCacheInterceptor.class); parserContext.registerBeanComponent(new BeanComponentDefinition(advisorDefinition, advisorBeanName)); if (log.isInfoEnabled()) { log.info("register internal advisor [" + advisorBeanName + "], advice [" + MethodInvokeCacheInterceptor.class.getName() + "] annotation [" + CacheEnable.class.getName() + "]"); } } } 于是我们的业务层就可以通过如下配置得到缓存的能力:
public class TestServie implements ITestServie { @Override @CacheEnable(cacheName = "test.annotation", liveSeconds = 2) public String hello(String name) { return "Hello,Passyt"; } } 至此,本文所要表达的目的已经全部实现,现在业务层可以通过三种配置方式得到缓存的能力。至于缓存如何实现,可以参考附件中提供的源码,其中包括几个测试Sample。
后话 以上任然遗留几个问题 (1)如何提供一种动态更新缓存过期的机制,如在系统运行中动态修改某个业务的缓存过期策略 (2)Cache的实现方式和Ehcache绑定过紧,基于内存的缓存实现可以参考google-collections中的MapMaker,但是它不提供空闲过期策略,需要做适当的扩展;同样基于Oscache的实现也不空闲过期策略
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2010-07-19
怎么没点注释啊
看起来有点晕 |
|
返回顶楼 | |
发表时间:2010-07-19
com.derby.dswitch 不知道你公司有没有保密协议...
|
|
返回顶楼 | |
发表时间:2010-07-19
很好 打个标以后仔细看看
|
|
返回顶楼 | |
发表时间:2010-07-19
这种实现方式只是适合被缓存的数据是只读的情况,否则有很多地方都不完善。
另外,spring-modules-cache已经提供了一个比较简单的实现。 |
|
返回顶楼 | |
发表时间:2010-07-19
本地缓存是吧
对于缓存的过期策略 可以使用lazy remove 策略。 在get的时候判断是否过期就好了。 当然 在一定的使用场景下lazy remove和非lazy remove 可以配合使用 |
|
返回顶楼 | |
发表时间:2010-07-19
whitesock 写道 这种实现方式只是适合被缓存的数据是只读的情况,否则有很多地方都不完善。
另外,spring-modules-cache已经提供了一个比较简单的实现。 不明白非只读缓存适用的场景,能不能解释一下呢 |
|
返回顶楼 | |
发表时间:2010-07-19
关于非只读缓存,如果强调数据一致性,那么一个普遍存在的问题在于将性能问题转化为可伸缩性问题:
● 本地缓存:如果读取远多于更新,那么仍然可以使用缓存;只要通过合适的并发控制,可以完全保证数据的一致性。 ● 分布式缓存:如果业务上能够容忍一定程度上的不一致性,那么仍然可以使用非只读缓存;如果完全不能容忍不一致性,那么也可以实现(Terracotta就是一个可选的解决方案),只是代价高昂。 一些情况下并不一定(甚至也不可能)完全保证数据一致性。例如: ● 在一个并发环境中取得了一个List的size,在此之后任何时间点该List的size都可能发生变化,那么取得该size的线程又有多大把握能正确使用这个size? ● 如果使用了MySQL的replication,那么应用程序必须时刻对同步延迟做好准备。 ● 目前NoSQL中比较流行(同时也有一定争议)的CAP原则。 对于这种通过AOP方式(楼主实现的方式有些繁琐)实现的缓存(不管只读还是非只读),尽管其优点可能在于对应用透明,但是仍然需要考虑很多因素,例如: ● 是否对从cache中获得的数据进行保护性拷贝。 ● 如果被拦截的方法在一个事务上下文(特别是支持事务传播的上下文)中执行,那么情况就会变得非常复杂。 以下对两段伪码,简单地分析一下其不足之处: 1 read Object result = getFromCache(); if(result.hit()) { return result.value(); } Object r = proceed(); putIntoCache(r); return r; ● 如果希望缓存有最大程度的并发性,那么的proceed()方法执行的过程中,应该释放对该缓存的锁。 ● 如果proceed()方法执行的过程中释放对cache的锁,那么多个线程可能同时发现缓存不命中,最终都会调用proceed()方法(甚至以相同的参数进行调用)。假设proceed()方法占用了有限的资源,那么同一时刻,同一个参数,应该只允许一个线程调用proceed()方法,其它以相同参数调用该proceed()方法的线程应等待执行该proceed()方法的线程的结果。 ● 如果允许多个线程以相同的参数调用proceed()方法,那么这些线程得到的返回值可能不同(例如数据库中的值发生了变化),那么如何决定到底哪个线程的结果应该放入缓存。 ● 如果proceed()方法在一个事务上下文中执行,其结果未必是目前数据库中的最新数据,那么在这种情况下通常不应该将结果放入缓存。例如使用MySQL数据库,并使用其REPEATABLE READ事务隔离级别(基于MVCC快照)。 ● 还有其它很多情况不应该将所有proceed()的结果放入缓存,例如如果使用MySQL的replication,通常不希望将从slave读取的结果放入缓存。 2 flush: try { return proceed(); } finally { flushCache(); } ● 如果proceed()方法在一个事务上下文中执行,那么不能在proceed()方法执行完毕后清空缓存,而是应该等事务真正提交或回滚之后。在该事务执行的过程中,别的线程应该仍然可以访问缓存中的数据(类似数据库的事务隔离,除非使用了的READ UNCOMMITTED)。当前线程不应该继续访问该缓存:因为缓存没有被立即清空,因此可能包含(对当前线程来说)旧的数据。 |
|
返回顶楼 | |
发表时间:2010-07-20
whitesock 写道 关于非只读缓存,如果强调数据一致性,那么一个普遍存在的问题在于将性能问题转化为可伸缩性问题:
● 本地缓存:如果读取远多于更新,那么仍然可以使用缓存;只要通过合适的并发控制,可以完全保证数据的一致性。 ● 分布式缓存:如果业务上能够容忍一定程度上的不一致性,那么仍然可以使用非只读缓存;如果完全不能容忍不一致性,那么也可以实现(Terracotta就是一个可选的解决方案),只是代价高昂。 一些情况下并不一定(甚至也不可能)完全保证数据一致性。例如: ● 在一个并发环境中取得了一个List的size,在此之后任何时间点该List的size都可能发生变化,那么取得该size的线程又有多大把握能正确使用这个size? ● 如果使用了MySQL的replication,那么应用程序必须时刻对同步延迟做好准备。 ● 目前NoSQL中比较流行(同时也有一定争议)的CAP原则。 对于这种通过AOP方式(楼主实现的方式有些繁琐)实现的缓存(不管只读还是非只读),尽管其优点可能在于对应用透明,但是仍然需要考虑很多因素,例如: ● 是否对从cache中获得的数据进行保护性拷贝。 ● 如果被拦截的方法在一个事务上下文(特别是支持事务传播的上下文)中执行,那么情况就会变得非常复杂。 以下对两段伪码,简单地分析一下其不足之处: 1 read Object result = getFromCache(); if(result.hit()) { return result.value(); } Object r = proceed(); putIntoCache(r); return r; ● 如果希望缓存有最大程度的并发性,那么的proceed()方法执行的过程中,应该释放对该缓存的锁。 ● 如果proceed()方法执行的过程中释放对cache的锁,那么多个线程可能同时发现缓存不命中,最终都会调用proceed()方法(甚至以相同的参数进行调用)。假设proceed()方法占用了有限的资源,那么同一时刻,同一个参数,应该只允许一个线程调用proceed()方法,其它以相同参数调用该proceed()方法的线程应等待执行该proceed()方法的线程的结果。 ● 如果允许多个线程以相同的参数调用proceed()方法,那么这些线程得到的返回值可能不同(例如数据库中的值发生了变化),那么如何决定到底哪个线程的结果应该放入缓存。 ● 如果proceed()方法在一个事务上下文中执行,其结果未必是目前数据库中的最新数据,那么在这种情况下通常不应该将结果放入缓存。例如使用MySQL数据库,并使用其REPEATABLE READ事务隔离级别(基于MVCC快照)。 ● 还有其它很多情况不应该将所有proceed()的结果放入缓存,例如如果使用MySQL的replication,通常不希望将从slave读取的结果放入缓存。 2 flush: try { return proceed(); } finally { flushCache(); } ● 如果proceed()方法在一个事务上下文中执行,那么不能在proceed()方法执行完毕后清空缓存,而是应该等事务真正提交或回滚之后。在该事务执行的过程中,别的线程应该仍然可以访问缓存中的数据(类似数据库的事务隔离,除非使用了的READ UNCOMMITTED)。当前线程不应该继续访问该缓存:因为缓存没有被立即清空,因此可能包含(对当前线程来说)旧的数据。 总结一下,你提出的其实就是三个问题: (1)缓存的伸缩能力,如分布式缓存:注意本文中的Cache只是一个接口,其实现方式才决定着如何扩展,如源码中使用Ehcache实现Cache层,应该可以利用其集群能力,而且这点并不是本文阐述的目的 (2)缓存的线程安全:本文的确忽略了这点,只需在Interceptor中加入对缓存的双层检查就OK了 (3)上下文环境的缓存安全,如数据库事务上下文中的缓存:这点比较麻烦了,需要抽出一个事务层处理隔离和传播,所有缓存数据必须要在事务commit之后才能flush,如果是数据库级别的事务,可以扩展一个TransactionManager,所有flush的缓存数据先临时put进TransactionCacheContext中,然后在事务提交时flush到真正的缓存中 PS:本文一开始的基调就是"某些业务方法的执行结果在一段时间内比较稳定,不太容易发生变化",对于变化过于频繁的调用不适合使用缓存. |
|
返回顶楼 | |
发表时间:2010-07-20
最后修改:2010-07-20
总结的不全面:首先是正确性,其次才是性能,可伸缩性,可扩展性等其它方面因素。
通过EHCache解决伸缩性的想法有些天真,Terracotta收购EHCache之后,也许EHCache会表现的好些。 从cache中取得的数据,没有保护性拷贝非常危险。 关于线程安全,并不想你想象的那么简单:只要是proceed()方法执行时不持有对cache的锁,那么就存在多个线程以相同参数调用proceed()而返回值不同的情况,此时如何决定将哪个返回值应该放入缓存中? 不要期望该返回值有个什么版本号可以用于校验。 |
|
返回顶楼 | |