ConsumerRepository是消费者的入口
ConsumerInfo中保存了一个消费者的所有的信息.
ConsumerRepository的consumerInfos字段是个ConsumerInfo数组,保存了全部消费者的信息.
ConsumerInfo中有个SequenceBarrier字段,这个是用来获取生产者的位置信息的.
ConsumerInfo中的EventProcessor是真正的消费者.
有人会说我自己写的是EventHandler作为消费者啊,其实EventProcessor是做了封装
EventProcessor中有个字段就是eventHandler.
如果对Disruptor只传入EventHandler的话:
Disruptor类中:
public EventHandlerGroup<T> handleEventsWith(final EventHandler<T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
Disruptor类中:
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
final EventHandler<T>[] eventHandlers)
{
final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<T>(ringBuffer, barrier, eventHandler);
可以看到默认的EventProcessor是BatchEventProcessor.
BatchEventProcessor类:
private final DataProvider<T> dataProvider;
private final SequenceBarrier sequenceBarrier;
private final EventHandler<T> eventHandler;
dataProvider其实就是ringBuffer,用来获取ringBuffer那个位置的Event
sequenceBarrier用来获取生产者的位置信息
eventHandler是具体处理Event的策略,这个是在Disruptor的handleEventsWith方法中传进来的.
消费者的实现.
BatchEventProcessor类是实现了Runnable接口的.所以每一个消费者都是可以作为一个线程来跑的.
这也是 Disruptor的构造类传入一个线程池的原因,线程池就是用来跑消费者的.
@Override
public void run()
{
if (!running.compareAndSet(false, true))
{
throw new IllegalStateException("Thread is already running");
}
sequenceBarrier.clearAlert();
notifyStart();
T event = null;
long nextSequence = sequence.get() + 1L;
try
{
while (true)
{
try
{
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (nextSequence > availableSequence)
{
Thread.yield();
}
while (nextSequence <= availableSequence)
{
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
}
catch (final TimeoutException e)
{
notifyTimeout(sequence.get());
}
catch (final AlertException ex)
{
if (!running.get())
{
break;
}
}
catch (final Throwable ex)
{
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}
finally
{
notifyShutdown();
running.set(false);
}
}
run方法看起来很长,其实主要就是这个:
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
这个是获取生产者的位置的
if (nextSequence > availableSequence) 表示消费者超过生产者,这个是不允许的.
这里的操作是Thread.yield();不同的EventProcessor这里采取的策略也会不一样.
while (true) 可以看到在不成功的状态下会不断去检查生产者的信息.
waitFor的实现是waitStrategy.waitFor来实现的,waitStrategy有很多中不同的策略,默认是BlockingWaitStrategy.
BlockingWaitStrategy的实现中用到了lock.lock();意思就是同时只允许一个个消费者排队去抢,下一个消费者要等待上一个消费者处理完一个之后才能抢.
在SequenceBarrie的waitFor的最后一步,会通过getHighestPublishedSequence来检查生产者的位置信息的标志是否正常.这个是和生产者的publish方法联系起来的.
分享到:
相关推荐
- **Sequencer**:负责为生产者和消费者分配唯一的序列号,确保数据的有序性和一致性。 - **Producers**:生产者将消息放入Ring Buffer,遵循特定的策略(如使用_claim_操作)来避免冲突。 - **Consumers**:消费...
《Disruptor应用实例》 Disruptor是高性能并发编程领域的一个重要工具...在《DisruptorStudy》这个压缩包文件中,包含了对Disruptor的详细学习资料,包括源码分析和实践案例,帮助你更全面地掌握这一强大的并发工具。
深入Disruptor源码分析 #### (1) 核心概念 - **Ring Buffer**:环形队列是Disruptor的基础数据结构,它使用固定大小的数组,避免了动态扩容带来的开销。 - **Sequence**:序列号,用于跟踪Ring Buffer中每个槽位的...
Disruptor还引入了事件处理器链,使得多个消费者可以同时处理一个事件,进一步提升了处理效率。 使用Disruptor的步骤大致如下: 1. **创建Event**: 定义事件类,它是Disruptor中的基本数据单元,用于传递信息。 `...
七、源码分析 深入理解Disruptor的源码,可以帮助开发者更好地掌握其实现原理,从而在实际项目中更有效地应用。例如,了解如何实现无锁算法、Sequence的管理以及Ring Buffer的细节,都有助于提升代码的并发性能。 ...
3. 阅读和分析Disruptor框架的源码,理解其序列、事件处理器链和屏障(Barrier)等核心概念。 4. 实践编写简单的无锁队列,通过实际操作加深理解。 5. 分析和对比无锁队列与其他并发数据结构(如阻塞队列、同步队列...
学习并发框架源码还需要理解一些经典并发编程模式,如生产者消费者模型、双端队列、工作窃取等,这些模式在很多并发框架中都有应用。 7. **并发工具类**: `Future`和`Callable`接口提供了异步计算的能力,`...
【标题】"Utilities:开源应用程序的集合" 涉及的知识点主要集中在开源软件和特定的编程技术上,包括Facebook广告API、基于Disruptor的Messaging API以及Apache Kafka的生产者实现。 首先,"开源应用程序的集合"指的...
在多线程环境下,多个生产者线程可以同时向队列添加元素,而多个消费者线程也可以同时从中取出元素,这种设计模式提高了系统的并行处理能力。 在Java中,JDK的Concurrent包并没有提供原生的MPMC队列实现。然而,有...