在数据交换场景,disruptor受到越来越多的欢迎。下面是将原生disruptor封装成queue模型的代码,供参考
抽象类Disruptor,提供pull、take等接口
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.InsufficientCapacityException; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; public abstract class DisruptorQueue { public static void setUseSleep(boolean useSleep) { DisruptorQueueImpl.setUseSleep(useSleep); } public static DisruptorQueue mkInstance(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) { return new DisruptorQueueImpl(queueName, producerType, bufferSize, wait); } public abstract String getName(); public abstract void haltWithInterrupt(); public abstract Object poll(); public abstract Object take(); public abstract void consumeBatch(EventHandler<Object> handler); public abstract void consumeBatchWhenAvailable(EventHandler<Object> handler); public abstract void publish(Object obj); public abstract void publish(Object obj, boolean block) throws InsufficientCapacityException; public abstract void consumerStarted(); public abstract void clear(); public abstract long population(); public abstract long capacity(); public abstract long writePos(); public abstract long readPos(); public abstract float pctFull(); }
具体实现
import java.util.HashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.lmax.disruptor.AlertException; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.InsufficientCapacityException; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; public class DisruptorQueueImpl extends DisruptorQueue { private static final Logger logger = LoggerFactory.getLogger(DisruptorQueueImpl.class); static boolean useSleep = true; public static void setUseSleep(boolean useSleep) { AbstractSequencerExt.setWaitSleep(useSleep); } private static final Object FLUSH_CACHE = new Object(); private static final Object INTERRUPT = new Object(); private static final String PREFIX = "disruptor-"; private final String _queueName; private final RingBuffer<MutableObject> _buffer; private final Sequence _consumer; private final SequenceBarrier _barrier; volatile boolean consumerStartedFlag = false; private final HashMap<String, Object> state = new HashMap<String, Object>(4); private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>(); private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(); private final Lock readLock = cacheLock.readLock(); private final Lock writeLock = cacheLock.writeLock(); public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) { this._queueName = PREFIX + queueName; _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait); _consumer = new Sequence(); _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); if (producerType == ProducerType.SINGLE) { consumerStartedFlag = true; } else { if (bufferSize < 2) { throw new RuntimeException("QueueSize must >= 2"); } try { publishDirect(FLUSH_CACHE, true); } catch (InsufficientCapacityException e) { throw new RuntimeException("This code should be unreachable!", e); } } } public String getName() { return _queueName; } public void consumeBatch(EventHandler<Object> handler) { consumeBatchToCursor(_barrier.getCursor(), handler); } public void haltWithInterrupt() { publish(INTERRUPT); } public Object poll() { if (consumerStartedFlag == false) { return _cache.poll(); } final long nextSequence = _consumer.get() + 1; if (nextSequence <= _barrier.getCursor()) { MutableObject mo = _buffer.get(nextSequence); _consumer.set(nextSequence); Object ret = mo.o; mo.setObject(null); return ret; } return null; } public Object take() { if (consumerStartedFlag == false) { return _cache.poll(); } final long nextSequence = _consumer.get() + 1; try { _barrier.waitFor(nextSequence); } catch (AlertException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); } catch (InterruptedException e) { logger.error("InterruptedException " + e.getCause()); return null; } catch (TimeoutException e) { logger.error(e.getMessage(), e); return null; } MutableObject mo = _buffer.get(nextSequence); _consumer.set(nextSequence); Object ret = mo.o; mo.setObject(null); return ret; } public void consumeBatchWhenAvailable(EventHandler<Object> handler) { try { final long nextSequence = _consumer.get() + 1; final long availableSequence = _barrier.waitFor(nextSequence); if (availableSequence >= nextSequence) { consumeBatchToCursor(availableSequence, handler); } } catch (AlertException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); } catch (InterruptedException e) { logger.error("InterruptedException " + e.getCause()); return; }catch (TimeoutException e) { logger.error(e.getMessage(), e); return ; } } public void consumeBatchToCursor(long cursor, EventHandler<Object> handler){ for (long curr = _consumer.get() + 1; curr <= cursor; curr++) { try { MutableObject mo = _buffer.get(curr); Object o = mo.o; mo.setObject(null); if (o == FLUSH_CACHE) { Object c = null; while (true) { c = _cache.poll(); if (c == null) break; else handler.onEvent(c, curr, true); } } else if (o == INTERRUPT) { throw new InterruptedException( "Disruptor processing interrupted"); } else { handler.onEvent(o, curr, curr == cursor); } } catch (InterruptedException e) { logger.error(e.getMessage()); return; } catch (Exception e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); } } _consumer.set(cursor); } public void publish(Object obj) { try { publish(obj, true); } catch (InsufficientCapacityException ex) { throw new RuntimeException("This code should be unreachable!"); } } public void tryPublish(Object obj) throws InsufficientCapacityException { publish(obj, false); } public void publish(Object obj, boolean block) throws InsufficientCapacityException { boolean publishNow = consumerStartedFlag; if (!publishNow) { readLock.lock(); try { publishNow = consumerStartedFlag; if (!publishNow) { _cache.add(obj); } } finally { readLock.unlock(); } } if (publishNow) { publishDirect(obj, block); } } protected void publishDirect(Object obj, boolean block) throws InsufficientCapacityException { final long id; if (block) { id = _buffer.next(); } else { id = _buffer.tryNext(1); } final MutableObject m = _buffer.get(id); m.setObject(obj); _buffer.publish(id); } public void consumerStarted() { writeLock.lock(); consumerStartedFlag = true; writeLock.unlock(); } public void clear() { while (population() != 0L) { poll(); } } public long population() { return (writePos() - readPos()); } public long capacity() { return _buffer.getBufferSize(); } public long writePos() { return _buffer.getCursor(); } public long readPos() { return _consumer.get(); } public float pctFull() { return (1.0F * population() / capacity()); } public Object getState() { long rp = readPos(); long wp = writePos(); state.put("capacity", capacity()); state.put("population", wp - rp); state.put("write_pos", wp); state.put("read_pos", rp); return state; } public static class ObjectEventFactory implements EventFactory<MutableObject> { @Override public MutableObject newInstance() { return new MutableObject(); } } }
public class MutableObject { Object o = null; public MutableObject() { } public MutableObject(Object o) { this.o = o; } public void setObject(Object o) { this.o = o; } public Object getObject() { return o; } }
代码依赖
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.2.1</version> </dependency>
相关推荐
async-framework提供了流程和队列的概念,流程 Flow 代表步骤,队列 Queue 代表处理节点,队列由Disruptor封装而成,每个事件都有Flow发起,并且在每个Queue存在一个状态,每个队列处理特定的事件,简单的来说 Flow ...
《Disruptor技术详解——基于DisruptorDemo.zip实例解析》 Disruptor,由LMAX公司开发并开源,是一款高性能、低延迟的并发工具,主要用于优化多线程间的通信。它采用一种环形缓冲区(Ring Buffer)的设计,极大地...
1. **报价生产**:接收到外部报价源的更新后,生产者将报价数据封装成事件并发布到 Disruptor。 2. **事件处理**:Disruptor 中的处理器会按序处理报价事件,进行过滤、计算、存储等操作。 3. **通知用户**:经过...
1. **`handleEventsWith`**:该方法将事件处理器注册到Disruptor中,并通过`createEventProcessors`方法将其封装为`BatchEventProcessor`,同时根据处理器间的关系构建Sequence Barrier。对于第一个处理器而言,它不...
tiny_disruptor项目保留了这一核心思想,但可能对其进行了更直观、更易于理解的封装。 在tiny_disruptor中,开发者可以轻松创建一个Ring Buffer,并设置生产者和消费者。生产者将数据放入缓冲区,而消费者则从缓冲...
基于Disruptor的Spring Boot Starter实现,初步事件推导,处理封装 1,事件推动 a,配置简单,少量配置即可实现初始化事件推送 2,事件处理 a、配置简单,少量配置即可实现异步事件处理 b、组件实现了基于责任链的...
- **L3缓存**:容量最大,速度比前两者慢,通常不在CPU内核上,而是集成在主板上或CPU封装内。 **缓存访问流程**: - 当CPU需要读取数据时,首先检查L1缓存。 - 如果未命中,再检查L2缓存。 - 之后是L3缓存。 - ...
`TaskEvent`类是用来封装`Task`的事件对象,其中包含了`Task`实例的getter和setter方法。`TaskEvent`的`EVENT_FACTORY`是一个工厂类,用于创建新的`TaskEvent`实例。 接下来,我们创建了一个`TaskEventHandler`类,...
语言方面的练习算法:leetcode,nowcoder,swordoffer,以及算法红皮书等语言技术:kotlin,java8函数式,多线程等框架:akka,zookeeper,Disruptor等核心网springboot相关练习球衣封装Spring安全练习基于curator ...
Netty 宇宙最强的java网络库,可定义各种网络通信方式,本框架中RPC、http、websocket等都基于netty的封装。 Disruptor 高性能线程间消息传递库,通过它来实现“消息中心”,跨线程消息传递so easy! HikariCP 稳定...
这样的转换过程通常包括视频编码、解码、重新封装等步骤,以确保转换后的文件能在目标设备上顺利播放。 “Ultra Mobile 3GP Video Converter”是具体的一个软件实例,这是一款专业级的视频转换工具,专为将各种视频...
8. **对象和类**:面向对象编程的基本概念,如封装、继承、多态,以及构造函数、析构函数、友元等特性。 9. **模板**:函数模板和类模板的使用,以及模板特化和偏特化。 10. **异常处理**:理解何时和如何使用try...
xd-toolkit 工具集(工作过程中常用到的工具类、工具模块) SPMC并发组件(同时也支持MPMC模式) ...LMAX Disruptor并发框架封装 应用场景: a、原框架使用起来比较繁杂,其实很多时候我们只关心生
4. 异步处理:利用Java的ExecutorService或者Disruptor等工具进行异步处理,提高系统并发能力,提升响应速度。 5. 安全机制:通过Spring Security或者Apache Shiro实现权限控制,确保游戏服务的安全性。 三、模块...
Log4j2的异步日志采用了LMAX Disruptor队列数据结构,这是一种高性能的并发工具,可以有效地减少线程间的上下文切换,进一步提升了日志处理效率。 在"Log4j2测试"的示例中,我们可以看到如何设置和使用异步日志。...
另一种方法是使用异步日志库,如Log4j2的AsyncAppender,它内部使用了Disruptor框架,提供了高效的无锁数据结构和事件处理机制,进一步减少了日志写入的延迟。 在Java中,我们通常使用的日志框架有Log4j、Logback和...
Log4j 2.11.0引入了异步日志记录,通过使用LMAX Disruptor库实现高效的并发处理,显著提高了日志系统的性能。 9. **性能与扩展性** 2.11.0版本对性能进行了优化,同时提供了良好的扩展性,允许开发者自定义布局...
总的来说,"abacus-unified"项目体现了软件工程中的抽象和封装原则,它将复杂的库集成工作简化,使开发者可以更专注于业务逻辑,而不是底层库的实现细节。这对于大型项目和团队协作尤其有利,因为它促进了代码的标准...
15.9 Disruptor+Redis队列 303 15.9.1 简介 303 15.9.2 XML配置 304 15.9.3 EventWorker 305 15.9.4 EventPublishThread 307 15.9.5 EventHandler 308 15.9.6 EventQueue 308 15.10 下单系统水平可扩展架构 311 ...