`
wh0426
  • 浏览: 56170 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
博客专栏
Group-logo
架构师的知识与实践
浏览量:56171
社区版块
存档分类
最新评论

disruptor使用示例

阅读更多


LMAX
开源了一个高性能并发编程框架。可以理解为消费者-生产者的消息发布订阅模式。本文下载了官方示例代码,进行实验。

longEvent事件数据

 

public class LongEvent {
    private long value;

    public void set(long value) {
        this.value = value;
    }
    
    public long get(){
        return this.value;
    }
}

 

 

LongEventFactory事件工厂

 

import com.lmax.disruptor.EventFactory;
/**
 * 事件生产工厂
 * @author wanghao
 *
 */
public class LongEventFactory implements EventFactory<LongEvent>
{

	@Override
	public LongEvent newInstance() {
		return new LongEvent();
	}

}

 

 

LongEventProducer事件生产者

import java.nio.ByteBuffer;

import com.lmax.disruptor.RingBuffer;

/**
 * 生产者,生产longEvent事件
 * @author harry
 *
 */
public class LongEventProducer {
	private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void product(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                                                        // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

RingBuffer是消息存储结构,为环形存储结构,每个单元存储一条消息。类似于队列。当ringbuffer中数据填满后,环就会阻塞,等待消费者消费掉数据。当所有消费者消费掉环中一个数据,新的消息才可以加入环中。每个环插入数据后,都会分配下一个位置的编号,即sequence 。

消息者事件处理器

为消费者消费处理器,这处需要执行速度足够快。否则,会影响ringbuffer后续没空间加入新的数据。因此,不能做业务耗时操作。建议另外开始java 线程池处理消息。

 

import com.lmax.disruptor.EventHandler;
/**
 * 消息者事件处理器,打印输出到控制台
 * @author harry
 *
 */
public class LongEventHandler  implements EventHandler<LongEvent>{
	  public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
	    {
	        System.out.println("consumer:"+Thread.currentThread().getName()+" Event: value=" + event.get()+",sequence="+sequence+",endOfBatch="+endOfBatch);
	    }
}

 

LongEventProducerWithTranslator

 

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

/**
 * post-3.0 the preferred approach for publishing messages is 
 * via the Event Publisher/Event Translator portion of the API. E.g.
 * @author harry
 *
 */
public class LongEventProducerWithTranslator {
	private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
        new EventTranslatorOneArg<LongEvent, ByteBuffer>()
        {
            public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
            {
                event.set(bb.getLong(0));
            }
        };

    public void product(ByteBuffer bb)
    {
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}


translateTo方法将ringbuffer中的消息,转换成java对象格式。示例 为LongEvent对象,后续消费者LongEventHandler 处理器,直接操作LongEvent对象,获取消息各属性信息,本示例 为value属性。

 

product方法,将生产者生产的消息放入ringbuffer中。

LongEventMain

 

消费者-生产者启动类,其依靠构造Disruptor对象,调用start()方法完成启动线程。Disruptor 需要ringbuffer环,消费者数据处理工厂,WaitStrategy等

ByteBuffer 类字节buffer,用于包装消息。

ProducerType.SINGLE为单线程 ,可以提高性能。

 

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class LongEventMain {
	@SuppressWarnings("unchecked")
	public static void main(String[] args) throws Exception
	    {
	        // 执行器,用于构造消费者线程
	        Executor executor = Executors.newCachedThreadPool();

	        // 指定事件工厂
	        LongEventFactory factory = new LongEventFactory();

	        // 指定 ring buffer字节大小, must be power of 2.
	        int bufferSize = 1024;

	        //单线程模式,获取额外的性能
	        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, 
                    bufferSize,executor,
                    ProducerType.SINGLE,
                    new BlockingWaitStrategy());
	        //设置事件业务处理器---消费者
	        disruptor.handleEventsWith(new LongEventHandler());
	        //启动disruptor线程
	        disruptor.start();

	        // 获取 ring buffer环,用于接取生产者生产的事件
	        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
	        //为 ring buffer指定事件生产者
	        //LongEventProducer producer = new LongEventProducer(ringBuffer);
	        LongEventProducerWithTranslator producer=new LongEventProducerWithTranslator(ringBuffer);
	        ByteBuffer bb = ByteBuffer.allocate(8);//预置8字节长整型字节缓存
	        for (long l = 0; true; l++)
	        {
	            bb.putLong(0, l);
	            producer.product(bb);//生产者生产数据
	            Thread.sleep(1000);
	        }
	        
	    }
}

 

 

实验结果:

consumer:pool-1-thread-1 Event: value=0,sequence=0,endOfBatch=true
consumer:pool-1-thread-1 Event: value=1,sequence=1,endOfBatch=true
consumer:pool-1-thread-1 Event: value=2,sequence=2,endOfBatch=true
consumer:pool-1-thread-1 Event: value=3,sequence=3,endOfBatch=true
consumer:pool-1-thread-1 Event: value=4,sequence=4,endOfBatch=true
consumer:pool-1-thread-1 Event: value=5,sequence=5,endOfBatch=true
consumer:pool-1-thread-1 Event: value=6,sequence=6,endOfBatch=true

 

Event: value = 为消费者接收到的数据,sequence为数据在ringbuffer环的位置。

 

分享到:
评论
1 楼 ClyenLiang 2016-08-24  

相关推荐

    Disruptor示例

    业务逻辑处理器完全是运行在内存中,使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。 Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作。 ...

    disruptor技术培训

    以下是一个简单的Disruptor使用示例流程: 1. **定义Event类**:根据需要处理的数据类型定义具体的Event类。 2. **定义Event工厂类**:创建Event实例对象的工厂类。 3. **定义Event监听类**:实现具体的业务逻辑...

    Netty 使用Disruptor机制的处理源代码

    文件名 "trapos-master" 可能是指一个交易系统的源代码仓库,其中可能包含了 Netty 和 Disruptor 结合使用的示例。在这样的系统中,Disruptor 可能被用来处理实时外汇报价,因为外汇交易对延迟非常敏感,需要高效且...

    Disruptor并发框架中文参考文档

    #### 四、Disruptor的应用示例 ##### 3.1 LMAX的架构 LMAX是一家零售金融服务提供商,其交易平台基于Disruptor构建。LMAX的业务逻辑处理器能够在单个线程中每秒处理600万笔订单,这得益于Disruptor的高效并发处理...

    Disruptor应用实例

    4. 处理器协作:Disruptor的等待策略(WaitStrategy)可以灵活调整,如使用忙等、多路复用或者阻塞等待,以适应不同的系统需求和资源条件。 5. 监控与优化:Disruptor提供了丰富的监控指标,如事件处理速率、延迟等...

    Disruptor C++版(仅支持单生产者)

    在使用Disruptor C++版时,你可以参考提供的示例代码来理解和实现自己的事件处理流程。需要注意的是,正确配置和优化Disruptor参数,如缓冲区大小、消费者数量等,对性能影响显著。此外,理解并运用Disruptor的事件...

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    - 一个简单的Disruptor示例通常包括创建Disruptor对象、初始化Ring Buffer、设置Producer和Consumer,以及启动处理循环。通过示例,开发者可以快速上手,理解Disruptor的工作流程。 综上所述,LMAX Disruptor是一...

    DisruptorDemo.zip

    在"DisruptorDemo.zip"的示例代码中,我们可以看到以下关键类和方法: 1. `DisruptorDemo`:主类,创建Disruptor实例并启动事件处理器。 2. `Event`:自定义事件对象,存储要传递的数据。 3. `EventHandlerImpl`:...

    disruptor jar包+Demo+Api

    `DisruptorConceptProofTest.java` 是一个非官方的示例代码,它通常会展示如何使用 Disruptor 创建事件处理链路,以及如何发布和消费事件。这个测试类可以让我们了解 Disruptor 的基本用法和概念验证。 `...

    Disruptor demo

    Disruptor是一款高性能的并发工具库,由LMAX公司开发并开源,主要...这个示例将帮助你理解Disruptor如何简化并发编程,提高程序运行效率,尤其对于需要处理大量并发请求的高性能应用来说,Disruptor是一个强大的工具。

    disruptor-3.2.1源码带jar包20140321

    此外,你还可以通过创建一个简单的示例来实践Disruptor的使用。首先,创建生产者和消费者,然后配置Disruptor实例,设置事件处理器链。运行示例,观察Disruptor如何在多线程环境中高效地处理事件。这不仅加深了你对...

    Disruptor学习(1)

    Disruptor-examples这个压缩包文件很可能是Disruptor的示例代码,包括了各种应用场景的实现,如简单的生产者消费者模型、多级处理链等,通过这些示例,我们可以更直观地理解Disruptor如何在实际中应用。 总的来说,...

    Disruptor3.2官方例子测试

    NULL 博文链接:https://yanbingwei.iteye.com/blog/1985778

    disruptor高性能Java线程间通讯库

    Disruptor使用了无锁算法,通过全局唯一的序列号来确保数据的正确性。每个事件在被生产者放入缓冲区和被消费者消费时,都会被赋予一个唯一的序列号,这样可以避免数据的乱序和丢失。 2. **多生产者与单消费者模式*...

    disruptor 实例

    本文将深入探讨Disruptor的核心原理,并通过一个实际的例子——testMyDisruptor,来展示如何在应用中有效利用Disruptor实现高效的缓冲队列。 首先,我们要理解Disruptor的核心设计理念。传统的并发编程往往依赖于锁...

    spring集成disruptor

    网上关于Disruptor的例子大部份是旧版本的, 其中集成spring的更少, 只好自已写个新版本简单的demo了。 该demo利用spring的定时器往Disruptor添加数据, 希望该demo能帮助到大家。

    share-disruptor.zip

    在这个名为"share-disruptor.zip"的压缩包中,包含了Disruptor的基本示例,旨在帮助初学者快速理解并掌握Disruptor的使用。下面将详细介绍Disruptor的关键概念和技术特点: 1. **环形缓冲区(Ring Buffer)**:...

    spring与disruptor集成的简单示例

    Spring 与 Disruptor 集成的简单示例展示了如何使用 Spring 框架将 Disruptor 库集成到应用程序中,以实现高性能的异步消息处理。该示例 Demonstrate 了 Disruptor 库的高效性和可靠性,能够满足高性能的业务需求。 ...

    disruptror的jar包和例子

    同时,提供的"一个简单实用disruptor的例子"可以帮助我们更好地理解如何在实际项目中应用Disruptor。 这个例子可能包括以下几个部分: 1. 定义事件:首先,你需要创建一个代表业务逻辑的事件类,这个类将被放入环形...

    Disruptor学习.7z

    2. **实战演练**:通过提供的示例代码,了解如何创建和配置Disruptor,以及如何在多线程环境中使用它。 3. **性能测试**:对比使用Disruptor前后的性能,理解其在高并发场景下的优势。 4. **深入研究**:学习...

Global site tag (gtag.js) - Google Analytics