`

Disruptor 源码分析(3) Disruptor的生产者

 
阅读更多
RingBuffer是生产者的入口

    private final Object[] entries;
    private final int bufferSize;
    private final Sequencer sequencer;


sequencer是真正的生产者.

entries就是要被生产者和消费者争夺的数组,bufferSize是数组的大小.
bufferSize是通过Disruptor的构造方法传递过来的.

1 生产者的构造

Sequencer默认的实现是MultiProducerSequencer(多个生产者),
还有一种是SingleProducerSequencer(只有一个生产者).

 public Disruptor(final EventFactory<T> eventFactory,
                     final int ringBufferSize,
                     final Executor executor,
                     final ProducerType producerType,
                     final WaitStrategy waitStrategy)
    {
        this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
             executor);
    }


可以在Disruptor的构造方法中传入对应的producerType的选择.
不传这个参数的话就是MultiProducerSequencer.

2 生产者的运作
主要是通过next方法抢到位置:


@Override
    public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long current;
        long next;

        do
        {
            current = cursor.get();
            next = current + n;

            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();

            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);

                if (wrapPoint > gatingSequence)
                {
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence);
            }
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);

        return next;
    }


long wrapPoint = next - bufferSize;
如果wrapPoint大于0,表示生产者开始从后面追消费者了.

long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
gatingSequence获取的是最慢的消费者的位置.

wrapPoint > gatingSequence
表示生产者从后面超过消费者,这个是不允许的.
所以这里的策略是LockSupport.parkNanos(1);等待一毫秒,再从新检查.

else if (cursor.compareAndSet(current, next))
生成者可以正常去抢位置,compareAndSet不能保证一定成功,所以你可以看到 while (true),其实是会不断去尝试,直到成功.






分享到:
评论

相关推荐

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    - **Sequencer**:负责为生产者和消费者分配唯一的序列号,确保数据的有序性和一致性。 - **Producers**:生产者将消息放入Ring Buffer,遵循特定的策略(如使用_claim_操作)来避免冲突。 - **Consumers**:消费...

    Disruptor应用实例

    此外,它采用Sequence机制,确保了生产者与消费者之间的顺序一致性,避免了数据竞争。 2. **环形缓冲区的工作流程** 生产者通过一个叫做Claim操作获取槽位,然后写入数据。消费者通过Wait策略等待可消费的消息,...

    Java工具:高性能并发工具Disruptor简单使用

    6. **生产者发布事件**: 生产者通过Disruptor的`publishEvent`方法将事件放入环形缓冲区。 ```java disruptor.getRingBuffer().publishEvent((event, sequence, arg0) -&gt; { event.setData(arg0); }); ``` 7. **...

    无锁队列Disruptor超详细教程

    深入Disruptor源码分析 #### (1) 核心概念 - **Ring Buffer**:环形队列是Disruptor的基础数据结构,它使用固定大小的数组,避免了动态扩容带来的开销。 - **Sequence**:序列号,用于跟踪Ring Buffer中每个槽位的...

    Disruptor并发框架

    七、源码分析 深入理解Disruptor的源码,可以帮助开发者更好地掌握其实现原理,从而在实际项目中更有效地应用。例如,了解如何实现无锁算法、Sequence的管理以及Ring Buffer的细节,都有助于提升代码的并发性能。 ...

    无锁队列

    3. 阅读和分析Disruptor框架的源码,理解其序列、事件处理器链和屏障(Barrier)等核心概念。 4. 实践编写简单的无锁队列,通过实际操作加深理解。 5. 分析和对比无锁队列与其他并发数据结构(如阻塞队列、同步队列...

    java并发框架源码-notes:记录各种学习笔记(Java、算法、框架、数据库、并发、源码...)

    学习并发框架源码还需要理解一些经典并发编程模式,如生产者消费者模型、双端队列、工作窃取等,这些模式在很多并发框架中都有应用。 7. **并发工具类**: `Future`和`Callable`接口提供了异步计算的能力,`...

    Utilities:开源应用程序的集合

    【标题】"Utilities:开源应用程序的集合" 涉及的知识点主要集中在开源软件和特定的编程技术上,包括Facebook广告API、基于Disruptor的Messaging API以及Apache Kafka的生产者实现。 首先,"开源应用程序的集合"指的...

    mpmc

    在多线程环境下,多个生产者线程可以同时向队列添加元素,而多个消费者线程也可以同时从中取出元素,这种设计模式提高了系统的并行处理能力。 在Java中,JDK的Concurrent包并没有提供原生的MPMC队列实现。然而,有...

Global site tag (gtag.js) - Google Analytics