原文地址:http://mechanitis.blogspot.com/2011/07/dissecting-disruptor-wiring-up.html 作者是 Trisha Gee, LMAX 公司的一位女工程师。
从逻辑上来说,下一件要做的事情就是把所有的知识拼接到在一起。
我提到过多生产者的情况——他们通过 ProducerBarrier 保证写入操作顺序与可控。我也提到过简单场景下的多消费者数据访问。更多消费者的场景会变得更加复杂,我们 实现了一些聪明的机制允许多个消费者在访问 Ring Buffer 的时候互相等待(依赖)。像很多应用里,有一连串的工作需要在实际执行业务逻辑之前完成 (happen before) —— 例如,在做任何操作之前,我们都必须先保证消息写入磁盘。
Disruptor 论文 和性能测试里包含了你可能想到的一些基本结构。我准备讲一下其中最有趣的那个,这多半是因为我需要练习如何使用画图板。
菱形结构
DiamondPath1P3CPerfTest 展示了一个并不罕见的结构——独立的一个生产者和三个消费者。最棘手的一点是:第三个消费者必须等待前两个消费者处理完成后,才能开始工作。
消费者 C3 也许是你的业务逻辑。消费者 C1 可能在备份接收到的数据,而消费者 C2 可能在准备数据或者别的东西。
用队列实现菱形结构
在一个 SEDA-风格的架构 中,每个处理阶段都会用队列分开:
(为什么单词 Queue 里必须有这么多 "e" 呢?这是我在画这些图时遇到最麻烦的词)。
你也许从这里看到了问题的端倪:一条消息从 P1 传输到 C3 要完整的穿过四个队列,每个队列都会在消息进入队列和取出队列时消耗成本。
用 Disruptor 实现菱形结构
在 Disruptor 的世界里,一切都由单独的 Ring Buffer 管理:
这张图看起来更复杂。不过所有的参与者都只依赖 Ring Buffer 作为一个单独的关联点,并且所有的交互都是基于 Barrier 对象和检查依赖的目标序号来实现。
生产者这边比较简单,它是我在 上文 中描述过的单生产者模型。有趣的是,生产者并不需要关心所有的消费者。它只关心消费者 C3,如果消费者 C3 处理完了 Ring Buffer 的某一个节点,那么另外两个消费者肯定也处理完了。因此,只要 C3 的位置向前移动,Ring Buffer 的后续节点就会空闲出来。
管理消费者的依赖关系需要两个 ConsumerBarrier 对象。第一个仅仅与 Ring Buffer 交互,C1 和 C2 消费者向它申请下一个可访问节点。第二个 ConsumerBarrier 只知道消费者 C1 和 C2,它返回两个消费者访问过的消息序号中较小的那个。
Disruptor 怎样实现消费者等待(依赖)
Hmmm。我想需要一个例子。
我们从这个故事发生到一半的时候来看:生产者 P1 已经在 Ring Buffer 里写到序号 22 了,消费者 C1 已经访问和处理完了序号 21 之前的所有数据。消费者 C2 处理到了序号 18。消费者 C3,就是依赖其他消费者的那个,才处理到序号 15。
生产者 P1 不能继续向 RingBuffer 写入数据了,因为序号 15 占据了我们想要写入序号 23 的数据节点 (Slot)。
(抱歉,我真的试过用其他颜色来代替红色和绿色,但是别的都更容易混淆。)
第一个 ConsumerBarrier(CB1)告诉 C1 和 C2 消费者可以去访问序号 22 前面的所有数据,这是 Ring Buffer 中的最大序号。第二个 ConsumerBarrier (CB2) 不但会检查 RingBuffer 的序号,也会检查另外两个消费者的序号并且返回它们之间的最小值。因此,三号消费者被告知可以访问 Ring Buffer 里序号 18 前面的数据。
注意这些消费者还是直接从 Ring Buffer 拿数据节点——并不是由 C1 和 C2 消费者把数据节点从 Ring Buffer 里取出再传递给 C3 消费者的。作为替代的是,由第二个 ConsumerBarrier 告诉 C3 消费者,在 RingBuffer 里的哪些节点可以安全的处理。
这产生了一个问题——如果任何数据都来自于 Ring Buffer,那么 C3 消费者如何读到前面两个消费者处理完成的数据呢?如果 C3 消费者关心的只是先前的消费者是否已经完成它们的工作(例如,把数据复制到别的地方),那么这一切都没有问题—— C3 消费者知道工作已完成就开心了。但是,如果 C3 消费者需要访问先前的消费者的处理结果,它又从哪里去获取呢?
更新数据节点
秘密在于把处理结果写入 Ring Buffer 数据节点 (Entry) 本身。这样,当 C3 消费者从 Ring Buffer 取出节点时,它已经填充好了 C3 消费者工作需要的所有信息。这里 真正 重要的地方是节点 (Entry) 对象的每一个字段应该只允许一个消费者写入。这可以避免产生并发写入争用 (write-contention) 减慢整个处理过程。
你可以在 DiamondPath1P3CPerfTest 里看到这个例子—— FizzBuzzEntry 有两个字段:fizz 和 buzz。如果消费者是 Fizz Consumer, 它只写入字段 fizz。如果是 Buzz Consumer, 它只写入字段 buzz。第三个消费者 FizzBuzz,它只去读这两个字段但是不会写入,因为读没问题,不会引起争用。
一些实际的 Java 代码
这一切看起来要比队列实现更复杂。是的,它涉及到更多的内部协调。但是这些细节对于消费者和生产者是隐藏的,它们只和 Barrier 对象交互。诀窍在消费者结构里。上文例子中提到的菱形结构可以用下面的方法创建:
ConsumerBarrier consumerBarrier1 = ringBuffer.createConsumerBarrier(); BatchConsumer consumer1 = new BatchConsumer(consumerBarrier1, handler1); BatchConsumer consumer2 = new BatchConsumer(consumerBarrier1, handler2); ConsumerBarrier consumerBarrier2 = ringBuffer.createConsumerBarrier(consumer1, consumer2); BatchConsumer consumer3 = new BatchConsumer(consumerBarrier2, handler3); ProducerBarrier producerBarrier = ringBuffer.createProducerBarrier(consumer3);
总结
现在你知道了——如何关联 Disruptor 与相互依赖(等待)的多个消费者。关键点是:
※ 使用多个 ConsumerBarrier 来管理消费者之间的依赖(等待)关系。
※ 使用 ProducerBarrier 监视结构图中最后一个消费者。
※ 只允许一个消费者更新数据节点 (Entry) 的每一个独立字段。
更新:Adrian 写了一个非常好的 DSL 工具让拼接 Disruptor 更加简单了。
更新 2:注意 Disruptor 2.0 版使用了与本文不一样的命名。如果你对类名感到困惑,请阅读我的 变更总结。另外,Adrian 的 DSL 工具现在是 Disruptor 主干代码的一部分了。
相关推荐
netty结合disruptor队列实现即时通信1、简介使用disruptor改造netty通讯,使提高吞吐率,主要是提供disruptor如何与netty整合的思路2、软件架构spring-boot2.7.3 + netty4.1.36.Final + disruptor + jdk1.83、源码...
下面将详细解析这个问题,并提供解决方案。 首先,`FatalExceptionHandler`是Disruptor框架中预定义的一个全局异常处理器,当生产者或消费者处理事件时发生未捕获的异常,且该异常未被用户自定义的异常处理器处理时...
赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-3.3.0-javadoc-API文档-中文(简体)版.zip; Maven坐标:com.lmax:disruptor:3.3.0; 标签:disruptor、lmax、jar包、java、中文文档...
4. **绑定处理器到环形缓冲区**: 将处理器添加到Disruptor的处理器链中。 ```java List<EventHandler<MyEvent>> handlers = new ArrayList(); handlers.add(new MyEventHandler()); disruptor.handleEventsWith...
赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-3.3.0-javadoc-API文档-中文(简体)-英语-对照版.zip; Maven坐标:com.lmax:disruptor:3.3.0; 标签:disruptor、lmax、jar包、java...
##### 2.4 解析Disruptor关系组装 Disruptor通过定义一系列的事件处理器(Event Processors)来组装处理流程。这些处理器按照一定的顺序被链接起来,形成一个处理链。每个处理器负责处理特定类型的数据,并将其传递...
《Spring Boot Starter Disruptor深度解析》 在现代软件开发中,高性能和低延迟往往是系统设计的关键要素。Spring Boot作为Java领域最受欢迎的微服务框架,提供了丰富的启动器(starters)来简化开发工作。"spring-...
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
4. **等待策略(Wait Strategies)**:Disruptor提供了多种等待策略,如 BusySpin、Block、Yield 等,用于控制消费者在没有新事件时的行为。例如,`BusySpin`策略会在无事件时忙循环,适合CPU资源充足的情况;而`...
赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...
《Disruptor技术详解——基于DisruptorDemo.zip实例解析》 Disruptor,由LMAX公司开发并开源,是一款高性能、低延迟的并发工具,主要用于优化多线程间的通信。它采用一种环形缓冲区(Ring Buffer)的设计,极大地...
4. **屏障(Barrier)与门(Gate)**:Disruptor中的屏障确保了事件处理的正确顺序。例如,BatchEventProcessor使用一个屏障来等待所有依赖的处理器完成对事件的处理,确保数据的一致性。 5. ** ClaimStrategy 和 ...
《Disruptor原始码解析-源码解析》 Disruptor是英国LMAX公司开发的一款高性能、低延迟的并发框架,它在处理高并发场景时展现出卓越的性能,被誉为金融交易领域的“神器”。本篇文章将深入探讨Disruptor的设计原理,...
4. **启动 Disruptor**:启动 Disruptor,它将开始接收和处理事件。 5. **发布事件**:在 Netty 的 ChannelHandlerContext 中,使用 Disruptor 的发布方法将事件放入环形缓冲区。 文件名 "trapos-master" 可能是指...
整合Disruptor到SpringBoot项目中,首先需要在项目的pom.xml文件中添加Disruptor的依赖。之后,可以创建一个Disruptor配置类,定义并配置Disruptor的事件处理器、事件工厂以及环形缓冲区大小等参数。利用Spring的...
《深入解析高并发框架Disruptor》 在现代互联网应用中,处理高并发场景是必不可少的技术挑战。Disruptor,由LMAX公司开发并开源,是一个高性能、低延迟的并发框架,尤其适用于需要大量数据交换的系统。本文将详细...
4. **使用Disruptor的步骤** - 初始化Ring Buffer:根据预期的最大并发度和事件大小计算Ring Buffer大小。 - 创建Producer:选择适合的策略,如SingleProducerSequencer或MultiProducerSequencer。 - 设置...
赠送Maven依赖信息文件:disruptor-3.3.7.pom; 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)版.zip; Maven坐标:com.lmax:disruptor:3.3.7; 标签:disruptor、lmax、jar包、java、中文文档...
4. **事件处理流水线**:通过预定义的事件处理器链,Disruptor能构建出高效的数据处理流水线,使得数据在处理器之间快速流动。 5. **低延迟**:由于其独特的设计,Disruptor能在极短的时间内处理大量事件,适合于对...