`

Disruptor 源码分析(4) Disruptor的消费者

 
阅读更多

ConsumerRepository是消费者的入口

ConsumerInfo中保存了一个消费者的所有的信息.
ConsumerRepository的consumerInfos字段是个ConsumerInfo数组,保存了全部消费者的信息.

ConsumerInfo中有个SequenceBarrier字段,这个是用来获取生产者的位置信息的.

ConsumerInfo中的EventProcessor是真正的消费者.
有人会说我自己写的是EventHandler作为消费者啊,其实EventProcessor是做了封装
EventProcessor中有个字段就是eventHandler.

如果对Disruptor只传入EventHandler的话:

Disruptor类中:


public EventHandlerGroup<T> handleEventsWith(final EventHandler<T>... handlers)
    {
        return createEventProcessors(new Sequence[0], handlers);
    }

 Disruptor类中: 

EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
                                               final EventHandler<T>[] eventHandlers)
    {      
            final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);


可以看到默认的EventProcessor是BatchEventProcessor.

BatchEventProcessor类:


  private final DataProvider<T> dataProvider;
    private final SequenceBarrier sequenceBarrier;
    private final EventHandler<T> eventHandler;


dataProvider其实就是ringBuffer,用来获取ringBuffer那个位置的Event
sequenceBarrier用来获取生产者的位置信息
eventHandler是具体处理Event的策略,这个是在Disruptor的handleEventsWith方法中传进来的.

消费者的实现.
BatchEventProcessor类是实现了Runnable接口的.所以每一个消费者都是可以作为一个线程来跑的.
这也是 Disruptor的构造类传入一个线程池的原因,线程池就是用来跑消费者的.



@Override
    public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        T event = null;
        long nextSequence = sequence.get() + 1L;
        try
        {
            while (true)
            {
                try
                {
                    final long availableSequence = sequenceBarrier.waitFor(nextSequence);

                    if (nextSequence > availableSequence)
                    {
                        Thread.yield();
                    }

                    while (nextSequence <= availableSequence)
                    {
                        event = dataProvider.get(nextSequence);
                        eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                        nextSequence++;
                    }

                    sequence.set(availableSequence);
                }
                catch (final TimeoutException e)
                {
                    notifyTimeout(sequence.get());
                }
                catch (final AlertException ex)
                {
                    if (!running.get())
                    {
                        break;
                    }
                }
                catch (final Throwable ex)
                {
                    exceptionHandler.handleEventException(ex, nextSequence, event);
                    sequence.set(nextSequence);
                    nextSequence++;
                }
            }
        }
        finally
        {
            notifyShutdown();
            running.set(false);
        }
    }



run方法看起来很长,其实主要就是这个:
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
这个是获取生产者的位置的

if (nextSequence > availableSequence) 表示消费者超过生产者,这个是不允许的.
这里的操作是Thread.yield();不同的EventProcessor这里采取的策略也会不一样.
while (true) 可以看到在不成功的状态下会不断去检查生产者的信息.

waitFor的实现是waitStrategy.waitFor来实现的,waitStrategy有很多中不同的策略,默认是BlockingWaitStrategy.
BlockingWaitStrategy的实现中用到了lock.lock();意思就是同时只允许一个个消费者排队去抢,下一个消费者要等待上一个消费者处理完一个之后才能抢.

在SequenceBarrie的waitFor的最后一步,会通过getHighestPublishedSequence来检查生产者的位置信息的标志是否正常.这个是和生产者的publish方法联系起来的.


                 


分享到:
评论

相关推荐

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    - **Sequencer**:负责为生产者和消费者分配唯一的序列号,确保数据的有序性和一致性。 - **Producers**:生产者将消息放入Ring Buffer,遵循特定的策略(如使用_claim_操作)来避免冲突。 - **Consumers**:消费...

    Disruptor应用实例

    《Disruptor应用实例》 Disruptor是高性能并发编程领域的一个重要工具...在《DisruptorStudy》这个压缩包文件中,包含了对Disruptor的详细学习资料,包括源码分析和实践案例,帮助你更全面地掌握这一强大的并发工具。

    无锁队列Disruptor超详细教程

    深入Disruptor源码分析 #### (1) 核心概念 - **Ring Buffer**:环形队列是Disruptor的基础数据结构,它使用固定大小的数组,避免了动态扩容带来的开销。 - **Sequence**:序列号,用于跟踪Ring Buffer中每个槽位的...

    Java工具:高性能并发工具Disruptor简单使用

    Disruptor还引入了事件处理器链,使得多个消费者可以同时处理一个事件,进一步提升了处理效率。 使用Disruptor的步骤大致如下: 1. **创建Event**: 定义事件类,它是Disruptor中的基本数据单元,用于传递信息。 `...

    Disruptor并发框架

    七、源码分析 深入理解Disruptor的源码,可以帮助开发者更好地掌握其实现原理,从而在实际项目中更有效地应用。例如,了解如何实现无锁算法、Sequence的管理以及Ring Buffer的细节,都有助于提升代码的并发性能。 ...

    无锁队列

    3. 阅读和分析Disruptor框架的源码,理解其序列、事件处理器链和屏障(Barrier)等核心概念。 4. 实践编写简单的无锁队列,通过实际操作加深理解。 5. 分析和对比无锁队列与其他并发数据结构(如阻塞队列、同步队列...

    java并发框架源码-notes:记录各种学习笔记(Java、算法、框架、数据库、并发、源码...)

    学习并发框架源码还需要理解一些经典并发编程模式,如生产者消费者模型、双端队列、工作窃取等,这些模式在很多并发框架中都有应用。 7. **并发工具类**: `Future`和`Callable`接口提供了异步计算的能力,`...

    Utilities:开源应用程序的集合

    【标题】"Utilities:开源应用程序的集合" 涉及的知识点主要集中在开源软件和特定的编程技术上,包括Facebook广告API、基于Disruptor的Messaging API以及Apache Kafka的生产者实现。 首先,"开源应用程序的集合"指的...

    mpmc

    在多线程环境下,多个生产者线程可以同时向队列添加元素,而多个消费者线程也可以同时从中取出元素,这种设计模式提高了系统的并行处理能力。 在Java中,JDK的Concurrent包并没有提供原生的MPMC队列实现。然而,有...

Global site tag (gtag.js) - Google Analytics