`
zhhphappy
  • 浏览: 122376 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Disruptor 学习笔记

    博客分类:
  • java
 
阅读更多

Disruptor 是什么

Disruptor 是一个高性能异步处理框架,也可以认为是一个消息框架,它实现了观察者模式。

Disruptor 比传统的基于锁的消息框架的优势在于:它是无锁的、CPU友好;它不会清除缓存中的数据,只会覆盖,降低了垃圾回收机制启动的频率。

 

Disruptor 为什么快

1. 不使用锁。通过内存屏障和原子性的CAS操作替换锁。

2. 缓存基于数组而不是链表,用位运算替代求模。缓存的长度总是2的n次方,这样可以用位运算 i & (length - 1) 替代 i % length。

3. 去除伪共享。CPU的缓存一般是以缓存行为最小单位的,对应主存的一块相应大小的单元;当前的缓存行大小一般是64字节,每个缓存行一次只能被一个CPU核访问,如果一个缓存行被多个CPU核访问,就会造成竞争,导致某个核必须等其他核处理完了才能继续处理,响应性能。去除伪共享就是确保CPU核访问某个缓存行时不会出现争用。

4.预分配缓存对象,通过更新缓存里对象的属性而不是删除对象来减少垃圾回收。

 

核心类和接口

EventHandler:用户提供具体的实现,在里面实现事件的处理逻辑。

Sequence:代表事件序号或一个指向缓存某个位置的序号。

WaitStrategy:功能包括:当没有可消费的事件时,根据特定的实现进行等待,有可消费事件时返回可事件序号;有新事件发布时通知等待的 SequenceBarrier。

Sequencer:生产者用于访问缓存的控制器,它持有消费者序号的引用;新事件发布后通过WaitStrategy 通知正在等待的SequenceBarrier。

SequenceBarrier:消费者关卡。消费者用于访问缓存的控制器,每个访问控制器还持有前置访问控制器的引用,用于维持正确的事件处理顺序;通过WaitStrategy获取可消费事件序号。

EventProcessor:事件处理器,是可执行单元,运行在指定的Executor里;它会不断地通过SequenceBarrier获取可消费事件,当有可消费事件时调用用户提供的 EventHandler实现处理事件。

EventTranslator:事件转换器,由于Disruptor只会覆盖缓存,需要通过此接口的实现来更新缓存里的事件来覆盖旧事件。

RingBuffer:基于数组的缓存实现,它内部持有对Executor、WaitStrategy、生产者和消费者访问控制器的引用。

Disruptor:提供了对 RingBuffer 的封装,并提供了一些DSL风格的方法,方便使用。

 

实现

Sequence 类

Sequence类表示一个序号,是对long型字段的线程安全的封装,用于跟踪ringBuffer的进度和事件处理器的进度。支持一些并发操作,包括CAS和有序写。尝试在volatile字段周围填充内容来避免伪共享,变得更高效。

 

public class Sequence {
 
    static final long INITIAL_VALUE = -1L;
 
    private static final Unsafe UNSAFE;
 
    private static final long VALUE_OFFSET;



    static {
 
        UNSAFE = Util.getUnsafe();
 
        final int base = UNSAFE.arrayBaseOffset( long[].class );
 
        final int scale = UNSAFE.arrayIndexScale( long[].class );
 
        VALUE_OFFSET = base + (scale * 7);
 
    }



    // 15个元素,从0开始,有效值处于第7个,这样前后各有7个long字段填充,
 
    // 8个long型占共用64字节,而当前CPU的缓存行大小也是64字节,这样可以避免对Sequence的读写出现伪共享。
 
    private final long [] paddedValue = new long [15];



    // 原子地读
 
    public long get() {
 
        return UNSAFE .getLongVolatile(paddedValue, VALUE_OFFSET);
 
    }



    // 原子地写
 
    public void set(final long value) {
 
        UNSAFE.putOrderedLong(paddedValue , VALUE_OFFSET, value);
 
    }



    // CAS
 
    public boolean compareAndSet(final long expectedValue, final long newValue) {
 
        return UNSAFE .compareAndSwapLong(paddedValue, VALUE_OFFSET, expectedValue, newValue);
 
    }



    public long addAndGet(final long increment) {
 
        long currentValue;
 
        long newValue;



        do {
 
            currentValue = get();
 
            newValue = currentValue + increment;
 
        } while (!compareAndSet(currentValue, newValue));



        return newValue;
 
    }

 

 

Disruptor类

Disruptor类是这个类库的门面,用DSL的形式直观地提供了组装事件回调处理的关系链的功能,并提供获取事件、发布事件的方法,缓存容器生命周期管理。

属性

 

private final RingBuffer ringBuffer ; // 核心,绝大多数功能都委托给ringBuffer处理
private final Executor executor ;        // 用于执行事件处理器的线程池
private final ConsumerRepository consumerRepository = new ConsumerRepository();   // 事件处理器仓库,就是事件处理器的集合
private final AtomicBoolean started = new AtomicBoolean( false);     // 启动时检查,只能启动一次
private ExceptionHandler exceptionHandler;  // 异常处理器

 设置EventHandler事件处理器

 

 

/*
 
 * barrierSequences是eventHandlers的前置事件处理关卡,是用来保证事件处理的时序性的关键;
 
 * */
EventHandlerGroup createEventProcessors( final Sequence[] barrierSequences,
 
                                           final EventHandler[] eventHandlers) {
 
    checkNotStarted();  // 确保在容器启动前设置



    final Sequence[] processorSequences = new Sequence[eventHandlers.length ];   // 存放游标的数组
 
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);    // 获取前置的序号关卡



    for ( int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
 
        final EventHandler eventHandler = eventHandlers[i];



        // 封装为批量事件处理器BatchEventProcessor,其实现了Runnable接口,所以可以放到executor去执行处理逻辑;处理器还会自动建立一个序号Sequence。
 
        final BatchEventProcessor batchEventProcessor = new BatchEventProcessor(ringBuffer , barrier, eventHandler);



        if (exceptionHandler != null) { // 如果有则设置异常处理器
 
            batchEventProcessor.setExceptionHandler( exceptionHandler);
 
        }



        // 添加到消费者仓库,会先封装为EventProcessorInfo对象(表示事件处理的一个阶段),
 
        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
 
        processorSequences[i] = batchEventProcessor.getSequence();
 
    }



    if (processorSequences. length > 0) {// 如果有前置关卡,则取消之前的前置关卡对应的EventProcessor 的 链的终点标记。
 
        consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
 
    }



    // EventHandlerGroup是一组EventProcessor,作为disruptor的一部分,提供DSL形式的方法,作为方法链的起点,用于设置事件处理器。
 
    return new EventHandlerGroup(this, consumerRepository, processorSequences);
}
}

 RingBuffer 类

 

 

//  属性的初始化声明
 
    public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE ;
 
    private final int indexMask ;
 
    private final Object[] entries ;
 
    private final int bufferSize ;
 
    private final Sequencer sequencer ;

//  属性的初始化代码
 
    RingBuffer(EventFactory eventFactory,
 
               Sequencer       sequencer) {
 
        this.sequencer     = sequencer;
 
        this.bufferSize    = sequencer.getBufferSize();



        if (bufferSize < 1) {
 
            throw new IllegalArgumentException("bufferSize must not be less than 1");
 
        }
 
        if (Integer.bitCount( bufferSize) != 1) {
 
            throw new IllegalArgumentException("bufferSize must be a power of 2");
 
        }



        this.indexMask = bufferSize - 1;
 
        this.entries    = new Object[sequencer.getBufferSize()];
 
        fill(eventFactory);
 
    } 

 

从以上这些代码可以看出:

RingBuffer是基于数组构建的,因为数组是缓存友好的,相邻的元素一般处于同一个缓存块。

 

缓存的大小必须是2的X次方,这是为了用位运算提高性能;由于数组缓存的容量总是有限,当缓存填满后,又要从 下标0 开始填充,如果缓存大小不是2的X次方,那只能用求模运算来获得新下标,所以还有个indexMask 来保存下标掩码;通过与indexMask 进行按位与可以得到一个安全的下标,不再需要进行下标检查,如:(E)entries[( int)sequence & indexMask ]。

 

从缓存获取事件

 

// 从缓存获取指定序号的事件
public E get(long sequence) {
 
    // 这个按位与操作说明了为什么ringBuffer的大小必须是2的n次方:用高效的 按位与  代替  低效的求模操作。
 
    return (E) entries[(int ) sequence & indexMask];
}
}

 

 

 发布事件

发布事件有3步:获取新事件的序号,覆盖旧事件,通知等待着。最简单的发布事件形式:

 

public void publishEvent(EventTranslator translator) {
 
    final long sequence = sequencer .next(); // 通过生产者序号控制器获取可用序号
 
    translateAndPublish(translator, sequence);  // 转换事件到队列缓存并发布事件
}
}

private void translateAndPublish(EventTranslator translator, long sequence) {
 
    try {
 
        // 发布事件前要先获取对应位置上的旧事件,再用translator把新事件的属性转换到旧事件的属性,从而达到发布的目的。
 
        // 这就是说,Disruptor对于已消费的事件是不删除的,有新事件时只是用新事件的属性去替换旧事件的属性。
 
        // 这带来的一个问题就是内存占用
 
        translator.translateTo(get(sequence), sequence);
 
    } finally {
 
        sequencer.publish(sequence);     // 原子性地更新生产者的序号,并通知在等待的消费者关卡。
 
    }
}
}

生产者序号控制器与消费者关卡是共用同一个等待策略的,一个Disruptor容器只有一个等待策略实例。

EventProcessor

事件处理器的执行单元。有两个实现:NoOpEventProcessor 和 BatchEventProcessor,其中NoOpEventProcessor 是不处理事件的,就不关注了。

BatchEventProcessor

Disruptor提供的唯一有用的 EventProcessor 实现类。

Disruptor容器启动时,会调用 ConsumerInfo 的 start方法,如果 ConsumerInfo 封装的是用户提交的EventHandler 实例,那么会在线程池里运行 EventProcessor,也就是 BatchEventProcessor 实例的 run方法。

public void run() {
 
    // 确保一次只有一个线程执行此方法,这样访问自身的序号就不要加锁
 
    if (! running.compareAndSet(false, true)) {
 
        throw new IllegalStateException("Thread is already running");
 
    }
 
    sequenceBarrier.clearAlert();   // 清除前置序号关卡的通知状态



    notifyStart();  // 声明周期通知,开始前回调



    T event = null;
 
    long nextSequence = sequence.get() + 1L;    // sequence指向上一个已处理的事件,默认是-1
    try {
 
        while (true ) {
 
            try {
 
                // 从它的前置序号关卡获取下一个可处理的事件序号。
 
                // 如果这个事件处理器不依赖于其他的事件处理器,则前置关卡就是生产者序号;
 
                // 如果这个事件处理器依赖于1个或多个事件处理器,那么这个前置关卡就是这些前置事件处理器中最慢的一个。
 
                // 通过这样,可以确保事件处理器不会超前处理地事件。
 
                final long availableSequence = sequenceBarrier.waitFor(nextSequence);



                // 处理一批事件
 
                while (nextSequence <= availableSequence) {
 
                    event = dataProvider.get(nextSequence);
 
                    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
 
                    nextSequence++;
 
                }



                // 设置它自己最后处理的事件序号,这样依赖于它的处理器可以它处理刚处理过的事件。
 
                sequence.set(availableSequence);
 
            } catch (final TimeoutException e) {
 
                // 获取事件序号超时处理
 
                notifyTimeout( sequence.get());



            } catch (final AlertException ex) {
 
                // 处理通知事件;检测是否要停止,如果非则继续处理事件
 
                if (!running .get()) {
 
                    break;
 
                }
 
            } catch (final Throwable ex) {
 
                // 其他异常,用事件处理器处理;然后继续处理下一个事件
 
                exceptionHandler.handleEventException(ex, nextSequence, event);
 
                sequence.set(nextSequence);
 
                nextSequence++;
 
            }
 
        }
 
    } finally {
 
        // 声明周期通知,停止事件回调;复位运行状态标志,确保可以再次运行此方法。
 
        notifyShutdown();
 
        running.set(false );
 
    }
}
}

 从 while 循环可以看出,事件处理可以分为三步:

从 SequenceBarrier 获取获取可以处理的最大事件序号;

循环处理可处理事件;

更新自身的已处理的事件序号,让依赖自身的事件处理器可以继续处理。

sequence .set 和 sequence .get 方法都是原子性地读取、更新序号的,这样就避免了加锁,从而提供性能。

sequenceBarrier .waitFor 最终也会调用 sequence .get 方法。

SequenceBarrier

协作式关卡,用于跟踪生产者游标和依赖的事件处理器的序号的数据结构。有两个实现DummySequenceBarrier 和 ProcessingSequenceBarrier,类如其名,前者是虚拟的,只有空方法;后者是实用的。

SequenceBarrier接口定义

public interface SequenceBarrier {
 
          // 等待指定的序号变得可消费
 
     long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException;



          // 返回当前可读的游标(一个序号)
 
     long getCursor();



         // 当前是否有通知状态给此关卡
 
     boolean isAlerted();



     // 通知事件处理器状态发生改变,并保持这个状态直到被清除
 
     void alert();



     // 清除当前通知状态
 
     void clearAlert();



     // 检查通知状态,如果有异常则抛出
 
     void checkAlert() throws AlertException;
}

 ProcessingSequenceBarrier

生成ProcessingSequenceBarrier的实例是由框架控制的,首先在Disruptor类的createEventProcessors方法内:

final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);    
...

// RingBufferd 的newBarrier方法:
public SequenceBarrier newBarrier(Sequence... sequencesToTrack) {
 
    return sequencer.newBarrier(sequencesToTrack);     // 是通过生产者序号控制器生成的。
}

...

// AbstractSequencer的newBarrier方法。
public SequenceBarrier newBarrier(Sequence... sequencesToTrack) {
 
    return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}

...
/**
 
 * @param sequencer 生产者序号控制器
 
 * @param waitStrategy 等待策略
 
 * @param cursorSequence 生产者序号
 
 * @param dependentSequences 依赖的Sequence
 
 */
p
public ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,
 
                                 final Sequence cursorSequence, final Sequence[] dependentSequences) {
 
    this. sequencer = sequencer;
 
    this. waitStrategy = waitStrategy;
 
    this. cursorSequence = cursorSequence;



    // 如果事件处理器不依赖于任何前置处理器,那么dependentSequence也指向生产者的序号。
 
    if (0 == dependentSequences. length) {
 
        dependentSequence = cursorSequence;
 
    } else {     // 如果有多个前置处理器,则对其进行封装,实现了组合模式。
 
        dependentSequence = new FixedSequenceGroup(dependentSequences);
 
    }
}

获取序号的方法

/**
 
 * 该方法不保证总是返回未处理的序号;如果有更多的可处理序号时,返回的序号也可能是超过指定序号的。
 
 */
public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException {
 
    // 首先检查有无通知
 
    checkAlert();



    // 通过等待策略来获取可处理事件序号,
 
    long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence , this);



    // 这个方法不保证总是返回可处理的序号
 
        return availableSequence;
 
    if (availableSequence < sequence) {
 
    }



    // 再通过生产者序号控制器返回最大的可处理序号
 
    return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
}

 

 WaitStrategy:

BlockingWaitStrategy:没有可消费事件时阻塞等待生产者唤醒。

BusySpinWaitStrategy:忙等策略。

TimeoutBlockingWaitStrategy:阻塞等待策略

YieldingWaitStrategy:通过调用Thread.yield方法来让出CPU,达到等待的目的,等待时长没保证,取决于线程的调度系统。

 

 

分享到:
评论

相关推荐

    Disruptor阅读笔记.md.pdf

    尽管Disruptor提供了许多优秀的设计理念和性能优化手段,但其复杂的实现也带来了学习成本。开发者在使用Disruptor时需要深入理解其机制,才能充分利用其优势。此外,Disruptor的实现细节,包括如何在多线程环境中...

    一个基于Java的开源游戏服务器框架实现,使用了Netty、ProtoBuf、Disruptor等.zip

    学习笔记:整理了Java语言在游戏开发中的核心知识点和常用技术,方便你随时查阅和学习。 适用人群: 这份资源包适用于所有对Java游戏开发感兴趣的朋友,无论你是计算机专业的学生,还是希望业余时间尝试游戏开发的...

    java并发框架源码-notes:记录各种学习笔记(Java、算法、框架、数据库、并发、源码...)

    分析如`Akka`、`Quasar`或`Disruptor`等并发框架的源码,可以深入理解如何在Java中构建高效的并发系统,学习其设计思想和最佳实践。 通过阅读和研究这些源码,开发者不仅可以提升对Java并发编程的理解,还能学习到...

    基于Java语言的并发编程核心设计与实践源码分析

    此外,对于并发编程的学习和实践,通常需要借助一些设计工具来帮助理解,例如xmind笔记。Xmind是一种思维导图工具,它可以帮助开发者以图形化的方式梳理并发程序的结构和逻辑。通过这样的工具,程序员可以更好地组织...

    ### 2024年第一季度青岛房地产市场季度简报总结、市场综述

    2024年第一季度,青岛房地产市场经历了显著变化,总体呈现供需双降的趋势。一季度全市商品房新增10,721套,面积约152.04万平方米,同比下降29%;销量为14,936套,面积约200.85万平方米,同比下降38%,成交均价为14,204元/平方米,同比下降2%。土地市场方面,供应总量为39万平方米,同比减少7%,但成交面积为27万平方米,同比增长31%,楼面地价为6,625元/平方米,同比增长253%,土地出让金为17.61亿元,同比增长354%。二手房市场新增挂牌2.9万套,成交13,405套,132.21万平方米,累计挂牌51.70万套,挂牌均价17,800元/平方米。此外,青岛市出台多项政策支持房地产市场平稳健康发展,包括降低房贷利率、优化开发用地土地规划政策、支持房企融资等。这些政策旨在促进市场供需平衡,防止市场大起大落。

    Linux常用命令大全.markdown

    linux常用命令大全

    MATLAB代码,用于模拟具有无限半空间体积导体的电机单元电势(MUP),星号.rar

    1.版本:matlab2014/2019a/2024a 2.附赠案例数据可直接运行matlab程序。 3.代码特点:参数化编程、参数可方便更改、代码编程思路清晰、注释明细。 4.适用对象:计算机,电子信息工程、数学等专业的大学生课程设计、期末大作业和毕业设计。

    空调销售网站策划案例.doc

    空调销售网站策划案例.doc

    全球6G技术大会2024年以用户为中心的6G接入网技术研究白皮书31页.pdf

    全球6G技术大会2024年以用户为中心的6G接入网技术研究白皮书31页.pdf

    简约专业风格毕业答辩模板47个

    简约专业风格毕业答辩模板是一系列专为追求简洁与高效表达的大学生设计的答辩文档模板,共47个。这些模板融合了经典的设计元素与现代审美,强调信息的清晰传递与视觉的整洁,旨在帮助学生在答辩中以最专业的面貌展示自己的研究成果。 每个模板都具备结构合理的布局,适用于各个学科和研究领域,从人文社科到自然科学,均能满足不同需求。简约风格的设计使得学生能够专注于内容本身,避免冗余信息的干扰,提升答辩的专业性和可信度。此外,模板中合理运用的色彩、字体和图表设计,不仅增强了视觉吸引力,也使信息更易于理解。 通过使用这些简约专业风格的毕业答辩模板,毕业生能够自信地呈现自己的学术成果,提升答辩的整体效果,为成功的学术交流打下坚实基础。这些模板是展示个人研究与风格的理想选择。

    【数据集和模型】ChatGPT文本二分类

    由 Epsilon Luoo 在 HC3-Chinese 的基础上进行了一些细微的修改和清洗

    数字人动作捕捉:MATLAB-Kinect骨骼数据实时插值算法.pdf

    文档支持目录章节跳转同时还支持阅读器左侧大纲显示和章节快速定位,文档内容完整、条理清晰。文档内所有文字、图表、函数、目录等元素均显示正常,无任何异常情况,敬请您放心查阅与使用。文档仅供学习参考,请勿用作商业用途。 你是否渴望高效解决复杂的数学计算、数据分析难题?MATLAB 就是你的得力助手!作为一款强大的技术计算软件,MATLAB 集数值分析、矩阵运算、信号处理等多功能于一身,广泛应用于工程、科学研究等众多领域。 其简洁直观的编程环境,让代码编写如同行云流水。丰富的函数库和工具箱,为你节省大量时间和精力。无论是新手入门,还是资深专家,都能借助 MATLAB 挖掘数据背后的价值,创新科技成果。别再犹豫,拥抱 MATLAB,开启你的科技探索之旅!

    HI3519DV500 配置无线网依赖库以及编译脚本

    HI3519DV500 配置无线网依赖库以及编译脚本

    定制小米8-lineage22.1安卓15-fast功能项目线刷双版root 解锁bl后fast线刷

    资源说明; 1-----刷写前提是手机必须解锁bl先。而且会在fast模式刷写固件 2-----刷写方法与官方刷写步骤一样 3-----此固件为定制初始固件。可以在fast模式刷写 4-----属于适配固件。也许有个别bug。不接受请勿下载 5-----需要一定的刷机常识与动手能力的友友刷写。 6-----资源有可复制性。下载后不支持退。请知悉 7-----定制其他需求可以在csdn私信博主 博文参阅:https://csdn9.blog.csdn.net/article/details/143058308

    【机械臂路径规划】基于matlab快速探索随机树RRT和概率路网PRM串联机械臂路径规划【含Matlab源码 13167期】.zip

    Matlab领域上传的视频是由对应的完整代码运行得来的,完整代码皆可运行,亲测可用,适合小白; 1、从视频里可见完整代码的内容 主函数:main.m; 调用函数:其他m文件;无需运行 运行结果效果图; 2、代码运行版本 Matlab 2019b;若运行有误,根据提示修改;若不会,私信博主; 3、运行操作步骤 步骤一:将所有文件放到Matlab的当前文件夹中; 步骤二:双击打开main.m文件; 步骤三:点击运行,等程序运行完得到结果; 4、仿真咨询 如需其他服务,可私信博主; 4.1 博客或资源的完整代码提供 4.2 期刊或参考文献复现 4.3 Matlab程序定制 4.4 科研合作

    世邦魏理仕:2021年西安房地产市场回顾与2022年展望.pdf

    世邦魏理仕:2021年西安房地产市场回顾与2022年展望

    Android Studio 2022.1.1和java编程语言yinyuebofangqi.zip

    Android Studio 2022.1.1和java编程语言yinyuebofangqi

    C知道对话分享图片下载

    C知道对话分享图片

    png-jpg-gif-webp-tiff等图片压缩工具基于nodejs的实现

    png-jpg-gif-webp-tiff等图片压缩工具基于nodejs的实现,绿色本地免安装,解压后运行exe文件,将图片文件或者包含图片的文件夹拖拽到软件界面即可压缩

    派对屋A1效果器电脑调音软件

    我们要了解什么是DSP(Digital Signal Processing)。DSP即数字信号处理,是一种利用数字计算方法对信号进行分析、变换和操作的技术。在汽车音响领域,DSP被广泛应用于改善音质,通过调整频率响应、延时、相位和增益等参数,使声音更加均衡、立体。 惠威是一款数字信号处理器,适用于那些希望升级原车音响系统但预算有限的用户。它通常拥有多个输入和输出接口,可以连接到汽车的音频源和扬声器,通过软件进行调音,使得声音能够适应不同的驾驶环境和听音偏好。 ,集成了先进的噪声抑制技术和强大的功率放大器,旨在为发烧友级别的车载音响系统提供卓越的性能。用户可以通过软件对整个系统的每一个细节进行优化,包括主动分频、时间校正等,以达到Hi-Fi级别的音乐享受。

Global site tag (gtag.js) - Google Analytics