- 浏览: 58084 次
- 性别:
- 来自: 杭州
最新评论
-
bibithink:
LinkedHashMap 的内部实现是 一个哈希表 + 双向 ...
基于LinkedHashMap实现LRU缓存调度算法原理及应用 -
javaboy2010:
不错,还没毕业,写出如此代码,楼主很强啊! 向楼主学习!
自己编写一个基于Velocity的MVC框架 -
lwclover:
lz的代码在多线程环境下 会出问题
基于LinkedHashMap实现LRU缓存调度算法原理及应用 -
woming66:
condeywadl 写道观爷我来顶一个 哈哈
哈哈 你来晚 ...
基于LinkedHashMap实现LRU缓存调度算法原理及应用 -
condeywadl:
观爷我来顶一个 哈哈
基于LinkedHashMap实现LRU缓存调度算法原理及应用
RRiBbit可以作为事件总线Eventbus, 能够让组件之间进行双向通讯,支持远程功能,实现失败恢复 负载平衡, SSL/TLS等支持,这也称为请求-响应总线(Request-Response-Bus).
所有事情都是从HelloWorld开始
看看InstantiatingClassBasedListenerObjectCreator都做了什么
rrb = RRiBbitUtil.createRequestResponseBusForLocalUse(creator, false);
rrb.send("say", "Hello World");
requestDispatcher.dispatchRequest(request);
localRequestProcessor.processRequest(request);
listenerObjects = listenerObjectRetriever.getListenerObjects(request.getHint());
这块有一个bug,cache不能用HashMap,要用ConcurrentHashMap,HashMap在高并发下会出现Entry链路循环引用现象
Response<T> response = listenerObjectExecutor.executeListeners(listenerObjects, request.getParameters());
MultiThreadedListenerObjectExecutor.doExecuteListeners()
AbstractListenerObjectExecutor.executeSingleListenerObject
RRiBbit相比其他框架的优点是:
1. 其他框架一般要求监听者实现特定的Listener接口,甚至执行onEvent() or onRequest() 方法,而RRiBbit只需要在方法上标注 @Listener元注解即可。
2.其他框架因第一个条件需要改变源码,而RRiBbit没有必要。
3.其他框架监听者不能将结果发回发送者,如果你非要这样做,得做些黑客的工作,比如通过修改参数对象什么的,而RRiBbit的监听者方法能够返回一个POJO给发送者。
4.其他框架并不支持监听者在多个线程中,RRiBbit能够配置运行在不同线程,每个监听对应一个事件。
5.其他框架并不支持远程,只是工作在一个虚拟机中,RRiBbit监听者能够运行在其他机器上,而调用者无需任何变化。
所有事情都是从HelloWorld开始
import org.rribbit.Listener; import org.rribbit.RRiBbitUtil; import org.rribbit.RequestResponseBus; import org.rribbit.creation.InstantiatingClassBasedListenerObjectCreator; import org.rribbit.creation.ObjectBasedListenerObjectCreator; public class HelloWorld { private static RequestResponseBus rrb; public static final void main(String[] args) throws Exception { ObjectBasedListenerObjectCreator creator = new InstantiatingClassBasedListenerObjectCreator(HelloWorld.class); rrb = RRiBbitUtil.createRequestResponseBusForLocalUse(creator, false); HelloWorld helloWorld = new HelloWorld(); helloWorld.sayHello(); } public void sayHello() { rrb.send("say", "Hello World"); } @Listener(hint="say") public void say(String message) { System.out.println(message); } @Listener(hint="say") public void say2(String message) { System.out.println(message + " : )"); } }
看看InstantiatingClassBasedListenerObjectCreator都做了什么
public class InstantiatingClassBasedListenerObjectCreator extends AbstractClassBasedListenerObjectCreator { private static final Logger log = LoggerFactory.getLogger(InstantiatingClassBasedListenerObjectCreator.class); /** * @see AbstractClassBasedListenerObjectCreator#AbstractClassBasedListenerObjectCreator(Class...) * * @param classes */ public InstantiatingClassBasedListenerObjectCreator(Class<?>... classes) { super(classes); }
public abstract class AbstractClassBasedListenerObjectCreator extends ObjectBasedListenerObjectCreator { private static final Logger log = LoggerFactory.getLogger(AbstractClassBasedListenerObjectCreator.class); protected Collection<Class<?>> excludedClasses; /** * Calls {@link #addClass(Class)} on each given class. * * @param classes */ public AbstractClassBasedListenerObjectCreator(Class<?>... classes) { excludedClasses = new ArrayList<Class<?>>(); for(Class<?> clazz : classes) { this.addClass(clazz); } } public void addClass(Class<?> clazz) { log.debug("Processing class '" + clazz.getName() + "'"); Object targetObject = this.getTargetObjectForClass(clazz); if(targetObject != null) { log.debug("Found target object for class '" + clazz.getName() + "', getting Listener methods"); Collection<ListenerObject> incompleteListenerObjects = this.getIncompleteListenerObjectsFromClass(clazz); for(ListenerObject listenerObject : incompleteListenerObjects) { listenerObject.setTarget(targetObject); } listenerObjects.addAll(incompleteListenerObjects); this.notifyObserversOnClassAdded(clazz); } } @Override protected Object getTargetObjectForClass(Class<?> clazz) { try { log.debug("Attempting to construct instance of class '" + clazz.getName() + "'"); return clazz.getConstructor().newInstance(); } catch(InvocationTargetException e) { throw new RuntimeException("Exception thrown from constructor of Listener", e.getCause()); } catch(Exception e) { log.debug("No suitable constructor found, returning null"); return null; } }
protected Collection<ListenerObject> getIncompleteListenerObjectsFromClass(Class<?> clazz) { Collection<ListenerObject> incompleteListenerObjects = new ArrayList<ListenerObject>(); for(Method method : clazz.getMethods()) { Listener listener = method.getAnnotation(Listener.class); if(listener != null) { log.debug("Listener annotation found for method \"" + method + "\", instantiating ListenerObject"); ListenerObject listenerObject = new ListenerObject(); listenerObject.setHint(listener.hint()); listenerObject.setMethod(method); listenerObject.setReturnType(method.getReturnType()); incompleteListenerObjects.add(listenerObject); } } return incompleteListenerObjects; }
rrb = RRiBbitUtil.createRequestResponseBusForLocalUse(creator, false);
public static RequestResponseBus createRequestResponseBusForLocalUse(ListenerObjectCreator listenerObjectCreator, boolean setInRRB) { ListenerObjectRetriever listenerObjectRetriever = new CachedListenerObjectRetriever(listenerObjectCreator); ListenerObjectExecutor listenerObjectExecutor = new MultiThreadedListenerObjectExecutor(); LocalRequestProcessor localRequestProcessor = new LocalRequestProcessor(listenerObjectRetriever, listenerObjectExecutor); RequestDispatcher requestDispatcher = new LocalRequestDispatcher(localRequestProcessor); RequestResponseBus requestResponseBus = new DefaultRequestResponseBus(requestDispatcher); if(setInRRB) { RRB.setRequestResponseBus(requestResponseBus); } return requestResponseBus; }
rrb.send("say", "Hello World");
@Override public <T> T sendForSingleWithHint(String hint, Object... parameters) { log.info("Sending request for single object with hint '" + hint + "'"); log.debug("Creating Request object"); Request request = new Request(null, hint, parameters); log.debug("Dispatching Request"); Response<T> response = requestDispatcher.dispatchRequest(request); log.debug("Processing Response"); this.processThrowables(response.getThrowables()); return response.getReturnValues().isEmpty() ? null : response.getReturnValues().iterator().next(); }
requestDispatcher.dispatchRequest(request);
public <T> Response<T> dispatchRequest(Request request) { log.info("Dispatching Request"); Response<T> response = localRequestProcessor.processRequest(request); log.info("Returning Response"); return response; }
localRequestProcessor.processRequest(request);
public <T> Response<T> processRequest(Request request) { log.info("Processing Request"); log.debug("Getting ListenerObjects"); Collection<ListenerObject> listenerObjects; if(request.getDesiredReturnType() == null) { if(request.getHint() == null) { listenerObjects = listenerObjectRetriever.getListenerObjects(); } else { listenerObjects = listenerObjectRetriever.getListenerObjects(request.getHint()); } } else { if(request.getHint() == null) { listenerObjects = listenerObjectRetriever.getListenerObjects(request.getDesiredReturnType()); } else { listenerObjects = listenerObjectRetriever.getListenerObjects(request.getDesiredReturnType(), request.getHint()); } } log.debug("Executing ListenerObjects"); Response<T> response = listenerObjectExecutor.executeListeners(listenerObjects, request.getParameters()); log.info("Returning Response"); return response; }
listenerObjects = listenerObjectRetriever.getListenerObjects(request.getHint());
@Override public Collection<ListenerObject> getListenerObjects(String hint) { this.checkHint(hint); log.debug("Inspecting cache for matches"); RetrievalRequest request = new RetrievalRequest(hint, null); Collection<ListenerObject> listenerObjects = cache.get(request); if(listenerObjects == null) { log.debug("No match found, retrieving ListenerObject from DefaultRequestResponseBus and storing in cache"); listenerObjects = super.getListenerObjects(hint); cache.put(request, listenerObjects); } return listenerObjects; } @Override public Collection<ListenerObject> getListenerObjects(String hint) { Collection<ListenerObject> result = new ArrayList<ListenerObject>(); log.debug("Getting all ListenerObjects from the ListenerObjectCreator"); for(ListenerObject listenerObject : listenerObjectCreator.getListenerObjects()) { if(this.matchesHint(listenerObject, hint)) { log.trace(listenerObject + " matched hint '" + hint + "'"); result.add(listenerObject); } else { log.trace(listenerObject + " did not match hint '" + hint + "'"); } } return result; } protected boolean matchesHint(ListenerObject listenerObject, String hint) { if(hint == null) { throw new IllegalArgumentException("hint cannot be null!"); } return listenerObject.getHint().equals(hint); }
这块有一个bug,cache不能用HashMap,要用ConcurrentHashMap,HashMap在高并发下会出现Entry链路循环引用现象
public class CachedListenerObjectRetriever extends DefaultListenerObjectRetriever implements ListenerObjectCreationObserver { private static final Logger log = LoggerFactory.getLogger(CachedListenerObjectRetriever.class); /** * The cache of {@link Collection}s of {@link ListenerObject}s. */ protected Map<RetrievalRequest, Collection<ListenerObject>> cache; /** * Whenever you use this constructor, be sure to set the {@link ListenerObjectCreator} with the setter provided by this class. * If you don't, runtime {@link NullPointerException}s will occur. */ public CachedListenerObjectRetriever() { cache = new HashMap<RetrievalRequest, Collection<ListenerObject>>(); }
Response<T> response = listenerObjectExecutor.executeListeners(listenerObjects, request.getParameters());
public abstract class AbstractListenerObjectExecutor implements ListenerObjectExecutor { private static final Logger log = LoggerFactory.getLogger(AbstractListenerObjectExecutor.class); @SuppressWarnings("unchecked") //We assume that the caller has correctly specified the type of objects that he wants back from the listeners @Override public <T> Response<T> executeListeners(Collection<ListenerObject> listenerObjects, Object... parameters) { Collection<T> results = new ArrayList<T>(); Collection<Throwable> throwables = new ArrayList<Throwable>(); log.debug("Executing ListenerObjects"); for(ExecutionResult executionResult : this.doExecuteListeners(listenerObjects, parameters)) { if(executionResult != null && !(executionResult instanceof VoidResult)) { if(executionResult instanceof ThrowableResult) { throwables.add((((ThrowableResult) executionResult)).getThrowable()); } else { results.add((T) (((ObjectResult) executionResult).getResult())); } } } return new Response<T>(results, throwables); }
MultiThreadedListenerObjectExecutor.doExecuteListeners()
public class MultiThreadedListenerObjectExecutor extends AbstractListenerObjectExecutor { private static final Logger log = LoggerFactory.getLogger(MultiThreadedListenerObjectExecutor.class); @Override protected Collection<ExecutionResult> doExecuteListeners(final Collection<ListenerObject> listenerObjects, final Object... parameters) { final Collection<ExecutionResult> executionResults = new Vector<ExecutionResult>(); //Vector is Thread-safe if(listenerObjects.size() == 1) { //There is only one, don't spawn a new Thread, but do it in this Thread log.debug("There is only one ListenerObject, not creating new Thread, executing it in this Thread"); executionResults.add(this.executeSingleListenerObject(listenerObjects.iterator().next(), parameters)); return executionResults; } Collection<Thread> threads = new ArrayList<Thread>(); log.debug("Creating Threads for each ListenerObject"); for(final ListenerObject listenerObject : listenerObjects) { log.debug("Creating Thread for ListenerObject \"" + listenerObject + "\""); Thread thread = new Thread(new Runnable() { @Override public void run() { executionResults.add(MultiThreadedListenerObjectExecutor.this.executeSingleListenerObject(listenerObject, parameters)); } }); log.debug("Starting Thread for ListenerObject \"" + listenerObject + "\""); thread.start(); threads.add(thread); } log.debug("Waiting for all Threads to finish"); for(Thread thread : threads) { try { thread.join(); } catch(InterruptedException e) { throw new RuntimeException(e); } } log.debug("All Threads have finished, returning"); return executionResults; } }
AbstractListenerObjectExecutor.executeSingleListenerObject
protected ExecutionResult executeSingleListenerObject(ListenerObject listenerObject, Object... parameters) { try { Object returnValue = listenerObject.getMethod().invoke(listenerObject.getTarget(), parameters); if(listenerObject.getMethod().getReturnType().equals(void.class)) { //Nothing to return log.debug("ListenerObject \"" + listenerObject + "\" successfully executed, return value was void"); return new VoidResult(); } else { log.debug("ListenerObject \"" + listenerObject + "\" successfully executed, return value was object"); return new ObjectResult(returnValue); } } catch(InvocationTargetException e) { log.debug("Underlying method of ListenerObject \"" + listenerObject + "\" threw Throwable"); //Caused by the underlying method throwing a Throwable. Rethrowing... return new ThrowableResult(e.getCause()); } catch(Exception e) { log.trace("Method of ListenerObject \"" + listenerObject + "\" did not match parameters, ignoring", e); //Probably caused by parameters not matching Method signature. Ignoring... return null; } }
RRiBbit相比其他框架的优点是:
1. 其他框架一般要求监听者实现特定的Listener接口,甚至执行onEvent() or onRequest() 方法,而RRiBbit只需要在方法上标注 @Listener元注解即可。
2.其他框架因第一个条件需要改变源码,而RRiBbit没有必要。
3.其他框架监听者不能将结果发回发送者,如果你非要这样做,得做些黑客的工作,比如通过修改参数对象什么的,而RRiBbit的监听者方法能够返回一个POJO给发送者。
4.其他框架并不支持监听者在多个线程中,RRiBbit能够配置运行在不同线程,每个监听对应一个事件。
5.其他框架并不支持远程,只是工作在一个虚拟机中,RRiBbit监听者能够运行在其他机器上,而调用者无需任何变化。
发表评论
-
转:用消息队列和消息应用状态表来消除分布式事务
2012-07-18 21:53 1509由于数据量的巨大,大 ... -
安全发布原则
2012-04-18 21:48 1101public class XXX { private ... -
基于LinkedHashMap实现LRU缓存调度算法原理及应用
2011-11-29 21:02 18506最近手里事情不太多,随意看了看源码,在学习缓存技术的时候,都少 ... -
Java NIO Reactor模式
2011-10-11 15:57 4495package com.zzq.nio.reactor; ... -
并发控制—CAS
2011-07-07 23:55 1725public class AtomicIntegerTes ... -
压力测试JSON-RPC服务
2011-06-21 14:30 4011/** * 压力测试JSON-RPC服务 * * ... -
缓存失效算法比较
2011-06-13 20:03 1744提到缓存,有两点是必须要考虑的: 1、缓存数据和目标数据的一致 ... -
记录工作每阶段的代码质量——2011年2月20日
2011-02-20 23:45 1017package com.zzq.pattern.decorat ... -
另类的Singleton模式
2010-09-03 15:15 1080package com.zzq.singleton; /** ... -
自己编写一个基于Velocity的MVC框架
2010-06-02 17:28 3833公司留了作业(还有一个月毕业),让预习Velocity,在家呆 ... -
我的各种主键生成策略类
2010-05-20 13:00 1703package com.generate; impo ... -
我的日志模型
2010-05-20 12:37 1222package com.zzq.logging; / ... -
Java模板引擎——Velocity应用实例(原创)
2010-02-10 21:18 2481对于b/s架构的项目而言,表示层呈现页面技术大多数选用jsp, ...
相关推荐
Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习笔记Java学习...
希沃白板学习笔记.pdf希沃白板学习笔记.pdf希沃白板学习笔记.pdf希沃白板学习笔记.pdf希沃白板学习笔记.pdf希沃白板学习笔记.pdf希沃白板学习笔记.pdf希沃白板学习笔记.pdf希沃白板学习笔记.pdf
2022吴恩达机器学习笔记汇总(共10章节).zip2022吴恩达机器学习笔记汇总(共10章节).zip2022吴恩达机器学习笔记汇总(共10章节).zip2022吴恩达机器学习笔记汇总(共10章节).zip2022吴恩达机器学习笔记汇总(共10章节).zip...
Springcloud学习笔记.md,Springcloud学习笔记.md,Springcloud学习笔记.md,Springcloud学习笔记.md,Springcloud学习笔记.md,Springcloud学习笔记.md,Springcloud学习笔记.md,Springcloud学习笔记.md,Spring...
人工智能学习笔记,人工智能学习笔记,人工智能学习笔记人工智能学习笔记,人工智能学习笔记,人工智能学习笔记人工智能学习笔记,人工智能学习笔记,人工智能学习笔记人工智能学习笔记,人工智能学习笔记,人工智能...
云的学习笔记-云的学习笔记系统-云的学习笔记系统源码-云的学习笔记管理系统-云的学习笔记管理系统java代码-云的学习笔记系统设计与实现-基于ssm的云的学习笔记系统-基于Web的云的学习笔记系统设计与实现-云的学习...
云的学习笔记-云的学习笔记系统-云的学习笔记系统源码-云的学习笔记管理系统-云的学习笔记管理系统java代码-云的学习笔记系统设计与实现-基于ssm的云的学习笔记系统-基于Web的云的学习笔记系统设计与实现-云的学习...
CCNA学习笔记 CCNA学习笔记 CCNA学习笔记
nginx学习笔记(软件+学习笔记) 仅供学习交流! 后续会持续分享相关资源,记得关注哦! nginx学习笔记(软件+学习笔记) 仅供学习交流! 后续会持续分享相关资源,记得关注哦! nginx学习笔记(软件+学习笔记) ...
docker学习笔记,docker学习笔记,docker学习笔记,docker学习笔记,docker学习笔记,docker学习笔记,docker学习笔记,docker学习笔记,docker学习笔记,docker学习笔记,docker学习笔记,docker学习笔记,docker学习笔记,...
ssh学习笔记1 ssh学习笔记1 ssh学习笔记1 ssh学习笔记1 ssh学习笔记1 ssh学习笔记1 ssh学习笔记1
PHP个人学习笔记
java学习笔记java学习笔记.zipjava学习笔记.zipjava学习笔记.zipjava学习笔记.zipjava学习笔记.zipjava学习笔记.zipjava学习笔记.zipjava学习笔记.zipjava学习笔记.zipjava学习笔记.zipjava学习笔记.zipjava学习笔记...
Contiki学习笔记:进程、事件、etimer关系 Contiki 实例: Contiki学习笔记:创建两个交互进程 Contiki 主函数剖析: Contiki学习笔记:main函数剖析 Contiki学习笔记:启动一个进程process_start Contiki学习笔记...