Disruptor 是什么
Disruptor 是一个高性能异步处理框架,也可以认为是一个消息框架,它实现了观察者模式。
Disruptor 比传统的基于锁的消息框架的优势在于:它是无锁的、CPU友好;它不会清除缓存中的数据,只会覆盖,降低了垃圾回收机制启动的频率。
Disruptor 为什么快
1. 不使用锁。通过内存屏障和原子性的CAS操作替换锁。
2. 缓存基于数组而不是链表,用位运算替代求模。缓存的长度总是2的n次方,这样可以用位运算 i & (length - 1) 替代 i % length。
3. 去除伪共享。CPU的缓存一般是以缓存行为最小单位的,对应主存的一块相应大小的单元;当前的缓存行大小一般是64字节,每个缓存行一次只能被一个CPU核访问,如果一个缓存行被多个CPU核访问,就会造成竞争,导致某个核必须等其他核处理完了才能继续处理,响应性能。去除伪共享就是确保CPU核访问某个缓存行时不会出现争用。
4.预分配缓存对象,通过更新缓存里对象的属性而不是删除对象来减少垃圾回收。
核心类和接口
EventHandler:用户提供具体的实现,在里面实现事件的处理逻辑。
Sequence:代表事件序号或一个指向缓存某个位置的序号。
WaitStrategy:功能包括:当没有可消费的事件时,根据特定的实现进行等待,有可消费事件时返回可事件序号;有新事件发布时通知等待的 SequenceBarrier。
Sequencer:生产者用于访问缓存的控制器,它持有消费者序号的引用;新事件发布后通过WaitStrategy 通知正在等待的SequenceBarrier。
SequenceBarrier:消费者关卡。消费者用于访问缓存的控制器,每个访问控制器还持有前置访问控制器的引用,用于维持正确的事件处理顺序;通过WaitStrategy获取可消费事件序号。
EventProcessor:事件处理器,是可执行单元,运行在指定的Executor里;它会不断地通过SequenceBarrier获取可消费事件,当有可消费事件时调用用户提供的 EventHandler实现处理事件。
EventTranslator:事件转换器,由于Disruptor只会覆盖缓存,需要通过此接口的实现来更新缓存里的事件来覆盖旧事件。
RingBuffer:基于数组的缓存实现,它内部持有对Executor、WaitStrategy、生产者和消费者访问控制器的引用。
Disruptor:提供了对 RingBuffer 的封装,并提供了一些DSL风格的方法,方便使用。
实现
Sequence 类
Sequence类表示一个序号,是对long型字段的线程安全的封装,用于跟踪ringBuffer的进度和事件处理器的进度。支持一些并发操作,包括CAS和有序写。尝试在volatile字段周围填充内容来避免伪共享,变得更高效。
public class Sequence { static final long INITIAL_VALUE = -1L; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET; static { UNSAFE = Util.getUnsafe(); final int base = UNSAFE.arrayBaseOffset( long[].class ); final int scale = UNSAFE.arrayIndexScale( long[].class ); VALUE_OFFSET = base + (scale * 7); } // 15个元素,从0开始,有效值处于第7个,这样前后各有7个long字段填充, // 8个long型占共用64字节,而当前CPU的缓存行大小也是64字节,这样可以避免对Sequence的读写出现伪共享。 private final long [] paddedValue = new long [15]; // 原子地读 public long get() { return UNSAFE .getLongVolatile(paddedValue, VALUE_OFFSET); } // 原子地写 public void set(final long value) { UNSAFE.putOrderedLong(paddedValue , VALUE_OFFSET, value); } // CAS public boolean compareAndSet(final long expectedValue, final long newValue) { return UNSAFE .compareAndSwapLong(paddedValue, VALUE_OFFSET, expectedValue, newValue); } public long addAndGet(final long increment) { long currentValue; long newValue; do { currentValue = get(); newValue = currentValue + increment; } while (!compareAndSet(currentValue, newValue)); return newValue; }
Disruptor类
Disruptor类是这个类库的门面,用DSL的形式直观地提供了组装事件回调处理的关系链的功能,并提供获取事件、发布事件的方法,缓存容器生命周期管理。
属性
private final RingBuffer ringBuffer ; // 核心,绝大多数功能都委托给ringBuffer处理 private final Executor executor ; // 用于执行事件处理器的线程池 private final ConsumerRepository consumerRepository = new ConsumerRepository(); // 事件处理器仓库,就是事件处理器的集合 private final AtomicBoolean started = new AtomicBoolean( false); // 启动时检查,只能启动一次 private ExceptionHandler exceptionHandler; // 异常处理器
设置EventHandler事件处理器
/* * barrierSequences是eventHandlers的前置事件处理关卡,是用来保证事件处理的时序性的关键; * */ EventHandlerGroup createEventProcessors( final Sequence[] barrierSequences, final EventHandler[] eventHandlers) { checkNotStarted(); // 确保在容器启动前设置 final Sequence[] processorSequences = new Sequence[eventHandlers.length ]; // 存放游标的数组 final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); // 获取前置的序号关卡 for ( int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) { final EventHandler eventHandler = eventHandlers[i]; // 封装为批量事件处理器BatchEventProcessor,其实现了Runnable接口,所以可以放到executor去执行处理逻辑;处理器还会自动建立一个序号Sequence。 final BatchEventProcessor batchEventProcessor = new BatchEventProcessor(ringBuffer , barrier, eventHandler); if (exceptionHandler != null) { // 如果有则设置异常处理器 batchEventProcessor.setExceptionHandler( exceptionHandler); } // 添加到消费者仓库,会先封装为EventProcessorInfo对象(表示事件处理的一个阶段), consumerRepository.add(batchEventProcessor, eventHandler, barrier); processorSequences[i] = batchEventProcessor.getSequence(); } if (processorSequences. length > 0) {// 如果有前置关卡,则取消之前的前置关卡对应的EventProcessor 的 链的终点标记。 consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences); } // EventHandlerGroup是一组EventProcessor,作为disruptor的一部分,提供DSL形式的方法,作为方法链的起点,用于设置事件处理器。 return new EventHandlerGroup(this, consumerRepository, processorSequences); } }
RingBuffer 类
// 属性的初始化声明 public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE ; private final int indexMask ; private final Object[] entries ; private final int bufferSize ; private final Sequencer sequencer ; // 属性的初始化代码 RingBuffer(EventFactory eventFactory, Sequencer sequencer) { this.sequencer = sequencer; this.bufferSize = sequencer.getBufferSize(); if (bufferSize < 1) { throw new IllegalArgumentException("bufferSize must not be less than 1"); } if (Integer.bitCount( bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } this.indexMask = bufferSize - 1; this.entries = new Object[sequencer.getBufferSize()]; fill(eventFactory); }
从以上这些代码可以看出:
RingBuffer是基于数组构建的,因为数组是缓存友好的,相邻的元素一般处于同一个缓存块。
缓存的大小必须是2的X次方,这是为了用位运算提高性能;由于数组缓存的容量总是有限,当缓存填满后,又要从 下标0 开始填充,如果缓存大小不是2的X次方,那只能用求模运算来获得新下标,所以还有个indexMask 来保存下标掩码;通过与indexMask 进行按位与可以得到一个安全的下标,不再需要进行下标检查,如:(E)entries[( int)sequence & indexMask ]。
从缓存获取事件
// 从缓存获取指定序号的事件 public E get(long sequence) { // 这个按位与操作说明了为什么ringBuffer的大小必须是2的n次方:用高效的 按位与 代替 低效的求模操作。 return (E) entries[(int ) sequence & indexMask]; } }
发布事件
发布事件有3步:获取新事件的序号,覆盖旧事件,通知等待着。最简单的发布事件形式:
public void publishEvent(EventTranslator translator) { final long sequence = sequencer .next(); // 通过生产者序号控制器获取可用序号 translateAndPublish(translator, sequence); // 转换事件到队列缓存并发布事件 } } private void translateAndPublish(EventTranslator translator, long sequence) { try { // 发布事件前要先获取对应位置上的旧事件,再用translator把新事件的属性转换到旧事件的属性,从而达到发布的目的。 // 这就是说,Disruptor对于已消费的事件是不删除的,有新事件时只是用新事件的属性去替换旧事件的属性。 // 这带来的一个问题就是内存占用 translator.translateTo(get(sequence), sequence); } finally { sequencer.publish(sequence); // 原子性地更新生产者的序号,并通知在等待的消费者关卡。 } } }
生产者序号控制器与消费者关卡是共用同一个等待策略的,一个Disruptor容器只有一个等待策略实例。
EventProcessor
事件处理器的执行单元。有两个实现:NoOpEventProcessor 和 BatchEventProcessor,其中NoOpEventProcessor 是不处理事件的,就不关注了。
BatchEventProcessor
Disruptor提供的唯一有用的 EventProcessor 实现类。
Disruptor容器启动时,会调用 ConsumerInfo 的 start方法,如果 ConsumerInfo 封装的是用户提交的EventHandler 实例,那么会在线程池里运行 EventProcessor,也就是 BatchEventProcessor 实例的 run方法。
public void run() { // 确保一次只有一个线程执行此方法,这样访问自身的序号就不要加锁 if (! running.compareAndSet(false, true)) { throw new IllegalStateException("Thread is already running"); } sequenceBarrier.clearAlert(); // 清除前置序号关卡的通知状态 notifyStart(); // 声明周期通知,开始前回调 T event = null; long nextSequence = sequence.get() + 1L; // sequence指向上一个已处理的事件,默认是-1 try { while (true ) { try { // 从它的前置序号关卡获取下一个可处理的事件序号。 // 如果这个事件处理器不依赖于其他的事件处理器,则前置关卡就是生产者序号; // 如果这个事件处理器依赖于1个或多个事件处理器,那么这个前置关卡就是这些前置事件处理器中最慢的一个。 // 通过这样,可以确保事件处理器不会超前处理地事件。 final long availableSequence = sequenceBarrier.waitFor(nextSequence); // 处理一批事件 while (nextSequence <= availableSequence) { event = dataProvider.get(nextSequence); eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); nextSequence++; } // 设置它自己最后处理的事件序号,这样依赖于它的处理器可以它处理刚处理过的事件。 sequence.set(availableSequence); } catch (final TimeoutException e) { // 获取事件序号超时处理 notifyTimeout( sequence.get()); } catch (final AlertException ex) { // 处理通知事件;检测是否要停止,如果非则继续处理事件 if (!running .get()) { break; } } catch (final Throwable ex) { // 其他异常,用事件处理器处理;然后继续处理下一个事件 exceptionHandler.handleEventException(ex, nextSequence, event); sequence.set(nextSequence); nextSequence++; } } } finally { // 声明周期通知,停止事件回调;复位运行状态标志,确保可以再次运行此方法。 notifyShutdown(); running.set(false ); } } }
从 while 循环可以看出,事件处理可以分为三步:
从 SequenceBarrier 获取获取可以处理的最大事件序号;
循环处理可处理事件;
更新自身的已处理的事件序号,让依赖自身的事件处理器可以继续处理。
sequence .set 和 sequence .get 方法都是原子性地读取、更新序号的,这样就避免了加锁,从而提供性能。
sequenceBarrier .waitFor 最终也会调用 sequence .get 方法。
SequenceBarrier
协作式关卡,用于跟踪生产者游标和依赖的事件处理器的序号的数据结构。有两个实现DummySequenceBarrier 和 ProcessingSequenceBarrier,类如其名,前者是虚拟的,只有空方法;后者是实用的。
SequenceBarrier接口定义
public interface SequenceBarrier { // 等待指定的序号变得可消费 long waitFor(long sequence) throws AlertException, InterruptedException, TimeoutException; // 返回当前可读的游标(一个序号) long getCursor(); // 当前是否有通知状态给此关卡 boolean isAlerted(); // 通知事件处理器状态发生改变,并保持这个状态直到被清除 void alert(); // 清除当前通知状态 void clearAlert(); // 检查通知状态,如果有异常则抛出 void checkAlert() throws AlertException; }
ProcessingSequenceBarrier
生成ProcessingSequenceBarrier的实例是由框架控制的,首先在Disruptor类的createEventProcessors方法内:
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences); ... // RingBufferd 的newBarrier方法: public SequenceBarrier newBarrier(Sequence... sequencesToTrack) { return sequencer.newBarrier(sequencesToTrack); // 是通过生产者序号控制器生成的。 } ... // AbstractSequencer的newBarrier方法。 public SequenceBarrier newBarrier(Sequence... sequencesToTrack) { return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack); } ... /** * @param sequencer 生产者序号控制器 * @param waitStrategy 等待策略 * @param cursorSequence 生产者序号 * @param dependentSequences 依赖的Sequence */ p public ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { this. sequencer = sequencer; this. waitStrategy = waitStrategy; this. cursorSequence = cursorSequence; // 如果事件处理器不依赖于任何前置处理器,那么dependentSequence也指向生产者的序号。 if (0 == dependentSequences. length) { dependentSequence = cursorSequence; } else { // 如果有多个前置处理器,则对其进行封装,实现了组合模式。 dependentSequence = new FixedSequenceGroup(dependentSequences); } }
获取序号的方法
/** * 该方法不保证总是返回未处理的序号;如果有更多的可处理序号时,返回的序号也可能是超过指定序号的。 */ public long waitFor(final long sequence) throws AlertException, InterruptedException, TimeoutException { // 首先检查有无通知 checkAlert(); // 通过等待策略来获取可处理事件序号, long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence , this); // 这个方法不保证总是返回可处理的序号 return availableSequence; if (availableSequence < sequence) { } // 再通过生产者序号控制器返回最大的可处理序号 return sequencer.getHighestPublishedSequence(sequence, availableSequence); } }
WaitStrategy:
BlockingWaitStrategy:没有可消费事件时阻塞等待生产者唤醒。
BusySpinWaitStrategy:忙等策略。
TimeoutBlockingWaitStrategy:阻塞等待策略
YieldingWaitStrategy:通过调用Thread.yield方法来让出CPU,达到等待的目的,等待时长没保证,取决于线程的调度系统。
相关推荐
Disruptor阅读笔记.md
《Disruptor学习(1)——深入理解高性能并发框架》 Disruptor是英国LMAX公司开发的一个开源并发框架,其设计目标是为了提供极低延迟的并发处理能力。在金融交易系统等领域,对于高频率、低延迟的需求尤为重要,...
四、Disruptor学习路径 1. **理解基础概念**:首先,需要理解Disruptor的基本原理,包括环形缓冲区、事件处理、序列号和屏障等核心概念。 2. **实战演练**:通过提供的示例代码,了解如何创建和配置Disruptor,...
### Disruptor并发框架知识点详解 #### 一、Disruptor简介及原理 **Disruptor** 是一款高性能、低延迟的并发框架,它通过无锁设计实现了高效的队列操作,从而大大提升了多线程环境下的性能表现。该框架于2011年...
5. 学习如何在SpringBoot中管理和注入Disruptor实例,以及如何在业务代码中使用。 6. 调优Disruptor的配置,如调整环形缓冲区大小、选择合适的序列化策略等,以适应不同并发场景的需求。 通过上述步骤,开发者可以...
《Spring Boot Starter Disruptor深度解析》 在现代软件开发中,高性能和低延迟往往是系统设计的关键要素。Spring Boot作为Java领域最受欢迎的微服务框架,提供了丰富的启动器(starters)来简化开发工作。"spring-...
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
"DisruptorDemo.zip"的实例代码为我们提供了学习和理解Disruptor的一个良好起点,通过实际操作,我们可以更直观地感受Disruptor的强大性能。在实际项目中,尤其是对于需要处理大量并发请求的系统,Disruptor是一个...
通过这个"Disruptor demo",你可以学习到如何在实际项目中应用Disruptor来优化并发性能,以及如何配置和调整各种参数以适应不同的系统需求。这个示例将帮助你理解Disruptor如何简化并发编程,提高程序运行效率,尤其...
《Disruptor 框架详解与应用实例》 ...总的来说,这个压缩包提供了一个了解和学习 Disruptor 的良好起点,通过阅读源码、示例代码和 API 文档,我们可以深入了解这个框架如何帮助我们构建高并发、低延迟的应用系统。
Disruptor是由LMAX公司开发的一种高性能的并发编程框架,主要应用于金融交易系统。它通过优化数据共享方式,显著提高了多线程环境下的处理速度。在"Disruptor 多个消费者"的场景中,我们可以深入理解Disruptor如何...
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
Disruptor是由LMAX公司开发的一种高性能的并发编程框架,主要应用于金融交易系统。它以其高效、低延迟的事件处理机制而闻名。在C++版本的Disruptor中,我们同样可以享受到这种高效的并发能力,尤其适用于需要大量...
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
- `disruptor-3.0.1-sources.jar`:提供源代码,帮助开发者理解内部实现,方便调试和学习。 - `disruptor-3.0.1-javadoc.jar`:包含API文档,指导开发者如何正确使用Disruptor的类和方法。 4. **使用Disruptor的...
《Disruptor应用实例》 Disruptor是高性能并发编程领域的一个重要工具...在《DisruptorStudy》这个压缩包文件中,包含了对Disruptor的详细学习资料,包括源码分析和实践案例,帮助你更全面地掌握这一强大的并发工具。
学习Disruptor的源码"study-disruptor"可以帮助我们深入理解其内部机制,例如如何实现无锁操作,如何优化内存访问,以及如何通过事件处理器链来并行处理事件。通过对这些概念和技术的理解,我们可以更好地利用...