浏览 1763 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2013-09-20
RingBuffer是生产者的入口
<pre name="code" class="java"> private final Object[] entries; private final int bufferSize; private final Sequencer sequencer;</pre> sequencer是真正的生产者. entries就是要被生产者和消费者争夺的数组,bufferSize是数组的大小. bufferSize是通过Disruptor的构造方法传递过来的. 1 生产者的构造 Sequencer默认的实现是MultiProducerSequencer(多个生产者), 还有一种是SingleProducerSequencer(只有一个生产者). <pre name="code" class="java"> public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final Executor executor, final ProducerType producerType, final WaitStrategy waitStrategy) { this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), executor); }</pre> 可以在Disruptor的构造方法中传入对应的producerType的选择. 不传这个参数的话就是MultiProducerSequencer. 2 生产者的运作 主要是通过next方法抢到位置: @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } long current; long next; do { current = cursor.get(); next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; } gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; } long wrapPoint = next - bufferSize; 如果wrapPoint大于0,表示生产者开始从后面追消费者了. long gatingSequence = Util.getMinimumSequence(gatingSequences, current); gatingSequence获取的是最慢的消费者的位置. wrapPoint > gatingSequence 表示生产者从后面超过消费者,这个是不允许的. 所以这里的策略是LockSupport.parkNanos(1);等待一毫秒,再从新检查. else if (cursor.compareAndSet(current, next)) 生成者可以正常去抢位置,compareAndSet不能保证一定成功,所以你可以看到 while (true),其实是会不断去尝试,直到成功. 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |