`

Disruptor应用实例

阅读更多

  Disruptor是什么可以阅读《高性能线程间队列DISRUPTOR简介》一文,下面重点讲讲在实际应用中如何去使用Disruptor。

        项目结构如下:


CreateReqEvent.java

package com.bijian.study;

import com.lmax.disruptor.EventFactory;

public class CreateReqEvent {
	
    private String reqStr;
    
    public String getReqStr() {
		return reqStr;
	}

	public void setReqStr(String reqStr) {
		this.reqStr = reqStr;
	}

	private static class Factory implements EventFactory<CreateReqEvent> {

        @Override
        public CreateReqEvent newInstance() {
            return new CreateReqEvent();
        }
    }

    public static final CreateReqEvent.Factory FACTORY = new CreateReqEvent.Factory();
}

CreateReqEventHandler.java

package com.bijian.study;

import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lmax.disruptor.EventHandler;

public class CreateReqEventHandler implements EventHandler<CreateReqEvent> {
	
    private static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Override
    public void onEvent(CreateReqEvent event, long sequence, boolean endOfBatch) throws Exception {
    	
        log.info("on event:{}", event.getReqStr());
    }
}

CreateReqEventTranslator.java

package com.bijian.study;

import com.lmax.disruptor.EventTranslator;

public class CreateReqEventTranslator implements EventTranslator<CreateReqEvent> {

    private String reqString;
    
    public String getReqString() {
		return reqString;
	}

	public void setReqString(String reqString) {
		this.reqString = reqString;
	}

	@Override
    public void translateTo(CreateReqEvent event, long sequence) {
        event.setReqStr(reqString);
    }
}

ReqEventUtil.java

package com.bijian.study;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class ReqEventUtil {
	
    private static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    private static volatile Disruptor<CreateReqEvent> disruptor;
    private static ExecutorService executor;

    //启动处理线程
    static {
        int ringBufferSize = 256 * 256; // RingBuffer 大小=65536,必须是 2的 N次方;
        executor = Executors.newFixedThreadPool(4*4);
        disruptor = new Disruptor(CreateReqEvent.FACTORY, ringBufferSize, executor, ProducerType.MULTI,new BlockingWaitStrategy());

        EventHandler<CreateReqEvent> eventHandler = new CreateReqEventHandler();
        disruptor.handleEventsWith(eventHandler);
        disruptor.start();
    }
    
    public static void push(String reqString){
        try {
            log.info("push create reqString event:{}", reqString);
            CreateReqEventTranslator translator = new CreateReqEventTranslator();
            translator.setReqString(reqString);
            disruptor.publishEvent(translator);
        }catch (Exception e){
            log.error("push CreateOrderEvent error:",e);
        }
    }
    
    /**
     * 停止处理
     */
    public static void shutdown(){
    	
        log.info("shutdown now...");
        
        if(disruptor != null) {
            disruptor.shutdown();
        }

        if(executor != null) {
            executor.shutdown();
        }
    }
}

测试类Main.java

package com.bijian.test;

import java.lang.invoke.MethodHandles;
import java.util.Scanner;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.bijian.study.ReqEventUtil;

public class Main {

	private static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
	
	public static void main(String[] args) {
		
		Scanner sc = new Scanner(System.in);
		while(sc.hasNext()) {
			String reqStr = sc.nextLine();
			log.info("输入的信息:" + reqStr);
			if(reqStr.equals("exit")) {
				ReqEventUtil.shutdown();
				break;
			}else {
				ReqEventUtil.push(reqStr);
			}
		}
	}
}

运行结果:

test
2018-12-24 23:12:26.337 [main] INFO  Main.main(Main.java:20)[][][] - 输入的信息:test
2018-12-24 23:12:26.370 [main] INFO  ReqEventUtil.push(ReqEventUtil.java:35)[][][] - push create reqString event:test
2018-12-24 23:12:26.371 [pool-2-thread-1] INFO  CreateReqEventHandler.onEvent(CreateReqEventHandler.java:16)[][][] - on event:test
abc
2018-12-24 23:12:28.323 [main] INFO  Main.main(Main.java:20)[][][] - 输入的信息:abc
2018-12-24 23:12:28.323 [main] INFO  ReqEventUtil.push(ReqEventUtil.java:35)[][][] - push create reqString event:abc
2018-12-24 23:12:28.324 [pool-2-thread-1] INFO  CreateReqEventHandler.onEvent(CreateReqEventHandler.java:16)[][][] - on event:abc
xyz
2018-12-24 23:12:30.628 [main] INFO  Main.main(Main.java:20)[][][] - 输入的信息:xyz
2018-12-24 23:12:30.628 [main] INFO  ReqEventUtil.push(ReqEventUtil.java:35)[][][] - push create reqString event:xyz
2018-12-24 23:12:30.628 [pool-2-thread-1] INFO  CreateReqEventHandler.onEvent(CreateReqEventHandler.java:16)[][][] - on event:xyz
exit
2018-12-24 23:12:34.032 [main] INFO  Main.main(Main.java:20)[][][] - 输入的信息:exit
2018-12-24 23:12:34.033 [main] INFO  ReqEventUtil.shutdown(ReqEventUtil.java:49)[][][] - shutdown now...

 

PS:完整工程代码详见附件《DisruptorStudy.zip》,《DisruptorStudy02.zip》是《Disruptor样例实战》一文的完整工程代码。

  • 大小: 16.5 KB
分享到:
评论

相关推荐

    springboot整合Disruptor并发编程框架 #资源达人分享计划#

    利用Spring的Bean管理机制,将Disruptor实例化并注入到需要使用它的服务中。 在业务代码中,生产者通过Disruptor的Producer接口发布事件,而消费者则通过EventProcessor接口来消费这些事件。Disruptor会自动调度...

    disruptor 实例

    在testMyDisruptor这个实例中,我们可能会看到如何创建并配置Disruptor,包括设置缓冲区大小、定义事件处理器链以及启动Disruptor。在初始化Disruptor时,我们需要提供一个事件工厂(EventFactory),它负责为环形...

    Netty 使用Disruptor机制的处理源代码

    总的来说,Netty 和 Disruptor 的结合使用,能够显著提升网络应用的性能,尤其适合处理大规模并发、实时性要求高的场景,如金融交易、实时数据推送等。在深入研究 "trapos-master" 项目源码时,可以进一步理解这两个...

    Disruptor专题简单案例资料

    这个“Disruptor专题简单案例资料”很可能包含了一些基础的使用示例和概念解释,帮助我们理解Disruptor的工作原理和优势。 在Java并发编程中,线程之间的通信通常依赖于共享内存,这可能导致竞态条件、死锁等问题,...

    Java工具:高性能并发工具Disruptor简单使用

    在实际应用中,Disruptor通常与其他并发工具如ExecutorService结合使用,以实现更复杂的并发控制策略。 学习Disruptor的源码"study-disruptor"可以帮助我们深入理解其内部机制,例如如何实现无锁操作,如何优化内存...

    DisruptorDemo.zip

    本篇文章将结合提供的"DisruptorDemo.zip"压缩包中的代码实例,深入探讨Disruptor的工作原理及应用。 首先,我们要理解Disruptor的核心设计——环形缓冲区。环形缓冲区是一种固定大小的数组,生产者在数组的一端...

    Disruptor C++版(仅支持单生产者)

    由于这里提到的是“仅支持单生产者”,这意味着在一个Disruptor实例中,只能有一个线程发布事件。这简化了并发控制,但限制了并行度。 2. 消费者:消费者从Disruptor中获取并处理事件。Disruptor支持多个消费者,...

    disruptor jar包+Demo+Api

    《Disruptor 框架详解与应用实例》 Disruptor 是一款高性能的并发工具库,由英国的 LMAX 公司开发并开源。它主要用于优化多线程环境下的数据共享,尤其在金融交易系统中表现卓越。Disruptor 的核心设计是一个环形...

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    - 创建Disruptor实例:配置Ring Buffer、Sequencer、事件处理器等。 - 启动Disruptor:调用Disruptor的start()方法启动事件处理循环。 5. **Demo示例** - 一个简单的Disruptor示例通常包括创建Disruptor对象、...

    disruptor-3.2.1源码带jar包20140321

    首先,创建生产者和消费者,然后配置Disruptor实例,设置事件处理器链。运行示例,观察Disruptor如何在多线程环境中高效地处理事件。这不仅加深了你对Disruptor的理解,还能够帮助你在实际项目中更好地应用这个框架...

    Netty整合并发编程框架Disruptor实战百万长链接服务构建源码.zip

    Netty整合并发编程框架Disruptor实战百万长链接服务构建源码.zip Netty整合并发编程框架Disruptor实战百万长链接服务构建源码.zip Netty整合并发编程框架Disruptor实战百万长链接服务构建源码.zip

    Disruptor并发框架

    同时,Disruptor还采用了懒初始化策略,只有在真正需要时才创建事件实例,降低了资源消耗。 六、Disruptor的应用场景 1. 高性能交易系统:Disruptor的低延迟特性使其成为金融交易系统中的理想选择。 2. 实时数据...

    disruptor 代码分析

    本文将深入分析Disruptor的代码,特别聚焦于`setGatingSequences`方法的作用以及如何确定每个消费者可处理的最大事件序号,同时探讨`YieldingWaitStrategy`与`SingleThreadedClaimStrategy`的组合使用。 #### ...

    高并发框架Disruptor代码

    **Disruptor在实际应用中的案例** Disruptor不仅在金融领域表现出色,也可应用于大数据处理、消息队列、游戏服务器等领域。例如,Apache Kafka的Producer就是使用Disruptor来提高发布消息的效率。 总之,Disruptor...

    disruptor技术培训

    ### Disruptor技术详解及其应用 #### 一、Disruptor框架概述 Disruptor是一个高性能的并发框架,主要用于解决高并发场景下的消息传递问题。它通过无锁设计实现了非常高的吞吐量和低延迟,这使得Disruptor在金融...

    无锁队列Disruptor超详细教程

    无锁队列Disruptor是LMAX公司为解决内存队列延迟问题而...了解并熟练掌握Disruptor,对于提升Java应用的并发性能至关重要。在实际开发中,根据具体场景选择合适的配置和策略,可以进一步优化系统的吞吐量和响应时间。

    Disruptor学习.7z

    虽然Disruptor本身并不直接与Spring框架集成,但开发者可以利用Spring的Bean管理机制,将Disruptor作为服务组件引入到Spring应用中。通过配置Bean,可以方便地管理和调用Disruptor实例,实现业务逻辑。 四、...

    disruptror的jar包和例子

    2. 创建Disruptor实例:配置Disruptor,包括设置环形缓冲区的大小、事件处理器链以及事件工厂等。 3. 注册事件处理器:定义处理事件的处理器,并将其注册到Disruptor上,可以有多个处理器并行处理事件。 4. 启动...

Global site tag (gtag.js) - Google Analytics