`
sillycat
  • 浏览: 2542948 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

High Performance(3)Disruptor Examples and Multi Threads

 
阅读更多

High Performance(3)Disruptor Examples and Multi Threads

We need to use multiple threads in the consumers part.

To support the work pool I change the handlers as follow:
package com.sillycat.easydisruptor.consumer
import com.lmax.disruptor.WorkHandler
import com.sillycat.easydisruptor.model.LongEvent

class APNSWorkHandler extends WorkHandler[LongEvent]{
  def onEvent(event: LongEvent): Unit = {
    Thread.sleep(40)
    println(Thread.currentThread().getName + " APNS Event(" + event.number +"): " + (System.currentTimeMillis() - event.time)/1000 + " s")
  }
}

package com.sillycat.easydisruptor.consumer
import com.lmax.disruptor.WorkHandler
import com.sillycat.easydisruptor.model.LongEvent

class GCMWorkHandler extends WorkHandler[LongEvent]{
  def onEvent(event:LongEvent): Unit ={
    Thread.sleep(40)
    println(Thread.currentThread().getName + " GCM  Event(" + event.number +"): " + (System.currentTimeMillis() - event.time) / 1000 + " s")
  }
}

package com.sillycat.easydisruptor.consumer
import com.lmax.disruptor.WorkHandler
import com.sillycat.easydisruptor.model.LongEvent

class LongEventWorkHandler extends WorkHandler[LongEvent]{
  def onEvent(event:LongEvent): Unit ={
    Thread.sleep(30)
    println(Thread.currentThread().getName + " logging Event(" + event.number +"): " + (System.currentTimeMillis() - event.time)/1000 + " s")
  }
}

Run it as work pool
package com.sillycat.easydisruptor

import java.util.concurrent.Executors

import com.lmax.disruptor.{WorkHandler, SleepingWaitStrategy}
import com.lmax.disruptor.dsl.{ProducerType, Disruptor}
import com.sillycat.easydisruptor.consumer._
import com.sillycat.easydisruptor.factory.LongEventFactory
import com.sillycat.easydisruptor.model.{Message, LongEvent}
import com.sillycat.easydisruptor.producer.LongEventProducer

/**
 * Created by carl on 8/7/14.
 */
object MultiHandlerEventApp extends App {

  val numberOfWorkers = 10

  val executor = Executors.newCachedThreadPool()
  val factory = new LongEventFactory()

  // Specify the size of the ring buffer, must be power of 2.
  //1024*4 = 110 seconds
  //1024   = 112 seconds
  val bufferSize = 1024


  val disruptor = new Disruptor[LongEvent](factory, bufferSize, executor, ProducerType.SINGLE, new SleepingWaitStrategy())
  //set consumer/handler
  val logWorkHandler = new Array[WorkHandler[LongEvent]](numberOfWorkers)
  for(a <- 0 to numberOfWorkers - 1){
    logWorkHandler(a) = new LongEventWorkHandler
  }

  val gcmWorkHandler = new Array[WorkHandler[LongEvent]](numberOfWorkers)
  for(a <- 0 to numberOfWorkers - 1){
    gcmWorkHandler(a) = new GCMWorkHandler
  }

  val apnsWorkHandler = new Array[WorkHandler[LongEvent]](numberOfWorkers)
  for(a <- 0 to numberOfWorkers - 1){
    apnsWorkHandler(a) = new APNSWorkHandler
  }

  disruptor.handleEventsWithWorkerPool(logWorkHandler:_*)
    .thenHandleEventsWithWorkerPool(apnsWorkHandler:_*)
    .thenHandleEventsWithWorkerPool(gcmWorkHandler: _*)

  val ringBuffer = disruptor.start()
  //fetch the ringBuffer, then producer can use it

  val producer = new LongEventProducer(ringBuffer)

  val beginTime = System.currentTimeMillis()
  for (a <- 1 to 2000) {
    val message = Message(beginTime, a)
    producer.onData(message)
  }
}

After change to multiple threads.

The performance improve a lot.


References:
https://code.google.com/p/disruptor/source/browse/trunk/code/src/perf/?r=421#perf%2Fcom%2Flmax%2Fdisruptor
https://github.com/LMAX-Exchange/disruptor/tree/master/src/perftest/java/com/lmax/disruptor/workhandler

http://ziyue1987.github.io/pages/2013/09/22/disruptor-use-manual.html

https://github.com/LMAX-Exchange/disruptor/wiki/Disruptor-Wizard#dependencies

分享到:
评论

相关推荐

    Disruptor3.x Disruptor使用方式

    Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...

    串行io disruptor

    《Disruptor:高性能的并发数据交换机制》 在构建高效率金融交易平台LMAX的过程中,开发者面临了如何优化数据在并发线程间交换的问题。传统的队列方式在处理并发数据交换时,其延迟成本与磁盘I/O操作相当,这对追求...

    Disruptor并发框架中文参考文档

    ### Disruptor并发框架知识点详解 #### 一、Disruptor简介及原理 **Disruptor** 是一款高性能、低延迟的并发框架,它通过无锁设计实现了高效的队列操作,从而大大提升了多线程环境下的性能表现。该框架于2011年...

    springboot整合Disruptor并发编程框架 #资源达人分享计划#

    SpringBoot整合Disruptor并发编程框架是针对高并发场景下性能优化的一种技术实践。Disruptor是由LMAX公司开发的一款高性能、低延迟的并发工具,它通过消除线程间的锁竞争,大大提升了多线程环境下的处理速度。...

    Disruptor demo

    3. **事件处理(Event Processing)**:在Disruptor中,数据处理过程被转化为事件的发布和消费。生产者发布事件到环形缓冲区,消费者通过回调函数消费事件。这种模式使得数据处理与发布解耦,提高了系统的可扩展性。...

    DisruptorDemo.zip

    《Disruptor技术详解——基于DisruptorDemo.zip实例解析》 Disruptor,由LMAX公司开发并开源,是一款高性能、低延迟的并发工具,主要用于优化多线程间的通信。它采用一种环形缓冲区(Ring Buffer)的设计,极大地...

    spring-boot-starter-disruptor.zip

    《Spring Boot Starter Disruptor深度解析》 在现代软件开发中,高性能和低延迟往往是系统设计的关键要素。Spring Boot作为Java领域最受欢迎的微服务框架,提供了丰富的启动器(starters)来简化开发工作。"spring-...

    disruptor-3.4.4.jar disruptor 3.4.4 jar 官方github下载

    disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)

    Disruptor学习(1)

    Disruptor-examples这个压缩包文件很可能是Disruptor的示例代码,包括了各种应用场景的实现,如简单的生产者消费者模型、多级处理链等,通过这些示例,我们可以更直观地理解Disruptor如何在实际中应用。 总的来说,...

    Disruptor C++版(仅支持单生产者)

    无锁队列是Disruptor中的关键数据结构,通过使用CAS(Compare and Swap)操作,能够在不使用锁的情况下保证数据的一致性和完整性。这种方法减少了上下文切换和竞态条件,从而提升了并发性能。 在Windows环境下,...

    disruptor-3.3.8.jar

    Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.&lt;init&gt;(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...

    disruptor 多个消费者

    3. **环形缓冲区与事件路由**:Disruptor的环形缓冲区通过一个称为“SequenceBarrier”的组件来管理多个消费者的同步。每个消费者有一个自己的Sequence,SequenceBarrier会等待所有前序事件被处理后再允许消费者继续...

    disruptor jar包+Demo+Api

    《Disruptor 框架详解与应用实例》 Disruptor 是一款高性能的并发工具库,由英国的 LMAX 公司开发并开源。它主要用于优化多线程环境下的数据共享,尤其在金融交易系统中表现卓越。Disruptor 的核心设计是一个环形...

    disruptor-3.2.1源码带jar包20140321

    6. **无锁和CAS操作**:Disruptor利用Java的无锁编程和Compare-and-Swap (CAS)原语,避免了传统锁带来的性能瓶颈,提高了并发效率。 在"disruptor-3.2.1源码带jar包20140321"这个资源中,包含了Disruptor的源代码,...

    Netty 使用Disruptor机制的处理源代码

    3. **预定义的序列号**:Disruptor 提供全局唯一的序列号,确保消息的正确顺序。 在 Netty 中,Disruptor 可以作为事件处理器链的一部分,优化事件的发布和消费。通常,Netty 的 ChannelHandler 链会处理进来的网络...

    disruptor案例加简单说明

    简单讲解disruptor并附上demo

    Disruptor应用实例

    《Disruptor应用实例》 Disruptor是高性能并发编程领域的一个重要工具,由LMAX公司开发并开源,主要用于优化多线程环境下的数据处理。它通过一种创新的数据同步方式,极大地提升了系统的吞吐量和响应速度。在本文中...

    disruptor-3.3.0-API文档-中文版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

Global site tag (gtag.js) - Google Analytics