`

LMAX Disruptor—多生产者多消费者中,消息复制分发的高性能实现

阅读更多

解决的问题

当我们有多个消息的生产者线程,一个消费者线程时,他们之间如何进行高并发、线程安全的协调?

很简单,用一个队列。

 

当我们有多个消息的生产者线程,多个消费者线程,并且每一条消息需要被所有的消费者都消费一次(这就不是一般队列,只消费一次的语义了),该怎么做?

这时仍然需要一个队列。但是:

1. 每个消费者需要自己维护一个指针,知道自己消费了队列中多少数据。这样同一条消息,可以被多个人独立消费。

2. 队列需要一个全局指针,指向最后一条被所有生产者加入的消息。消费者在消费数据时,不能消费到这个全局指针之后的位置——因为这个全局指针,已经是代表队列中最后一条可以被消费的消息了。

3. 需要协调所有消费者,在消费完所有队列中的消息后,阻塞等待。

4. 如果消费者之间有依赖关系,即对同一条消息的消费顺序,在业务上有固定的要求,那么还需要处理谁先消费,谁后消费同一条消息的问题。

 

总而言之,如果有多个生产者,多个消费者,并且同一条消息要给到所有的消费者都去处理一下,需要做到以上4点。这是不容易的。

LMAX Disruptor,正是这种场景下,满足以上4点要求的单机跨线程消息传递、分发的开源、高性能实现。

 

关键概念

1. RingBuffer

应用需要传递的消息在Disrutpor中称为Event(事件)。

RingBuffer是Event的数组,实现了阻塞队列的语义:

如果RingBuffer满了,则生产者会阻塞等待。

如果RingBuffer空了,则消费者会阻塞等待。

 

2. Sequence

在上文中,我提到“每个消费者需要自己维护一个指针”。这里的指针就是一个单调递增长整数(及其基于CAS的加法、获取操作),称为Sequence。

除了每个消费者需要维护一个指针外,RingBuffer自身也要维护一个全局指针(如上一节第2点所提到的),记录最后一条可以被消费的消息。这个全局指针就在下图红框中。

生产场景实现

生产者往RingBuffer中发送一条消息(RingBuffer.publish())时:

1. 生产者的私有sequence会+1

2. 检查生产者的私有sequence与RingBuffer中Event个数的关系。如果发现Event数组满了(下图红框中的判断),则阻塞(下图绿框中的等待)。

3. RingBuffer会在Event数组中(sequencer+1) % BUFFER_SIZE的地方,放入Event。这里的取模操作,就体现了Event数组用到最后,则回到头部继续放,所谓“Ring“ Buffer的轮循复用语义。

 

消费场景实现

 消费者从RingBuffer循环队列中获取一条消息时:

1. 从消费者私有Sequence,可以知道它自己消费到了RingBuffer队列中的哪一条消息。

2. 从RingBuffer的全局指针Sequence,可以知道RingBuffer中最后一条没有被消费的消息在什么位置。

3. N = (RuingBuffer的全局指针Sequence - 消费者私有Sequence),就是当前消费者,还可以消费多少Event。

4. 如果以上差值N为0,说明当前消费者已经消费过RingBuffer中的所有消息了。那么当前消费者会阻塞。等待生产者加入更多的消息:

 以上代码中,红框中的availableSequence就是RingBuffer的全局指针Sequence。绿框中的sequence是当前消费者的私有sequence。

如果这个判断为true,说明RingBuffer中最新一条可以被消费的Event,已经被当前消费者消费过了。那么就会调用apployWaitMethod()阻塞,等待生产者产生更多的Event。

 5. 如果RingBuffer中,还有可以被当前消费者消费的Event,即N > 0,

     那么消费者,会一口气获取所有可以被消费的N个Event。即下图中的while循环,直到N个Event都被消费才退出。这种一口气消费尽量多的Event,是高性能的体现。

     从RingBuffer中每获取一个Event,都会回调绿框中的eventHandler——这是应用注册的Event处理方法,执行应用的Event消费业务逻辑。

  

   最后,上图中的sequence.set(availableSequence),会把当前消费者的私有Sequence更新到RingBuffer的全局Sequence。表示RingBuffer中所有的Event都已经消费掉了。

 

 高性能的实现细节

无锁

无锁就没有锁竞争。当生产者、消费者线程数很高时,意义重大。所以,

往大里说,每个消费者维护自己的Sequence,基本没有跨线程共享的状态。

往小里说,Sequence的加法是CAS实现的。

  • 当生产者需要判断RingBuffer是否已满时,用CAS比较原先RingBuffer的Event个数,和假定放入新Event后Event的个数。
  • 如果CAS返回false,说明在判断期间,别的生产者加入了新Event;或者别的消费者拿走了Event。那么当前判断无效,需要重新判断。这就是常见的 do { ... } while (false == CAS(oldVal, newVal))。——都是套路:)

 

对象的复用

JVM运行时,一怕创建大对象,二怕创建很多小对象。这都会导致JVM堆碎片化、对象元数据存储的额外开销大。这是高性能Java应用的噩梦。

为了解决第二点“很多小对象”,主流开源框架都会自己维护、复用对象池。LMAX Disruptor也不例外。

生产者不是创建新的Event对象,放入到RingBuffer中。而是从RingBuffer中取出一个已有的Event对象,更新它所指向的业务数据,来代表一个逻辑上的新Event。

所以LMAX Disruptor的生产者API,用起来有些麻烦——分为三步,一是下图绿框中取出一个已有的、已经被所有人消费过的Event对象,二是下图红框中更新这个Event对象所指向的业务数据,三是下图蓝框中标记这个Event对象为逻辑上的新Event。

 

 

 

rel:https://www.cnblogs.com/seaspring/p/6504902.html

分享到:
评论

相关推荐

    disruptor 多个消费者

    与传统的队列不同,Disruptor的环形缓冲区允许多个生产者和消费者并发操作,避免了锁竞争,从而实现了更高的性能。 1. **多消费者机制**:在Disruptor中,可以注册多个消费者监听事件。每个消费者都有一个独立的...

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    LMAX Disruptor是一款高性能的消息处理框架,由LMAX公司开发并开源,它在金融交易领域有着广泛的应用。Disruptor的设计目标是解决多线程环境下的数据共享问题,通过优化并发性能,实现极低的延迟和高吞吐量。在Java...

    LMAX-Disruptor框架jar包

    Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。使用无锁的方式实现了一个环形队列。据官方描述,其性能要比BlockingQueue至少高一个数量级。根据GitHub上的最新版本源码打出的包,希望对大家有帮助。

    基于Spring Boot和LMAX Disruptor的高性能并发框架.zip

    本项目是一个基于Spring Boot和LMAX Disruptor框架的高性能并发框架,旨在提供高效的事件处理和消息传递机制。项目涵盖了并发编程的核心概念、无锁并行计算框架的使用、高级特性、底层源码分析以及与Netty的整合实战...

    LMAX.Disruptor,一个无锁高并发框架,中文文档

    LMAX是一种新型零售金融交 易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理 器,它能够在一个线程里每秒处理6百万订单。业务逻辑处理器完全是运行在内存中,使用事件...

    高性能高稳定性分布式的游戏架构,游戏逻辑运行在Disruptor消费者线程中,其它线程都为辅助线程, 整体为多生产者.zip

    本文将深入探讨一个使用Java开发的游戏项目,该项目采用了一种独特的方式,即游戏逻辑运行在Disruptor消费者线程中,其余线程作为辅助线程,构建了一个多生产者的分布式架构。这个设计模式旨在优化并发性能,提高...

    Disruptor C++版(仅支持单生产者)

    1. 生产者:生产者是向Disruptor中发布事件的角色。由于这里提到的是“仅支持单生产者”,这意味着在一个Disruptor实例中,只能有一个线程发布事件。这简化了并发控制,但限制了并行度。 2. 消费者:消费者从...

    disruptor高性能Java线程间通讯库

    尽管Disruptor最初设计为支持多个生产者和单个消费者,但通过EventProcessor链,它可以实现多消费者模式。每个EventProcessor负责处理一部分事件,形成流水线化的工作方式,进一步提高了处理效率。 3. **事件...

    LMAX Disruptor的Go语言端口。-Golang开发

    这是LMAX Disruptor移植到Go编程语言中的移植。 它保留了Disruptor的本质和精神,并利用了许多相同的抽象和概念,但没有维护相同的API。 在使用Go 1.13.1的MacBook Pro(英特尔酷睿i9-8950HK CPU @ 2.90GHz)上,它...

    disruptor-3.3.8.jar

    Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...

    Disruptor并发框架中文参考文档

    **Disruptor** 的核心在于它的Ring Buffer设计,这是一种特殊的循环数组结构,能够高效地支持多个生产者向其中添加数据以及多个消费者从中读取数据的过程,而无需使用传统的锁机制。下面将详细介绍Disruptor的工作...

    串行io disruptor

    生产者可以连续写入而不必等待消费者的读取,而消费者则可以批量读取,避免了锁的使用和上下文切换,极大地提高了性能。 此外,Disruptor引入了事件处理器链(EventHandler Chain)的概念,使得数据在多个处理器...

    Disruptor 一种可替代有界队列完成并发线程间数据交换高性能解决方案.docx

    Disruptor 是一个高性能的并发编程框架,由 LMAX 公司开发,旨在解决线程间数据交换的效率问题。它的设计目标是替代传统的有界队列,提供更高的吞吐量和更低的延迟。Disruptor 的核心在于其创新的数据交换机制,包括...

    springboot整合Disruptor并发编程框架 #资源达人分享计划#

    Disruptor的工作原理基于环形缓冲区(Ring Buffer)的设计,它将生产者和消费者之间的数据传递过程转换为顺序写入和读取,避免了传统并发模型中的锁竞争和上下文切换,从而实现了极高的消息传递效率。在SpringBoot中...

    rxmax:LMAX Disruptor使Rx Java最大化

    Disruptor通过避免线程间的上下文切换和同步开销,实现了微秒级别的消息传递延迟,这对于高频率交易或者其他对性能有极高要求的应用来说至关重要。 LMAX Disruptor的核心设计理念是“序列化”和“事件处理器链”。...

    超高效的交易所撮合引擎,采用伦敦外汇交易所LMAX开源的Disruptor框架,分布式内存存取,以及原子性操作

    match-trade超高效的交易所撮合引擎,采用伦敦外汇交易所LMAX开源的Disruptor框架,分布式内存存取,以及原子性操作。使用数据流的方式进行计算撮合序列,才用价格水平独立撮合逻辑,实现高效大数据撮合

    disruptor 实例

    3. **多生产者与多消费者**:Disruptor支持多个生产者和消费者并发操作,通过序列号的同步机制,保证了数据的一致性。 4. **事件处理流水线**:通过预定义的事件处理器链,Disruptor能构建出高效的数据处理流水线,...

    Java工具:高性能并发工具Disruptor简单使用

    1. **创建Event**: 定义事件类,它是Disruptor中的基本数据单元,用于传递信息。 ```java public class MyEvent { private String data; // getter, setter方法 } ``` 2. **配置Disruptor**: 创建Disruptor实例...

    disrustor:LMAX Disruptor的端口连接到Rust

    特征 单一生产者 批量消费者 阻塞等待策略 旋转等待策略 多制片人 工人池 DSL 文献资料基准测试从生产者向消费者发送i32大小的消息的初步基准测试结果。 名称批量大小通量mpsc频道1 34.894 Melem /秒驱油剂纺丝1个38...

    DisruptorDemo.zip

    Disruptor,由LMAX公司开发并开源,是一款高性能、低延迟的并发工具,主要用于优化多线程间的通信。它采用一种环形缓冲区(Ring Buffer)的设计,极大地提高了并发处理性能,尤其在高频率交易系统中表现卓越。本篇...

Global site tag (gtag.js) - Google Analytics