凌弃同学已经介绍了EventBus的使用方式
如何使用——三步走:
1、定义一个observer,并加入@Subscribe作为消息回调函数;
2、将observer注册到EventBus;EventBus.register(this);
3、消息投递: eventBus.post(logTo);
本文将深入EventBus的源代码,和大家一起深入研究EventBus的让人惊叹的设计思路。由于作者水平有限,无法面面俱到,希望大家先读读EventBusExplained。
注:Guava的版本
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>15.0</version> </dependency>
准备工作
为了方便大家理解Coder的思路,有一些名词或约定先解释一下:
-
在observer类(比如:例子中的
EventBusChangeRecorder
)里面,@Subscribe
所annotate的method
,有且只有一个参数。因为EventBus#post(Object)
方法只有一个参数咯,比如// Class is typically registered by the container. class EventBusChangeRecorder { // Subscribe annotation,并且只有一个 ChangeEvent 方法参数 @Subscribe public void recordCustomerChange(ChangeEvent e) { recordChange(e.getChange()); } }
-
通过
method
和observer的instance
来定义一个EventSubscriber
,请看源码SubscriberFindingStrategy#findAllSubscribers(Object)
-
在一个observer类里面,可以定义多个
@Subscribe
,根据method.getParameterTypes()[0]
来缓存参数的类型——EventType
和Set<EventSubscriber>
//所谓SetMultimap,就是Map<Class<?>, Set<EventSubscriber>> Set<EventSubscriber>>private final SetMultimap<Class<?>, EventSubscriber> subscribersByType = HashMultimap.create();
-
@Subscribe
所annotate的method
的参数,不能支持泛型。因为在运行的时候,因为Type Erasure
导致拿不到"真正"的parameterType
,举个例子:public class GenericClass<T> { // 1 private List<T> list; // 2 private Map<String, T> map; // 3 public <U> U genericMethod(Map<T, U> m) { // 4 return null; } }
上面代码里,带有注释的行里的泛型信息在运行时都还能获取到,原则是源码里写了什么运行时就能得到什么。针对1的GenericClass,运行时通过Class.getTypeParameters()方法得到的数组可以获取那个“T”;同理,2的T、3的java.lang.String与T、4的T与U都可以获得。源码文本里写的是什么运行时就能得到什么;像是T、U等在运行时的实际类型是获取不到的。
设计思路
Register/Unregister
在99.99%的使用场景中,是不会在runtime的时候去register/unregister某个observer的,在spring的环境,也是在init的时候做register/unregister。不过做framework就必须要考虑这0.01%的使用场景。在runtime的时候去register/unregister,最重要的就是线程安全问题:如果我在unregister某个observer的时候,正好调用EventSubscriber
,会因为异常,导致Event不能送达到其它的observer上。所以在register/unregister的方法实现里面,都加入了ReadWriteLock
,register/unregister的时候用writeLock
,post的时候用readLock
public void register(Object object) {
// Map<Class<?>, Collection<EventSubscriber>>结构
Multimap<Class<?>, EventSubscriber> methodsInListener =
finder.findAllSubscribers(object);
subscribersByTypeLock.writeLock().lock();
try {
// subscribersByType是一个Map<Class<?>, Set<EventSubscriber>>结构
subscribersByType.putAll(methodsInListener);
} finally {
subscribersByTypeLock.writeLock().unlock();
}
}
其次,在SubscriberFindingStrategy#findAllSubscribers
的时候有也用到了Cache,原理与下面要研究的Post的Cache一模一样
Post
EventBus#post
的实现真的非常amazing,我们先从最初的设计思路开始,一步一步来。
最简单的想法就是,通过post
传入一个event
对象,这个event
的getClass
作为key
,通过subscribersByType
来获取EventSubscriber
的Set
,再调用EventSubscriber#handleEvent
完成method#invoke
。
这样的思路没有什么问题,不过EventBus的作者想得更多更远:
-
Post Everything
可以是任意的object,只要
subscribersByType
有这个KeySet<Class<?>> dispatchTypes = flattenHierarchy(event.getClass())
-
Cache
毕竟post的
Event
的class是有限的,所以我们可以在classLoader
下缓存flattenHierarchy
的输入和输出,正如:private static final LoadingCache<Class<?>, Set<Class<?>>> flattenHierarchyCache = CacheBuilder.newBuilder() .weakKeys() .build(new CacheLoader<Class<?>, Set<Class<?>>>() { @SuppressWarnings({"unchecked", "rawtypes"}) // safe cast @Override public Set<Class<?>> load(Class<?> concreteClass) { return (Set) TypeToken.of(concreteClass).getTypes().rawTypes(); } });
注:static不是JVM下的全局共享,只是在
classloader
下面共享 -
WeakReference
也许你也注意到了,
flattenHierarchyCache
的Key(EventType
)是一个WeakReference
,这样做的目的就是GC友好。比方说你在runtime的时候,unregister了一个observer,这时候subscribersByType
就不再Strong Reference这个EventType
,flattenHierarchyCache
也会在minor gc的时候回收内存。 -
ThreadLocal
EventBus里面最Amazing的实现,在EventBus里面使用了
ThreadLocal
的地方有两处/** queues of events for the current thread to dispatch */ private final ThreadLocal<Queue<EventWithSubscriber>> eventsToDispatch = new ThreadLocal<Queue<EventWithSubscriber>>() { @Override protected Queue<EventWithSubscriber> initialValue() { return new LinkedList<EventWithSubscriber>(); } }; /** true if the current thread is currently dispatching an event */ private final ThreadLocal<Boolean> isDispatching = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return false; } };
-
解决嵌套问题。比方说一个observer有两个方法
@Subscribe
,其中一个方法的实现里面bus.post(SECOND);
,为了避免已经处理过的Event
再次被处理,所以需要isDispatching
,下面是一个嵌套的例子。public class ReentrantEventsHater { boolean ready = true; List<Object> eventsReceived = Lists.newArrayList(); @Subscribe public void listenForStrings(String event) { eventsReceived.add(event); ready = false; try { bus.post(SECOND); } finally { ready = true; } } @Subscribe public void listenForDoubles(Double event) { assertTrue("I received an event when I wasn't ready!", ready); eventsReceived.add(event); } }
-
eventsToDispatch
是一个queue
,在enqueueEvent
(请结合源码EventBus#enqueueEvent
)的时候调用,queue
的使用够减少读锁的占用时间 -
eventsToDispatch
和dispatchQueuedEvents
通过ThreadLocal能够独立成为方法,方便了AsyncEventBus
做Override
-
-
ConcurrentLinkedQueue vs LinkedBlockingQueue
得益于
EventBus
的巧妙设计,AsyncEventBus
的实现就容易很多,不过笔者也发现了一个很有意思的地方。JavaDoc里面都标识了BlockingQueue implementations are designed to be used primarily for producer-consumer queues
那么为什么要选用ConcurrentLinkedQueue而不是LinkedBlockingQueue呢?
/** the queue of events is shared across all threads */ private final ConcurrentLinkedQueue<EventWithSubscriber> eventsToDispatch = new ConcurrentLinkedQueue<EventWithSubscriber>(); protected void dispatchQueuedEvents() { while (true) { EventWithSubscriber eventWithSubscriber = eventsToDispatch.poll(); if (eventWithSubscriber == null) { break; } dispatch(eventWithSubscriber.event, eventWithSubscriber.subscriber); } }
简单来说,ConcurrentLinkedQueue是无锁的,没有synchronized,也没有Lock.lock,依靠CAS保证并发,同时,也不提供阻塞方法
put()
和take()
,速度上面肯定无锁的会更快一些,吞吐量更高一些(都是纳秒的差距)。再加上这里只有一个Publisher
,多个Consumer
,Cousumer
的消费速度又几乎是0,所以我个人觉得用啥都没啥区别。。。
总结
Guava真的是神器,希望读者看完本文后,能够对Guava产生兴趣。