disruptor-3.3.2源码解析(5)-框架支持
作者:大飞
- 更方便的使用Disruptor:
前面几篇看了Disruptor中的一些重要组件和组件的运行方式,也通过手动组合这些组件的方式给出了一些基本的用例。框架也提供了一个DSL-style API,来帮助我们更容易的使用框架,屏蔽掉一些细节(比如怎么构建RingBuffer、怎么关联追踪序列等),相当于Builder模式。
在看Disruptor之前,先看一些辅助类,首先看下ConsumerRepository:
class ConsumerRepository<T> implements Iterable<ConsumerInfo>{ private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>(); private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>(); private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
可见ConsumerRepository内部存储着事件处理者(消费者)的信息,相当于事件处理者的仓库。
看一下里面的方法: public void add(final EventProcessor eventprocessor, final EventHandler<? super T> handler, final SequenceBarrier barrier){ final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier); eventProcessorInfoByEventHandler.put(handler, consumerInfo); eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo); consumerInfos.add(consumerInfo); }添加事件处理者(Event模式)、事件处理器和序列栅栏到仓库中。
public void add(final EventProcessor processor){ final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(processor, null, null); eventProcessorInfoBySequence.put(processor.getSequence(), consumerInfo); consumerInfos.add(consumerInfo); }添加事件处理者(Event模式)到仓库中。
public void add(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier){ final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<T>(workerPool, sequenceBarrier); consumerInfos.add(workerPoolInfo); for (Sequence sequence : workerPool.getWorkerSequences()){ eventProcessorInfoBySequence.put(sequence, workerPoolInfo); } }添加事件处理者(Work模式)和序列栅栏到仓库中。
public Sequence[] getLastSequenceInChain(boolean includeStopped){ List<Sequence> lastSequence = new ArrayList<Sequence>(); for (ConsumerInfo consumerInfo : consumerInfos){ if ((includeStopped || consumerInfo.isRunning()) && consumerInfo.isEndOfChain()){ final Sequence[] sequences = consumerInfo.getSequences(); Collections.addAll(lastSequence, sequences); } } return lastSequence.toArray(new Sequence[lastSequence.size()]); }获取当前已经消费到RingBuffer上事件队列末尾的事件处理者的序列,可通过参数指定是否要包含已经停止的事件处理者。
public void unMarkEventProcessorsAsEndOfChain(final Sequence... barrierEventProcessors){ for (Sequence barrierEventProcessor : barrierEventProcessors){ getEventProcessorInfo(barrierEventProcessor).markAsUsedInBarrier(); } }
重置已经处理到事件队列末尾的事件处理者的状态。
其他方法就不看了。
上面代码中出现的ConsumerInfo就相当于事件处理者信息和序列栅栏的包装类,ConsumerInfo本身是一个接口,针对Event模式和Work模式提供了两种实现:EventProcessorInfo和WorkerPoolInfo,代码都很容易理解,这里就不贴了。
public class Disruptor<T>{ //事件队列。 private final RingBuffer<T> ringBuffer; //用于执行事件处理的执行器。 private final Executor executor; //事件处理信息仓库。 private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>(); //运行状态。 private final AtomicBoolean started = new AtomicBoolean(false); //异常处理器。 private ExceptionHandler<? super T> exceptionHandler;
可见,Disruptor内部包含了我们之前写用例使用到的所有组件。
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor){ this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor); } public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy){ this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor); } private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) { this.ringBuffer = ringBuffer; this.executor = executor; }
可见,通过构造方法,可以对内部的RingBuffer和执行器进行初始化。
@SuppressWarnings("varargs") public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){ return createEventProcessors(new Sequence[0], handlers); } EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers){ checkNotStarted(); final Sequence[] processorSequences = new Sequence[eventHandlers.length]; final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++){ final EventHandler<? super T> eventHandler = eventHandlers[i]; final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler); if (exceptionHandler != null){ batchEventProcessor.setExceptionHandler(exceptionHandler); } consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } if (processorSequences.length > 0){ consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } return new EventHandlerGroup<T>(this, consumerRepository, processorSequences); }
可见,handleEventsWith方法内部会创建BatchEventProcessor。
当然,对于Event模式,还有一些玩法,其实之前几篇就看到过,我们可以设置两个EventHandler,然后事件会依次被这两个handler处理。Disruptor类中提供了更明确的定义(事实是结合了EventHandlerGroup的一些方法),比如我想让事件先被处理器a处理,然后在被处理器b处理,就可以这么写: EventHandler<MyEvent> a = new EventHandler<MyEvent>() { ... }; EventHandler<MyEvent> b = new EventHandler<MyEvent>() { ... }; disruptor.handleEventsWith(a); //语句1 disruptor.after(a).handleEventsWith(b);
注意上面必须先写语句1,然后才能针对a调用after,否则after找不到处理器a,会报错。
上面的例子也可以这么写: EventHandler<MyEvent> a = new EventHandler<MyEvent>() { ... }; EventHandler<MyEvent> b = new EventHandler<MyEvent>() { ... }; disruptor.handleEventsWith(a).then(b);
效果是一样的。
public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories){ final Sequence[] barrierSequences = new Sequence[0]; return createEventProcessors(barrierSequences, eventProcessorFactories); } public interface EventProcessorFactory<T>{ EventProcessor createEventProcessor(RingBuffer<T> ringBuffer, Sequence[] barrierSequences); }
handleEventsWith方法内部创建的Event模式的事件处理者,有没有Work模式的呢?
@SuppressWarnings("varargs") public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers){ return createWorkerPool(new Sequence[0], workHandlers); } EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers){ final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences); final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers); consumerRepository.add(workerPool, sequenceBarrier); return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences()); }
handleEventsWithWorkerPool内部会创建WorkerPool。
public RingBuffer<T> start(){ final Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true); ringBuffer.addGatingSequences(gatingSequences); checkOnlyStartedOnce(); for (final ConsumerInfo consumerInfo : consumerRepository){ consumerInfo.start(executor); } return ringBuffer; }
可见,启动过程中会将事件处理者的序列设置为RingBuffer的追踪序列。最后会启动事件处理者,并利用执行器来执行事件处理线程。
public void publishEvent(final EventTranslator<T> eventTranslator){ ringBuffer.publishEvent(eventTranslator); }
很简单,里面就是直接调用了RingBuffer来发布事件,之前几篇都分析过了。
public void halt(){ for (final ConsumerInfo consumerInfo : consumerRepository){ consumerInfo.halt(); } }停止事件处理者。
public void shutdown(){ try{ shutdown(-1, TimeUnit.MILLISECONDS); }catch (final TimeoutException e){ exceptionHandler.handleOnShutdownException(e); } } public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException{ final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout); while (hasBacklog()){ if (timeout >= 0 && System.currentTimeMillis() > timeOutAt){ throw TimeoutException.INSTANCE; } // Busy spin } halt(); } private boolean hasBacklog(){ final long cursor = ringBuffer.getCursor(); for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false)){ if (cursor > consumer.get()){ return true; } } return false; }
等待所有能处理的事件都处理完了,再定制事件处理者,有超时选项。
public static void main(String[] args) { //创建一个执行器(线程池)。 Executor executor = Executors.newFixedThreadPool(4); //创建一个Disruptor。 Disruptor<MyDataEvent> disruptor = new Disruptor<MyDataEvent>(new MyDataEventFactory(), 4, executor); //创建两个事件处理器。 MyDataEventHandler handler1 = new MyDataEventHandler(); KickAssEventHandler handler2 = new KickAssEventHandler(); //同一个事件,先用handler1处理再用handler2处理。 disruptor.handleEventsWith(handler1).then(handler2); //启动Disruptor。 disruptor.start(); //发布10个事件。 for(int i=0;i<10;i++){ disruptor.publishEvent(new MyDataEventTranslator()); System.out.println("发布事件["+i+"]"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } } try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } }看下输出:
发布事件[0] handle event's data:MyData [id=0, value=holy shit!]isEndOfBatch:true kick your ass 0 times!!!! 发布事件[1] handle event's data:MyData [id=1, value=holy shit!]isEndOfBatch:true kick your ass 1 times!!!! 发布事件[2] handle event's data:MyData [id=2, value=holy shit!]isEndOfBatch:true kick your ass 2 times!!!! 发布事件[3] handle event's data:MyData [id=3, value=holy shit!]isEndOfBatch:true kick your ass 3 times!!!! 发布事件[4] handle event's data:MyData [id=4, value=holy shit!]isEndOfBatch:true kick your ass 4 times!!!! 发布事件[5] handle event's data:MyData [id=5, value=holy shit!]isEndOfBatch:true kick your ass 5 times!!!! 发布事件[6] handle event's data:MyData [id=6, value=holy shit!]isEndOfBatch:true kick your ass 6 times!!!! 发布事件[7] handle event's data:MyData [id=7, value=holy shit!]isEndOfBatch:true kick your ass 7 times!!!! 发布事件[8] handle event's data:MyData [id=8, value=holy shit!]isEndOfBatch:true kick your ass 8 times!!!! 发布事件[9] handle event's data:MyData [id=9, value=holy shit!]isEndOfBatch:true kick your ass 9 times!!!!
- 最后总结:
1.使用时可以直接使用Disruptor这个类来更方便的完成代码编写,注意灵活使用。
2.最后别忘了单线程/多线程生产者、Event/Work处理模式等等。
相关推荐
不错的框架,可以好好研究研究,速度下载,速度下载速度下载速度下载
disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包
disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
在"disruptor-3.2.1源码带jar包20140321"这个资源中,包含了Disruptor的源代码,这对于理解其内部机制和定制化开发非常有帮助。通过阅读源码,你可以更深入地了解如何利用Disruptor构建高效的并发系统。 此外,你还...
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
disruptor-3.4.2.jar
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...
赠送jar包:disruptor-3.3.7.jar; 赠送原API文档:disruptor-3.3.7-javadoc.jar; 赠送源代码:disruptor-3.3.7-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.7.pom; 包含翻译后的API文档:disruptor-...
disruptor-3.3.11.jar 无锁并行框架 值得学习 jar包
Disruptor3.3.2是该框架的一个稳定版本,提供了更高的并发性能和更完善的特性。 在Java并发编程中,线程之间的数据共享通常会带来锁竞争,这会成为性能瓶颈。Disruptor通过消除锁和减少内存缓存失效,实现了线程间...
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
3. **API与源码解析** - `disruptor-3.0.1.jar`:这是Disruptor的运行时库,包含了框架的类和接口,供开发者在项目中引用。 - `disruptor-3.0.1-sources.jar`:提供源代码,帮助开发者理解内部实现,方便调试和...
java运行依赖jar包
《Disruptor-3.2.1与Play2Memcached:开源项目的魅力解析》 在IT行业中,开源项目一直是技术创新的重要推动力。这次我们要探讨的是两个极具影响力的开源项目——Disruptor-3.2.1和Play2Memcached。它们分别在并发...
disruptor-3.3.11-sources.jar jar包源码,值得学习,源码
注:下文中的 *** 代表文件名中的组件名称。 # 包含: 中文-英文对照文档:【***-javadoc-API文档-中文(简体)-英语-对照版.zip】 jar包下载地址:【***.jar下载地址(官方地址+国内镜像地址).txt】 ...
disruptor-unity3d, Unity3d Disruptor的基本实现 disruptor-unity3dUnity3d的基本自包含。自包含实现。 仅支持单个生产者/单个用户。 仅在x86平台上测试。 Mono中的Bug 在Unity前可以在iOS和安卓上工作。用法将 ...