`
raymond.chen
  • 浏览: 1436835 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Disruptor:高性能低延迟的内存有界队列框架

    博客分类:
  • Java
 
阅读更多

Disruptor是用于在多个线程之间通信的高性能低延时的内存消息队列。其实现了“队列”的功能,而且是一个有界队列。它适合“生产者-消费者”模型的应用场合。

 

Disruptor是一个高性能的异步处理框架,也可以认为是一个观察者模式实现,或者事件-监听模式的实现。

 

它类似于ArrayBlockingQueue队列类。

 

高性能原理

       引入环形的数组结构:缓存基于数组,用位运算替代求模。缓存的长度总是2的n次方,这样可以用位运算 i & (length - 1) 替代 i % length。预分配缓存对象,通过更新缓存里对象的属性而不是删除对象来减少垃圾回收。

        无锁的设计:采用内存屏障机制和原子性的CAS操作实现无锁。用内存屏障加序列号的方式实现了无锁的并发机制。

        属性填充:通过添加额外的无用信息,避免伪共享问题。

        元素位置的定位:采用跟一致性哈希一样的方式,一个索引,进行自增。

 

伪共享

       CPU的缓存一般是以缓存行为最小单位的,对应主存的一块相应大小的单元;当前的缓存行大小一般是64字节,每个缓存行一次只能被一个CPU核访问,如果一个缓存行被多个CPU核访问,就会造成竞争,导致某个核必须等其他核处理完了才能继续处理,响应性能。

 

适用场景

        适用于大规模低延迟的并发场景。

        可用于读写操作分离、数据缓存,速度匹配(因为其实现了生产者-消费者模型)。

        基于内存的事件流处理机制的场景。

 

生产者-消费者模型使用场景

        一个生产者 — 一个消费者的场景

        一个生产者 — 多个消费者的场景

        多个生产者 — 一个消费者的场景

        多个生产者 — 多个消费者的场景

        生产者 - 消费流,消费者之间既可以并行处理,也可以相互依赖形成处理的先后次序(形成一个依赖图)。

 

核心概念

         Ring Buffer:环形的缓冲区。负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。

 

         Sequence:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个Sequence用于跟踪标识某个特定的事件处理者(RingBuffer/Consumer)的处理进度。

 

         Sequencer:是Disruptor的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。

 

         Sequence Barrier:用于保持对RingBuffer的main published Sequence和Consumer依赖其它Consumer的Sequence的引用。

 

          WaitStrategy:决定了一个消费者将如何等待生产者将Event置入Disruptor。

 

          Event:生产者和消费者之间进行交换的数据被称为事件(Event)。

 

          EventProcessor:持有特定消费者(Consumer)的Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。

 

          EventHandler:事件处理接口,由用户实现。

 

          Producer:生产者,由用户实现

 

常用的等待策略:用于抽象Consumer如何等待新事件

        BlockingWaitStrategy

               内部使用一个锁和条件变量来控制线程的执行和等待。

               是最慢的等待策略,也是CPU使用率最低和最稳定的选项。

 

        SleepingWaitStrategy

               循环等待并且在循环中间调用LockSupport.parkNanos(1)来睡眠。

               优点:生产线程只需要计数,而不执行任何指令,且没有条件变量的消耗。

               缺点:事件对象从生产者到消费者传递的延迟变大了。

               应用场景:用在不需要低延迟,且事件发布对于生产者的影响比较小的情况下。比如异步日志功能。

 

         YieldingWaitStrategy

                循环等待sequence增加到合适的值,循环中调用Thread.yield()允许其他准备好的线程执行。

                该策略在减低系统延迟的同时也会增加CPU运算量。

                应用场景:需要高性能而且事件消费者线程比逻辑内核少的场景。

 

         BusySpinWaitStrategy

                是性能最高的等待策略,也是对部署环境要求最高的策略。

                应用场景:用在事件处理线程比物理内核数目还要小的场景。

 

消息 事件类类:

public class MessageEvent<T>{
	private T message;

	public T getMessage() {
		return message;
	}

	public void setMessage(T message) {
		this.message = message;
	}
}

 

消息 事件工厂类:

public class MessageEventFactory<T> implements EventFactory<MessageEvent<T>>{
	@Override
	public MessageEvent<T> newInstance() {
		return new MessageEvent<T>();
	}
}

 

异常处理类:

public class MessageExceptionHandler<T> implements ExceptionHandler<MessageEvent<T>> {
	@Override
	public void handleEventException(Throwable ex, long sequence, MessageEvent<T> event) {
		System.out.println("handleEventException: " + ex.toString());
	}
	
	@Override
	public void handleOnStartException(Throwable ex) {
		System.out.println("handleOnStartException: " + ex.toString());
	}
	
	@Override
	public void handleOnShutdownException(Throwable ex) {
		System.out.println("handleOnShutdownException: " + ex.toString());
	}
}

 

广播方式的消费者抽象类:

public abstract class AbstractBroadcastConsumer<T> implements EventHandler<MessageEvent<T>>{
	@Override
	public void onEvent(MessageEvent<T> event, long sequence, boolean endOfBatch) throws Exception {
		handleEvent(event, sequence, endOfBatch);
	}
	
	public abstract void handleEvent(MessageEvent<T> event, long sequence, boolean endOfBatch) throws Exception;
}

 

分组方式的消费者抽象类:

public abstract class AbstractGroupingConsumer<T> implements WorkHandler<MessageEvent<T>>{
	@Override
	public void onEvent(MessageEvent<T> event) throws Exception {
		handleEvent(event);
	}
	
	public abstract void handleEvent(MessageEvent<T> event) throws Exception;
}

 

消息生产和消费:

public class DisruptorTest {
	public static void main(String[] args) {
        int ringBufferSize = Double.valueOf(Math.pow(2, 15)).intValue(); //必须是2的N次方
        System.out.println("ringBufferSize=" + ringBufferSize);
        
		//ProducerType.SINGLE 单生产者
		Disruptor<MessageEvent<String>> disruptor
			= new Disruptor<>(new MessageEventFactory<String>(), 
					ringBufferSize, Executors.defaultThreadFactory(), 
					ProducerType.SINGLE, new YieldingWaitStrategy());
		
		//## 多消费者 ##
		//广播:同一个Event被所有消费者消费
		EventHandler<MessageEvent<String>> handler = new DefaultBroadcastConsumer();
		disruptor.handleEventsWith(handler, handler, handler); //无依赖关系
//		disruptor.handleEventsWith(handler, handler).then(handler); //有依赖关系
//		disruptor.after(handler).handleEventsWith(handler); //处理类有先后顺序
		
		//分组:一个Event只能被组内的一个消费者消费
		WorkHandler<MessageEvent<String>> handler2 = new DefaultGroupingConsumer();
		disruptor.handleEventsWithWorkerPool(handler2, handler2, handler2);
		
		disruptor.setDefaultExceptionHandler(new MessageExceptionHandler<String>());
		RingBuffer<MessageEvent<String>> ringBuffer = disruptor.start();
		
		while(true){
			publishEvent(ringBuffer, String.valueOf(++count));
		}
		disruptor.shutdown();
	}
	
	/**
	 * 发布消息
	 */
	private static void publishEvent(RingBuffer<MessageEvent<String>> ringBuffer, String message){
		ringBuffer.publishEvent((event, sequence, messageContent) -> {
			event.setMessage(messageContent);
		}, message);
	}
	
	/**
	 * 广播消费者
	 */
	public static class DefaultBroadcastConsumer extends AbstractBroadcastConsumer<String>{
		@Override
		public void handleEvent(MessageEvent<String> event, long sequence, boolean endOfBatch) throws Exception {
			System.out.println(Thread.currentThread().getName() + ": " + event.getMessage());
		}
	}

	/**
	 * 分组消费者
	 */
	public static class DefaultGroupingConsumer extends AbstractGroupingConsumer<String>{
		@Override
		public void handleEvent(MessageEvent<String> event) throws Exception {
			System.out.println(Thread.currentThread().getName() + ": " + event.getMessage());
		}
	}
}

  

分享到:
评论

相关推荐

    Disruptor:一种高性能的、在并发线程间数据交换领域用于替换有界限队列的方案

    Disruptor是一种高性能的并发数据交换框架,由Martin Thompson、Dave Farley、Michael Barker、Patricia Gee和Andrew Stewart共同开发,主要用于替代传统的有界队列。这个框架的诞生源于LMAX公司在构建高性能金融...

    Disruptor 一种可替代有界队列完成并发线程间数据交换高性能解决方案.docx

    它的设计目标是替代传统的有界队列,提供更高的吞吐量和更低的延迟。Disruptor 的核心在于其创新的数据交换机制,包括使用栅栏(barrier)和序号(Sequencing)来协调生产者和消费者,避免了锁和CAS操作带来的开销。...

    Java工具:高性能并发工具Disruptor简单使用

    Java工具:高性能并发工具Disruptor简单使用 在Java编程中,高效并发处理是优化系统性能的关键之一。Disruptor,由LMAX公司开源的一款并发框架,为处理高并发场景提供了一种新颖且高效的解决方案。它通过消除锁和...

    15、CPU缓存架构详解&高性能内存队列Disruptor实战

    本文将深入探讨CPU缓存架构及其对系统性能的影响,并详细解析高性能内存队列——Disruptor的实际应用。 首先,让我们来了解一下CPU缓存。CPU缓存是计算机硬件中的一个关键组成部分,它位于CPU和主内存之间,用于...

    disruptor 实例

    Disruptor,由英国LMAX公司开源的一款高性能、低延迟的消息处理框架,它彻底改变了并发编程的传统模式,尤其在金融交易领域,其性能表现卓越,被誉为“游戏规则的改变者”。本文将深入探讨Disruptor的核心原理,并...

    Disruptor并发框架中文参考文档

    **Disruptor** 是一款高性能、低延迟的并发框架,它通过无锁设计实现了高效的队列操作,从而大大提升了多线程环境下的性能表现。该框架于2011年获得了Duke's Choice Award,这充分证明了其在并发处理领域的创新性和...

    LMAX Disruptor:高性能线程间消息传递库-开源

    显然,为了实现这一目标,我们需要做一些特殊的事情,以通过我们的Java平台实现极低的延迟和高吞吐量。 性能测试表明,使用队列在系统各阶段之间传递数据会引入延迟,因此我们专注于优化该区域。 Disruptor是我们...

    高并发框架Disruptor代码

    Disruptor,由LMAX公司开发并开源,是一个高性能、低延迟的并发框架,尤其适用于需要大量数据交换的系统。本文将详细介绍Disruptor的核心原理、设计模式及其在实际中的应用。 Disruptor的诞生源于LMAX对金融交易...

    disruptor:LMAX干扰器的C++实现

    LMAX Disruptor是由LMAX公司开发的一款高性能、低延迟的消息传递框架,最初是用Java编写的。它的核心设计理念是通过消除传统锁机制,采用无锁数据结构和单向事件传递路径,以达到极致的并发性能。在本文中,我们将...

    spring-boot-starter-disruptor.zip

    Disruptor是由LMAX公司开源的一款并发框架,其设计灵感来源于传统的消息队列,但通过独特的环形缓冲区(Ring Buffer)和事件处理机制,显著提升了并发性能,特别适用于高吞吐量、低延迟的场景。Disruptor的核心思想是...

    Disruptor并行框架面试题收录

    Disruptor是一种高效的并发编程框架,主要用于解决高性能系统中的低延迟问题。它通过无锁设计实现了线程间的高效通信与同步,避免了传统锁机制所带来的性能瓶颈。 #### 二、Disruptor核心组件解析 Disruptor框架...

    SourceAnalysis_Disruptor:Disruptor原始码解析-源码解析

    Disruptor不仅适用于金融交易领域,其高性能和低延迟的特性也广泛应用于大数据处理、消息队列、分布式系统等领域。通过深入理解Disruptor的源码,开发者可以更好地理解和应用这一并发框架,提升系统的性能和稳定性。...

    Disruptor并发框架

    1. 高性能交易系统:Disruptor的低延迟特性使其成为金融交易系统中的理想选择。 2. 实时数据分析:在大数据实时处理中,Disruptor可以高效地传递和处理数据流。 3. 消息队列中间件:Disruptor可作为消息中间件的一...

    Disruptor C++版(仅支持单生产者)

    Disruptor是由LMAX公司开发的一种高性能的并发编程框架,主要应用于金融交易系统。它以其高效、低延迟的事件处理机制而闻名。在C++版本的Disruptor中,我们同样可以享受到这种高效的并发能力,尤其适用于需要大量...

    disruptor 多个消费者

    Disruptor是由LMAX公司开发的一种高性能的并发编程框架,主要应用于金融交易系统。它通过优化数据共享方式,显著提高了多线程环境下的处理速度。在"Disruptor 多个消费者"的场景中,我们可以深入理解Disruptor如何...

    Disruptor 极速体验.docx

    Disruptor 实现了一个有界的队列,适用于“生产者-消费者”模型,但相比传统的JDK中的BlockingQueue,它具有更多优势。 首先,Disruptor 支持多个消费者同时处理同一个事件,这些消费者之间可以并行工作,也可以...

    disruptor 代码分析

    Disruptor框架是LMAX交易所开源的一个高性能、低延迟的消息队列实现,它采用无锁化编程技术,利用环形缓冲区(Ring Buffer)来实现高效的多生产者多消费者模型。本文将深入分析Disruptor的代码,特别聚焦于`...

    Spring Cloud+Vertx+Disruptor 金融业撮合交易系统实战

    Disruptor 的高性能特性使其成为实现低延迟、高并发交易系统的关键技术之一。 #### 四、技术栈综合应用 在实际项目中,Spring Cloud、Vert.x 以及 Disruptor 这三个技术可以协同工作,共同构建出一个既稳定又高效...

Global site tag (gtag.js) - Google Analytics