`
wbj0110
  • 浏览: 1617338 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

如何使用Disruptor(二)如何从Ringbuffer读取(转)

阅读更多

ConsumerBarrier与消费者

这里我要稍微反过来介绍,因为总的来说读取数据这一过程比写数据要容易理解。假设通过一些“魔法”已经把数据写入到Ring Buffer了,怎样从Ring Buffer读出这些数据呢?

(好,我开始后悔使用Paint/Gimp 了。尽管这是个购买绘图板的好借口,如果我继续写下去的话… UML界的权威们大概也在诅咒我的名字了。)

消费者(Consumer)是一个想从Ring Buffer里读取数据的线程,它可以访问ConsumerBarrier对象——这个对象由RingBuffer创建并且代表消费者与RingBuffer进行交互。就像Ring Buffer显然需要一个序号才能找到下一个可用节点一样,消费者也需要知道它将要处理的序号——每个消费者都需要找到下一个它要访问的序号。在上面的例子中,消费者处理完了Ring Buffer里序号8之前(包括8)的所有数据,那么它期待访问的下一个序号是9

消费者可以调用ConsumerBarrier对象的waitFor()方法,传递它所需要的下一个序号.

1 final long availableSeq = consumerBarrier.waitFor(nextSequence);

ConsumerBarrier返回RingBuffer的最大可访问序号——在上面的例子中是12ConsumerBarrier有一个WaitStrategy方法来决定它如何等待这个序号,我现在不会去描述它的细节,代码的注释里已经概括了每一种WaitStrategy的优点和缺点 。

接下来怎么做?

接下来,消费者会一直原地停留,等待更多数据被写入Ring Buffer。并且,一旦数据写入后消费者会收到通知——节点9101112 已写入。现在序号12到了,消费者可以让ConsumerBarrier去拿这些序号节点里的数据了。

拿到了数据后,消费者(Consumer)会更新自己的标识(cursor)。

你应该已经感觉得到,这样做是怎样有助于平缓延迟的峰值了——以前需要逐个节点地询问“我可以拿下一个数据吗?现在可以了么?现在呢?”,消费者(Consumer)现在只需要简单的说“当你拿到的数字比我这个要大的时候请告诉我”,函数返回值会告诉它有多少个新的节点可以读取数据了。因为这些新的节点的确已经写入了数据(Ring Buffer本身的序号已经更新),而且消费者对这些节点的唯一操作是读而不是写,因此访问不用加锁。这太好了,不仅代码实现起来可以更加安全和简单,而且不用加锁使得速度更快。

另一个好处是——你可以用多个消费者(Consumer)去读同一个RingBuffer ,不需要加锁,也不需要用另外的队列来协调不同的线程(消费者)。这样你可以在Disruptor的协调下实现真正的并发数据处理。

BatchConsumer代码是一个消费者的例子。如果你实现了BatchHandler, 你可以用BatchConsumer来完成上面我提到的复杂工作。它很容易对付那些需要成批处理的节点(例如上文中要处理的9-12节点)而不用单独地去读取每一个节点。

分享到:
评论

相关推荐

    Disruptor并发框架中文参考文档

    ##### 2.2 如何从Ring Buffer读取 消费者通过轮询Ring Buffer的头指针来检查是否有新数据可用。一旦检测到新的数据,消费者就可以从Ring Buffer中读取数据。为了保证数据的一致性,Disruptor使用了序列号和内存屏障...

    springboot整合Disruptor并发编程框架 #资源达人分享计划#

    Disruptor的工作原理基于环形缓冲区(Ring Buffer)的设计,它将生产者和消费者之间的数据传递过程转换为顺序写入和读取,避免了传统并发模型中的锁竞争和上下文切换,从而实现了极高的消息传递效率。在SpringBoot中...

    DisruptorDemo.zip

    3. 环形缓冲区(RingBuffer):存储数据的容器,采用循环数组实现,确保空间利用率。 4. 事件处理器(EventHandler):消费者处理数据的逻辑封装,每个消费者可以注册多个事件处理器。 5. 事件处理器组...

    disruptor 代码分析

    接下来,需要告诉Disruptor哪些EventHandler将用于处理从Ring Buffer中取出的事件。这通常通过`handleEventsWith`和`then`方法来完成,以定义事件处理的链式调用关系。 #### 事件处理流程解析 在Disruptor中,事件...

    disruptor技术培训

    5. **启动Disruptor**:获取RingBuffer实例,并通过它来生产数据。 6. **处理数据**:数据被自动发布到RingBuffer,并由相应的消费者进行处理。 通过以上步骤,我们就可以构建一个基于Disruptor的高性能并发处理...

    Disruptor并行框架面试题收录

    - **定义**:负责从RingBuffer中读取事件并进行处理。 - **实现**:通过继承`BatchEventProcessor`抽象类来定制事件处理逻辑。 - **工作流程**:循环读取并处理事件。 5. **WaitStrategy(等待策略)** - **...

    disruptor-3.2.1.zip

    Disruptor-3.2.1是由LMAX公司开发并开源的一款高性能的并发框架,其核心是一个环形缓冲区(Ring Buffer)。这个框架的设计理念是消除多线程之间的锁竞争,从而大幅度提高系统的处理速度。在3.2.1版本中,Disruptor...

    串行io disruptor

    Disruptor的工作原理大致如下:它使用环形缓冲区(Ring Buffer)作为基础数据结构,将生产者和消费者的操作解耦,并通过序列号(Sequence)来确保数据的一致性。生产者可以连续写入而不必等待消费者的读取,而消费者...

    share-disruptor.zip

    1. **环形缓冲区(Ring Buffer)**:Disruptor的核心是环形缓冲区,它是一个固定大小的数组,用于存储待处理的事件。这个设计避免了在内存中频繁分配和释放空间,从而减少了垃圾回收的压力。环形缓冲区的索引使用模...

    disruptor concurency pattern in c++.zip

    Disruptor模式的核心思想是消除线程间的共享数据,通过一个环形缓冲区(Ring Buffer)来传递消息,从而避免了锁和条件变量带来的性能开销。在传统的并发编程中,线程间通信通常涉及到内存同步,这可能导致阻塞和上...

    disruptor:LMAX干扰器的C++实现

    首先,我们要理解Disruptor的核心——环形缓冲区(Ring Buffer)。这个数据结构是一种固定大小的数组,用于存储待处理的事件。在C++实现中,环形缓冲区通常使用原子操作和内存屏障来保证在多线程环境下的正确性,...

    SourceAnalysis_Disruptor:Disruptor原始码解析-源码解析

    它采用环形缓冲区(Ring Buffer)作为主要的数据结构,结合生产者-消费者模型,实现了高效的事件处理机制。这种设计模式可以极大地减少数据同步的成本,从而达到微秒级别的消息传递速度。 二、环形缓冲区 环形缓冲...

    15、CPU缓存架构详解&高性能内存队列Disruptor实战(1).pdf

    - **Ring Buffer**:Disruptor的核心是一个环形缓冲区,用于存储事件。它是一个固定大小的数组,生产者和消费者通过索引访问其中的元素。 - **Sequence**:用于跟踪生产和消费进度的序列号。生产者和消费者各自有一...

    无锁队列

    无锁队列的实现通常基于循环数组(Ring Buffer),这种数据结构允许在固定大小的数组中进行无缝的数据传递。在Disruptor框架中,循环数组的每个位置称为槽(Slot),用于存储消息或者事件。生产者通过CAS操作将新...

    积分系统开发总结及设计原则

    10. **无锁并发**:使用无锁并发机制,如Disruptor框架中的Ringbuffer,提高系统的并发性能。 #### 五、业务逻辑设计 1. **加解密**:保护用户数据安全,对敏感信息进行加密处理。 2. **数据采集**:从各个渠道...

    13、线程池ForkJoinPool实战及其工作原理分析(1).pdf

    **RingBuffer**: 环形缓冲区,作为消息的存储区域。 3. **Consumer**: 从环形缓冲区读取消息并处理。 4. **Barrier**: 控制消费者读取数据的边界。 ### 5. **常用并发设计模式** - **生产者-消费者模式**: 解决...

Global site tag (gtag.js) - Google Analytics