RingBuffer是生产者的入口
private final Object[] entries;
private final int bufferSize;
private final Sequencer sequencer;
sequencer是真正的生产者.
entries就是要被生产者和消费者争夺的数组,bufferSize是数组的大小.
bufferSize是通过Disruptor的构造方法传递过来的.
1 生产者的构造
Sequencer默认的实现是MultiProducerSequencer(多个生产者),
还有一种是SingleProducerSequencer(只有一个生产者).
public Disruptor(final EventFactory<T> eventFactory,
final int ringBufferSize,
final Executor executor,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
executor);
}
可以在Disruptor的构造方法中传入对应的producerType的选择.
不传这个参数的话就是MultiProducerSequencer.
2 生产者的运作
主要是通过next方法抢到位置:
@Override
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
current = cursor.get();
next = current + n;
long wrapPoint = next - bufferSize;
long cachedGatingSequence = gatingSequenceCache.get();
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
gatingSequenceCache.set(gatingSequence);
}
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
long wrapPoint = next - bufferSize;
如果wrapPoint大于0,表示生产者开始从后面追消费者了.
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
gatingSequence获取的是最慢的消费者的位置.
wrapPoint > gatingSequence
表示生产者从后面超过消费者,这个是不允许的.
所以这里的策略是LockSupport.parkNanos(1);等待一毫秒,再从新检查.
else if (cursor.compareAndSet(current, next))
生成者可以正常去抢位置,compareAndSet不能保证一定成功,所以你可以看到 while (true),其实是会不断去尝试,直到成功.
分享到:
相关推荐
- **Sequencer**:负责为生产者和消费者分配唯一的序列号,确保数据的有序性和一致性。 - **Producers**:生产者将消息放入Ring Buffer,遵循特定的策略(如使用_claim_操作)来避免冲突。 - **Consumers**:消费...
此外,它采用Sequence机制,确保了生产者与消费者之间的顺序一致性,避免了数据竞争。 2. **环形缓冲区的工作流程** 生产者通过一个叫做Claim操作获取槽位,然后写入数据。消费者通过Wait策略等待可消费的消息,...
6. **生产者发布事件**: 生产者通过Disruptor的`publishEvent`方法将事件放入环形缓冲区。 ```java disruptor.getRingBuffer().publishEvent((event, sequence, arg0) -> { event.setData(arg0); }); ``` 7. **...
深入Disruptor源码分析 #### (1) 核心概念 - **Ring Buffer**:环形队列是Disruptor的基础数据结构,它使用固定大小的数组,避免了动态扩容带来的开销。 - **Sequence**:序列号,用于跟踪Ring Buffer中每个槽位的...
七、源码分析 深入理解Disruptor的源码,可以帮助开发者更好地掌握其实现原理,从而在实际项目中更有效地应用。例如,了解如何实现无锁算法、Sequence的管理以及Ring Buffer的细节,都有助于提升代码的并发性能。 ...
3. 阅读和分析Disruptor框架的源码,理解其序列、事件处理器链和屏障(Barrier)等核心概念。 4. 实践编写简单的无锁队列,通过实际操作加深理解。 5. 分析和对比无锁队列与其他并发数据结构(如阻塞队列、同步队列...
学习并发框架源码还需要理解一些经典并发编程模式,如生产者消费者模型、双端队列、工作窃取等,这些模式在很多并发框架中都有应用。 7. **并发工具类**: `Future`和`Callable`接口提供了异步计算的能力,`...
【标题】"Utilities:开源应用程序的集合" 涉及的知识点主要集中在开源软件和特定的编程技术上,包括Facebook广告API、基于Disruptor的Messaging API以及Apache Kafka的生产者实现。 首先,"开源应用程序的集合"指的...
在多线程环境下,多个生产者线程可以同时向队列添加元素,而多个消费者线程也可以同时从中取出元素,这种设计模式提高了系统的并行处理能力。 在Java中,JDK的Concurrent包并没有提供原生的MPMC队列实现。然而,有...