`

disruptor(一) 单一生产者和WorkPool消费者源码阅读

阅读更多



 最近项目中有用到disruptor,提供一个类似队列或者数据容器的功能,并发能力很强

 

概念:

Sequence:就是一个增长序列,类似oracle的增长序列,生产和消费程序都有Sequence,记录生产和消费程序的序列

Sequencer: 多个概念的一个组合,持有Sequence,等待策略等一些引用,生产者引用

SequenceBarrier:直接翻译就是序列屏障,就是Sequence和RingBuffer交互的一个屏障,单个生产者时,生产者不需要SequenceBarrier

RingBuffer:是一个数组,根据追踪生产和消费的Sequence来实现一个环形的数据结构

 

说明:

1 很多地方都说到RingBuffer是一个环形的数据结构,它功能上表现出来的确实是环形结构,但是实现上是一个数组,而是通过生产者覆盖已经读过的数据,消费者回头读取未读取过的数据来实现的环形数据结构,

很多画图画成环形的,很容易误导理解

2 RingBuffer的size为2的n次方,可是Sequence是一直递增的,不知道其他人怎么理解的,我原来没看代码前的理解就是它的长度不超过RingBuffer的长度,然后重置后重新增长,这个错误的理解主要是看了很多博客上的环形的那个图,不论生产还是消费的Sequence一直

都是递增的, 到RingBuffer取值时,会根据RingBuffer的长度转换成对应的下标值

 

1 maven pom

 

		<dependency>
			<groupId>com.lmax</groupId>
			<artifactId>disruptor</artifactId>
			<version>3.3.0</version>
		</dependency>

 

 

2 我现在只是用到,了解的不是很深,看了部分代码,参考了很多其他博客,下面是我自己的看法,不对的地方欢迎指正

  

  2.1 测试代码是这样用的

   

  TestEvent.java

public class TestEvent {

    private String line;

    public String getLine() {
        return line;
    }

    public void setLine(String line) {
        this.line = line;
    }
}

   生产者TestEventProducer.java:

 

   

public class TestEventProducer {

    private RingBuffer<TestEvent> ringBuffer;

    public TestEventProducer (RingBuffer<TestEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    /**
     * 转换器,向队列里放值的时候调用(队列先设置固定长度的对象,然后通过set方法生产值)
     */
    private static EventTranslatorOneArg<TestEvent, String> eventTranslatorOneArg = new EventTranslatorOneArg<TestEvent, String>() {
        @Override
        public void translateTo(TestEvent event, long sequence, String arg0) {
            event.setLine(arg0);
        }
    };

    /**
     * 生产者向队列里放入数据
     * @param line
     */
    public void onData (String line) {
        this.ringBuffer.publishEvent(eventTranslatorOneArg, line);
    }

    /**
     * 处理数据
     */
    public void handler () {
        for (int i = 0; i < 1000; i++) {
            this.onData("wozaizhe" + i);
        }
    }

   消费者TestEventHandler:消费者从RingBuffer中拿出数据打印了一下

   

public class TestEventHandler implements WorkHandler<TestEvent> {
    @Override
    public void onEvent(TestEvent event) throws Exception {
        System.out.println("处理了一行数据:" + event.getLine());
    }
}

 

 

   测试类:

public class TestEventMain {

    public static final int BUFFER_SIZE = 8;

    public static void main (String[] args) {
        testDisruptor();
    }

    public static void testDisruptor () {
        ExecutorService executor = Executors.newFixedThreadPool(8);
        EventFactory<TestEvent> eventFactory = new TestEventFactory();
        //创建disruptor,设置单生产者模式
        Disruptor disruptor = new Disruptor(eventFactory, BUFFER_SIZE, executor, ProducerType.SINGLE,
                new YieldingWaitStrategy ());
        //设置消费者程序
        disruptor.handleEventsWithWorkerPool(new TestEventHandler(), new TestEventHandler(),
                new TestEventHandler(), new TestEventHandler());
        //设置异常处理
        disruptor.handleExceptionsWith(new TestEventExceptionHandler());
        //启动disruptor并返回RingBuffer
        RingBuffer<TestEvent> ringBuffer = disruptor.start();
        //创建生产者线程,并生产
        TestEventProducer producer = new TestEventProducer(ringBuffer);
        producer.handler();
        disruptor.shutdown();
        executor.shutdown();
    }

}

   测试中使用的是单一的生产者,消费者有多个,使用WorkerPool来管理多个消费者

 

   disruptor 的数据存放在RingBuffer中,RingBuffer的类结构:

 

 

disruptor设计的主要类

 

 

 

 上面是主要类图,RingBuffer只负责存储,生产者和消费者取号的协调工作都是由SingleProducerSequencer来完成的

 

 从代码分析:

 (1) 创建Disruptor

 

public Disruptor(final EventFactory<T> eventFactory,
                     final int ringBufferSize,
                     final Executor executor,
                     final ProducerType producerType,
                     final WaitStrategy waitStrategy)
    {
        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
             executor);
    }

    //创建RingBuffer
    public static <E> RingBuffer<E> create(ProducerType    producerType,
                                           EventFactory<E> factory,
                                           int             bufferSize,
                                           WaitStrategy    waitStrategy)
    {
        switch (producerType)
        {
        case SINGLE:
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
        }
    }

      //创建Sequencer并实例化到RingBuffer中
    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory,
                                                         int             bufferSize,
                                                         WaitStrategy    waitStrategy)
    {
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

        return new RingBuffer<E>(factory, sequencer);
    }
 

   创建Disruptor的过程: 创建了一个RingBuffer,实例化了生产者sequencer,实例化了其他参数

 

   (2) 设置消费者程序

       源码:

     

    //被调用的方法
    public EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
    {
        return createWorkerPool(new Sequence[0], workHandlers);
    }

    //创建workpool
    EventHandlerGroup<T> createWorkerPool(final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
    {
        //创建SequenceBarrier,每次消费者要读取RingBuffer中的下一个值都要通过SequenceBarrier来获取SequenceBarrier用来协调多个消费者并发的问题
        final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
        //实现在下个方法
        final WorkerPool<T> workerPool = new WorkerPool<T>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
        consumerRepository.add(workerPool, sequenceBarrier);
        return new EventHandlerGroup<T>(this, consumerRepository, workerPool.getWorkerSequences());
    }

    //workpool 构造方法
    public WorkerPool(final RingBuffer<T> ringBuffer,
                      final SequenceBarrier sequenceBarrier,
                      final ExceptionHandler exceptionHandler,
                      final WorkHandler<? super T>... workHandlers)
    {
        this.ringBuffer = ringBuffer;
        final int numWorkers = workHandlers.length;
        //创建一个和消费者线程个数相同的WorkProcessor数组
        workProcessors = new WorkProcessor[numWorkers];

        for (int i = 0; i < numWorkers; i++)
        {
            //消费者在源码中的表现形式就是WorkProcessor,通过构造方法可以知道,一个workpool中的消费者程序,使用的是相同的sequenceBarrier, workSequence
            workProcessors[i] = new WorkProcessor<T>(ringBuffer,
                                                     sequenceBarrier,
                                                     workHandlers[i],
                                                     exceptionHandler,
                                                     workSequence);
        }
    }

    这个部分就是创建一个workpool,根据每个WorkHandler创建对应的WorkProcessor,同一个workpool中的消费者线程共享同一个sequenceBarrier,workSequence,类的关系可以看上面的类图

 

   (3) 启动disruptor

      

    public RingBuffer<T> start()
    {
        //获取每一个消费者程序的Sequence和workpool的Sequence
        Sequence[] gatingSequences = consumerRepository.getLastSequenceInChain(true);
        ringBuffer.addGatingSequences(gatingSequences);

        checkOnlyStartedOnce();
        for (ConsumerInfo consumerInfo : consumerRepository)
        {
            consumerInfo.start(executor);
        }

        return ringBuffer;
    }

    //根据一系列的引用,找到消费者程序WorkProcessor,初始化每个WorkProcessor的sequence,然后执行提交到线程池执行
    public RingBuffer<T> start(final Executor executor)
    {
        if (!started.compareAndSet(false, true))
        {
            throw new IllegalStateException("WorkerPool has already been started and cannot be restarted until halted.");
        }

        final long cursor = ringBuffer.getCursor();
        workSequence.set(cursor);

        for (WorkProcessor<?> processor : workProcessors)
        {
            processor.getSequence().set(cursor);
            executor.execute(processor);
        }

        return ringBuffer;
    }

    启动Disruptor就是启动消费者线程

 

   (4)单一生产者生产数据

      

    @Override
    public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0)
    {
        //获取RingBuffer下一个可操作的序列
        final long sequence = sequencer.next();
        //把数据set到队列,设置cursor的序列
        translateAndPublish(translator, sequence, arg0);
    }

    /**
     * @see Sequencer#next()
     */
    @Override
    public long next()
    {
        return next(1);
    }

    /**
     * @see Sequencer#next(int)
     */
    @Override
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }
        //RingBuffer持有Sequencer,SingleProducerSequencer有nextValue和cachedValue两个成员变量,前者记录生产者生产到的位置,后者记录消费者线程中序列号最小的序列号,即是在最前面的消费者的序号
        long nextValue = this.nextValue;

        long nextSequence = nextValue + n;
        //wrapPoint是一个很关键的变量,这个变量决定生产者是否可以覆盖序列号nextSequence,wrapPoint是为什么是nextSequence - bufferSize;RingBuffer表现出来的是一个环形的数据结构,实际上是一个长度为bufferSize的数组, 
        //nextSequence - bufferSize如果nextSequence小于bufferSize wrapPoint是负数,表示可以一直生产;如果nextSequence大于bufferSize wrapPoint是一个大于0的数,由于生产者和消费者的序列号差距不能超过bufferSize
        //(超过bufferSize会覆盖消费者未消费的数据),wrapPoint要小于等于多个消费者线程中消费的最小的序列号,即cachedValue的值,这就是下面if判断的根据
        long wrapPoint = nextSequence - bufferSize;
        long cachedGatingSequence = this.cachedValue;

        //继续生成会覆盖消费者未消费的数据
        if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            long minSequence;
            //判断wrapPoint是否大于消费者线程最小的序列号,如果大于,不能写入,继续等待
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }
            //满足生产条件了,缓存这次消费者线程最小消费序号,供下次使用
            this.cachedValue = minSequence;
        }
        //缓存生产者最大生产序列号
        this.nextValue = nextSequence;

        return nextSequence;
    }
 

   (5) 消费者程序WorkProccessor

     

    
    //线程的逻辑
    @Override
    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();
      
        //标志位,用来标志一次消费过程,此标志位在代码方面用的很巧妙,把两次执行揉成一段代码
        boolean processedSequence = true;
        //用来缓存消费者可以使用的RingBuffer最大序列号
        long cachedAvailableSequence = Long.MIN_VALUE;
        //记录被分配的WorkSequence的序列号,也是去RingBuffer取数据的序列号
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                // if previous sequence was processed - fetch the next sequence and set
                // that we have successfully processed the previous sequence
                // typically, this will be true
                // this prevents the sequence getting too far forward if an exception
                // is thrown from the WorkHandler
                //每次消费开始执行
                if (processedSequence)
                {
                    processedSequence = false;
                    //使用CAS算法从WorkPool的序列WorkSequence取得可用序列nextSequence
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
                //如果可使用的最大序列号cachedAvaliableSequence大于等于我们要使用的序列号nextSequence,直接从RingBuffer取数据;不然进入else
                if (cachedAvailableSequence >= nextSequence)
                {
                    //可以满足消费的条件,根据序列号去RingBuffer去读取数据
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    //一次消费结束,设置标志位
                    processedSequence = true;
                }
                else
                {//等待生产者生产,获取到最大的可以使用的序列号
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }

        notifyShutdown();

        running.set(false);
    }

    //ProcessorSequencerBarrier.java等待生产者生产出更多的产品用来消费
    @Override
    public long waitFor(final long sequence)
        throws AlertException, InterruptedException, TimeoutException
    {
        checkAlert();
        
        //根据选用的等待策略来等待生产者生产
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
        
        //这个不明白是为什么,生产者最大的序列小于要使用的序列,直接返回了,上面的run()方法中的while循环要再执行一遍,不明白此处的用意 
        if (availableSequence < sequence)
        {
            return availableSequence;
        }
        //返回较大的Sequence
        return sequencer.getHighestPublishedSequence(sequence, availableSequence);
    }

    //YieldingWaitStrategy 等待策略,先尝试一百次,再不满足条件,当前线程就yield,让其他线程先执行
    @Override
    public long waitFor(
        final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
        throws AlertException, InterruptedException
    {
        long availableSequence;
        //等待次数
        int counter = SPIN_TRIES;
 
        //循环,如果生产的最大序列号小于消费者需要的序列号,继续等待,等待次数超过counter次,线程yield
        //这里dependentSequence就是cursorSequence,在ProcessorSequencerBarrier构造函数中可以看到
        while ((availableSequence = dependentSequence.get()) < sequence)
        {
            counter = applyWaitMethod(barrier, counter);
        }

        return availableSequence;
    }

    //counter大于0则减一返回,否则当前线程yield
    private int applyWaitMethod(final SequenceBarrier barrier, int counter)
        throws AlertException
    {
        barrier.checkAlert();

        if (0 == counter)
        {
            Thread.yield();
        }
        else
        {
            --counter;
        }

        return counter;
    }

    消费者的整体逻辑:多个消费者共同使用同一个Sequence即workSequence,大家都从这个sequence里取得序列号,通过CAS保证线程安全,然后每个消费者拿到序列号nextSequence后去和RingBuffer的cursor比较,即生产者生产到的最大序列号比较,如果自己要取的序号还没有被生产者生产出来,则等待生产者生成出来后再从RingBuffer中取数据,处理数据

 

  上面的每个线程run()方法是while(true)怎么停下来的呢,通过跑出异常来控制的,抛出AlertException,在捕捉到异常后break循环语句块

   促使抛出异常在关闭disruptor的时候

  (6)disruptor关闭

 

 

    
    public void shutdown(final long timeout, final TimeUnit timeUnit) throws TimeoutException
    {
        final long timeOutAt = System.currentTimeMillis() + timeUnit.toMillis(timeout);
        //断是否有剩余消息未发送,有则继续循环
        while (hasBacklog())
        {
            if (timeout >= 0 && System.currentTimeMillis() > timeOutAt)
            {
                throw TimeoutException.INSTANCE;
            }
            // Busy spin
        }
        //数据全部发送和消费完毕
        halt();
    }



   /**
     * Confirms if all messages have been consumed by all event processors
     */
    private boolean hasBacklog()
    {
        final long cursor = ringBuffer.getCursor();
        for (final Sequence consumer : consumerRepository.getLastSequenceInChain(false))
        {
            //通过判断生产数是否等于消费数,等于表示生产消费结束,返回false
            if (cursor > consumer.get())
            {
                return true;
            }
        }
        return false;
    }

    //线程运行设置为false, Barrier的alert设置为true,run()方法执行while循环的时候会检查一下alert,为true则跳出循环
    @Override
    public void halt()
    {
        running.set(false);
        sequenceBarrier.alert();
    }
 

 

 总体来说:RingBuffer在生产Sequencer中记录一个cursor,追踪生产者生产到的最新位置,通过WorkSequence和sequence记录整个workpool消费的位置和每个WorkProcessor消费到位置,来协调生产和消费程序

  • 大小: 69.8 KB
  • 大小: 161 KB
  • 大小: 44.8 KB
分享到:
评论
2 楼 544483342 2017-10-25  
楼主请问WorkerEventHandler和EventHandler这两个处理器的区别是什么啊?
1 楼 544483342 2017-10-25  
请问楼主使用的是什么UML工具啊?

相关推荐

    disruptor 多个消费者

    与传统的队列不同,Disruptor的环形缓冲区允许多个生产者和消费者并发操作,避免了锁竞争,从而实现了更高的性能。 1. **多消费者机制**:在Disruptor中,可以注册多个消费者监听事件。每个消费者都有一个独立的...

    disruptor-3.2.1源码带jar包20140321

    序列号的增一操作是原子性的,这使得多个生产者和消费者可以安全地并行操作,而无需传统的锁机制。 3. **事件处理器**:Disruptor中的事件处理器是处理缓冲区内事件的类。它们按照预定义的顺序连接在一起,形成一个...

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    - **Sequencer**:负责为生产者和消费者分配唯一的序列号,确保数据的有序性和一致性。 - **Producers**:生产者将消息放入Ring Buffer,遵循特定的策略(如使用_claim_操作)来避免冲突。 - **Consumers**:消费...

    Disruptor C++版(仅支持单生产者)

    7. 唤醒策略:Disruptor使用一种高效的唤醒机制,当消费者等待新事件时,生产者会唤醒它们,而不是让它们持续轮询,这样可以降低CPU使用率。 在使用Disruptor C++版时,你可以参考提供的示例代码来理解和实现自己的...

    disruptor jar包+Demo+Api

    Disruptor 还引入了 ClaimStrategy 和 WaitStrategy 两种策略,分别用于控制生产者的发布速度和消费者的等待策略,从而达到最佳的性能效果。 总的来说,这个压缩包提供了一个了解和学习 Disruptor 的良好起点,通过...

    disruptor高性能Java线程间通讯库

    然后配置Ring Buffer大小、等待策略等参数,设置生产者和消费者,最后启动EventProcessor链。 通过深入理解和合理运用Disruptor,开发者可以构建出高效、低延迟的并发系统。对于Java开发中的性能优化,Disruptor是...

    Disruptor并发框架中文参考文档

    **Disruptor** 的核心在于它的Ring Buffer设计,这是一种特殊的循环数组结构,能够高效地支持多个生产者向其中添加数据以及多个消费者从中读取数据的过程,而无需使用传统的锁机制。下面将详细介绍Disruptor的工作...

    Disruptor应用实例

    Disruptor的另一个关键特性是序列号(Sequence),每个生产者和消费者都有独立的序列号,用于跟踪它们对环形缓冲区的操作。这种设计保证了数据处理的有序性,避免了线程间的竞态条件。 在Disruptor中,生产者将数据...

    springboot整合Disruptor并发编程框架 #资源达人分享计划#

    Disruptor的工作原理基于环形缓冲区(Ring Buffer)的设计,它将生产者和消费者之间的数据传递过程转换为顺序写入和读取,避免了传统并发模型中的锁竞争和上下文切换,从而实现了极高的消息传递效率。在SpringBoot中...

    DisruptorDemo.zip

    7. 信号量(Barrier):在生产者和消费者之间建立同步点,确保数据的正确传递。 在"DisruptorDemo.zip"的示例代码中,我们可以看到以下关键类和方法: 1. `DisruptorDemo`:主类,创建Disruptor实例并启动事件...

    Disruptor学习(1)

    每个生产者和消费者都有一个独立的Sequence,表示它们当前处理到的事件位置。通过比较和更新这些Sequence,Disruptor能够确保事件处理的正确顺序,同时避免了传统的锁和条件变量带来的性能瓶颈。 在Disruptor的工作...

    disruptor 实例

    3. **多生产者与多消费者**:Disruptor支持多个生产者和消费者并发操作,通过序列号的同步机制,保证了数据的一致性。 4. **事件处理流水线**:通过预定义的事件处理器链,Disruptor能构建出高效的数据处理流水线,...

    SourceAnalysis_Disruptor:Disruptor原始码解析-源码解析

    序列号是Disruptor中的关键概念,每个生产者和消费者都有自己的独立序列号,用来跟踪它们在环形缓冲区中的位置。屏障(Barrier)则是协调生产者和消费者之间工作关系的关键,它包括生产者屏障和消费者屏障。生产者...

    Disruptor资料合集

    Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用...

    Disruptor demo

    2. **序列号(Sequence)**:每个生产者和消费者都有自己的序列号,用于跟踪其在环形缓冲区中的位置。序列号的增减操作是原子性的,确保了多线程环境下的一致性。 3. **事件处理(Event Processing)**:在...

    高性能高稳定性分布式的游戏架构,游戏逻辑运行在Disruptor消费者线程中,其它线程都为辅助线程, 整体为多生产者.zip

    本文将深入探讨一个使用Java开发的游戏项目,该项目采用了一种独特的方式,即游戏逻辑运行在Disruptor消费者线程中,其余线程作为辅助线程,构建了一个多生产者的分布式架构。这个设计模式旨在优化并发性能,提高...

    Disruptor 一种可替代有界队列完成并发线程间数据交换高性能解决方案.docx

    Disruptor 的核心在于其创新的数据交换机制,包括使用栅栏(barrier)和序号(Sequencing)来协调生产者和消费者,避免了锁和CAS操作带来的开销。 1. **并发的复杂性** 在多线程环境中,线程间的同步和通信通常会...

    Netty 使用Disruptor机制的处理源代码

    2. **消除缓存失效**:由于所有生产者和消费者共享同一个内存区域,所以缓存局部性得到优化,减少了缓存失效的开销。 3. **预定义的序列号**:Disruptor 提供全局唯一的序列号,确保消息的正确顺序。 在 Netty 中,...

Global site tag (gtag.js) - Google Analytics