`
BrokenDreams
  • 浏览: 255060 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
68ec41aa-0ce6-3f83-961b-5aa541d59e48
Java并发包源码解析
浏览量:100557
社区版块
存档分类
最新评论

disruptor-3.3.2源码解析(3)-发布事件

阅读更多

disruptor-3.3.2源码解析(3)-发布事件

作者:大飞

 

  • Disruptor中如何发布事件
       前面两篇看了disruptor中的序列和队列,这篇说一下怎么往RingBuffer中发布事件。这里也需要明确一下,和一般的生产者/消费者模式不同(如果以生产者/消费者的模式来看待disruptor的话),disruptor中队列里面的数据一般称为事件,RingBuffer中提供了发布事件的方法,另外也提供了专门的处理事件的类。
       其实在disruptor中,RingBuffer也提供了一部分生产的功能,里面提供了大量的发布事件的方法。
       上篇看到的RingBuffer的构造方法,需要传一个EventFactory做事件的预填充:
    RingBuffer(EventFactory<E> eventFactory,
               Sequencer       sequencer){
        super(eventFactory, sequencer);
    }
 
    RingBufferFields(EventFactory<E> eventFactory,
                     Sequencer       sequencer){
        ...
        //最后要填充事件
        fill(eventFactory);
    }
    private void fill(EventFactory<E> eventFactory){
        for (int i = 0; i < bufferSize; i++){
            entries[BUFFER_PAD + i] = eventFactory.newInstance();
        }
    }
       看下EventFactory: 
public interface EventFactory<T>{
    /*
     * Implementations should instantiate an event object, with all memory already allocated where possible.
     */
    T newInstance();
}
 
       再看下RingBuffer的发布事件方法:
    @Override
    public void publishEvent(EventTranslator<E> translator){
        final long sequence = sequencer.next();
        translateAndPublish(translator, sequence);
    }
    private void translateAndPublish(EventTranslatorVararg<E> translator, long sequence){
        try{
            translator.translateTo(get(sequence), sequence);
        }finally{
            sequencer.publish(sequence);
        }
    }
 
       在发布事件时需要传一个事件转换的接口,内部用这个接口做一下数据到事件的转换。看下这个接口:
public interface EventTranslator<T>{
    /**
     * Translate a data representation into fields set in given event
     *
     * @param event into which the data should be translated.
     * @param sequence that is assigned to event.
     */
    void translateTo(final T event, long sequence);
}
       可见,具体的生产者可以实现这个接口,将需要发布的数据放到这个事件里面,一般是设置到事件的某个域上。
 
       好,来看个例子理解一下。
       首先我们定义好数据: 
public class MyData {
	private int id;
	private String value;
	public MyData(int id, String value) {
		this.id = id;
		this.value = value;
	}

    ...getter setter...

	@Override
	public String toString() {
		return "MyData [id=" + id + ", value=" + value + "]";
	}
	
}
 
       然后针对我们的数据定义事件:
public class MyDataEvent {
	private MyData data;
	public MyData getData() {
		return data;
	}
	public void setData(MyData data) {
		this.data = data;
	}
	
}
 
       接下来需要给出一个EventFactory提供给RingBuffer做事件预填充:
public class MyDataEventFactory implements EventFactory<MyDataEvent>{
	@Override
	public MyDataEvent newInstance() {
		return new MyDataEvent();
	}
}
 
       好了,可以初始化RingBuffer了:
    public static void main(String[] args) {
		RingBuffer<MyDataEvent> ringBuffer = 
				RingBuffer.createSingleProducer(new MyDataEventFactory(), 1024);
		MyDataEvent dataEvent = ringBuffer.get(0);
		System.out.println("Event = " + dataEvent);
		System.out.println("Data = " + dataEvent.getData());
	}
 
       输出如下:
    Event = com.mjf.disruptor.product.MyDataEvent@5c647e05
    Data = null
       首先要注意,RingBuffer里面是MydataEvent,而不是MyData;其次我们构造好了RingBuffer,里面就已经填充了事件,我们可以取一个事件出来,发现里面的数据是空的。
 
       下面就是怎么往RingBuffer里面放数据了,也就是事件发布者要干活了。我们上面看到了,要使用RingBuffer发布一个事件,需要一个事件转换器接口,针对我们的数据实现一个: 
public class MyDataEventTranslator implements EventTranslator<MyDataEvent>{
	@Override
	public void translateTo(MyDataEvent event, long sequence) {
		//新建一个数据
		MyData data = new MyData(1, "holy shit!");
		//将数据放入事件中。
		event.setData(data);
	}
}
 
       有了转换器,我们就可以嗨皮的发布事件了:
	public static void main(String[] args) {
		RingBuffer<MyDataEvent> ringBuffer = 
				RingBuffer.createSingleProducer(new MyDataEventFactory(), 1024);
		//发布事件!!!
		ringBuffer.publishEvent(new MyDataEventTranslator());
		
		MyDataEvent dataEvent0 = ringBuffer.get(0);
		System.out.println("Event = " + dataEvent0);
		System.out.println("Data = " + dataEvent0.getData());
		MyDataEvent dataEvent1 = ringBuffer.get(1);
		System.out.println("Event = " + dataEvent1);
		System.out.println("Data = " + dataEvent1.getData());
	}
 
       输出如下:
Event = com.mjf.disruptor.product.MyDataEvent@5c647e05
Data = MyData [id=1, value=holy shit!]
Event = com.mjf.disruptor.product.MyDataEvent@33909752
Data = null
       可见,我们已经成功了发布了一个事件到RingBuffer,由于是从序列0开始发布,所以我们从序列0可以读出这个数据。因为只发布了一个,所以序列1上还是没有数据。
 
       当然也有其他姿势的转换器: 
public class MyDataEventTranslatorWithIdAndValue implements EventTranslatorTwoArg<MyDataEvent, Integer, String>{
	@Override
	public void translateTo(MyDataEvent event, long sequence, Integer id,
			String value) {
		MyData data = new MyData(id, value);
		event.setData(data);
	}
}
 
       当然也可以直接利用RingBuffer来发布事件,不需要转换器:
	public static void main(String[] args) {
		RingBuffer<MyDataEvent> ringBuffer = 
				RingBuffer.createSingleProducer(new MyDataEventFactory(), 1024);
		long sequence = ringBuffer.next();
		try{
			MyDataEvent event = ringBuffer.get(sequence);
			MyData data = new MyData(2, "R u kidding me?");
			event.setData(data);
		}finally{
			ringBuffer.publish(sequence);
		}
	}
 
 
  • 单线程发布事件和多线程发布事件
       前面我们构造RingBuffer使用的是单线程发布事件的模式:
		RingBuffer<MyDataEvent> ringBuffer = 
				RingBuffer.createSingleProducer(new MyDataEventFactory(), 1024);
       RingBuffer也支持多线程发布事件模式,还记得上一篇分析的RingBuffer代码吧: 
    public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize){
        return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
    }
       当然也提供了比较全面的构造方法:
    public static <E> RingBuffer<E> create(ProducerType    producerType,
                                           EventFactory<E> factory,
                                           int             bufferSize,
                                           WaitStrategy    waitStrategy){
        switch (producerType){
        case SINGLE:
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
        }
    }
       这个方法支持传入一个枚举来选择使用哪种模式:  
public enum ProducerType{
    /** Create a RingBuffer with a single event publisher to the RingBuffer */
    SINGLE,
    /** Create a RingBuffer supporting multiple event publishers to the one RingBuffer */
    MULTI
}
 
       上面看过了单线程发布事件的例子,接下来看个多线程发布事件的:
	public static void main(String[] args) {
		final RingBuffer<MyDataEvent> ringBuffer = 
				RingBuffer.createMultiProducer(new MyDataEventFactory(), 1024);
		final CountDownLatch latch = new CountDownLatch(100);
		for(int i=0;i<100;i++){
			final int index = i;
			//开启多个线程发布事件。
			new Thread(new Runnable() {
				@Override
				public void run() {
					long sequence = ringBuffer.next();
					try{
						MyDataEvent event = ringBuffer.get(sequence);
						MyData data = new MyData(index, index+"s");
						event.setData(data);
					}finally{
						ringBuffer.publish(sequence);
						latch.countDown();
					}
				}
			}).start();
		}
		try {
			latch.await();
			//最后观察下发布的时间。
			for(int i=0;i<100;i++){
				MyDataEvent event = ringBuffer.get(i);
				System.out.println(event.getData());
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
 
 

       如果多线程环境下使用单线程发布模式会有上面问题呢?

	public static void main(String[] args) {
		final RingBuffer<MyDataEvent> ringBuffer = 
				           //这里是单线程模式!!!
				RingBuffer.createSingleProducer(new MyDataEventFactory(), 1024);
		final CountDownLatch latch = new CountDownLatch(100);
		for(int i=0;i<100;i++){
			final int index = i;
			//开启多个线程发布事件。
			new Thread(new Runnable() {
				@Override
				public void run() {
					long sequence = ringBuffer.next();
					try{
						MyDataEvent event = ringBuffer.get(sequence);
						MyData data = new MyData(index, index+"s");
						event.setData(data);
					}finally{
						ringBuffer.publish(sequence);
						latch.countDown();
					}
				}
			}).start();
		}
		try {
			latch.await();
			//最后观察下发布的时间。
			for(int i=0;i<100;i++){
				MyDataEvent event = ringBuffer.get(i);
				System.out.println(event.getData());
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

       输出如下: 

    ...
    MyData [id=92, value=92s]
    MyData [id=93, value=93s]
    MyData [id=94, value=94s]
    MyData [id=95, value=95s]
    MyData [id=96, value=96s]
    MyData [id=97, value=97s]
    MyData [id=99, value=99s]
    MyData [id=98, value=98s]
    null
    null
       会发现,如果多线程发布事件的环境下,使用单线程发布事件模式,会有数据被覆盖的情况。所以使用时应该按照具体情况选择合理发布模式。
 
  • 最后总结:
       如何往RingBuffer中发布事件:
              1.定义好要生产的数据和相应的事件类(里面存放数据)。
              2.定于好事件转换器或者直接用RingBuffer进行事件发布。

              3.明确发布场景,合理的选择发布模式(单线程还是多线程)。 

 

 

分享到:
评论

相关推荐

    disruptor-3.3.2.jar 并发

    不错的框架,可以好好研究研究,速度下载,速度下载速度下载速度下载

    disruptor-3.2.0.jar

    disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载

    disruptor-3.4.2.jar 及 disruptor-3.4.2-sources.jar

    disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包

    disruptor-3.4.4.jar disruptor 3.4.4 jar 官方github下载

    disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)

    disruptor-3.2.1源码带jar包20140321

    在"disruptor-3.2.1源码带jar包20140321"这个资源中,包含了Disruptor的源代码,这对于理解其内部机制和定制化开发非常有帮助。通过阅读源码,你可以更深入地了解如何利用Disruptor构建高效的并发系统。 此外,你还...

    disruptor-3.3.0-API文档-中文版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.3.0-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...

    disruptor-3.4.2.jar

    disruptor-3.4.2.jar

    disruptor-3.3.7-API文档-中英对照版.zip

    赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...

    disruptor-3.3.7-API文档-中文版.zip

    赠送jar包:disruptor-3.3.7.jar; 赠送原API文档:disruptor-3.3.7-javadoc.jar; 赠送源代码:disruptor-3.3.7-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.7.pom; 包含翻译后的API文档:disruptor-...

    disruptror的jar包和例子

    5. 生产者发布事件:通过Disruptor的getPublisher()方法获取生产者对象,然后使用其publish()方法将事件放入缓冲区。 6. 消费者消费事件:事件处理器会在合适的时候自动从缓冲区中获取事件进行处理。 学习和掌握...

    disruptor-3.3.11.jar

    disruptor-3.3.11.jar 无锁并行框架 值得学习 jar包

    disruptor-3.3.8.jar

    Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.&lt;init&gt;(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    3. **API与源码解析** - `disruptor-3.0.1.jar`:这是Disruptor的运行时库,包含了框架的类和接口,供开发者在项目中引用。 - `disruptor-3.0.1-sources.jar`:提供源代码,帮助开发者理解内部实现,方便调试和...

    disruptor-3.3.6.jar

    java运行依赖jar包

    disruptor-3.2.1.zip

    《Disruptor-3.2.1与Play2Memcached:开源项目的魅力解析》 在IT行业中,开源项目一直是技术创新的重要推动力。这次我们要探讨的是两个极具影响力的开源项目——Disruptor-3.2.1和Play2Memcached。它们分别在并发...

    disruptor-3.3.11-sources.jar

    disruptor-3.3.11-sources.jar jar包源码,值得学习,源码

    disruptor-3.3.6.jar中文-英文对照文档.zip

    注:下文中的 *** 代表文件名中的组件名称。 # 包含: 中文-英文对照文档:【***-javadoc-API文档-中文(简体)-英语-对照版.zip】 jar包下载地址:【***.jar下载地址(官方地址+国内镜像地址).txt】 ...

    disruptor-unity3d, Unity3d Disruptor的基本实现.zip

    disruptor-unity3d, Unity3d Disruptor的基本实现 disruptor-unity3dUnity3d的基本自包含。自包含实现。 仅支持单个生产者/单个用户。 仅在x86平台上测试。 Mono中的Bug 在Unity前可以在iOS和安卓上工作。用法将 ...

Global site tag (gtag.js) - Google Analytics