`
BrokenDreams
  • 浏览: 254641 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:100407
社区版块
存档分类
最新评论

disruptor-3.3.2源码解析(2)-队列

阅读更多

disruptor-3.3.2源码解析(2)-队列

作者:大飞

 

  • Disruptor中的队列-RingBuffer
       RingBuffer是disruptor最重要的核心组件,如果以生产者/消费者模式来看待disruptor框架的话,那RingBuffer就是生产者和消费者的工作队列了。RingBuffer可以理解为是一个环形队列,那内部是怎么实现的呢?看下源码。
 
       首先,RingBuffer实现了一系列接口,Cursored、EventSequencer和EventSink,Cursored上篇提过了,这里看下后面两个:
public interface EventSequencer<T> extends DataProvider<T>, Sequenced{
}
 
public interface DataProvider<T>{
    T get(long sequence);
}
       EventSequencer扩展了Sequenced,提供了一些序列功能;同时扩展了DataProvider,提供了按序列值来获取数据的功能。 
 
public interface EventSink<E>{
    void publishEvent(EventTranslator<E> translator);

    boolean tryPublishEvent(EventTranslator<E> translator);

    <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0);

    <A> boolean tryPublishEvent(EventTranslatorOneArg<E, A> translator, A arg0);

    <A, B> void publishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1);

    <A, B> boolean tryPublishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1);

    <A, B, C> void publishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2);

    <A, B, C> boolean tryPublishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2);

    void publishEvent(EventTranslatorVararg<E> translator, Object... args);

    boolean tryPublishEvent(EventTranslatorVararg<E> translator, Object... args);

    void publishEvents(EventTranslator<E>[] translators);

    void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize);

    boolean tryPublishEvents(EventTranslator<E>[] translators);

    boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize);

    <A> void publishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0);

    <A> void publishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0);

    <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0);

    <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0);

    <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1);

    <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1);

    <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1);

    <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1);

    <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2);

    <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1, C[] arg2);

    <A, B, C> boolean tryPublishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2);

    <A, B, C> boolean tryPublishEvents(EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt, int batchSize, A[] arg0, B[] arg1, C[] arg2);

    void publishEvents(EventTranslatorVararg<E> translator, Object[]... args);

    void publishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args);

    boolean tryPublishEvents(EventTranslatorVararg<E> translator, Object[]... args);

    boolean tryPublishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args);
}
       可见,EventSink主要是提供发布事件(就是往队列上放数据)的功能,接口上定义了以各种姿势发布事件的方法。
 
       了解了RingBuffer的接口功能,下面看下RingBuffer的结构:
abstract class RingBufferPad{
    protected long p1, p2, p3, p4, p5, p6, p7;
}
abstract class RingBufferFields<E> extends RingBufferPad{
    private static final int BUFFER_PAD;
    private static final long REF_ARRAY_BASE;
    private static final int REF_ELEMENT_SHIFT;
    private static final Unsafe UNSAFE = Util.getUnsafe();
    static{
        final int scale = UNSAFE.arrayIndexScale(Object[].class);
        if (4 == scale){
            REF_ELEMENT_SHIFT = 2;
        } else if (8 == scale){
            REF_ELEMENT_SHIFT = 3;
        }else{
            throw new IllegalStateException("Unknown pointer size");
        }
        BUFFER_PAD = 128 / scale;
        // Including the buffer pad in the array base offset
        REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);
    }
    private final long indexMask;
    private final Object[] entries;
    protected final int bufferSize;
    protected final Sequencer sequencer;
    RingBufferFields(EventFactory<E> eventFactory,
                     Sequencer       sequencer){
        this.sequencer  = sequencer;
        this.bufferSize = sequencer.getBufferSize();
        if (bufferSize < 1){
            throw new IllegalArgumentException("bufferSize must not be less than 1");
        }
        if (Integer.bitCount(bufferSize) != 1){
            throw new IllegalArgumentException("bufferSize must be a power of 2");
        }
        this.indexMask = bufferSize - 1;
        this.entries   = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
        fill(eventFactory);
    }
    private void fill(EventFactory<E> eventFactory){
        for (int i = 0; i < bufferSize; i++){
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }
    @SuppressWarnings("unchecked")
    protected final E elementAt(long sequence){
        return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
    }
}

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E>{
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
    protected long p1, p2, p3, p4, p5, p6, p7;

    RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer){
        super(eventFactory, sequencer);
    }
       RingBuffer的内部结构明确了:内部用数组来实现,同时有保存数组长度的域bufferSize和下标掩码indexMask,还有一个sequencer。
       这里要注意几点:
              1.整个RingBuffer内部做了大量的缓存行填充,前后各填充了56个字节,entries本身也根据引用大小进行了填充,假设引用大小为4字节,那么entries数组两侧就要个填充32个空数组位。也就是说,实际的数组长度比bufferSize要大。所以可以看到根据序列从entries中取元素的方法elementAt内部做了一些调整,不是单纯的取模。
              2.bufferSize必须是2的幂,indexMask就是bufferSize-1,这样取模更高效(sequence&indexMask)。
              3.初始化时需要传入一个EventFactory,用来做队列内事件的预填充。
 
       总结下RingBuffer的特点:内部做了细致的缓存行填充,避免伪共享;内部队列基于数组实现,能很好的利用程序的局部性;队列上的事件槽会不断重复利用,不受JavaGC的影响。
 
       从上面的代码看到,RingBuffer对本包外屏蔽了构造方法,那么怎创建RingBuffer实例呢? 
    public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory,
                                                        int             bufferSize,
                                                        WaitStrategy    waitStrategy){
        MultiProducerSequencer sequencer = new MultiProducerSequencer(bufferSize, waitStrategy);
        return new RingBuffer<E>(factory, sequencer);
    }

    public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize){
        return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
    }

    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory,
                                                         int             bufferSize,
                                                         WaitStrategy    waitStrategy){
        SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
        return new RingBuffer<E>(factory, sequencer);
    }

    public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize){
        return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
    }

    public static <E> RingBuffer<E> create(ProducerType    producerType,
                                           EventFactory<E> factory,
                                           int             bufferSize,
                                           WaitStrategy    waitStrategy){
        switch (producerType){
        case SINGLE:
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
        }
    }
       可见,RingBuffer提供了静态工厂方法分别针对单事件发布者和多事件发布者的情况进行RingBuffer实例创建。
 
       RingBuffer方法实现也比较简单,看几个:
    @Override
    public E get(long sequence){
        return elementAt(sequence);
    }
       事件发布者和事件处理者申请到序列后,都会通过这个方法从序列上获取事件槽来生产或者发布事件。
 
    @Override
    public long next(int n){
        return sequencer.next(n);
    }
       next系列的方法是通过内部的sequencer来实现的。   
 
    public boolean isPublished(long sequence){
        return sequencer.isAvailable(sequence);
    }
       判断某个序列是否已经被事件发布者发布事件。  
 
    public void addGatingSequences(Sequence... gatingSequences){
        sequencer.addGatingSequences(gatingSequences);
    }

    public long getMinimumGatingSequence(){
        return sequencer.getMinimumSequence();
    }

    public boolean removeGatingSequence(Sequence sequence){
        return sequencer.removeGatingSequence(sequence);
    }
       追踪序列相关的方法。 
 
    @Override
    public void publishEvent(EventTranslator<E> translator){
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence);
    }
    private void translateAndPublish(EventTranslator<E> translator, long sequence){
        try{
            translator.translateTo(get(sequence), sequence);
        }finally{
            sequencer.publish(sequence);
        }
    }
       可见,发布事件分三步:
              1.申请序列。
              2.填充事件。
              3.提交序列。
 
       其他方法实现都很简单,这里不啰嗦了。
 
  • 最后总结:
       有关RingBuffer要记住以下几点:
              1.RingBuffer是协调事件发布者和事件处理者的中间队列,事件发布者发布事件放到RingBuffer,事件处理者从RingBuffer上拿事件进行消费。
              2.RingBuffer可以认为是一个环形队列,底层由数组实现。内部做了大量的缓存行填充,保存事件使用的数组的长度必须是2的幂,这样可以高效的取模(取模本身就包含绕回逻辑,按照序列不断的增长,形成一个环形轨迹)。由于RingBuffer这样的特性,也避免了GC带来的性能影响,因为RingBuffer本身永远不会被GC。
              3.RingBuffer和普通的FIFO队列相比还有一个重要的区别就是,RingBuffer避免了头尾节点的竞争,多个事件发布者/事件处理者之间不必竞争同一个节点,只需要安全申请序列值各自存取事件就好了。 
 
分享到:
评论
1 楼 746238836 2019-01-25  
整个RingBuffer内部做了大量的缓存行填充,前后各填充了56个字节,entries本身也根据引用大小进行了填充,假设引用大小为4字节,那么entries数组两侧就要个填充32个空数组位。  这一段的因果关系没理解,为什么前后填充了56个字节,引用大小为4字节,,,这时候entries数组两侧就要各填充32个空数组位,这个因果关系没理解,望告知!!!

相关推荐

    disruptor-3.3.2.jar 并发

    不错的框架,可以好好研究研究,速度下载,速度下载速度下载速度下载

    disruptor-3.2.0.jar

    disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载

    disruptor-3.4.2.jar 及 disruptor-3.4.2-sources.jar

    disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包

    disruptor-3.4.4.jar disruptor 3.4.4 jar 官方github下载

    disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)

    disruptor-3.3.0-API文档-中文版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.4.2.jar

    disruptor-3.4.2.jar

    disruptor-3.3.0-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.2.1源码带jar包20140321

    在"disruptor-3.2.1源码带jar包20140321"这个资源中,包含了Disruptor的源代码,这对于理解其内部机制和定制化开发非常有帮助。通过阅读源码,你可以更深入地了解如何利用Disruptor构建高效的并发系统。 此外,你还...

    disruptor-3.3.7-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...

    disruptor-3.3.7-API文档-中文版.zip

    赠送jar包:disruptor-3.3.7.jar; 赠送原API文档:disruptor-3.3.7-javadoc.jar; 赠送源代码:disruptor-3.3.7-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.7.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.3.11.jar

    disruptor-3.3.11.jar 无锁并行框架 值得学习 jar包

    disruptror的jar包和例子

    2. 创建Disruptor实例:配置Disruptor,包括设置环形缓冲区的大小、事件处理器链以及事件工厂等。 3. 注册事件处理器:定义处理事件的处理器,并将其注册到Disruptor上,可以有多个处理器并行处理事件。 4. 启动...

    disruptor-3.3.6.jar

    java运行依赖jar包

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    3. **API与源码解析** - `disruptor-3.0.1.jar`:这是Disruptor的运行时库,包含了框架的类和接口,供开发者在项目中引用。 - `disruptor-3.0.1-sources.jar`:提供源代码,帮助开发者理解内部实现,方便调试和...

    disruptor-3.2.1.zip

    《Disruptor-3.2.1与Play2Memcached:开源项目的魅力解析》 在IT行业中,开源项目一直是技术创新的重要推动力。这次我们要探讨的是两个极具影响力的开源项目——Disruptor-3.2.1和Play2Memcached。它们分别在并发...

    netty结合disruptor队列实现即时通信

    netty结合disruptor队列实现即时通信1、简介使用disruptor改造netty通讯,使提高吞吐率,主要是提供disruptor如何与netty整合的思路2、软件架构spring-boot2.7.3 + netty4.1.36.Final + disruptor + jdk1.83、源码...

    disruptor-3.3.11-sources.jar

    disruptor-3.3.11-sources.jar jar包源码,值得学习,源码

    disruptor-3.3.6.jar中文-英文对照文档.zip

    注:下文中的 *** 代表文件名中的组件名称。 # 包含: 中文-英文对照文档:【***-javadoc-API文档-中文(简体)... (2)有时,一套Java组件会有多个jar,所以在下载前,请仔细阅读本篇描述,以确保这就是你需要的文件;

    disruptor-unity3d, Unity3d Disruptor的基本实现.zip

    disruptor-unity3d, Unity3d Disruptor的基本实现 disruptor-unity3dUnity3d的基本自包含。自包含实现。 仅支持单个生产者/单个用户。 仅在x86平台上测试。 Mono中的Bug 在Unity前可以在iOS和安卓上工作。用法将 ...

Global site tag (gtag.js) - Google Analytics