`
sillycat
  • 浏览: 2567201 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

High Performance(2)Ringbuffer and Disruptor

 
阅读更多

High Performance(2)Ringbuffer and Disruptor

1. Disruptor Introduction
a set circle array, lock-free ring buffer
Component - Ringbuffer
current number % size = order number   12%10=2 
the end of the array is managed by consumer.

Component - Sequence
thread safe, counter

Component - SequenceBarrier
consumers wait for sequence

Component - WaitStrategy
BusySpinWaitStrategy, BlockingWaitStrategy, SleepingWaitStrategy, YieldingWaitStragegy
PhasedBackoffWaitStrategy

BactchEventProcessor
consumer

WorkProcessor
consumers share one sequence

WorkerPool
All the WorkProcessor which share the same Sequence are managed by WorkerPool

2. Simple Producer and Consumer
All the example codes are in easydisruptor.
dependencies
libraryDependencies ++= Seq(
    "org.scalatest"       %   "scalatest_2.10"            % "1.9.1"   % "test",
    "org.specs2"          %%  "specs2"                    % "1.13"    % "test",
    "com.lmax"            %   "disruptor"                 % "3.3.0"
)

model
package com.sillycat.easydisruptor.model
class LongEvent(){
  var value = Long.MinValue
}
factory
package com.sillycat.easydisruptor.factory
import com.lmax.disruptor.EventFactory
import com.sillycat.easydisruptor.model.LongEvent

class LongEventFactory extends EventFactory[LongEvent] {
  def  newInstance():LongEvent = {
    return new LongEvent()
  }
}

translator
package com.sillycat.easydisruptor.translator

import com.sillycat.easydisruptor.model.LongEvent
import java.nio.ByteBuffer
import com.lmax.disruptor.EventTranslatorOneArg;

class CustomEventTranslatorOneArg extends EventTranslatorOneArg[LongEvent,ByteBuffer]{

  def translateTo(event: LongEvent, sequence: Long, bb: ByteBuffer) = {
    event.value = bb.getLong(0)
  }

}

consumer
package com.sillycat.easydisruptor.consumer

import com.lmax.disruptor.EventHandler
import com.sillycat.easydisruptor.model.LongEvent

class LongEventHandler extends EventHandler[LongEvent]{

  def onEvent(event:LongEvent,sequence:Long,endOfBatch:Boolean): Unit ={
    println("Event: " + event.value)
  }

}

producer
package com.sillycat.easydisruptor.producer

import com.lmax.disruptor.RingBuffer
import java.nio.ByteBuffer
import com.sillycat.easydisruptor.translator.CustomEventTranslatorOneArg
import com.sillycat.easydisruptor.model.LongEvent

class LongEventProducer(ringBuffer: RingBuffer[LongEvent]) {
  val translator = new CustomEventTranslatorOneArg()
  def onData(bb:ByteBuffer) = {
    ringBuffer.publishEvent(translator,bb)
  }
}

App to run
package com.sillycat.easydisruptor

import java.util.concurrent.Executors
import com.sillycat.easydisruptor.factory.LongEventFactory
import com.lmax.disruptor.dsl.Disruptor
import com.sillycat.easydisruptor.consumer.LongEventHandler
import com.sillycat.easydisruptor.producer.LongEventProducer
import java.nio.ByteBuffer

object LongEventApp extends App{
  val executor = Executors.newCachedThreadPool()
  val factory = new LongEventFactory()

  // Specify the size of the ring buffer, must be power of 2.
  val bufferSize = 1024

  val disruptor = new Disruptor(factory, bufferSize, executor)
  //set consumer/handler
  disruptor.handleEventsWith(new LongEventHandler())
  disruptor.start()

  val ringBuffer = disruptor.getRingBuffer()
  val producer = new LongEventProducer(ringBuffer)

  val bb = ByteBuffer.allocate(8);
  for( a <- 1 to 10){
    bb.putLong(0, a)
    producer.onData(bb)
    Thread.sleep(1000)
  }
  sys.exit(0)
}

We can also define the Disruptor as follow:
val disruptor = new Disruptor[LongEvent](factory, bufferSize, executor,ProducerType.SINGLE, new SleepingWaitStrategy())


References:
http://developer.51cto.com/art/201306/399341.htm
http://liuxun.org/blog/disruptor-yi-ge-kai-yuan-de-gao-xiao-nei-cun-wu-suo-dui-lie/
http://ziyue1987.github.io/pages/2013/09/22/disruptor-use-manual.html

http://lmax-exchange.github.io/disruptor/

More Example
https://code.google.com/p/disruptor/source/browse/trunk/code/src/perf/?r=421#perf%2Fcom%2Flmax%2Fdisruptor


分享到:
评论

相关推荐

    Rt-thead studio软件下使用ringbuffer

    2. **RT-Thread中的RingBuffer实现** - RT-Thread提供了标准的RingBuffer接口,包括`rt_ringbuf_create`用于创建RingBuffer,`rt_ringbuf_put`和`rt_ringbuf_get`分别用于数据的插入和提取,以及`rt_ringbuf_space`...

    RingBuffer 循环缓存 亲测可用 V2 修改一处

    4.当写入数据的长度大于ringbuffer的可写入长度时,多余的数据将会丢弃。所以写入数据前,先判断ringbuffer的可写入长度。另外程序包含示例。 支持windows平台的vs与linux平台的clion,语言级别实现,与平台无关。 ...

    ringbuffer.zip

    2. `ringbuffer_destroy`会通过`free`释放之前分配的内存。 3. `ringbuffer_space_left`和`ringbuffer_data_left`通过计算读写指针的差值得到缓冲区的状态。 4. `ringbuffer_write`和`ringbuffer_read`使用适当的...

    ringbuffer - 原创-高效率管理

    此外,还会包含一系列对外公开的函数声明,如`ringbuffer_init`用于初始化RingBuffer,`ringbuffer_write`和`ringbuffer_read`分别用于向缓冲区写入和读取数据,以及`ringbuffer_is_full`和`ringbuffer_is_empty`...

    RingBuffer

    RingBuffer是一种数据结构,它在计算机科学和编程中被广泛使用,特别是在并发编程和高效数据传输场景中。这个名字来源于它的环形形状,就像一个圆环,数据在这个环中循环存储和读取。RingBuffer的核心特性是其固定...

    ring buffer实现原理

    在通信程序中,经常使用环形缓冲区作为数据结构来存放通信中发送和接收的数据。环形缓冲区是一个先进先出的循环缓冲区,可以向通信程序提供对缓冲区的互斥访问。 1、环形缓冲区的实现原理 环形缓冲区通常有一个读...

    RingBuffer 循环缓存 亲测可用

    4.当写入数据的长度大于ringbuffer的可写入长度时,多余的数据将会丢弃。所以写入数据前,先判断ringbuffer的可写入长度。 另外程序包含示例。 支持windows平台的vs与linux平台的clion,语言级别实现,与平台无关。 ...

    RingBuffer的操作與简单應用

    工作后的第二个C程序,还是挺好做的。RingBuffer,循环队列

    简单的RingBuffer库

    2. 写入数据:向RingBuffer中写入数据时,需要检查当前缓冲区的状态(是否已满)。如果缓冲区未满,可以将数据写入并更新头部指针;如果已满,写入操作可能会阻塞,等待空间释放,或者根据设计选择丢弃新数据。 3. ...

    Disruptor并发框架中文参考文档

    **Disruptor** 的核心在于它的Ring Buffer设计,这是一种特殊的循环数组结构,能够高效地支持多个生产者向其中添加数据以及多个消费者从中读取数据的过程,而无需使用传统的锁机制。下面将详细介绍Disruptor的工作...

    stm32f103实现ringbuffer

    环形缓冲区(Ring Buffer)在嵌入式系统,尤其是基于STM32F103的微控制器项目中,是一种非常实用的数据管理结构。它在处理串口通信时发挥着关键作用,因为串口通常用于设备间的通信,而数据的接收和发送必须高效且...

    ring_buffer.c

    做个记录,需要的人也可以参考。这个是Linux上的实现参考。

    ringbuffer之C实现32位Debug库.rar

    环形缓冲区(Ring Buffer),又称为循环缓冲区,是一种数据结构,用于在有限的内存空间内高效地处理数据的存储和读取。在嵌入式系统、网络通信、多线程编程等领域,环形缓冲区的应用十分广泛,因为它能够有效地解决...

    ring_buffer

    环形缓冲区(Ring Buffer),又称为循环缓冲区或双端队列,是计算机科学中一种常见的数据结构,尤其在实时系统和并发编程中被广泛使用。它是一种线性缓冲区,具有首尾相连的特性,使得数据可以像在环上一样流动。...

    环形缓冲区(RingBuffer)源码

    环形缓冲区(RingBuffer),又称为循环缓冲区,是一种高效的数据处理机制,常用于多线程通信、数据采集系统以及内存管理等多个领域。它的设计理念是利用固定大小的内存空间,通过指针的循环移动来实现数据的存取,...

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    - 一个简单的Disruptor示例通常包括创建Disruptor对象、初始化Ring Buffer、设置Producer和Consumer,以及启动处理循环。通过示例,开发者可以快速上手,理解Disruptor的工作流程。 综上所述,LMAX Disruptor是一...

    RingBuffer.c

    RingBuffer.c

    ringbuffer:固定大小,异常安全的STL样式循环缓冲区的C ++ 1114实现,用于单线程和多线程上下文

    环形缓冲区线程安全和非线程安全的固定大小,模板化,STL样式的atomic_ringbuffer.hpp缓冲区(分别为atomic_ringbuffer.hpp和ringbuffer.hpp )的单头C ++实现。 两种版本均完全支持RAII / RRID,并在可能的情况下...

    适用于单片机的ringbuffer

    适用于单片机的ringbuffer

Global site tag (gtag.js) - Google Analytics