LMAX 开源了一个高性能并发编程框架。可以理解为消费者-生产者的消息发布订阅模式。本文下载了官方示例代码,进行实验。
longEvent事件数据
public class LongEvent { private long value; public void set(long value) { this.value = value; } public long get(){ return this.value; } }
LongEventFactory事件工厂
import com.lmax.disruptor.EventFactory; /** * 事件生产工厂 * @author wanghao * */ public class LongEventFactory implements EventFactory<LongEvent> { @Override public LongEvent newInstance() { return new LongEvent(); } }
LongEventProducer事件生产者
import java.nio.ByteBuffer; import com.lmax.disruptor.RingBuffer; /** * 生产者,生产longEvent事件 * @author harry * */ public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void product(ByteBuffer bb) { long sequence = ringBuffer.next(); // Grab the next sequence try { LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.set(bb.getLong(0)); // Fill with data } finally { ringBuffer.publish(sequence); } } }
RingBuffer是消息存储结构,为环形存储结构,每个单元存储一条消息。类似于队列。当ringbuffer中数据填满后,环就会阻塞,等待消费者消费掉数据。当所有消费者消费掉环中一个数据,新的消息才可以加入环中。每个环插入数据后,都会分配下一个位置的编号,即sequence 。
消息者事件处理器
为消费者消费处理器,这处需要执行速度足够快。否则,会影响ringbuffer后续没空间加入新的数据。因此,不能做业务耗时操作。建议另外开始java 线程池处理消息。
import com.lmax.disruptor.EventHandler; /** * 消息者事件处理器,打印输出到控制台 * @author harry * */ public class LongEventHandler implements EventHandler<LongEvent>{ public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("consumer:"+Thread.currentThread().getName()+" Event: value=" + event.get()+",sequence="+sequence+",endOfBatch="+endOfBatch); } }
LongEventProducerWithTranslator
import java.nio.ByteBuffer; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * post-3.0 the preferred approach for publishing messages is * via the Event Publisher/Event Translator portion of the API. E.g. * @author harry * */ public class LongEventProducerWithTranslator { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { event.set(bb.getLong(0)); } }; public void product(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } }
translateTo方法将ringbuffer中的消息,转换成java对象格式。示例 为LongEvent对象,后续消费者LongEventHandler 处理器,直接操作LongEvent对象,获取消息各属性信息,本示例 为value属性。
product方法,将生产者生产的消息放入ringbuffer中。
LongEventMain
消费者-生产者启动类,其依靠构造Disruptor对象,调用start()方法完成启动线程。Disruptor 需要ringbuffer环,消费者数据处理工厂,WaitStrategy等
ByteBuffer 类字节buffer,用于包装消息。
ProducerType.SINGLE为单线程 ,可以提高性能。
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class LongEventMain { @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { // 执行器,用于构造消费者线程 Executor executor = Executors.newCachedThreadPool(); // 指定事件工厂 LongEventFactory factory = new LongEventFactory(); // 指定 ring buffer字节大小, must be power of 2. int bufferSize = 1024; //单线程模式,获取额外的性能 Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize,executor, ProducerType.SINGLE, new BlockingWaitStrategy()); //设置事件业务处理器---消费者 disruptor.handleEventsWith(new LongEventHandler()); //启动disruptor线程 disruptor.start(); // 获取 ring buffer环,用于接取生产者生产的事件 RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); //为 ring buffer指定事件生产者 //LongEventProducer producer = new LongEventProducer(ringBuffer); LongEventProducerWithTranslator producer=new LongEventProducerWithTranslator(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8);//预置8字节长整型字节缓存 for (long l = 0; true; l++) { bb.putLong(0, l); producer.product(bb);//生产者生产数据 Thread.sleep(1000); } } }
实验结果:
consumer:pool-1-thread-1 Event: value=0,sequence=0,endOfBatch=true
consumer:pool-1-thread-1 Event: value=1,sequence=1,endOfBatch=true
consumer:pool-1-thread-1 Event: value=2,sequence=2,endOfBatch=true
consumer:pool-1-thread-1 Event: value=3,sequence=3,endOfBatch=true
consumer:pool-1-thread-1 Event: value=4,sequence=4,endOfBatch=true
consumer:pool-1-thread-1 Event: value=5,sequence=5,endOfBatch=true
consumer:pool-1-thread-1 Event: value=6,sequence=6,endOfBatch=true
Event: value = 为消费者接收到的数据,sequence为数据在ringbuffer环的位置。
相关推荐
业务逻辑处理器完全是运行在内存中,使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 ...
以下是一个简单的Disruptor使用示例流程: 1. **定义Event类**:根据需要处理的数据类型定义具体的Event类。 2. **定义Event工厂类**:创建Event实例对象的工厂类。 3. **定义Event监听类**:实现具体的业务逻辑...
文件名 "trapos-master" 可能是指一个交易系统的源代码仓库,其中可能包含了 Netty 和 Disruptor 结合使用的示例。在这样的系统中,Disruptor 可能被用来处理实时外汇报价,因为外汇交易对延迟非常敏感,需要高效且...
#### 四、Disruptor的应用示例 ##### 3.1 LMAX的架构 LMAX是一家零售金融服务提供商,其交易平台基于Disruptor构建。LMAX的业务逻辑处理器能够在单个线程中每秒处理600万笔订单,这得益于Disruptor的高效并发处理...
4. 处理器协作:Disruptor的等待策略(WaitStrategy)可以灵活调整,如使用忙等、多路复用或者阻塞等待,以适应不同的系统需求和资源条件。 5. 监控与优化:Disruptor提供了丰富的监控指标,如事件处理速率、延迟等...
在使用Disruptor C++版时,你可以参考提供的示例代码来理解和实现自己的事件处理流程。需要注意的是,正确配置和优化Disruptor参数,如缓冲区大小、消费者数量等,对性能影响显著。此外,理解并运用Disruptor的事件...
- 一个简单的Disruptor示例通常包括创建Disruptor对象、初始化Ring Buffer、设置Producer和Consumer,以及启动处理循环。通过示例,开发者可以快速上手,理解Disruptor的工作流程。 综上所述,LMAX Disruptor是一...
在"DisruptorDemo.zip"的示例代码中,我们可以看到以下关键类和方法: 1. `DisruptorDemo`:主类,创建Disruptor实例并启动事件处理器。 2. `Event`:自定义事件对象,存储要传递的数据。 3. `EventHandlerImpl`:...
`DisruptorConceptProofTest.java` 是一个非官方的示例代码,它通常会展示如何使用 Disruptor 创建事件处理链路,以及如何发布和消费事件。这个测试类可以让我们了解 Disruptor 的基本用法和概念验证。 `...
Disruptor是一款高性能的并发工具库,由LMAX公司开发并开源,主要...这个示例将帮助你理解Disruptor如何简化并发编程,提高程序运行效率,尤其对于需要处理大量并发请求的高性能应用来说,Disruptor是一个强大的工具。
此外,你还可以通过创建一个简单的示例来实践Disruptor的使用。首先,创建生产者和消费者,然后配置Disruptor实例,设置事件处理器链。运行示例,观察Disruptor如何在多线程环境中高效地处理事件。这不仅加深了你对...
Disruptor-examples这个压缩包文件很可能是Disruptor的示例代码,包括了各种应用场景的实现,如简单的生产者消费者模型、多级处理链等,通过这些示例,我们可以更直观地理解Disruptor如何在实际中应用。 总的来说,...
NULL 博文链接:https://yanbingwei.iteye.com/blog/1985778
Disruptor使用了无锁算法,通过全局唯一的序列号来确保数据的正确性。每个事件在被生产者放入缓冲区和被消费者消费时,都会被赋予一个唯一的序列号,这样可以避免数据的乱序和丢失。 2. **多生产者与单消费者模式*...
本文将深入探讨Disruptor的核心原理,并通过一个实际的例子——testMyDisruptor,来展示如何在应用中有效利用Disruptor实现高效的缓冲队列。 首先,我们要理解Disruptor的核心设计理念。传统的并发编程往往依赖于锁...
网上关于Disruptor的例子大部份是旧版本的, 其中集成spring的更少, 只好自已写个新版本简单的demo了。 该demo利用spring的定时器往Disruptor添加数据, 希望该demo能帮助到大家。
在这个名为"share-disruptor.zip"的压缩包中,包含了Disruptor的基本示例,旨在帮助初学者快速理解并掌握Disruptor的使用。下面将详细介绍Disruptor的关键概念和技术特点: 1. **环形缓冲区(Ring Buffer)**:...
Spring 与 Disruptor 集成的简单示例展示了如何使用 Spring 框架将 Disruptor 库集成到应用程序中,以实现高性能的异步消息处理。该示例 Demonstrate 了 Disruptor 库的高效性和可靠性,能够满足高性能的业务需求。 ...
同时,提供的"一个简单实用disruptor的例子"可以帮助我们更好地理解如何在实际项目中应用Disruptor。 这个例子可能包括以下几个部分: 1. 定义事件:首先,你需要创建一个代表业务逻辑的事件类,这个类将被放入环形...
2. **实战演练**:通过提供的示例代码,了解如何创建和配置Disruptor,以及如何在多线程环境中使用它。 3. **性能测试**:对比使用Disruptor前后的性能,理解其在高并发场景下的优势。 4. **深入研究**:学习...