这个是最新的 disruptor3的例子....来自官方代码稍微简化后的
package io.grass.core.collect; import static com.lmax.disruptor.RingBuffer.createSingleProducer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.util.PaddedLong; /** * 简单测试 * * @author zuoge85 * */ public class DisruptorBaseTest { protected static final Logger log = LoggerFactory.getLogger(DisruptorBaseTest.class); private static final int THREAD_NUMS = 1; private static final int BUFFER_SIZE = 1024 * 8; private static final long NUMS = 1000_000_00L; public static void main(String[] args) throws InterruptedException { RingBuffer<MessageEvent> ringBuffer = createSingleProducer( MessageEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy()); ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMS); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); MessageMutationEventHandler[] handlers = new MessageMutationEventHandler[THREAD_NUMS]; BatchEventProcessor<?>[] batchEventProcessors = new BatchEventProcessor[THREAD_NUMS]; for (int i = 0; i < THREAD_NUMS; i++) { handlers[i] = new MessageMutationEventHandler(); batchEventProcessors[i] = new BatchEventProcessor<MessageEvent>( ringBuffer, sequenceBarrier, handlers[i]); ringBuffer .addGatingSequences(batchEventProcessors[i].getSequence()); } CountDownLatch latch = new CountDownLatch(THREAD_NUMS); for (int i = 0; i < THREAD_NUMS; i++) { long n = batchEventProcessors[i].getSequence().get() + NUMS; System.out.println(n +" " +NUMS+" "+batchEventProcessors[i].getSequence().get() ); handlers[i].reset(latch, n); executors.submit(batchEventProcessors[i]); } long start = System.currentTimeMillis(); for (long i = 0; i < NUMS; i++) { long sequence = ringBuffer.next(); ringBuffer.get(sequence).setValue(i); ringBuffer.publish(sequence); } latch.await(); long opsPerSecond = (NUMS * 1000L) / (System.currentTimeMillis() - start); for (int i = 0; i < THREAD_NUMS; i++) { batchEventProcessors[i].halt(); if ((NUMS - 1) == handlers[i].getValue()) { } else { log.error("error"); } } executors.shutdown(); log.info(String.format("Run %d, Disruptor=%,d ops/sec%n", 1, opsPerSecond)); } public static final class MessageMutationEventHandler implements EventHandler<MessageEvent> { private final PaddedLong value = new PaddedLong(); private long count; private CountDownLatch latch; public MessageMutationEventHandler() { } public long getValue() { return value.get(); } public void reset(final CountDownLatch latch, final long expectedCount) { value.set(0L); this.latch = latch; count = expectedCount; } @Override public void onEvent(final MessageEvent event, final long sequence, final boolean endOfBatch) throws Exception { //log.info("onEvent:{}",event.getValue()); value.set(event.getValue()); if (count == sequence) { latch.countDown(); } } } public static final class MessageEvent { private long value; public long getValue() { return value; } public void setValue(final long value) { this.value = value; } public final static EventFactory<MessageEvent> EVENT_FACTORY = new EventFactory<MessageEvent>() { public MessageEvent newInstance() { return new MessageEvent(); } }; } }
相关推荐
Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...
《Disruptor技术详解——基于DisruptorDemo.zip实例解析》 Disruptor,由LMAX公司开发并开源,是一款高性能、低延迟的并发工具,主要用于优化多线程间的通信。它采用一种环形缓冲区(Ring Buffer)的设计,极大地...
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包
Disruptor是一个高性能的线程间通信框架,由LMAX公司开发,并且广泛应用于需要高效并发处理的场景。它的核心是基于Lock-free算法的Ringbuffer,相比于传统的BlockingQueue,Disruptor能提供显著的性能提升。 1. **...
3. **预处理与后处理**:Disruptor支持预处理器和后处理器,可以在事件被放入或取出缓冲区时进行额外的操作,而不影响主线程的执行。 三、Spring框架中的Disruptor集成 虽然Disruptor本身并不直接与Spring框架集成...
Disruptor是一个高性能的线程间通信框架,由LMAX公司开发,并且广泛应用于需要高效并发处理的场景。它的核心是基于Lock-free算法的Ringbuffer,这个数据结构比传统的BlockingQueue在性能上有显著优势。 首先,...
disruptor-3.4.2.jar
disruptor-3.3.11.jar 无锁并行框架 值得学习 jar包
java运行依赖jar包
3. **配置错误**:确认是否为Disruptor配置了自定义的`ExceptionHandler`。如果没有,可以考虑添加一个,以便在出现异常时进行适当的错误处理,而不是依赖默认的`FatalExceptionHandler`。 4. **资源泄露**:检查...
文件列表中的"java0323"可能是项目源代码的一部分,包含了从3月23日更新或编译的Java代码。这些代码可能涵盖了服务器框架的主要模块,如网络通信模块(使用Netty实现)、数据序列化模块(使用ProtoBuf实现)和并发...
在Spring Boot应用中,我们不再需要关心Disruptor的复杂初始化和配置,只需要简单地声明依赖,即可开始享受Disruptor带来的性能提升。 启动器中包含了以下关键组件: 1. **自动配置**:Spring Boot Starter ...
disruptor.jar 2018最新版本(包含disruptor-3.4.1.jar、disruptor-3.4.1-sources.jar、disruptor-3.4.1-javadoc.jar)
LMAX是一种新型零售金融交 易平台,它能够以很低的延迟产生大量交易。这个系统是建立在JVM平台上,其核心是一个业务逻辑处理 器,它能够在一个线程里每秒处理6百万订单。...业务逻辑处理器的核心是Disruptor。
Disruptor阅读笔记.md