事件对象:
/**
* POJO
* @author lenovoe
*
*/
public class ValueEvent {
private long value;
public long getValue()
{
return value;
}
public void setValue(final long value)
{
this.value = value;
}
public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()
{
public ValueEvent newInstance()
{
return new ValueEvent();
}
};
}
事件生产者:
/**
* 生产者
* @author lenovoe
*
*/
public class Producer implements Runnable{
private RingBuffer<ValueEvent> ringBuffer = null;
public Producer(RingBuffer<ValueEvent> rb) {
ringBuffer = rb;
}
public void run() {
// Publishers claim events in sequence
long sequence = ringBuffer.next();
ValueEvent event = ringBuffer.get(sequence);
event.setValue(1234); // this could be more complex with multiple fields
// make the event available to EventProcessors
ringBuffer.publish(sequence);
}
}
事件消费者:
/**
* 处理RingBuffer中的事件对象
* @author lenovoe
*
*/
public class ConsumeEventHandler implements EventHandler<ValueEvent>,LifecycleAware{
public void onEvent(ValueEvent event, long sequence, boolean endOfBatch) throws Exception {
// TODO Auto-generated method stub
System.out.println("处理事件对象:"+event.getValue());
//Thread.sleep(2000);
}
public void onStart() {
// TODO Auto-generated method stub
System.out.println("开始处理事件");
}
public void onShutdown() {
// TODO Auto-generated method stub
System.out.println("结束处理事件");
}
}
测试类:
/**
* 测试类
* @author lenovoe
*
*/
public class TestShow {
//number of elements to create within the ring buffer
private static final int BUFFER_SIZE = 16;
//JDK 创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
private final ExecutorService EXECUTOR = Executors .newSingleThreadExecutor();
//单生产者,策略使用YieldingWaitStrategy
private final RingBuffer<ValueEvent> ringBuffer = RingBuffer.create(ProducerType.SINGLE, ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());
//游标
private final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
//消费者
private final ConsumeEventHandler handler = new ConsumeEventHandler();
private final BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(
ringBuffer, sequenceBarrier, handler);
public TestShow() {
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
//2X版本:ringBuffer.setGatingSequences(batchEventProcessor.getSequence());
}
public void consume() {
EXECUTOR.submit(batchEventProcessor);
}
public void produce() {
new Thread(new Producer(ringBuffer)).start();
}
public void shutdown() {
EXECUTOR.shutdown();
}
public static void main(String[] args) throws InterruptedException {
TestShow test = new TestShow();
test.produce();
test.produce();
test.produce();
test.consume();
test.shutdown();
Thread.sleep(5000);
System.exit(0);
}
}
分享到:
相关推荐
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
##### 3.2 通过Axon和Disruptor处理1Mtps Axon是一个基于CQRS和事件溯源原则的框架,它可以与Disruptor结合使用来处理大规模的数据流。通过整合Axon和Disruptor,可以构建出能够处理每秒100万条事务(1Mtps)的高...
《Disruptor应用实例》 Disruptor是高性能并发编程领域的一个重要工具,由LMAX公司开发并开源,主要用于优化多线程环境下的数据处理。它通过一种创新的数据同步方式,极大地提升了系统的吞吐量和响应速度。在本文中...
SpringBoot整合Disruptor并发编程框架是针对高并发场景下性能优化的一种技术实践。Disruptor是由LMAX公司开发的一款高性能、低延迟的并发工具,它通过消除线程间的锁竞争,大大提升了多线程环境下的处理速度。...
《Spring Boot Starter Disruptor深度解析》 在现代软件开发中,高性能和低延迟往往是系统设计的关键要素。Spring Boot作为Java领域最受欢迎的微服务框架,提供了丰富的启动器(starters)来简化开发工作。"spring-...
Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
`DisruptorConceptProofTest.java` 是一个非官方的示例代码,它通常会展示如何使用 Disruptor 创建事件处理链路,以及如何发布和消费事件。这个测试类可以让我们了解 Disruptor 的基本用法和概念验证。 `...
Disruptor是一款高性能的并发框架,它通过使用Ring Buffer和基于事件的处理方式来消除锁竞争,提升系统性能。在使用Disruptor过程中,开发者可能会遇到`FatalExceptionHandler`的错误,这通常是由于处理流程中的异常...
《Disruptor技术详解——基于DisruptorDemo.zip实例解析》 Disruptor,由LMAX公司开发并开源,是一款高性能、低延迟的并发工具,主要用于优化多线程间的通信。它采用一种环形缓冲区(Ring Buffer)的设计,极大地...
Disruptor是一款高性能的并发工具库,由LMAX公司开发并开源,主要应用于高频率交易系统。它通过优化线程间通信的方式,极大地提升了多线程环境下的数据处理速度。Disruptor的设计理念是避免传统的锁机制,转而采用一...
Disruptor是由LMAX公司开发的一种高性能的并发编程框架,主要应用于金融交易系统。它通过优化数据共享方式,显著提高了多线程环境下的处理速度。在"Disruptor 多个消费者"的场景中,我们可以深入理解Disruptor如何...
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
Disruptor是由LMAX公司开发的一种高性能的并发编程框架,主要应用于金融交易系统。它以其高效、低延迟的事件处理机制而闻名。在C++版本的Disruptor中,我们同样可以享受到这种高效的并发能力,尤其适用于需要大量...
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
本文将深入探讨Disruptor的核心原理,并通过一个实际的例子——testMyDisruptor,来展示如何在应用中有效利用Disruptor实现高效的缓冲队列。 首先,我们要理解Disruptor的核心设计理念。传统的并发编程往往依赖于锁...
LMAX Disruptor是一款高性能的消息处理框架,由LMAX公司开发并开源,它在金融交易领域有着广泛的应用。Disruptor的设计目标是解决多线程环境下的数据共享问题,通过优化并发性能,实现极低的延迟和高吞吐量。在Java...
disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...