论坛首页 Java企业应用论坛

Disruptor 源码分析(3) Disruptor的生产者

浏览 1763 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2013-09-20  
RingBuffer是生产者的入口

<pre name="code" class="java">
    private final Object[] entries;
    private final int bufferSize;
    private final Sequencer sequencer;</pre>

sequencer是真正的生产者.

entries就是要被生产者和消费者争夺的数组,bufferSize是数组的大小.
bufferSize是通过Disruptor的构造方法传递过来的.

1 生产者的构造

Sequencer默认的实现是MultiProducerSequencer(多个生产者),
还有一种是SingleProducerSequencer(只有一个生产者).

<pre name="code" class="java"> public Disruptor(final EventFactory&lt;T&gt; eventFactory,
                     final int ringBufferSize,
                     final Executor executor,
                     final ProducerType producerType,
                     final WaitStrategy waitStrategy)
    {
        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
             executor);
    }</pre>

可以在Disruptor的构造方法中传入对应的producerType的选择.
不传这个参数的话就是MultiProducerSequencer.

2 生产者的运作
主要是通过next方法抢到位置:


@Override
    public long next(int n)
    {
        if (n &lt; 1)
        {
            throw new IllegalArgumentException("n must be &gt; 0");
        }

        long current;
        long next;

        do
        {
            current = cursor.get();
            next = current + n;

            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();

            if (wrapPoint &gt; cachedGatingSequence || cachedGatingSequence &gt; current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

                if (wrapPoint &gt; gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence);
            }
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);

        return next;
    }


long wrapPoint = next - bufferSize;
如果wrapPoint大于0,表示生产者开始从后面追消费者了.

long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
gatingSequence获取的是最慢的消费者的位置.

wrapPoint &gt; gatingSequence
表示生产者从后面超过消费者,这个是不允许的.
所以这里的策略是LockSupport.parkNanos(1);等待一毫秒,再从新检查.

else if (cursor.compareAndSet(current, next))
生成者可以正常去抢位置,compareAndSet不能保证一定成功,所以你可以看到 while (true),其实是会不断去尝试,直到成功.






论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics