`
luoshi0801
  • 浏览: 147392 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Disruptor封装

 
阅读更多

在数据交换场景,disruptor受到越来越多的欢迎。下面是将原生disruptor封装成queue模型的代码,供参考

 

抽象类Disruptor,提供pull、take等接口

 

 

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;


public abstract class DisruptorQueue {
    
    public static void setUseSleep(boolean useSleep) {
        DisruptorQueueImpl.setUseSleep(useSleep);
    }

    public static DisruptorQueue mkInstance(String queueName,
            ProducerType producerType, int bufferSize, WaitStrategy wait) {
        return new DisruptorQueueImpl(queueName, producerType, bufferSize, wait);
    }

    public abstract String getName();

    public abstract void haltWithInterrupt();

    public abstract Object poll();

    public abstract Object take();

    public abstract void consumeBatch(EventHandler<Object> handler);
    
    public abstract void consumeBatchWhenAvailable(EventHandler<Object> handler);

    public abstract void publish(Object obj);

    public abstract void publish(Object obj, boolean block)
            throws InsufficientCapacityException;

    public abstract void consumerStarted();

    public abstract void clear();

    public abstract long population();

    public abstract long capacity();

    public abstract long writePos();

    public abstract long readPos();

    public abstract float pctFull();
    
}

 

 

具体实现

 

import java.util.HashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lmax.disruptor.AlertException;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;


public class DisruptorQueueImpl extends DisruptorQueue {
    
    private static final Logger logger = LoggerFactory.getLogger(DisruptorQueueImpl.class);
    
    static boolean useSleep = true;
    public static void setUseSleep(boolean useSleep) {
        AbstractSequencerExt.setWaitSleep(useSleep);
    }
    
    private static final Object FLUSH_CACHE = new Object();
    private static final Object INTERRUPT = new Object();
    private static final String PREFIX = "disruptor-";

    private final String _queueName;
    private final RingBuffer<MutableObject> _buffer;
    private final Sequence _consumer;
    private final SequenceBarrier _barrier;

    volatile boolean consumerStartedFlag = false;

    private final HashMap<String, Object> state = new HashMap<String, Object>(4);
    private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
    private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
    private final Lock readLock = cacheLock.readLock();
    private final Lock writeLock = cacheLock.writeLock();

    public DisruptorQueueImpl(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
        this._queueName = PREFIX + queueName;
        _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
        _consumer = new Sequence();
        _barrier = _buffer.newBarrier();
        _buffer.addGatingSequences(_consumer);
        if (producerType == ProducerType.SINGLE) {
            consumerStartedFlag = true;
        } else {
            if (bufferSize < 2) {
                throw new RuntimeException("QueueSize must >= 2");
            }
            try {
                publishDirect(FLUSH_CACHE, true);
            } catch (InsufficientCapacityException e) {
                throw new RuntimeException("This code should be unreachable!", e);
            }
        }
    }

    public String getName() {
        return _queueName;
    }

    public void consumeBatch(EventHandler<Object> handler) {
        consumeBatchToCursor(_barrier.getCursor(), handler);
    }

    public void haltWithInterrupt() {
        publish(INTERRUPT);
    }

    public Object poll() {
        if (consumerStartedFlag == false) {
            return _cache.poll();
        }

        final long nextSequence = _consumer.get() + 1;
        if (nextSequence <= _barrier.getCursor()) {
            MutableObject mo = _buffer.get(nextSequence);
            _consumer.set(nextSequence);
            Object ret = mo.o;
            mo.setObject(null);
            return ret;
        }
        return null;
    }

    public Object take() {
        if (consumerStartedFlag == false) {
            return _cache.poll();
        }

        final long nextSequence = _consumer.get() + 1;
        try {
            _barrier.waitFor(nextSequence);
        } catch (AlertException e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            logger.error("InterruptedException " + e.getCause());
            return null;
        } catch (TimeoutException e) {
            logger.error(e.getMessage(), e);
            return null;
        }
        MutableObject mo = _buffer.get(nextSequence);
        _consumer.set(nextSequence);
        Object ret = mo.o;
        mo.setObject(null);
        return ret;
    }

    public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
        try {
            final long nextSequence = _consumer.get() + 1;
            final long availableSequence = _barrier.waitFor(nextSequence);
            if (availableSequence >= nextSequence) {
                consumeBatchToCursor(availableSequence, handler);
            }
        } catch (AlertException e) {
            logger.error(e.getMessage(), e);
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            logger.error("InterruptedException " + e.getCause());
            return;
        }catch (TimeoutException e) {
            logger.error(e.getMessage(), e);
            return ;
        }
    }

    public void consumeBatchToCursor(long cursor, EventHandler<Object> handler){
        for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
            try {
                MutableObject mo = _buffer.get(curr);
                Object o = mo.o;
                mo.setObject(null);
                if (o == FLUSH_CACHE) {
                    Object c = null;
                    while (true) {
                        c = _cache.poll();
                        if (c == null)
                            break;
                        else
                            handler.onEvent(c, curr, true);
                    }
                } else if (o == INTERRUPT) {
                    throw new InterruptedException(
                            "Disruptor processing interrupted");
                } else {
                    handler.onEvent(o, curr, curr == cursor);
                }
            } catch (InterruptedException e) {
                logger.error(e.getMessage());
                return;
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new RuntimeException(e);
            }
        }
        _consumer.set(cursor);
    }

    public void publish(Object obj) {
        try {
            publish(obj, true);
        } catch (InsufficientCapacityException ex) {
            throw new RuntimeException("This code should be unreachable!");
        }
    }

    public void tryPublish(Object obj) throws InsufficientCapacityException {
        publish(obj, false);
    }

    public void publish(Object obj, boolean block)
            throws InsufficientCapacityException {

        boolean publishNow = consumerStartedFlag;

        if (!publishNow) {
            readLock.lock();
            try {
                publishNow = consumerStartedFlag;
                if (!publishNow) {
                    _cache.add(obj);
                }
            } finally {
                readLock.unlock();
            }
        }

        if (publishNow) {
            publishDirect(obj, block);
        }
    }

    protected void publishDirect(Object obj, boolean block)
            throws InsufficientCapacityException {
        final long id;
        if (block) {
            id = _buffer.next();
        } else {
            id = _buffer.tryNext(1);
        }
        final MutableObject m = _buffer.get(id);
        m.setObject(obj);
        _buffer.publish(id);
    }

    public void consumerStarted() {

        writeLock.lock();
        consumerStartedFlag = true;
        
        writeLock.unlock();
    }

    public void clear() {
        while (population() != 0L) {
            poll();
        }
    }

    public long population() {
        return (writePos() - readPos());
    }

    public long capacity() {
        return _buffer.getBufferSize();
    }

    public long writePos() {
        return _buffer.getCursor();
    }

    public long readPos() {
        return _consumer.get();
    }

    public float pctFull() {
        return (1.0F * population() / capacity());
    }

    public Object getState() {
        long rp = readPos();
        long wp = writePos();
        state.put("capacity", capacity());
        state.put("population", wp - rp);
        state.put("write_pos", wp);
        state.put("read_pos", rp);
        return state;
    }

    public static class ObjectEventFactory implements EventFactory<MutableObject> {
        @Override
        public MutableObject newInstance() {
            return new MutableObject();
        }
    }
}

 

  

public class MutableObject {

    Object o = null;

    public MutableObject() {

    }

    public MutableObject(Object o) {
        this.o = o;
    }

    public void setObject(Object o) {
        this.o = o;
    }

    public Object getObject() {
        return o;
    }

}

 

 

代码依赖

<dependency>
    <groupId>com.lmax</groupId>
    <artifactId>disruptor</artifactId>
    <version>3.2.1</version>
</dependency>

 

分享到:
评论

相关推荐

    async-framework:基于Disruptor的异步并行框架

    async-framework提供了流程和队列的概念,流程 Flow 代表步骤,队列 Queue 代表处理节点,队列由Disruptor封装而成,每个事件都有Flow发起,并且在每个Queue存在一个状态,每个队列处理特定的事件,简单的来说 Flow ...

    DisruptorDemo.zip

    《Disruptor技术详解——基于DisruptorDemo.zip实例解析》 Disruptor,由LMAX公司开发并开源,是一款高性能、低延迟的并发工具,主要用于优化多线程间的通信。它采用一种环形缓冲区(Ring Buffer)的设计,极大地...

    Netty 使用Disruptor机制的处理源代码

    1. **报价生产**:接收到外部报价源的更新后,生产者将报价数据封装成事件并发布到 Disruptor。 2. **事件处理**:Disruptor 中的处理器会按序处理报价事件,进行过滤、计算、存储等操作。 3. **通知用户**:经过...

    disruptor 代码分析

    1. **`handleEventsWith`**:该方法将事件处理器注册到Disruptor中,并通过`createEventProcessors`方法将其封装为`BatchEventProcessor`,同时根据处理器间的关系构建Sequence Barrier。对于第一个处理器而言,它不...

    tiny_disruptor:简化的[disruptor](https

    tiny_disruptor项目保留了这一核心思想,但可能对其进行了更直观、更易于理解的封装。 在tiny_disruptor中,开发者可以轻松创建一个Ring Buffer,并设置生产者和消费者。生产者将数据放入缓冲区,而消费者则从缓冲...

    Disruptor-Spring-Boot-Starter:Disruptor的启动器

    基于Disruptor的Spring Boot Starter实现,初步事件推导,处理封装 1,事件推动 a,配置简单,少量配置即可实现初始化事件推送 2,事件处理 a、配置简单,少量配置即可实现异步事件处理 b、组件实现了基于责任链的...

    15、CPU缓存架构详解&高性能内存队列Disruptor实战(1).pdf

    - **L3缓存**:容量最大,速度比前两者慢,通常不在CPU内核上,而是集成在主板上或CPU封装内。 **缓存访问流程**: - 当CPU需要读取数据时,首先检查L1缓存。 - 如果未命中,再检查L2缓存。 - 之后是L3缓存。 - ...

    谈disruptor的单线程数据库操作

    `TaskEvent`类是用来封装`Task`的事件对象,其中包含了`Task`实例的getter和setter方法。`TaskEvent`的`EVENT_FACTORY`是一个工厂类,用于创建新的`TaskEvent`实例。 接下来,我们创建了一个`TaskEventHandler`类,...

    java_learning_practice:java进阶之路:面试高频算法,akka,多线程,NIO,Netty,SpringBoot,Spark && Flink等

    语言方面的练习算法:leetcode,nowcoder,swordoffer,以及算法红皮书等语言技术:kotlin,java8函数式,多线程等框架:akka,zookeeper,Disruptor等核心网springboot相关练习球衣封装Spring安全练习基于curator ...

    一款分布式的java游戏服务器框架,具备高性能、可伸缩、分布式、多线程等特点,java 8 +gradle 4.0

    Netty 宇宙最强的java网络库,可定义各种网络通信方式,本框架中RPC、http、websocket等都基于netty的封装。 Disruptor 高性能线程间消息传递库,通过它来实现“消息中心”,跨线程消息传递so easy! HikariCP 稳定...

    转换为3GP,MP4的软件

    这样的转换过程通常包括视频编码、解码、重新封装等步骤,以确保转换后的文件能在目标设备上顺利播放。 “Ultra Mobile 3GP Video Converter”是具体的一个软件实例,这是一款专业级的视频转换工具,专为将各种视频...

    华为面试题1.rar

    8. **对象和类**:面向对象编程的基本概念,如封装、继承、多态,以及构造函数、析构函数、友元等特性。 9. **模板**:函数模板和类模板的使用,以及模板特化和偏特化。 10. **异常处理**:理解何时和如何使用try...

    xd-toolkit:工具集(工作过程中常用到的工具类、工具模块)

    xd-toolkit 工具集(工作过程中常用到的工具类、工具模块) SPMC并发组件(同时也支持MPMC模式) ...LMAX Disruptor并发框架封装 应用场景: a、原框架使用起来比较繁杂,其实很多时候我们只关心生

    光宇游戏Java开发框架.zip

    4. 异步处理:利用Java的ExecutorService或者Disruptor等工具进行异步处理,提高系统并发能力,提升响应速度。 5. 安全机制:通过Spring Security或者Apache Shiro实现权限控制,确保游戏服务的安全性。 三、模块...

    log4j2 demo 性能测试

    Log4j2的异步日志采用了LMAX Disruptor队列数据结构,这是一种高性能的并发工具,可以有效地减少线程间的上下文切换,进一步提升了日志处理效率。 在"Log4j2测试"的示例中,我们可以看到如何设置和使用异步日志。...

    异步写日志

    另一种方法是使用异步日志库,如Log4j2的AsyncAppender,它内部使用了Disruptor框架,提供了高效的无锁数据结构和事件处理机制,进一步减少了日志写入的延迟。 在Java中,我们通常使用的日志框架有Log4j、Logback和...

    log4j 源码包 日志包 2.11.0

    Log4j 2.11.0引入了异步日志记录,通过使用LMAX Disruptor库实现高效的并发处理,显著提高了日志系统的性能。 9. **性能与扩展性** 2.11.0版本对性能进行了优化,同时提供了良好的扩展性,允许开发者自定义布局...

    abacus-unified

    总的来说,"abacus-unified"项目体现了软件工程中的抽象和封装原则,它将复杂的库集成工作简化,使开发者可以更专注于业务逻辑,而不是底层库的实现细节。这对于大型项目和团队协作尤其有利,因为它促进了代码的标准...

    开涛高可用高并发-亿级流量核心技术

    15.9 Disruptor+Redis队列 303 15.9.1 简介 303 15.9.2 XML配置 304 15.9.3 EventWorker 305 15.9.4 EventPublishThread 307 15.9.5 EventHandler 308 15.9.6 EventQueue 308 15.10 下单系统水平可扩展架构 311 ...

Global site tag (gtag.js) - Google Analytics