`

Disruptor PK BlockingQueue

阅读更多

 

package com.disruptor.test3;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;

import org.junit.Test;

public class ArrayBlockingQueueTest {

    @Test
    public void test() throws InterruptedException {

        long cost = System.currentTimeMillis();

        final CountDownLatch l = new CountDownLatch(1);
        final BlockingQueue<Long> bq = new ArrayBlockingQueue<Long>(4096);

        Runnable p = new Runnable() {
            public void run() {
                for (int i = 0; i < ConstantsUtil.MAX_LOOP; i++) {
                    try {
                        bq.put((long) i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        Runnable c = new Runnable() {
            public void run() {
                while (true) {
                    try {
                        long i = bq.take();
                        //System.out.println(i);
                        if (i == ConstantsUtil.MAX_LOOP - 1) {
                            l.countDown();
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        new Thread(c).start();

        new Thread(p).start();

        l.await();
        System.out.println("cost:" + (System.currentTimeMillis() - cost));
    }

}

 

 

package com.disruptor.test3;

public abstract class ConstantsUtil {
    static int MAX_LOOP = 1000000000;
}

 

 

package com.disruptor.test3;

import java.util.concurrent.CountDownLatch;

import org.junit.Test;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;

public class DisruptorTest {

    private static final int BUFFER_SIZE = 4096;

    @Test
    public void test() throws InterruptedException {
        long cost = System.currentTimeMillis();
        CountDownLatch l = new CountDownLatch(1);
        //创建RingBuffer
        RingBuffer<ValueEvent> ringBuffer = RingBuffer.createSingleProducer(
            ValueEvent.EVENT_FACTORY, BUFFER_SIZE, new YieldingWaitStrategy());

        //创建序列栅栏
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();

        //创建消费者
        MyEventHandler handler = new MyEventHandler(l);

        //事件执行者
        BatchEventProcessor<ValueEvent> batchEventProcessor = new BatchEventProcessor<ValueEvent>(
            ringBuffer, sequenceBarrier, handler);

        //序列由栅栏统一计算
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());

        new Thread(batchEventProcessor).start();

        for (long i = 0; i < ConstantsUtil.MAX_LOOP; i++) {
            long next = ringBuffer.next();
            //通过序列从环中,获取消息,没有则由ValueEvent.EVENT_FACTORY工厂创建空事件
            ValueEvent event = ringBuffer.get(next);
            //填充数据
            event.setValue(i);
            //将环中的数据发布出去,发布之后,实际也是直接通过事件消费
            ringBuffer.publish(next);
        }
        l.await();
        System.out.println("cost:" + (System.currentTimeMillis() - cost));
    }
}

 

 

package com.disruptor.test3;

import java.util.concurrent.CountDownLatch;

import com.lmax.disruptor.EventHandler;

public class MyEventHandler implements EventHandler<ValueEvent> {
    private CountDownLatch l;

    public long            count = 0;

    public MyEventHandler() {
    };

    public MyEventHandler(CountDownLatch l) {
        this.l = l;
    };

    public void onEvent(ValueEvent event, long arg1, boolean arg2) throws Exception {
        long i = event.getValue();
        //System.out.println(i);
        if (i == ConstantsUtil.MAX_LOOP - 1) {
            l.countDown();
        }
    }
}

 

 

package com.disruptor.test3;

import com.lmax.disruptor.EventFactory;

public final class ValueEvent {
    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(final long value) {
        this.value = value;
    }

    public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>() {
                                                                   public ValueEvent newInstance() {
                                                                       return new ValueEvent();
                                                                   }
                                                               };
}

 

 

1亿:
Disrupter :3910 ms       
BlockQueue: 246211 ms

4s VS 25s   6倍

10亿
Disrupter :36767 ms       
BlockQueue: 231872 ms

37s vs 232s 6倍


 

 

 

分享到:
评论

相关推荐

    disruptor:Disruptor BlockingQueue

    Conversant ConcurrentQueue、Disruptor BlockingQueue 和 ConcurrentStack Disruptor是Java中性能最高的线程内传输机制。 Conversant Disruptor 是这种环形缓冲区中性能最高的实现,因为它几乎没有开销,并且采用了...

    Disruptor并发框架中文参考文档

    ### Disruptor并发框架知识点详解 #### 一、Disruptor简介及原理 **Disruptor** 是一款高性能、低延迟的并发框架,它通过无锁设计实现了高效的队列操作,从而大大提升了多线程环境下的性能表现。该框架于2011年...

    springboot整合Disruptor并发编程框架 #资源达人分享计划#

    SpringBoot整合Disruptor并发编程框架是针对高并发场景下性能优化的一种技术实践。Disruptor是由LMAX公司开发的一款高性能、低延迟的并发工具,它通过消除线程间的锁竞争,大大提升了多线程环境下的处理速度。...

    spring-boot-starter-disruptor.zip

    《Spring Boot Starter Disruptor深度解析》 在现代软件开发中,高性能和低延迟往往是系统设计的关键要素。Spring Boot作为Java领域最受欢迎的微服务框架,提供了丰富的启动器(starters)来简化开发工作。"spring-...

    disruptor-3.4.4.jar disruptor 3.4.4 jar 官方github下载

    disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)

    Disruptor3.x Disruptor使用方式

    Disruptor3.x Disruptor使用方式 EventHandler[] eventHandlers=new DisruptorEventHandler[]{new DisruptorEventHandler()}; DisruptorPublisher dp=new DisruptorPublisher(1024, eventHandlers); dp.start(); ...

    DisruptorDemo.zip

    《Disruptor技术详解——基于DisruptorDemo.zip实例解析》 Disruptor,由LMAX公司开发并开源,是一款高性能、低延迟的并发工具,主要用于优化多线程间的通信。它采用一种环形缓冲区(Ring Buffer)的设计,极大地...

    LMAX-Disruptor框架jar包

    Disruptor框架是由LMAX公司开发的一款高效的无锁内存队列。使用无锁的方式实现了一个环形队列。据官方描述,其性能要比BlockingQueue至少高一个数量级。根据GitHub上的最新版本源码打出的包,希望对大家有帮助。

    disruptor 多个消费者

    Disruptor是由LMAX公司开发的一种高性能的并发编程框架,主要应用于金融交易系统。它通过优化数据共享方式,显著提高了多线程环境下的处理速度。在"Disruptor 多个消费者"的场景中,我们可以深入理解Disruptor如何...

    disruptor jar包+Demo+Api

    《Disruptor 框架详解与应用实例》 Disruptor 是一款高性能的并发工具库,由英国的 LMAX 公司开发并开源。它主要用于优化多线程环境下的数据共享,尤其在金融交易系统中表现卓越。Disruptor 的核心设计是一个环形...

    Disruptor demo

    Disruptor是一款高性能的并发工具库,由LMAX公司开发并开源,主要应用于高频率交易系统。它通过优化线程间通信的方式,极大地提升了多线程环境下的数据处理速度。Disruptor的设计理念是避免传统的锁机制,转而采用一...

    Disruptor C++版(仅支持单生产者)

    Disruptor是由LMAX公司开发的一种高性能的并发编程框架,主要应用于金融交易系统。它以其高效、低延迟的事件处理机制而闻名。在C++版本的Disruptor中,我们同样可以享受到这种高效的并发能力,尤其适用于需要大量...

    disruptor-3.3.8.jar

    Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.&lt;init&gt;(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...

    disruptor-3.3.0-API文档-中文版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    Disruptor报错FatalExceptionHandler的解决办法,看网上这种解决办法挺少,整理了一下

    Disruptor是一款高性能的并发框架,它通过使用Ring Buffer和基于事件的处理方式来消除锁竞争,提升系统性能。在使用Disruptor过程中,开发者可能会遇到`FatalExceptionHandler`的错误,这通常是由于处理流程中的异常...

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    LMAX Disruptor是一款高性能的消息处理框架,由LMAX公司开发并开源,它在金融交易领域有着广泛的应用。Disruptor的设计目标是解决多线程环境下的数据共享问题,通过优化并发性能,实现极低的延迟和高吞吐量。在Java...

    disruptor-3.4.2.jar 及 disruptor-3.4.2-sources.jar

    disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包

    disruptor-3.3.0-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    Java工具:高性能并发工具Disruptor简单使用

    Java工具:高性能并发工具Disruptor简单使用 在Java编程中,高效并发处理是优化系统性能的关键之一。Disruptor,由LMAX公司开源的一款并发框架,为处理高并发场景提供了一种新颖且高效的解决方案。它通过消除锁和...

Global site tag (gtag.js) - Google Analytics