作为比较接近正式使用的方式,我用一个生产者多个消费者来进行了Disurptor的不重复消费的性能测试,在这里我主要是介绍下我在测试过程中使用的代码以及出现的情况做下说明,这些情况有可能是我自己的代码原因引起的,在此也给自己留一个记录,如果看到的同学提出异议的麻烦给我说一下,关于Disruptor的其他介绍可以参考http://357029540.iteye.com/blog/2395677,这是我参考别人的文章写的。
使用maven的方式引入需要的jar
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.7</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.20</version>
</dependency>
</dependencies>
在这里我使用的是WorkHandler接口来实现的消费者类,消费者的代码如下:
package com.demo.disruptor.consumer;
import com.demo.disruptor.dto.LogEvent;
import com.demo.disruptor.test.BlockingQueueTest;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
/**
* @author liaoyubo
* @version 1.0 2017/10/9
* @description 自定义消费者
*/
public class LogEventWorkHandlerConsumer implements WorkHandler<LogEvent> {
private long startTime;
public LogEventWorkHandlerConsumer() {
this.startTime = System.currentTimeMillis();
}
@Override
public void onEvent(LogEvent logEvent) throws Exception {
//全部转化为大写,用于耗时测试
logEvent.setContent(logEvent.getContent().toUpperCase());
//判断是否已经有了开始时间
if(logEvent.getStartTime() == null || logEvent.getStartTime() == 0){
logEvent.setStartTime(startTime);
}else {
startTime = logEvent.getStartTime();
}
//判断是否已经到最后
if (logEvent.getLogId() +1 == BlockingQueueTest.eventNum) {
long endTime = System.currentTimeMillis();
System.out.println(" costTime1 = " + (endTime - startTime) + "ms");
}
//System.out.println("消费者1-seq logEvent:" + logEvent.toString());
}
}
LogEventWorkHandlerConsumer2和LogEventWorkHandlerConsumer3同LogEventWorkHandlerConsumer也一样。
生产者类代码如下:
package com.demo.disruptor.producer;
import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.EventTranslatorVararg;
import com.lmax.disruptor.RingBuffer;
import java.util.Date;
/**
* @author liaoyubo
* @version 1.0 2017/10/9
* @description 使用translator方式到事件生产者发布事件,通常使用该方法
*/
public class LogEventProducerWithTranslator {
private EventTranslatorVararg eventTranslatorVararg = new EventTranslatorVararg<LogEvent>() {
public void translateTo(LogEvent logEvent, long l, Object... objects) {
logEvent.setLogId((Long) objects[0]);
logEvent.setContent((String)objects[1]);
logEvent.setDate((Date)objects[2]);
}
};
private RingBuffer<LogEvent> ringBuffer;
public LogEventProducerWithTranslator(RingBuffer<LogEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(long logId, String content, Date date){
//System.out.println("生产者:" + logId);
ringBuffer.publishEvent(eventTranslatorVararg,logId,content,date);
}
}
创建的实体对象:
package com.demo.disruptor.dto;
import com.alibaba.fastjson.JSONObject;
import java.util.Date;
/**
* @author liaoyubo
* @version 1.0 2017/10/9
* @description 自定义事件对象
*/
public class LogEvent {
private long logId;
private String content;
private Date date;
private Long startTime;
public long getLogId() {
return logId;
}
public void setLogId(long logId) {
this.logId = logId;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
public Long getStartTime() {
return startTime;
}
public void setStartTime(Long startTime) {
this.startTime = startTime;
}
@Override
public String toString() {
return JSONObject.toJSONString(this);
}
}
一个用于定义测试次数的类:
package com.demo.disruptor.test;
/**
* @author liaoyubo
* @version 1.0 2017/10/11
* @description
*/
public class BlockingQueueTest {
public static int eventNum = 5000000;
}
用于定义异常的代码:
package com.demo.disruptor.exception;
import com.demo.disruptor.dto.LogEvent;
import com.lmax.disruptor.ExceptionHandler;
/**
* @author liaoyubo
* @version 1.0 2017/10/12
* @description
*/
public class LogEventExceptionHandler implements ExceptionHandler<LogEvent> {
@Override
public void handleEventException(Throwable throwable, long l, LogEvent logEvent) {
System.out.println("handleEventException....");
}
@Override
public void handleOnStartException(Throwable throwable) {
System.out.println("handleOnStartException....");
}
@Override
public void handleOnShutdownException(Throwable throwable) {
System.out.println("handleOnShutdownException....");
}
}
下面是用于测试耗时的主程序:
package com.demo.disruptor.test;
import com.demo.disruptor.consumer.*;
import com.demo.disruptor.dto.LogEvent;
import com.demo.disruptor.exception.LogEventExceptionHandler;
import com.demo.disruptor.factory.LogEventFactory;
import com.demo.disruptor.producer.LogEventProducer;
import com.demo.disruptor.producer.LogEventProducerWithTranslator;
import com.lmax.disruptor.*;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.util.Date;
import java.util.concurrent.*;
/**
* @author liaoyubo
* @version 1.0 2017/10/11
* @description
*/
public class DisruptorTest {
//5000000:554,627,545,602,550,578,675,626,587,692 604
//10000000:1657,1471,1234,1231,1302,1083,1186,1064,1044,1073 1235
//50000000:5017,5255,5048,5009,5410,4609,5979,5184,5060,4771 5134
public static void main(String [] args) throws TimeoutException, InterruptedException, InsufficientCapacityException {
LogEventFactory factory = new LogEventFactory();
//ExecutorService executor = Executors.newCachedThreadPool(); // 线程池
int ringBufferSize = 65536;
final Disruptor<LogEvent> disruptor = new Disruptor<LogEvent>(factory,ringBufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new YieldingWaitStrategy());
EventHandlerGroup<LogEvent> eventHandlerGroup = disruptor.handleEventsWithWorkerPool(new LogEventWorkHandlerConsumer(),new LogEventWorkHandlerConsumer2(),new LogEventWorkHandlerConsumer3());
final RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
disruptor.setDefaultExceptionHandler(new LogEventExceptionHandler());
disruptor.start();
/*new Thread(new Runnable() {
@Override
public void run() {
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//进行事件的发布
LogEventProducer logEventProducer = new LogEventProducer(ringBuffer);
for(int i = 0; i < BlockingQueueTest.eventNum; i++){
logEventProducer.onData(i, "c" + i, new Date());
}
}
}).start();*/
//启用单独线程进行发布
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
//进行事件的发布
LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
for(int i = 0; i < BlockingQueueTest.eventNum; i++){
producerWithTranslator.onData(i, "c" + i, new Date());
}
}
});
thread.start();
while (true){
long sequence = ringBuffer.getMinimumGatingSequence();
//System.out.println(sequence);
if(sequence + 1 == BlockingQueueTest.eventNum){
break;
}
}
//检查现在运行的线程
//Thread.currentThread().getThreadGroup().list();
//从代码上理解应该只是把消费者线程关闭了
disruptor.halt();
//Thread.currentThread().getThreadGroup().list();
//在关闭disruptor后如果发起新线程,那么新线程不会关闭,因为需要消费者消防数据导致处于阻塞状态,线程会一直挂起
/*Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
//进行事件的发布
LogEventProducerWithTranslator producerWithTranslator = new LogEventProducerWithTranslator(ringBuffer);
for(int i = 0; i < BlockingQueueTest.eventNum; i++){
producerWithTranslator.onData(i, "d" + i, new Date());
}
}
});
thread1.setName("thread1");
thread1.start();
while (true){
long sequence = ringBuffer.getMinimumGatingSequence();
System.out.println(sequence);
if(sequence + 1 == BlockingQueueTest.eventNum){
break;
}
}
Thread.currentThread().getThreadGroup().list();*/
System.out.println(eventHandlerGroup.asSequenceBarrier().getCursor());
}
}
这个地方因为添加了关闭程序,所以性能上面比没有添加关闭操作及判断要慢一些,上面的测试数据是没有关闭操作的测试数据。
下面是BlockingQueue的代码:
/**
* @author liaoyubo
* @version 1.0 2017/10/11
* @description
*/
public class BlockingQueueMultiTest {
//5000000:1195,1189,1147,1181,1135,1174,1216,1133,1145,1093 1161
//10000000:2633,1972,2417,2330,2255,2429,2354,2178,2235,2402 2321
//50000000:11841,11902,13048,12262,9769,10531,12721,12229,12283,13595 12018
public static void main(String[] args) throws InterruptedException {
final BlockingQueue<LogEvent> queue = new ArrayBlockingQueue<LogEvent>(65536);
//System.out.println("开始时间:" + startTime);
new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (i < BlockingQueueTest.eventNum) {
LogEvent logEvent = new LogEvent();
logEvent.setLogId(i);
logEvent.setContent("c" + i);
logEvent.setDate(new Date());
try {
queue.put(logEvent);
} catch (InterruptedException e) {
e.printStackTrace();
}
i++;
}
}
}).start();
// 创建缓冲池
final ExecutorService executorService = Executors.newCachedThreadPool();
final long startTime = System.currentTimeMillis();
executorService.execute(new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
LogEvent logEvent = queue.take();
logEvent.setContent(logEvent.getContent().toUpperCase());
//System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount()));
if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("BlockingQueue1 花费时间:" + (endTime-startTime) + "ms");
}
}));
executorService.execute(new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
LogEvent logEvent = queue.take();
logEvent.setContent(logEvent.getContent().toUpperCase());
//System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount()));
if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("BlockingQueue2 花费时间:" + (endTime-startTime) + "ms");
}
}));
executorService.execute(new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
LogEvent logEvent = queue.take();
logEvent.setContent(logEvent.getContent().toUpperCase());
//System.out.println("BlockingQueue1获取数据:" + (countDownLatch.getCount()));
if(logEvent.getLogId() + 1 == BlockingQueueTest.eventNum){
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("BlockingQueue3 花费时间:" + (endTime-startTime) + "ms");
}
}));
}
}
上面的Disruptor和BlockingQueue分别使用了3个线程来消费生产的数据,分别用500W,1000W,5000W的生产数据来取运行10次的平均值来查看结果:
数量 |
Disruptor |
BlockingQueue |
性能对比 |
500W |
604 |
1161 |
1.9 |
1000W |
1235 |
2321 |
1.9 |
5000W |
5134 |
12018 |
2.3 |
通过以上的测试对比说明在单生产者-多消费者的模式下Disruptor的性能还是比ArrayBlockingQueue好,这也可能和我使用的代码有关(电脑的配置是win10,64位,i5-6500的cpu,8g的内存,jdk是1.8).
相关推荐
生产者和消费者模式是多线程编程中一个经典的设计模式,它主要解决的是在多线程环境下资源的有效利用和同步问题。在这个模式中,生产者负责生成数据,而消费者负责消费这些数据。为了保证生产与消费的平衡以及避免...
生产者消费者问题是一个经典的多线程同步问题,来源于操作系统理论,用于模拟两个或多个相互依赖的进程或线程之间的协作。在这个场景下,“生产者”是生成数据的实体,而“消费者”则负责处理这些数据。这个问题的...
在我们的例子中,有一个生产者和两个消费者,这意味着数据生成的速度和消费的速度是不同的,需要通过多线程来协调它们之间的交互。 为了实现这个模式,我们需要确保以下条件得到满足: 1. **互斥访问**:当生产者...
与生产者类似,消费者之间也需要同步,以免一个消费者重复处理同一个产品。 3. **产品属性**:产品具有id、name和price等属性,id自增长表示每个产品的唯一标识,name和price由随机函数生成,增加了模拟真实场景的...
本示例中的“生产者-消费者”模型是一种经典的多线程问题,它模拟了实际生产环境中的资源分配与消耗过程。下面我们将详细探讨如何在Java中实现这个模型。 首先,我们要理解生产者-消费者模型的基本概念。在这个模型...
在操作系统设计中,有一个经典的问题被称为“生产者-消费者问题”(Producer-Consumer Problem),这是一个多线程同步问题的经典示例。在这个实验中,我们将深入探讨这一概念,并通过Java编程语言来实现。 生产者-...
在并发编程中,"生产者-消费者"模式是一种经典的解决问题的范式,用于协调两个或更多线程间的协作,其中一部分线程(生产者)生成数据,另一部分线程(消费者)消费这些数据。 生产者-消费者模型的核心在于共享资源...
在这种模式下,生产者负责生成数据并放入一个共享的数据结构,而消费者则从这个数据结构中取出并消费数据。在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则...
在这个例子中,生产者和消费者可能会使用共享的数据结构(如队列)来存储和获取产品,synchronized关键字将确保在任何时候只有一个线程能执行添加或移除产品的操作。 2. **wait()和notify()方法**:这些方法位于`...
在计算机科学中,生产者-消费者问题是多线程并发控制中的一个经典问题。该问题描述了两个并发执行的进程——生产者和消费者,它们共享一个有限大小的缓冲区。生产者进程负责生成数据(消息)并放入缓冲区,而消费者...
在描述中提到的链接(),博主分享了一个关于多线程生产者与消费者模式的具体实现案例。虽然具体代码没有给出,但我们可以根据常见的实现方式来解析这个模式。 1. **共享数据结构**:在这个模式中,通常会有一个...
在Java编程中,生产者消费者模式是一种典型的多线程协作模型,用于解决系统资源供需不平衡的问题。这个模式的核心思想是将生产数据和消费数据的过程解耦,使得生产者可以独立地生产数据,而消费者可以独立地消费数据...
在计算机科学中,"生产者消费者"模式是一种经典的多线程...在实际应用中,生产者消费者模式可以应用于各种场景,如数据处理流水线、网络I/O等,有效地解耦了数据的生成与处理过程,提高了程序的并发性能和可扩展性。
Java生产者消费者问题是多线程编程中的一个经典案例,它主要展示了如何通过共享资源来实现线程间的协作。这个问题源于现实生活中的生产流水线,其中生产者负责制造产品,而消费者负责消耗这些产品。在计算机系统中,...
这个问题的场景是多个生产者线程生产数据,放入一个共享的数据缓冲区,然后由多个消费者线程从这个缓冲区取出并消费数据。在Java中,解决这类问题通常涉及到线程安全、同步机制以及数据一致性。 首先,我们要理解...
在Java中,可以通过多种方式实现生产者-消费者模型,包括但不限于使用`wait()`和`notify()`方法、`BlockingQueue`接口、`Semaphore`类以及`ReentrantLock`和`Condition`组合等。 #### 使用`wait()`和`notify()` 这...
下面是一个使用ArrayBlockingQueue实现生产者消费者模型的示例代码: ```java public class MyThread42 { public static void main(String[] args) { final BlockingQueue<String> bq = new ArrayBlockingQueue...
1. **生产者消费者模型**:这是一种设计模式,用于协调两个并发执行的线程,一个负责生成数据(生产者),另一个负责处理数据(消费者)。这种模式可以避免因生产速度过快或消费速度过慢导致的数据溢出或等待问题。 ...
总的来说,"操作系统生产者与消费者问题Java简单模拟实现"这个项目提供了一个直观的多线程编程实例,帮助我们理解和实践Java中的线程同步技术,这对于理解和解决实际并发问题具有重要意义。通过分析这个项目,我们...