disruptor-3.3.2源码解析(3)-发布事件
作者:大飞
- Disruptor中如何发布事件:
前面两篇看了disruptor中的序列和队列,这篇说一下怎么往RingBuffer中发布事件。这里也需要明确一下,和一般的生产者/消费者模式不同(如果以生产者/消费者的模式来看待disruptor的话),disruptor中队列里面的数据一般称为事件,RingBuffer中提供了发布事件的方法,另外也提供了专门的处理事件的类。
其实在disruptor中,RingBuffer也提供了一部分生产的功能,里面提供了大量的发布事件的方法。
上篇看到的RingBuffer的构造方法,需要传一个EventFactory做事件的预填充:
RingBuffer(EventFactory<E> eventFactory, Sequencer sequencer){ super(eventFactory, sequencer); }
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer){ ... //最后要填充事件 fill(eventFactory); } private void fill(EventFactory<E> eventFactory){ for (int i = 0; i < bufferSize; i++){ entries[BUFFER_PAD + i] = eventFactory.newInstance(); } }
看下EventFactory:
public interface EventFactory<T>{ /* * Implementations should instantiate an event object, with all memory already allocated where possible. */ T newInstance(); }
再看下RingBuffer的发布事件方法:
@Override public void publishEvent(EventTranslator<E> translator){ final long sequence = sequencer.next(); translateAndPublish(translator, sequence); } private void translateAndPublish(EventTranslatorVararg<E> translator, long sequence){ try{ translator.translateTo(get(sequence), sequence); }finally{ sequencer.publish(sequence); } }
在发布事件时需要传一个事件转换的接口,内部用这个接口做一下数据到事件的转换。看下这个接口:
public interface EventTranslator<T>{ /** * Translate a data representation into fields set in given event * * @param event into which the data should be translated. * @param sequence that is assigned to event. */ void translateTo(final T event, long sequence); }
可见,具体的生产者可以实现这个接口,将需要发布的数据放到这个事件里面,一般是设置到事件的某个域上。
好,来看个例子理解一下。
首先我们定义好数据: public class MyData { private int id; private String value; public MyData(int id, String value) { this.id = id; this.value = value; } ...getter setter... @Override public String toString() { return "MyData [id=" + id + ", value=" + value + "]"; } }
然后针对我们的数据定义事件:
public class MyDataEvent { private MyData data; public MyData getData() { return data; } public void setData(MyData data) { this.data = data; } }
接下来需要给出一个EventFactory提供给RingBuffer做事件预填充:
public class MyDataEventFactory implements EventFactory<MyDataEvent>{ @Override public MyDataEvent newInstance() { return new MyDataEvent(); } }
好了,可以初始化RingBuffer了:
public static void main(String[] args) { RingBuffer<MyDataEvent> ringBuffer = RingBuffer.createSingleProducer(new MyDataEventFactory(), 1024); MyDataEvent dataEvent = ringBuffer.get(0); System.out.println("Event = " + dataEvent); System.out.println("Data = " + dataEvent.getData()); }
输出如下:
Event = com.mjf.disruptor.product.MyDataEvent@5c647e05 Data = null
首先要注意,RingBuffer里面是MydataEvent,而不是MyData;其次我们构造好了RingBuffer,里面就已经填充了事件,我们可以取一个事件出来,发现里面的数据是空的。
public class MyDataEventTranslator implements EventTranslator<MyDataEvent>{ @Override public void translateTo(MyDataEvent event, long sequence) { //新建一个数据 MyData data = new MyData(1, "holy shit!"); //将数据放入事件中。 event.setData(data); } }
有了转换器,我们就可以嗨皮的发布事件了:
public static void main(String[] args) { RingBuffer<MyDataEvent> ringBuffer = RingBuffer.createSingleProducer(new MyDataEventFactory(), 1024); //发布事件!!! ringBuffer.publishEvent(new MyDataEventTranslator()); MyDataEvent dataEvent0 = ringBuffer.get(0); System.out.println("Event = " + dataEvent0); System.out.println("Data = " + dataEvent0.getData()); MyDataEvent dataEvent1 = ringBuffer.get(1); System.out.println("Event = " + dataEvent1); System.out.println("Data = " + dataEvent1.getData()); }
输出如下:
Event = com.mjf.disruptor.product.MyDataEvent@5c647e05 Data = MyData [id=1, value=holy shit!] Event = com.mjf.disruptor.product.MyDataEvent@33909752 Data = null
可见,我们已经成功了发布了一个事件到RingBuffer,由于是从序列0开始发布,所以我们从序列0可以读出这个数据。因为只发布了一个,所以序列1上还是没有数据。
public class MyDataEventTranslatorWithIdAndValue implements EventTranslatorTwoArg<MyDataEvent, Integer, String>{ @Override public void translateTo(MyDataEvent event, long sequence, Integer id, String value) { MyData data = new MyData(id, value); event.setData(data); } }
当然也可以直接利用RingBuffer来发布事件,不需要转换器:
public static void main(String[] args) { RingBuffer<MyDataEvent> ringBuffer = RingBuffer.createSingleProducer(new MyDataEventFactory(), 1024); long sequence = ringBuffer.next(); try{ MyDataEvent event = ringBuffer.get(sequence); MyData data = new MyData(2, "R u kidding me?"); event.setData(data); }finally{ ringBuffer.publish(sequence); } }
- 单线程发布事件和多线程发布事件:
前面我们构造RingBuffer使用的是单线程发布事件的模式:
RingBuffer<MyDataEvent> ringBuffer = RingBuffer.createSingleProducer(new MyDataEventFactory(), 1024);RingBuffer也支持多线程发布事件模式,还记得上一篇分析的RingBuffer代码吧:
public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize){ return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy()); }当然也提供了比较全面的构造方法:
public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy){ switch (producerType){ case SINGLE: return createSingleProducer(factory, bufferSize, waitStrategy); case MULTI: return createMultiProducer(factory, bufferSize, waitStrategy); default: throw new IllegalStateException(producerType.toString()); } }这个方法支持传入一个枚举来选择使用哪种模式:
public enum ProducerType{ /** Create a RingBuffer with a single event publisher to the RingBuffer */ SINGLE, /** Create a RingBuffer supporting multiple event publishers to the one RingBuffer */ MULTI }
上面看过了单线程发布事件的例子,接下来看个多线程发布事件的:
public static void main(String[] args) { final RingBuffer<MyDataEvent> ringBuffer = RingBuffer.createMultiProducer(new MyDataEventFactory(), 1024); final CountDownLatch latch = new CountDownLatch(100); for(int i=0;i<100;i++){ final int index = i; //开启多个线程发布事件。 new Thread(new Runnable() { @Override public void run() { long sequence = ringBuffer.next(); try{ MyDataEvent event = ringBuffer.get(sequence); MyData data = new MyData(index, index+"s"); event.setData(data); }finally{ ringBuffer.publish(sequence); latch.countDown(); } } }).start(); } try { latch.await(); //最后观察下发布的时间。 for(int i=0;i<100;i++){ MyDataEvent event = ringBuffer.get(i); System.out.println(event.getData()); } } catch (InterruptedException e) { e.printStackTrace(); } }
如果多线程环境下使用单线程发布模式会有上面问题呢?
public static void main(String[] args) { final RingBuffer<MyDataEvent> ringBuffer = //这里是单线程模式!!! RingBuffer.createSingleProducer(new MyDataEventFactory(), 1024); final CountDownLatch latch = new CountDownLatch(100); for(int i=0;i<100;i++){ final int index = i; //开启多个线程发布事件。 new Thread(new Runnable() { @Override public void run() { long sequence = ringBuffer.next(); try{ MyDataEvent event = ringBuffer.get(sequence); MyData data = new MyData(index, index+"s"); event.setData(data); }finally{ ringBuffer.publish(sequence); latch.countDown(); } } }).start(); } try { latch.await(); //最后观察下发布的时间。 for(int i=0;i<100;i++){ MyDataEvent event = ringBuffer.get(i); System.out.println(event.getData()); } } catch (InterruptedException e) { e.printStackTrace(); } }
输出如下:
... MyData [id=92, value=92s] MyData [id=93, value=93s] MyData [id=94, value=94s] MyData [id=95, value=95s] MyData [id=96, value=96s] MyData [id=97, value=97s] MyData [id=99, value=99s] MyData [id=98, value=98s] null null
会发现,如果多线程发布事件的环境下,使用单线程发布事件模式,会有数据被覆盖的情况。所以使用时应该按照具体情况选择合理发布模式。
- 最后总结:
如何往RingBuffer中发布事件:
1.定义好要生产的数据和相应的事件类(里面存放数据)。
2.定于好事件转换器或者直接用RingBuffer进行事件发布。
3.明确发布场景,合理的选择发布模式(单线程还是多线程)。
相关推荐
不错的框架,可以好好研究研究,速度下载,速度下载速度下载速度下载
disruptor-3.4.2.jar 工具jar包 及 disruptor-3.4.2-sources.jar, Disruptor它是一个开源的并发框架,并获得2011 Duke’s 程序框架创新奖,能够在无锁的情况下实现网络的Queue并发操作,是 log4j2 引用的 jar 包
disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载disruptor-3.2.0.jar包下载
disruptor-3.4.4.jar 官方github下载 亲测可用,大家赶紧下载吧 后续再补充其他常用jar(但不好下载的)
在"disruptor-3.2.1源码带jar包20140321"这个资源中,包含了Disruptor的源代码,这对于理解其内部机制和定制化开发非常有帮助。通过阅读源码,你可以更深入地了解如何利用Disruptor构建高效的并发系统。 此外,你还...
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
赠送jar包:disruptor-3.3.0.jar; 赠送原API文档:disruptor-3.3.0-javadoc.jar; 赠送源代码:disruptor-3.3.0-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.0.pom; 包含翻译后的API文档:disruptor-...
disruptor-3.4.2.jar
赠送jar包:disruptor-3.3.7.jar 赠送原API文档:disruptor-3.3.7-javadoc.jar 赠送源代码:disruptor-3.3.7-sources.jar 包含翻译后的API文档:disruptor-3.3.7-javadoc-API文档-中文(简体)-英语-对照版.zip ...
赠送jar包:disruptor-3.3.7.jar; 赠送原API文档:disruptor-3.3.7-javadoc.jar; 赠送源代码:disruptor-3.3.7-sources.jar; 赠送Maven依赖信息文件:disruptor-3.3.7.pom; 包含翻译后的API文档:disruptor-...
5. 生产者发布事件:通过Disruptor的getPublisher()方法获取生产者对象,然后使用其publish()方法将事件放入缓冲区。 6. 消费者消费事件:事件处理器会在合适的时候自动从缓冲区中获取事件进行处理。 学习和掌握...
disruptor-3.3.11.jar 无锁并行框架 值得学习 jar包
Error: java.lang.NoSuchMethodError: com.lmax.disruptor.dsl.Disruptor.<init>(Lcom/lmax/disruptor/EventFactory;ILjava/util/concurrent/ThreadFactory;Lcom/lmax/disruptor/dsl/ProducerType;Lcom/lmax/...
3. **API与源码解析** - `disruptor-3.0.1.jar`:这是Disruptor的运行时库,包含了框架的类和接口,供开发者在项目中引用。 - `disruptor-3.0.1-sources.jar`:提供源代码,帮助开发者理解内部实现,方便调试和...
java运行依赖jar包
《Disruptor-3.2.1与Play2Memcached:开源项目的魅力解析》 在IT行业中,开源项目一直是技术创新的重要推动力。这次我们要探讨的是两个极具影响力的开源项目——Disruptor-3.2.1和Play2Memcached。它们分别在并发...
disruptor-3.3.11-sources.jar jar包源码,值得学习,源码
注:下文中的 *** 代表文件名中的组件名称。 # 包含: 中文-英文对照文档:【***-javadoc-API文档-中文(简体)-英语-对照版.zip】 jar包下载地址:【***.jar下载地址(官方地址+国内镜像地址).txt】 ...
disruptor-unity3d, Unity3d Disruptor的基本实现 disruptor-unity3dUnity3d的基本自包含。自包含实现。 仅支持单个生产者/单个用户。 仅在x86平台上测试。 Mono中的Bug 在Unity前可以在iOS和安卓上工作。用法将 ...