High Performance(3)Disruptor Examples and Multi Threads
We need to use multiple threads in the consumers part.
To support the work pool I change the handlers as follow:
package com.sillycat.easydisruptor.consumer
import com.lmax.disruptor.WorkHandler
import com.sillycat.easydisruptor.model.LongEvent
class APNSWorkHandler extends WorkHandler[LongEvent]{
def onEvent(event: LongEvent): Unit = {
Thread.sleep(40)
println(Thread.currentThread().getName + " APNS Event(" + event.number +"): " + (System.currentTimeMillis() - event.time)/1000 + " s")
}
}
package com.sillycat.easydisruptor.consumer
import com.lmax.disruptor.WorkHandler
import com.sillycat.easydisruptor.model.LongEvent
class GCMWorkHandler extends WorkHandler[LongEvent]{
def onEvent(event:LongEvent): Unit ={
Thread.sleep(40)
println(Thread.currentThread().getName + " GCM Event(" + event.number +"): " + (System.currentTimeMillis() - event.time) / 1000 + " s")
}
}
package com.sillycat.easydisruptor.consumer
import com.lmax.disruptor.WorkHandler
import com.sillycat.easydisruptor.model.LongEvent
class LongEventWorkHandler extends WorkHandler[LongEvent]{
def onEvent(event:LongEvent): Unit ={
Thread.sleep(30)
println(Thread.currentThread().getName + " logging Event(" + event.number +"): " + (System.currentTimeMillis() - event.time)/1000 + " s")
}
}
Run it as work pool
package com.sillycat.easydisruptor
import java.util.concurrent.Executors
import com.lmax.disruptor.{WorkHandler, SleepingWaitStrategy}
import com.lmax.disruptor.dsl.{ProducerType, Disruptor}
import com.sillycat.easydisruptor.consumer._
import com.sillycat.easydisruptor.factory.LongEventFactory
import com.sillycat.easydisruptor.model.{Message, LongEvent}
import com.sillycat.easydisruptor.producer.LongEventProducer
/**
* Created by carl on 8/7/14.
*/
object MultiHandlerEventApp extends App {
val numberOfWorkers = 10
val executor = Executors.newCachedThreadPool()
val factory = new LongEventFactory()
// Specify the size of the ring buffer, must be power of 2.
//1024*4 = 110 seconds
//1024 = 112 seconds
val bufferSize = 1024
val disruptor = new Disruptor[LongEvent](factory, bufferSize, executor, ProducerType.SINGLE, new SleepingWaitStrategy())
//set consumer/handler
val logWorkHandler = new Array[WorkHandler[LongEvent]](numberOfWorkers)
for(a <- 0 to numberOfWorkers - 1){
logWorkHandler(a) = new LongEventWorkHandler
}
val gcmWorkHandler = new Array[WorkHandler[LongEvent]](numberOfWorkers)
for(a <- 0 to numberOfWorkers - 1){
gcmWorkHandler(a) = new GCMWorkHandler
}
val apnsWorkHandler = new Array[WorkHandler[LongEvent]](numberOfWorkers)
for(a <- 0 to numberOfWorkers - 1){
apnsWorkHandler(a) = new APNSWorkHandler
}
disruptor.handleEventsWithWorkerPool(logWorkHandler:_*)
.thenHandleEventsWithWorkerPool(apnsWorkHandler:_*)
.thenHandleEventsWithWorkerPool(gcmWorkHandler: _*)
val ringBuffer = disruptor.start()
//fetch the ringBuffer, then producer can use it
val producer = new LongEventProducer(ringBuffer)
val beginTime = System.currentTimeMillis()
for (a <- 1 to 2000) {
val message = Message(beginTime, a)
producer.onData(message)
}
}
After change to multiple threads.
The performance improve a lot.
References:
https://code.google.com/p/disruptor/source/browse/trunk/code/src/perf/?r=421#perf%2Fcom%2Flmax%2Fdisruptor
https://github.com/LMAX-Exchange/disruptor/tree/master/src/perftest/java/com/lmax/disruptor/workhandler
http://ziyue1987.github.io/pages/2013/09/22/disruptor-use-manual.html
https://github.com/LMAX-Exchange/disruptor/wiki/Disruptor-Wizard#dependencies
- 浏览: 2542948 次
- 性别:
- 来自: 成都
文章分类
最新评论
-
nation:
你好,在部署Mesos+Spark的运行环境时,出现一个现象, ...
Spark(4)Deal with Mesos -
sillycat:
AMAZON Relatedhttps://www.godad ...
AMAZON API Gateway(2)Client Side SSL with NGINX -
sillycat:
sudo usermod -aG docker ec2-use ...
Docker and VirtualBox(1)Set up Shared Disk for Virtual Box -
sillycat:
Every Half an Hour30 * * * * /u ...
Build Home NAS(3)Data Redundancy -
sillycat:
3 List the Cron Job I Have>c ...
Build Home NAS(3)Data Redundancy
发表评论
-
Update Site will come soon
2021-06-02 04:10 1672I am still keep notes my tech n ... -
Hadoop Docker 2019 Version 3.2.1
2019-12-10 07:39 289Hadoop Docker 2019 Version 3.2. ... -
Nginx and Proxy 2019(1)Nginx Enable Lua and Parse JSON
2019-12-03 04:17 441Nginx and Proxy 2019(1)Nginx En ... -
Data Solution 2019(13)Docker Zeppelin Notebook and Memory Configuration
2019-11-09 07:15 284Data Solution 2019(13)Docker Ze ... -
Data Solution 2019(10)Spark Cluster Solution with Zeppelin
2019-10-29 08:37 245Data Solution 2019(10)Spark Clu ... -
AMAZON Kinesis Firehose 2019(1)Firehose Buffer to S3
2019-10-01 10:15 315AMAZON Kinesis Firehose 2019(1) ... -
Rancher and k8s 2019(3)Clean Installation on CentOS7
2019-09-19 23:25 308Rancher and k8s 2019(3)Clean In ... -
Pacemaker 2019(1)Introduction and Installation on CentOS7
2019-09-11 05:48 336Pacemaker 2019(1)Introduction a ... -
Crontab-UI installation and Introduction
2019-08-30 05:54 447Crontab-UI installation and Int ... -
Spiderkeeper 2019(1)Installation and Introduction
2019-08-29 06:49 495Spiderkeeper 2019(1)Installatio ... -
Supervisor 2019(2)Ubuntu and Multiple Services
2019-08-19 10:53 366Supervisor 2019(2)Ubuntu and Mu ... -
Supervisor 2019(1)CentOS 7
2019-08-19 09:33 325Supervisor 2019(1)CentOS 7 Ins ... -
Redis Cluster 2019(3)Redis Cluster on CentOS
2019-08-17 04:07 367Redis Cluster 2019(3)Redis Clus ... -
Amazon Lambda and Version Limit
2019-08-02 01:42 433Amazon Lambda and Version Limit ... -
MySQL HA Solution 2019(1)Master Slave on MySQL 5.7
2019-07-27 22:26 514MySQL HA Solution 2019(1)Master ... -
RabbitMQ Cluster 2019(2)Cluster HA and Proxy
2019-07-11 12:41 456RabbitMQ Cluster 2019(2)Cluster ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:35 318Running Zeppelin with Nginx Aut ... -
Running Zeppelin with Nginx Authentication
2019-05-25 21:34 316Running Zeppelin with Nginx Aut ... -
ElasticSearch(3)Version Upgrade and Cluster
2019-05-20 05:00 322ElasticSearch(3)Version Upgrade ... -
Jetty Server and Cookie Domain Name
2019-04-28 23:59 396Jetty Server and Cookie Domain ...
相关推荐
Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
《Disruptor:高性能的并发数据交换机制》 在构建高效率金融交易平台LMAX的过程中,开发者面临了如何优化数据在并发线程间交换的问题。传统的队列方式在处理并发数据交换时,其延迟成本与磁盘I/O操作相当,这对追求...
### Disruptor并发框架知识点详解 #### 一、Disruptor简介及原理 **Disruptor** 是一款高性能、低延迟的并发框架,它通过无锁设计实现了高效的队列操作,从而大大提升了多线程环境下的性能表现。该框架于2011年...
SpringBoot整合Disruptor并发编程框架是针对高并发场景下性能优化的一种技术实践。Disruptor是由LMAX公司开发的一款高性能、低延迟的并发工具,它通过消除线程间的锁竞争,大大提升了多线程环境下的处理速度。...
3. **事件处理(Event Processing)**:在Disruptor中,数据处理过程被转化为事件的发布和消费。生产者发布事件到环形缓冲区,消费者通过回调函数消费事件。这种模式使得数据处理与发布解耦,提高了系统的可扩展性。...
《Disruptor技术详解——基于DisruptorDemo.zip实例解析》 Disruptor,由LMAX公司开发并开源,是一款高性能、低延迟的并发工具,主要用于优化多线程间的通信。它采用一种环形缓冲区(Ring Buffer)的设计,极大地...
《Spring Boot Starter Disruptor深度解析》 在现代软件开发中,高性能和低延迟往往是系统设计的关键要素。Spring Boot作为Java领域最受欢迎的微服务框架,提供了丰富的启动器(starters)来简化开发工作。"spring-...
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
Disruptor-examples这个压缩包文件很可能是Disruptor的示例代码,包括了各种应用场景的实现,如简单的生产者消费者模型、多级处理链等,通过这些示例,我们可以更直观地理解Disruptor如何在实际中应用。 总的来说,...
无锁队列是Disruptor中的关键数据结构,通过使用CAS(Compare and Swap)操作,能够在不使用锁的情况下保证数据的一致性和完整性。这种方法减少了上下文切换和竞态条件,从而提升了并发性能。 在Windows环境下,...
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
3. **环形缓冲区与事件路由**:Disruptor的环形缓冲区通过一个称为“SequenceBarrier”的组件来管理多个消费者的同步。每个消费者有一个自己的Sequence,SequenceBarrier会等待所有前序事件被处理后再允许消费者继续...
《Disruptor 框架详解与应用实例》 Disruptor 是一款高性能的并发工具库,由英国的 LMAX 公司开发并开源。它主要用于优化多线程环境下的数据共享,尤其在金融交易系统中表现卓越。Disruptor 的核心设计是一个环形...
6. **无锁和CAS操作**:Disruptor利用Java的无锁编程和Compare-and-Swap (CAS)原语,避免了传统锁带来的性能瓶颈,提高了并发效率。 在"disruptor-3.2.1源码带jar包20140321"这个资源中,包含了Disruptor的源代码,...
3. **预定义的序列号**:Disruptor 提供全局唯一的序列号,确保消息的正确顺序。 在 Netty 中,Disruptor 可以作为事件处理器链的一部分,优化事件的发布和消费。通常,Netty 的 ChannelHandler 链会处理进来的网络...
简单讲解disruptor并附上demo
《Disruptor应用实例》 Disruptor是高性能并发编程领域的一个重要工具,由LMAX公司开发并开源,主要用于优化多线程环境下的数据处理。它通过一种创新的数据同步方式,极大地提升了系统的吞吐量和响应速度。在本文中...
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...