本文介绍的是版本是3.3.6的,参考了以下文章
http://11246272.blog.51cto.com/11236272/1745472
http://www.php.cn/java-article-370582.html
http://zhangfengzhe.blog.51cto.com/8855103/1885830
http://ifeve.com/concurrentlinkedqueue/
http://wh0426.iteye.com/blog/221202
https://my.oschina.net/OutOfMemory/blog/793275
Disruptor是什么?
Disruptor是一个高性能的异步处理框架,一个“生产者-消费者”模型,一个轻量级的JMS,和JDK中的BlockingQueue有相似处,但是它的处理速度非常快,获得2011年程序框架创新大奖,号称“一个线程一秒钟可以处理600W个订单”,并且Disruptor不仅仅只有buffer,它提供的功能非常强大,比如它可以帮助我们轻松构建数据流处理(比如一个数据先交给A和B这2个消费者并行处理后在交给C处理,是不是有点想起storm这种流处理,实际上strom的底层就是应用了disruptor来实现worker内部threads的通信)。
环形缓冲区(轮胎):RingBuffer
RingBuffer,环形缓冲区,在disruptor中扮演着非常重要的角色,包含一个指向下一个槽点的序号,可以在线程间传递数据,理解RingBuffer的结构有利于我们理解disruptor为什么这么快、无锁的实现方式、生产者/消费者模式的实现细节。如下图所示:
数组
这个类似于轮胎的东西实际上就是一个数组,使用数组的好处当然是由于预加载的缘故使得访问比链表要快的多。
序号
RingBuffer中元素拥有序号的概念,并且序号是一直增长的,比如RingBuffer大小为10,那么序号从0开始增长,当到9的时候,相当于转了一圈,如果继续增长的话,那么将覆盖0号元素。也即是说通过 序号%SIZE 来定位元素,实现set/get操作。这里也发现了和队列不同的一个方式,就是不存在元素的删除操作,只有覆盖而已,实际上RingBuffer的这种特别的环形存储方式,使得不需要花费大量的时间用于内存清理/垃圾回收。 由于涉及到取模操作,为了CPU进行位运算更加高效,RingBuffer的大小应该是2的N次方。
无锁的机制
在生产者/消费者模式下,disruptor号称“无锁并行框架”,下面我们来具体分析下:
一个生产者 + 一个消费者
生产者维护一个生产指针P,消费者维护一个消费者指针C,当然P和C本质上就是序号。2者各操作各的,不需要锁,仅仅需要注意的是生产者和消费者的速度问题,当然这个在disruptor内部已经为我们做了处理,就是判断一下P和C之间不能超过一圈的大小。
一个生产者 + 多个消费者
多个消费者当然持有多个消费指针C1,C2,...,消费者依据C进行各自读取数据,只需要保证生产者的速度“协调”最慢的消费者的速度,就是那个不能超出一圈的概念。此时也不需要进行锁定。
多个生产者 + N个消费者
很显然,无论生产者有几个,生产者指针P只能存在一个,否则数据就乱套了。那么多个生产者之间共享一个P指针,在disruptor中实际上是利用了CAS机制来保证多线程的数据安全,也没有使用到锁。
Event
在Disruptor框架中,生产者生产的数据叫做Event。
核心对象
RingBuffer:环形的一个数据结构,对象初始化时,会使用事件Event进行填充。Buffer的大小必须是2的幂次方,方便移位操作。
-
Event:无指定具体接口,用户自己实现,可以携带任何业务数据。
-
EventFactory:产生事件Event的工厂,由用户自己实现。
-
EventTranslator:事件发布的回调接口,由用户实现,负责将业务参数设置到事件中。
-
Sequencer:序列产生器,也是协调生产者和消费者及实现高并发的核心。有MultiProducerSequencer 和 SingleProducerSequencer两个实现类。
-
SequenceBarrier:拥有RingBuffer的发布事件Sequence引用和消费者依赖的Sequence引用。决定消费者消费可消费的Sequence。
-
EventHandler:事件的处理者,由用户自己实现。
-
EventProcessor:事件的处理器,单独在一个线程中运行。
-
WorkHandler:事件的处理者,由用户自己实现。
-
WorkProcessor:事件的处理器,单独在一个线程中运行。
-
WorkerPool:一组WorkProcessor的处理。
-
WaitStrategy:在消费者比生产者快时,消费者处理器的等待策略。
等待策略:
消费者在缓存中没有可以消费的事件时,采取的等待策略:
1.BlockingWaitStrategy:默认等待策略。和BlockingQueue的实现很类似,通过使用锁和条件(Condition)进行线程阻塞的方式,等待生产者唤醒(线程同步和唤醒)。此策略对于线程切换来说,最节约CPU资源,但在高并发场景下性能有限。
2.BusySpinWaitStrategy:死循环策略。消费者线程会尽最大可能监控缓冲区的变化,会占用所有CPU资源,线程一直自旋等待,比较耗CPU。
3.LiteBlockingWaitStrategy:通过线程阻塞的方式,等待生产者唤醒,比BlockingWaitStrategy要轻,某些情况下可以减少阻塞的次数。
4.PhasedBackoffWaitStrategy:根据指定的时间段参数和指定的等待策略决定采用哪种等待策略。5.SleepingWaitStrategy:CPU友好型策略。会在循环中不断等待数据。可通过参数设置,首先进行自旋等待,若不成功,则使用Thread.yield()让出CPU,并使用LockSupport.parkNanos(1)进行线程睡眠,通过线程调度器重新调度;或一直自旋等待,所以,此策略数据处理数据可能会有较高的延迟,适合用于对延迟不敏感的场景,优点是对生产者线程影响小,典型应用场景是异步日志。
6.TimeoutBlockingWaitStrategy:通过参数设置阻塞时间,如果超时则抛出异常。
7.YieldingWaitStrategy:低延时策略。消费者线程会不断循环监控RingBuffer的变化,在循环内部使用Thread.yield()让出CPU给其他线程,通过线程调度器重新调度。
Disruptor框架基本构成
1.MyEvent:自定义实体对象,充当“生产者-消费者”模型中的数据。
2.MyEventFactory:实现EventFactory的接口,用于生产数据。
3.MyEventProducerWithTranslator:将数据存储到自定义对象中并发布,通过在自定义类中新建EventTranslator类实现。
4.MyEventHandler:自定义消费者,通过EventHandler接口实现。
与ConcurrentLinkedQueue的比较
相同点
1.都是使用了无锁算法的CAS(Compare And Swap/Set)的实现方式
不同点
1.在多线程的情况下Disruptor是不存在竞争的,每个对象都使用自己的序号;ConcurrentLinkedQueue因为是队列的方式,可能会出现其他线程插队的情况。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点,所以比较耗时。
2.Disruptor是一个环形的缓冲区,是一个数组,它不会清除已经存在的数据,它只会更新一个可用的序列号,然后向该序列号中写入数据,因为是每一个对象拥有自己的序列号,因此不存在写冲突,而ConcurrentLinkedQueue是一个无界的队列,它会根据指针来判断数据是否已经使用过,使用过则会更新head节点的标记,因此在耗时的时间上Disurptor比ConcurrentLinkedQueue少。
3.Disruptor解决了伪共享的问题。
更多的关于Disruptor的介绍可以参考 http://ifeve.com/disruptor/ 虽然这个上面的介绍版本有点旧了,但是原理基本都是一样的。
Disurptor的java测试实现
1.导入需要的jar包
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.7.RELEASE</version> </parent> <dependencies> <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.3.7</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.20</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
2.自定义实体对象
package com.demo.disruptor.dto; import com.alibaba.fastjson.JSONObject; import java.util.Date; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description 自定义事件对象 */ public class LogEvent { private long logId; private String content; private Date date; public long getLogId() { return logId; } public void setLogId(long logId) { this.logId = logId; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public Date getDate() { return date; } public void setDate(Date date) { this.date = date; } @Override public String toString() { return JSONObject.toJSONString(this); } }
3.自定义factory类:
package com.demo.disruptor.factory; import com.demo.disruptor.dto.LogEvent; import com.lmax.disruptor.EventFactory; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description 事件生成工厂,用来初始化预分配事件对象,即根据RingBuffer大小创建的实体对象 */ public class LogEventFactory implements EventFactory<LogEvent> { public LogEvent newInstance() { System.out.println("新建LogEvent数据....."); return new LogEvent(); } }
4.新建生产者类
4.1 非Translator生产者类
package com.demo.disruptor.producer; import com.demo.disruptor.dto.LogEvent; import com.lmax.disruptor.RingBuffer; import java.util.Date; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description 自定义生产者 */ public class LogEventProducer { private RingBuffer<LogEvent> ringBuffer; public LogEventProducer(RingBuffer<LogEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(long logId, String content, Date date){ //RingBuffer类似一个队列,获取下一个空闲的序号 long seq = ringBuffer.next(); LogEvent logEvent = ringBuffer.get(seq); logEvent.setLogId(logId); logEvent.setContent(content); logEvent.setDate(date); //发布事件 ringBuffer.publish(seq); } }
package com.demo.disruptor.producer; import com.demo.disruptor.dto.LogEvent; import com.lmax.disruptor.RingBuffer; import java.util.Date; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description 自定义生产者 */ public class LogEventProducer2 { private RingBuffer<LogEvent> ringBuffer; public LogEventProducer2(RingBuffer<LogEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(long logId, String content, Date date){ //RingBuffer类似一个队列,获取下一个空闲的序号 long seq = ringBuffer.next(); LogEvent logEvent = ringBuffer.get(seq); logEvent.setLogId(logId); logEvent.setContent(content); logEvent.setDate(date); //发布事件 ringBuffer.publish(seq); } }
LogEventProducer3的类和生产者2也一样
4.2 Translator生产者类
package com.demo.disruptor.producer; import com.demo.disruptor.dto.LogEvent; import com.lmax.disruptor.EventTranslatorVararg; import com.lmax.disruptor.RingBuffer; import java.util.Date; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description 使用translator方式到事件生产者发布事件,通常使用该方法 */ public class LogEventProducerWithTranslator { private EventTranslatorVararg eventTranslatorVararg = new EventTranslatorVararg<LogEvent>() { public void translateTo(LogEvent logEvent, long l, Object... objects) { logEvent.setLogId((Long) objects[0]); logEvent.setContent((String)objects[1]); logEvent.setDate((Date)objects[2]); } }; private RingBuffer<LogEvent> ringBuffer; public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(long logId, String content, Date date){ ringBuffer.publishEvent(eventTranslatorVararg,logId,content,date); } }
5.新建消费者类
package com.demo.disruptor.consumer; import com.demo.disruptor.dto.LogEvent; import com.lmax.disruptor.EventHandler; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description 自定义消费者 */ public class LogEventConsumer implements EventHandler<LogEvent> { public void onEvent(LogEvent logEvent, long l, boolean b) throws Exception { System.out.println("消费者1-seq:" + l + ",bool:" + b + ",logEvent:" + logEvent.toString()); } }
package com.demo.disruptor.consumer; import com.demo.disruptor.dto.LogEvent; import com.lmax.disruptor.EventHandler; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description 自定义消费者 */ public class LogEventConsumer2 implements EventHandler<LogEvent> { public void onEvent(LogEvent logEvent, long l, boolean b) throws Exception { System.out.println("消费者2-seq:" + l + ",bool:" + b + ",logEvent:" + logEvent.toString()); } }
剩下的LogEventConsumer3、LogEventConsumer4、LogEventConsumer5的类同LogEventConsumer2类一样。
6.新建启动类
package com.demo.disruptor; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description */ @SpringBootApplication public class App { public static void main(String [] args){ SpringApplication.run(App.class,args); } }
7.新建测试类测试Disruptor
package com.demo.disruptor.logEvent; import com.demo.disruptor.consumer.*; import com.demo.disruptor.dto.LogEvent; import com.demo.disruptor.factory.LogEventFactory; import com.demo.disruptor.producer.LogEventProducer; import com.demo.disruptor.producer.LogEventProducer2; import com.demo.disruptor.producer.LogEventProducer3; import com.demo.disruptor.producer.LogEventProducerWithTranslator; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.YieldingWaitStrategy; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Date; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /** * @author liaoyubo * @version 1.0 2017/10/9 * @description */ @SpringBootTest(classes = LogEventMain.class) @RunWith(SpringRunner.class) public class LogEventMain { /** * 单个生产者和消费者的模式 * @throws InterruptedException */ @Test public void producer() throws InterruptedException { LogEventFactory logEventFactory = new LogEventFactory(); //用于生成RingBuffer大小,其大小必须是2的n次方 int ringBufferSize = 8; //定义Disruptor初始化信息 Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(logEventFactory,ringBufferSize,Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //定义处理事件的消费者 disruptor.handleEventsWith(new LogEventConsumer()); //定义事件的开始 disruptor.start(); RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); //进行事件的发布 LogEventProducer logEventProducer = new LogEventProducer(ringBuffer); for(int i = 0; i < 10; i++){ logEventProducer.onData(i, "c" + i, new Date()); } Thread.sleep(1000); //关闭Disruptor disruptor.shutdown(); } /** * 使用EventTranslatorVararg的单个生产者和消费者模式 * @throws InterruptedException */ @Test public void producerWithTranslator() throws InterruptedException { LogEventFactory logEventFactory = new LogEventFactory(); //用于生成RingBuffer大小,其大小必须是2的n次方 int ringBufferSize = 8; //定义Disruptor初始化信息 Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(logEventFactory,ringBufferSize,Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //定义处理事件的消费者 disruptor.handleEventsWith(new LogEventConsumer()); //定义事件的开始 disruptor.start(); RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); //进行事件的发布 LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer); for(int i = 0; i < 10; i++){ producerWithTranslator.onData(i, "c" + i, new Date()); } Thread.sleep(1000); //关闭Disruptor disruptor.shutdown(); } /** * 一个生产者,3个消费者,其中前面2个消费者完成后第3个消费者才可以消费 * 也即使说当前面2个消费者把所有的RingBuffer占领完成,同时都消费完成后才会有第3个消费者的消费 * 当发布的事件数量大于RingBuffer的大小的时候,在第3个消费者消费完RingBuffer大小的时候前面2个消费者才能继续消费,序号递增的 * @throws InterruptedException */ @Test public void multiConsumer() throws InterruptedException { LogEventFactory logEventFactory = new LogEventFactory(); //用于生成RingBuffer大小,其大小必须是2的n次方 int ringBufferSize = 8; //定义Disruptor初始化信息 Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(logEventFactory,ringBufferSize,Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); //设置多个消费者 EventHandlerGroup<LogEvent> eventEventHandlerGroup = disruptor.handleEventsWith(new LogEventConsumer(),new LogEventConsumer2()); eventEventHandlerGroup.then(new LogEventConsumer3()); //启动事件的开始 disruptor.start(); RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); //进行事件的发布 LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer); for(int i = 0; i < 10; i++){ producerWithTranslator.onData(i, "c" + i, new Date()); } Thread.sleep(1000); //关闭Disruptor disruptor.shutdown(); } /** * 一个生产者,多个消费者,有2条支线,其中消费者1和消费者3在同一条支线上, * 消费者2和消费者4在同一条支线上,消费者5是消费者3和消费者4的终点消费者 * 这样的消费将会在消费者1和消费者2把所有的RingBuffer大小消费完成后才会执行消费者3和消费者4 * 在消费者3和消费者4把RingBuffer大小消费完成后才会执行消费者5 * 消费者5消费完RingBuffer大小后又按照上面的顺序来消费 * 如果剩余的生产数据比RingBuffer小,那么还是要依照顺序来 * @throws InterruptedException */ @Test public void multiConsumers() throws InterruptedException { LogEventFactory logEventFactory = new LogEventFactory(); //用于生成RingBuffer大小,其大小必须是2的n次方 int ringBufferSize = 8; //定义Disruptor初始化信息 Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(logEventFactory,ringBufferSize,Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy()); LogEventConsumer consumer1 = new LogEventConsumer(); LogEventConsumer2 consumer2 = new LogEventConsumer2(); LogEventConsumer3 consumer3 = new LogEventConsumer3(); LogEventConsumer4 consumer4 = new LogEventConsumer4(); LogEventConsumer5 consumer5 = new LogEventConsumer5(); //同时执行消费者1和消费者2 disruptor.handleEventsWith(consumer1,consumer2); //消费者1后面执行消费者3 disruptor.after(consumer1).handleEventsWith(consumer3); //消费者后面执行消费者4 disruptor.after(consumer2).handleEventsWith(consumer4); //消费者3和消费者3执行完后执行消费者5 disruptor.after(consumer3,consumer4).handleEventsWith(consumer5); //定义事件的开始 disruptor.start(); RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); //进行事件的发布 LogEventProducer logEventProducer = new LogEventProducer(ringBuffer); for(int i = 0; i < 10; i++){ logEventProducer.onData(i, "c" + i, new Date()); } Thread.sleep(1000); //关闭Disruptor disruptor.shutdown(); } /** * 多个生产者,多个消费者,有2条消费者支线,其中消费者1和消费者3在同一条支线上, * 消费者2和消费者4在同一条支线上,消费者5是消费者3和消费者4的终点消费者 * 这样的消费将会在消费者1和消费者2把所有的RingBuffer大小消费完成后才会执行消费者3和消费者4 * 在消费者3和消费者4把RingBuffer大小消费完成后才会执行消费者5 * 消费者5消费完RingBuffer大小后又按照上面的顺序来消费 * 如果剩余的生产数据比RingBuffer小,那么还是要依照顺序来 * 生产者只是多生产了数据 * @throws InterruptedException */ @Test public void multiProcedureConsumers() throws InterruptedException { LogEventFactory logEventFactory = new LogEventFactory(); //用于生成RingBuffer大小,其大小必须是2的n次方 int ringBufferSize = 8; //定义Disruptor初始化信息 Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(logEventFactory,ringBufferSize,Executors.defaultThreadFactory(), ProducerType.MULTI, new YieldingWaitStrategy()); LogEventConsumer consumer1 = new LogEventConsumer(); LogEventConsumer2 consumer2 = new LogEventConsumer2(); LogEventConsumer3 consumer3 = new LogEventConsumer3(); LogEventConsumer4 consumer4 = new LogEventConsumer4(); LogEventConsumer5 consumer5 = new LogEventConsumer5(); //同时执行消费者1和消费者2 disruptor.handleEventsWith(consumer1,consumer2); //消费者1后面执行消费者3 disruptor.after(consumer1).handleEventsWith(consumer3); //消费者后面执行消费者4 disruptor.after(consumer2).handleEventsWith(consumer4); //消费者3和消费者3执行完后执行消费者5 disruptor.after(consumer3,consumer4).handleEventsWith(consumer5); //定义事件的开始 disruptor.start(); RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); //进行事件的发布 LogEventProducer logEventProducer = new LogEventProducer(ringBuffer); LogEventProducer2 logEventProducer2 = new LogEventProducer2(ringBuffer); LogEventProducer3 logEventProducer3 = new LogEventProducer3(ringBuffer); for(int i = 0; i < 10; i++){ logEventProducer.onData(i, "1-c" + i, new Date()); logEventProducer2.onData(i, "2-c" + i, new Date()); logEventProducer3.onData(i, "3-c" + i, new Date()); } Thread.sleep(1000); //关闭Disruptor disruptor.shutdown(); } }
8.单个线程的ArrayBlockingQueue和Disruptor的性能测试对比
我们这里分别用500W,1000W,5000W的数据量来做单个生产者和消费者的测试,分别进行了10次测试,然后取他们的平均值做对比。
package com.demo.disruptor.test; import com.demo.disruptor.dto.LogEvent; import java.util.Date; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; /** * @author liaoyubo * @version 1.0 2017/10/11 * @description */ public class BlockingQueueTest { public static int eventNum = 50000000; //5000000:974,932,943,946,993,1073,1044,1018,1027,971 992 //10000000:1845,1851,2433,2041,1789,1911,1953,2105,1862,1896 1969 //50000000:9828,9595,9377,9273,9020,9450,9873,9994,8882,9695 9499 public static void main(String[] args) { final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<LogEvent>(65536); final long startTime = System.currentTimeMillis(); new Thread(new Runnable() { @Override public void run() { int i = 0; while (i < eventNum) { LogEvent logEvent = new LogEvent(); logEvent.setLogId(i); logEvent.setContent("c" + i); logEvent.setDate(new Date()); try { queue.put(logEvent); } catch (InterruptedException e) { e.printStackTrace(); } i++; } } }).start(); new Thread(new Runnable() { @Override public void run() { int k = 0; while (k < eventNum) { try { queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } k++; } long endTime = System.currentTimeMillis(); System.out.println("BlockingQueue 花费时间:" + (endTime - startTime) + "ms"); } }).start(); } }
package com.demo.disruptor.test; import com.demo.disruptor.consumer.LogEventConsumer; import com.demo.disruptor.consumer.LogEventConsumer2; import com.demo.disruptor.dto.LogEvent; import com.demo.disruptor.factory.LogEventFactory; import com.demo.disruptor.producer.LogEventProducer; import com.demo.disruptor.producer.LogEventProducerWithTranslator; import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.util.DaemonThreadFactory; import java.util.Date; /** * @author liaoyubo * @version 1.0 2017/10/11 * @description */ public class DisruptorTest { //5000000:542,499,550,547,605,502,743,505,657,608 576 //10000000:1252,1048,1031,1075,1022,1207,1056,1494,1118,1258 1156 //50000000:5489,5125,5265,5609,5201,5482,4982,4891,5351,5758 5315 public static void main(String [] args){ LogEventFactory factory = new LogEventFactory(); int ringBufferSize = 65536; final Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory, ringBufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BusySpinWaitStrategy()); LogEventConsumer consumer = new LogEventConsumer(); disruptor.handleEventsWith(consumer); disruptor.start(); /*new Thread(new Runnable() { @Override public void run() { RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); //进行事件的发布 LogEventProducer logEventProducer = new LogEventProducer(ringBuffer); for(int i = 0; i < BlockingQueueTest.eventNum; i++){ logEventProducer.onData(i, "c" + i, new Date()); } } }).start();*/ new Thread(new Runnable() { @Override public void run() { RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); //进行事件的发布 LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer); for(int i = 0; i < BlockingQueueTest.eventNum; i++){ producerWithTranslator.onData(i, "c" + i, new Date()); } } }).start(); //disruptor.shutdown(); } }测试结果的对比为(都是按照ms统计):
相关推荐
并发框架Disruptor介绍Martin Fowler在自己网站上写了一篇...下载剖析Disruptor为什么会这么快Disruptor如何工作和使用(五) Disruptor(无锁并发框架)-发布(六) LMAX Disruptor 一个高性能、低延迟且简单的框架(七) Di
本篇文章将详细介绍Disruptor模式,并探讨如何在C++中实现和应用这一模式。 Disruptor模式的核心思想是消除线程间的共享数据,通过一个环形缓冲区(Ring Buffer)来传递消息,从而避免了锁和条件变量带来的性能开销...
async-framework是基于google开源框架Disruptor开发的一个异步流程处理框架,关于Disruptor的介绍请参考 async-framework提供了流程和队列的概念,流程 Flow 代表步骤,队列 Queue 代表处理节点,队列由Disruptor...
下面将详细介绍Disruptor的使用及其在并发编程中的作用。 首先,我们要理解Disruptor的核心组件——环形缓冲区。环形缓冲区是一种特殊的数组结构,它的大小是固定的,并且以循环的方式分配和释放空间。这种设计减少...
操作手册与使用说明:针对每个游戏项目,都准备了详细的操作手册和使用说明,手把手教你如何运行和测试项目。 学习笔记:整理了Java语言在游戏开发中的核心知识点和常用技术,方便你随时查阅和学习。 适用人群: 这...
本文将详细介绍如何使用Slf4j集成Log4j2构建项目日志系统的完美解决方案。 背景: 随着项目的发展,对日志系统的需求也在不断增加。传统的日志系统基于Log4j,但是在高并发情况下,Log4j 1.x会出现死锁,导致CPU...
- **Disruptor优化**:解决了Disruptor队列慢时导致的CPU消耗问题,降低了CPU使用率。 - **GC优化**:减少了因GC而导致worker被误认为死亡的情况。 - **监控增强**:支持用户级别的报警,并提供丰富的采样维度,有助...
本教程将详细介绍Log4j2的使用方法及其核心特性。 首先,Log4j2的设计目标是提供低延迟的日志记录,这得益于其异步日志处理机制。相比Log4j1.x和logback,Log4j2在高并发场景下具有更好的性能表现。 **配置方式** ...
##### 3.3 介绍各类在LINUX下对内存使用情况做观察的命令 Linux提供了多种工具来监控内存使用情况。 - **常用命令**: - **top**:显示系统中所有进程的资源使用情况。 - **free**:报告系统内存和交换内存的...
以上知识点涵盖了实时大数据分析的技术基础、实时处理框架Storm的介绍、Storm的使用和优化、大数据技术平台的构建,以及如何实现数据处理的可靠性和性能优化等内容。这些知识点为读者了解和掌握实时大数据分析提供了...
本文主要介绍了 Spring Boot 中 Log4j2 的配置使用详解,包括 Log4j2 的导入、配置文件的创建、 appenders 的配置等内容。 Log4j2 简介 Log4j2 是 Apache Log4j 的升级版本,它相对于 Log4j 1.x 有了许多显著的...
### JStorm介绍与特性 #### 一、JStorm概述 JStorm是由阿里巴巴开源的一个分布式实时计算框架,类似于Apache Storm但经过了阿里巴巴团队的优化和改进,具有更好的性能和稳定性。JStorm的核心目标在于提供一个高效...
15.9 Disruptor+Redis队列 303 15.9.1 简介 303 15.9.2 XML配置 304 15.9.3 EventWorker 305 15.9.4 EventPublishThread 307 15.9.5 EventHandler 308 15.9.6 EventQueue 308 15.10 下单系统水平可扩展架构 311 ...
首先,离线海量数据交换场景介绍是理解Plumber工具开发背景的重要部分。京东作为电商巨头,每天都会产生大量的交易数据和用户行为数据。这些数据存储在多种不同的数据库和数据源中,如MySQL、SQL Server、Oracle、...