`
BrokenDreams
  • 浏览: 255340 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:100650
社区版块
存档分类
最新评论

disruptor-3.3.2源码解析(5)-框架支持

阅读更多

disruptor-3.3.2源码解析(5)-框架支持

作者:大飞

 

  • 更方便的使用Disruptor
       前面几篇看了Disruptor中的一些重要组件和组件的运行方式,也通过手动组合这些组件的方式给出了一些基本的用例。框架也提供了一个DSL-style API,来帮助我们更容易的使用框架,屏蔽掉一些细节(比如怎么构建RingBuffer、怎么关联追踪序列等),相当于Builder模式。
       在看Disruptor之前,先看一些辅助类,首先看下ConsumerRepository:
class ConsumerRepository<T> implements Iterable<ConsumerInfo>{

    private final Map<EventHandler<?>, EventProcessorInfo<T>> eventProcessorInfoByEventHandler = new IdentityHashMap<EventHandler<?>, EventProcessorInfo<T>>();
    private final Map<Sequence, ConsumerInfo> eventProcessorInfoBySequence = new IdentityHashMap<Sequence, ConsumerInfo>();
    private final Collection<ConsumerInfo> consumerInfos = new ArrayList<ConsumerInfo>();
       可见ConsumerRepository内部存储着事件处理者(消费者)的信息,相当于事件处理者的仓库。
       看一下里面的方法: 
    public void add(final EventProcessor eventprocessor,
                    final EventHandler<? super T> handler,
                    final SequenceBarrier barrier){
        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(eventprocessor, handler, barrier);
        eventProcessorInfoByEventHandler.put(handler, consumerInfo);
        eventProcessorInfoBySequence.put(eventprocessor.getSequence(), consumerInfo);
        consumerInfos.add(consumerInfo);
    }
       添加事件处理者(Event模式)、事件处理器和序列栅栏到仓库中。 
 
    public void add(final EventProcessor processor){
        final EventProcessorInfo<T> consumerInfo = new EventProcessorInfo<T>(processor, null, null);
        eventProcessorInfoBySequence.put(processor.getSequence(), consumerInfo);
        consumerInfos.add(consumerInfo);
    }
       添加事件处理者(Event模式)到仓库中。 
 
    public void add(final WorkerPool<T> workerPool, final SequenceBarrier sequenceBarrier){
        final WorkerPoolInfo<T> workerPoolInfo = new WorkerPoolInfo<T>(workerPool, sequenceBarrier);
        consumerInfos.add(workerPoolInfo);
        for (Sequence sequence : workerPool.getWorkerSequences()){
            eventProcessorInfoBySequence.put(sequence, workerPoolInfo);
        }
    }
       添加事件处理者(Work模式)和序列栅栏到仓库中。 
 
    public Sequence[] getLastSequenceInChain(boolean includeStopped){
        List<Sequence> lastSequence = new ArrayList<Sequence>();
        for (ConsumerInfo consumerInfo : consumerInfos){
            if ((includeStopped || consumerInfo.isRunning()) && consumerInfo.isEndOfChain()){
                final Sequence[] sequences = consumerInfo.getSequences();
                Collections.addAll(lastSequence, sequences);
            }
        }
        return lastSequence.toArray(new Sequence[lastSequence.size()]);
    }
       获取当前已经消费到RingBuffer上事件队列末尾的事件处理者的序列,可通过参数指定是否要包含已经停止的事件处理者。 
 
    public void unMarkEventProcessorsAsEndOfChain(final Sequence... barrierEventProcessors){
        for (Sequence barrierEventProcessor : barrierEventProcessors){
            getEventProcessorInfo(barrierEventProcessor).markAsUsedInBarrier();
        }
    }
       重置已经处理到事件队列末尾的事件处理者的状态。
 
       其他方法就不看了。
 
       上面代码中出现的ConsumerInfo就相当于事件处理者信息和序列栅栏的包装类,ConsumerInfo本身是一个接口,针对Event模式和Work模式提供了两种实现:EventProcessorInfo和WorkerPoolInfo,代码都很容易理解,这里就不贴了。
 
       现在来看下Disruptor类,先看结构: 
public class Disruptor<T>{

    //事件队列。
    private final RingBuffer<T> ringBuffer;
    //用于执行事件处理的执行器。
    private final Executor executor;
    //事件处理信息仓库。
    private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<T>();
    //运行状态。
    private final AtomicBoolean started = new AtomicBoolean(false);
    //异常处理器。
    private ExceptionHandler<? super T> exceptionHandler;
       可见,Disruptor内部包含了我们之前写用例使用到的所有组件。
 
       再看下构造方法: 
    public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor){
        this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), executor);
    }

    public Disruptor(final EventFactory<T> eventFactory,
                     final int ringBufferSize,
                     final Executor executor,
                     final ProducerType producerType,
                     final WaitStrategy waitStrategy){
        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
             executor);
    }

    private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor)
    {
        this.ringBuffer = ringBuffer;
        this.executor = executor;
    }
       可见,通过构造方法,可以对内部的RingBuffer和执行器进行初始化。
 
       有了事件队列(RingBuffer),接下来看看怎么构建事件处理者: 
    @SuppressWarnings("varargs")
    public EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers){
        return createEventProcessors(new Sequence[0], handlers);
    }
    EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
                                               final EventHandler<? super T>[] eventHandlers){
        checkNotStarted();
        final Sequence[] processorSequences = new Sequence[eventHandlers.length];
        final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
        for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++){
            final EventHandler<? super T> eventHandler = eventHandlers[i];
            final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
            if (exceptionHandler != null){
                batchEventProcessor.setExceptionHandler(exceptionHandler);
            }
            consumerRepository.add(batchEventProcessor, eventHandler, barrier);
            processorSequences[i] = batchEventProcessor.getSequence();
        }
        if (processorSequences.length > 0){
            consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
        }
        return new EventHandlerGroup<T>(this, consumerRepository, processorSequences);
    }
       可见,handleEventsWith方法内部会创建BatchEventProcessor。
       当然,对于Event模式,还有一些玩法,其实之前几篇就看到过,我们可以设置两个EventHandler,然后事件会依次被这两个handler处理。Disruptor类中提供了更明确的定义(事实是结合了EventHandlerGroup的一些方法),比如我想让事件先被处理器a处理,然后在被处理器b处理,就可以这么写: 
    EventHandler<MyEvent> a = new EventHandler<MyEvent>() { ... };
    EventHandler<MyEvent> b = new EventHandler<MyEvent>() { ... };
    disruptor.handleEventsWith(a); //语句1
    disruptor.after(a).handleEventsWith(b);
       注意上面必须先写语句1,然后才能针对a调用after,否则after找不到处理器a,会报错。
       上面的例子也可以这么写: 
    EventHandler<MyEvent> a = new EventHandler<MyEvent>() { ... };
    EventHandler<MyEvent> b = new EventHandler<MyEvent>() { ... };
    disruptor.handleEventsWith(a).then(b);
       效果是一样的。
 
       Disruptor还允许我们定制事件处理者: 
    public EventHandlerGroup<T> handleEventsWith(final EventProcessorFactory<T>... eventProcessorFactories){
        final Sequence[] barrierSequences = new Sequence[0];
        return createEventProcessors(barrierSequences, eventProcessorFactories);
    }
public interface EventProcessorFactory<T>{
    EventProcessor createEventProcessor(RingBuffer<T> ringBuffer, Sequence[] barrierSequences);
}
 
       handleEventsWith方法内部创建的Event模式的事件处理者,有没有Work模式的呢?
    @SuppressWarnings("varargs")
    public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers){
        return createWorkerPool(new Sequence[0], workHandlers);
    }
    EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers){
        final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
        final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
        consumerRepository.add(workerPool, sequenceBarrier);
        return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
    }
        handleEventsWithWorkerPool内部会创建WorkerPool。
 
        事件处理者也构建好了,接下来看看怎么启动它们: 
    public RingBuffer<T> start(){
        final Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
        ringBuffer.addGatingSequences(gatingSequences);
        checkOnlyStartedOnce();
        for (final ConsumerInfo consumerInfo : consumerRepository){
            consumerInfo.start(executor);
        }
        return ringBuffer;
    }
        可见,启动过程中会将事件处理者的序列设置为RingBuffer的追踪序列。最后会启动事件处理者,并利用执行器来执行事件处理线程。
 
        事件处理者构建好了,也启动了,看看怎么发布事件: 
    public void publishEvent(final EventTranslator<T> eventTranslator){
        ringBuffer.publishEvent(eventTranslator);
    }
        很简单,里面就是直接调用了RingBuffer来发布事件,之前几篇都分析过了。
 
        再看看其他的方法:
    public void halt(){
        for (final ConsumerInfo consumerInfo : consumerRepository){
            consumerInfo.halt();
        }
    }
        停止事件处理者。  
 
    public void shutdown(){
        try{
            shutdown(-1, TimeUnit.MILLISECONDS);
        }catch (final TimeoutException e){
            exceptionHandler.handleOnShutdownException(e);
        }
    }
    public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException{
        final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        while (hasBacklog()){
            if (timeout >= 0 && System.currentTimeMillis() > timeOutAt){
                throw TimeoutException.INSTANCE;
            }
            // Busy spin
        }
        halt();
    }
    private boolean hasBacklog(){
        final long cursor = ringBuffer.getCursor();
        for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false)){
            if (cursor > consumer.get()){
                return true;
            }
        }
        return false;
    }
        等待所有能处理的事件都处理完了,再定制事件处理者,有超时选项。
 
        好了,其他方法都比较简单,不看了。最后来使用Disruptor写一个生产者消费者模式吧: 
	public static void main(String[] args) {
		//创建一个执行器(线程池)。
		Executor executor = Executors.newFixedThreadPool(4);
		//创建一个Disruptor。
		Disruptor<MyDataEvent> disruptor = 
				new Disruptor<MyDataEvent>(new MyDataEventFactory(), 4, executor);
		//创建两个事件处理器。
		MyDataEventHandler handler1 = new MyDataEventHandler();
		KickAssEventHandler handler2 = new KickAssEventHandler();
		//同一个事件,先用handler1处理再用handler2处理。
		disruptor.handleEventsWith(handler1).then(handler2);
		//启动Disruptor。
		disruptor.start();
		//发布10个事件。
		for(int i=0;i<10;i++){
			disruptor.publishEvent(new MyDataEventTranslator());
			System.out.println("发布事件["+i+"]");
			try {
				TimeUnit.SECONDS.sleep(3);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		try {
			System.in.read();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
        看下输出: 
    发布事件[0]
    handle event's data:MyData [id=0, value=holy shit!]isEndOfBatch:true
    kick your ass 0 times!!!!
    发布事件[1]
    handle event's data:MyData [id=1, value=holy shit!]isEndOfBatch:true
    kick your ass 1 times!!!!
    发布事件[2]
    handle event's data:MyData [id=2, value=holy shit!]isEndOfBatch:true
    kick your ass 2 times!!!!
    发布事件[3]
    handle event's data:MyData [id=3, value=holy shit!]isEndOfBatch:true
    kick your ass 3 times!!!!
    发布事件[4]
    handle event's data:MyData [id=4, value=holy shit!]isEndOfBatch:true
    kick your ass 4 times!!!!
    发布事件[5]
    handle event's data:MyData [id=5, value=holy shit!]isEndOfBatch:true
    kick your ass 5 times!!!!
    发布事件[6]
    handle event's data:MyData [id=6, value=holy shit!]isEndOfBatch:true
    kick your ass 6 times!!!!
    发布事件[7]
    handle event's data:MyData [id=7, value=holy shit!]isEndOfBatch:true
    kick your ass 7 times!!!!
    发布事件[8]
    handle event's data:MyData [id=8, value=holy shit!]isEndOfBatch:true
    kick your ass 8 times!!!!
    发布事件[9]
    handle event's data:MyData [id=9, value=holy shit!]isEndOfBatch:true
    kick your ass 9 times!!!!
 
   
  • 最后总结:
       1.使用时可以直接使用Disruptor这个类来更方便的完成代码编写,注意灵活使用。
       2.最后别忘了单线程/多线程生产者、Event/Work处理模式等等。
 
 
分享到:
评论
1 楼 354265066_qq_com 2016-08-11  
感谢 楼主分享  相当有用

相关推荐

    disruptor-3.3.2.jar 并发

    不错的框架,可以好好研究研究,速度下载,速度下载速度下载速度下载

    disruptor-3.4.2.jar 及 disruptor-3.4.2-sources.jar

    disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包

    disruptor-3.2.0.jar

    disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载

    disruptor-3.4.4.jar disruptor 3.4.4 jar 官方github下载

    disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)

    disruptor-3.2.1源码带jar包20140321

    在"disruptor-3.2.1源码带jar包20140321"这个资源中,包含了Disruptor的源代码,这对于理解其内部机制和定制化开发非常有帮助。通过阅读源码,你可以更深入地了解如何利用Disruptor构建高效的并发系统。 此外,你还...

    disruptor-3.3.0-API文档-中文版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.4.2.jar

    disruptor-3.4.2.jar

    disruptor-3.3.0-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.3.7-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...

    disruptor-3.3.7-API文档-中文版.zip

    赠送jar包:disruptor-3.3.7.jar; 赠送原API文档:disruptor-3.3.7-javadoc.jar; 赠送源代码:disruptor-3.3.7-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.7.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.3.11.jar

    disruptor-3.3.11.jar 无锁并行框架 值得学习 jar包

    disruptror的jar包和例子

    Disruptor3.3.2是该框架的一个稳定版本,提供了更高的并发性能和更完善的特性。 在Java并发编程中,线程之间的数据共享通常会带来锁竞争,这会成为性能瓶颈。Disruptor通过消除锁和减少内存缓存失效,实现了线程间...

    disruptor-3.3.8.jar

    Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.&lt;init&gt;(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    3. **API与源码解析** - `disruptor-3.0.1.jar`:这是Disruptor的运行时库,包含了框架的类和接口,供开发者在项目中引用。 - `disruptor-3.0.1-sources.jar`:提供源代码,帮助开发者理解内部实现,方便调试和...

    disruptor-3.3.6.jar

    java运行依赖jar包

    disruptor-3.2.1.zip

    《Disruptor-3.2.1与Play2Memcached:开源项目的魅力解析》 在IT行业中,开源项目一直是技术创新的重要推动力。这次我们要探讨的是两个极具影响力的开源项目——Disruptor-3.2.1和Play2Memcached。它们分别在并发...

    disruptor-3.3.11-sources.jar

    disruptor-3.3.11-sources.jar jar包源码,值得学习,源码

    disruptor-3.3.6.jar中文-英文对照文档.zip

    注:下文中的 *** 代表文件名中的组件名称。 # 包含: 中文-英文对照文档:【***-javadoc-API文档-中文(简体)-英语-对照版.zip】 jar包下载地址:【***.jar下载地址(官方地址+国内镜像地址).txt】 ...

    disruptor-unity3d, Unity3d Disruptor的基本实现.zip

    disruptor-unity3d, Unity3d Disruptor的基本实现 disruptor-unity3dUnity3d的基本自包含。自包含实现。 仅支持单个生产者/单个用户。 仅在x86平台上测试。 Mono中的Bug 在Unity前可以在iOS和安卓上工作。用法将 ...

Global site tag (gtag.js) - Google Analytics