Disruptor是用于在多个线程之间通信的高性能低延时的内存消息队列。其实现了“队列”的功能,而且是一个有界队列。它适合“生产者-消费者”模型的应用场合。
Disruptor是一个高性能的异步处理框架,也可以认为是一个观察者模式实现,或者事件-监听模式的实现。
它类似于ArrayBlockingQueue队列类。
高性能原理:
引入环形的数组结构:缓存基于数组,用位运算替代求模。缓存的长度总是2的n次方,这样可以用位运算 i & (length - 1) 替代 i % length。预分配缓存对象,通过更新缓存里对象的属性而不是删除对象来减少垃圾回收。
无锁的设计:采用内存屏障机制和原子性的CAS操作实现无锁。用内存屏障加序列号的方式实现了无锁的并发机制。
属性填充:通过添加额外的无用信息,避免伪共享问题。
元素位置的定位:采用跟一致性哈希一样的方式,一个索引,进行自增。
伪共享:
CPU的缓存一般是以缓存行为最小单位的,对应主存的一块相应大小的单元;当前的缓存行大小一般是64字节,每个缓存行一次只能被一个CPU核访问,如果一个缓存行被多个CPU核访问,就会造成竞争,导致某个核必须等其他核处理完了才能继续处理,响应性能。
适用场景:
适用于大规模低延迟的并发场景。
可用于读写操作分离、数据缓存,速度匹配(因为其实现了生产者-消费者模型)。
基于内存的事件流处理机制的场景。
生产者-消费者模型使用场景:
一个生产者 — 一个消费者的场景
一个生产者 — 多个消费者的场景
多个生产者 — 一个消费者的场景
多个生产者 — 多个消费者的场景
生产者 - 消费流,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图)。
核心概念:
Ring Buffer:环形的缓冲区。负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。
Sequence:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个Sequence用于跟踪标识某个特定的事件处理者(RingBuffer/Consumer)的处理进度。
Sequencer:是Disruptor的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
Sequence Barrier:用于保持对RingBuffer的main published Sequence和Consumer依赖其它Consumer的Sequence的引用。
WaitStrategy:决定了一个消费者将如何等待生产者将Event置入Disruptor。
Event:生产者和消费者之间进行交换的数据被称为事件(Event)。
EventProcessor:持有特定消费者(Consumer)的Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
EventHandler:事件处理接口,由用户实现。
Producer:生产者,由用户实现
常用的等待策略:用于抽象Consumer如何等待新事件
BlockingWaitStrategy
内部使用一个锁和条件变量来控制线程的执行和等待。
是最慢的等待策略,也是CPU使用率最低和最稳定的选项。
SleepingWaitStrategy
循环等待并且在循环中间调用LockSupport.parkNanos(1)来睡眠。
优点:生产线程只需要计数,而不执行任何指令,且没有条件变量的消耗。
缺点:事件对象从生产者到消费者传递的延迟变大了。
应用场景:用在不需要低延迟,且事件发布对于生产者的影响比较小的情况下。比如异步日志功能。
YieldingWaitStrategy
循环等待sequence增加到合适的值,循环中调用Thread.yield()允许其他准备好的线程执行。
该策略在减低系统延迟的同时也会增加CPU运算量。
应用场景:需要高性能而且事件消费者线程比逻辑内核少的场景。
BusySpinWaitStrategy
是性能最高的等待策略,也是对部署环境要求最高的策略。
应用场景:用在事件处理线程比物理内核数目还要小的场景。
消息 事件类类:
public class MessageEvent<T>{ private T message; public T getMessage() { return message; } public void setMessage(T message) { this.message = message; } }
消息 事件工厂类:
public class MessageEventFactory<T> implements EventFactory<MessageEvent<T>>{ @Override public MessageEvent<T> newInstance() { return new MessageEvent<T>(); } }
异常处理类:
public class MessageExceptionHandler<T> implements ExceptionHandler<MessageEvent<T>> { @Override public void handleEventException(Throwable ex, long sequence, MessageEvent<T> event) { System.out.println("handleEventException: " + ex.toString()); } @Override public void handleOnStartException(Throwable ex) { System.out.println("handleOnStartException: " + ex.toString()); } @Override public void handleOnShutdownException(Throwable ex) { System.out.println("handleOnShutdownException: " + ex.toString()); } }
广播方式的消费者抽象类:
public abstract class AbstractBroadcastConsumer<T> implements EventHandler<MessageEvent<T>>{ @Override public void onEvent(MessageEvent<T> event, long sequence, boolean endOfBatch) throws Exception { handleEvent(event, sequence, endOfBatch); } public abstract void handleEvent(MessageEvent<T> event, long sequence, boolean endOfBatch) throws Exception; }
分组方式的消费者抽象类:
public abstract class AbstractGroupingConsumer<T> implements WorkHandler<MessageEvent<T>>{ @Override public void onEvent(MessageEvent<T> event) throws Exception { handleEvent(event); } public abstract void handleEvent(MessageEvent<T> event) throws Exception; }
消息生产和消费:
public class DisruptorTest { public static void main(String[] args) { int ringBufferSize = Double.valueOf(Math.pow(2, 15)).intValue(); //必须是2的N次方 System.out.println("ringBufferSize=" + ringBufferSize); //ProducerType.SINGLE 单生产者 Disruptor<MessageEvent<String>> disruptor = new Disruptor<>(new MessageEventFactory<String>(), ringBufferSize, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //## 多消费者 ## //广播:同一个Event被所有消费者消费 EventHandler<MessageEvent<String>> handler = new DefaultBroadcastConsumer(); disruptor.handleEventsWith(handler, handler, handler); //无依赖关系 // disruptor.handleEventsWith(handler, handler).then(handler); //有依赖关系 // disruptor.after(handler).handleEventsWith(handler); //处理类有先后顺序 //分组:一个Event只能被组内的一个消费者消费 WorkHandler<MessageEvent<String>> handler2 = new DefaultGroupingConsumer(); disruptor.handleEventsWithWorkerPool(handler2, handler2, handler2); disruptor.setDefaultExceptionHandler(new MessageExceptionHandler<String>()); RingBuffer<MessageEvent<String>> ringBuffer = disruptor.start(); while(true){ publishEvent(ringBuffer, String.valueOf(++count)); } disruptor.shutdown(); } /** * 发布消息 */ private static void publishEvent(RingBuffer<MessageEvent<String>> ringBuffer, String message){ ringBuffer.publishEvent((event, sequence, messageContent) -> { event.setMessage(messageContent); }, message); } /** * 广播消费者 */ public static class DefaultBroadcastConsumer extends AbstractBroadcastConsumer<String>{ @Override public void handleEvent(MessageEvent<String> event, long sequence, boolean endOfBatch) throws Exception { System.out.println(Thread.currentThread().getName() + ": " + event.getMessage()); } } /** * 分组消费者 */ public static class DefaultGroupingConsumer extends AbstractGroupingConsumer<String>{ @Override public void handleEvent(MessageEvent<String> event) throws Exception { System.out.println(Thread.currentThread().getName() + ": " + event.getMessage()); } } }
相关推荐
Disruptor是一种高性能的并发数据交换框架,由Martin Thompson、Dave Farley、Michael Barker、Patricia Gee和Andrew Stewart共同开发,主要用于替代传统的有界队列。这个框架的诞生源于LMAX公司在构建高性能金融...
它的设计目标是替代传统的有界队列,提供更高的吞吐量和更低的延迟。Disruptor 的核心在于其创新的数据交换机制,包括使用栅栏(barrier)和序号(Sequencing)来协调生产者和消费者,避免了锁和CAS操作带来的开销。...
Java工具:高性能并发工具Disruptor简单使用 在Java编程中,高效并发处理是优化系统性能的关键之一。Disruptor,由LMAX公司开源的一款并发框架,为处理高并发场景提供了一种新颖且高效的解决方案。它通过消除锁和...
本文将深入探讨CPU缓存架构及其对系统性能的影响,并详细解析高性能内存队列——Disruptor的实际应用。 首先,让我们来了解一下CPU缓存。CPU缓存是计算机硬件中的一个关键组成部分,它位于CPU和主内存之间,用于...
Disruptor,由英国LMAX公司开源的一款高性能、低延迟的消息处理框架,它彻底改变了并发编程的传统模式,尤其在金融交易领域,其性能表现卓越,被誉为“游戏规则的改变者”。本文将深入探讨Disruptor的核心原理,并...
**Disruptor** 是一款高性能、低延迟的并发框架,它通过无锁设计实现了高效的队列操作,从而大大提升了多线程环境下的性能表现。该框架于2011年获得了Duke's Choice Award,这充分证明了其在并发处理领域的创新性和...
显然,为了实现这一目标,我们需要做一些特殊的事情,以通过我们的Java平台实现极低的延迟和高吞吐量。 性能测试表明,使用队列在系统各阶段之间传递数据会引入延迟,因此我们专注于优化该区域。 Disruptor是我们...
Disruptor,由LMAX公司开发并开源,是一个高性能、低延迟的并发框架,尤其适用于需要大量数据交换的系统。本文将详细介绍Disruptor的核心原理、设计模式及其在实际中的应用。 Disruptor的诞生源于LMAX对金融交易...
1. 高性能交易系统:Disruptor的低延迟特性使其成为金融交易系统中的理想选择。 2. 实时数据分析:在大数据实时处理中,Disruptor可以高效地传递和处理数据流。 3. 消息队列中间件:Disruptor可作为消息中间件的一...
LMAX Disruptor是由LMAX公司开发的一款高性能、低延迟的消息传递框架,最初是用Java编写的。它的核心设计理念是通过消除传统锁机制,采用无锁数据结构和单向事件传递路径,以达到极致的并发性能。在本文中,我们将...
Disruptor是由LMAX公司开源的一款并发框架,其设计灵感来源于传统的消息队列,但通过独特的环形缓冲区(Ring Buffer)和事件处理机制,显著提升了并发性能,特别适用于高吞吐量、低延迟的场景。Disruptor的核心思想是...
Disruptor是一种高效的并发编程框架,主要用于解决高性能系统中的低延迟问题。它通过无锁设计实现了线程间的高效通信与同步,避免了传统锁机制所带来的性能瓶颈。 #### 二、Disruptor核心组件解析 Disruptor框架...
Disruptor不仅适用于金融交易领域,其高性能和低延迟的特性也广泛应用于大数据处理、消息队列、分布式系统等领域。通过深入理解Disruptor的源码,开发者可以更好地理解和应用这一并发框架,提升系统的性能和稳定性。...
Disruptor是由LMAX公司开发的一种高性能的并发编程框架,主要应用于金融交易系统。它以其高效、低延迟的事件处理机制而闻名。在C++版本的Disruptor中,我们同样可以享受到这种高效的并发能力,尤其适用于需要大量...
Disruptor是由LMAX公司开发的一种高性能的并发编程框架,主要应用于金融交易系统。它通过优化数据共享方式,显著提高了多线程环境下的处理速度。在"Disruptor 多个消费者"的场景中,我们可以深入理解Disruptor如何...
Disruptor 实现了一个有界的队列,适用于“生产者-消费者”模型,但相比传统的JDK中的BlockingQueue,它具有更多优势。 首先,Disruptor 支持多个消费者同时处理同一个事件,这些消费者之间可以并行工作,也可以...
Disruptor框架是LMAX交易所开源的一个高性能、低延迟的消息队列实现,它采用无锁化编程技术,利用环形缓冲区(Ring Buffer)来实现高效的多生产者多消费者模型。本文将深入分析Disruptor的代码,特别聚焦于`...
Disruptor 的高性能特性使其成为实现低延迟、高并发交易系统的关键技术之一。 #### 四、技术栈综合应用 在实际项目中,Spring Cloud、Vert.x 以及 Disruptor 这三个技术可以协同工作,共同构建出一个既稳定又高效...