`
weigang.gao
  • 浏览: 486215 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Disruptor3.2实现一个生产者与两个消费者

 
阅读更多

项目的目录结构如下:


 

1.定义事件

package org.fenxisoft.disruptor;

//定义事件
public class LongEvent {
	
	private long value;
	
	public void set(long value){
		this.value = value;
	}
    
	@Override
	public String toString() {
		return String.valueOf(value);
	}
}

 

 

2.定义事件工厂

package org.fenxisoft.disruptor;

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent>{

	@Override
	public LongEvent newInstance() {
		return new LongEvent();
	}

}

 

3.定义事件处理者

package org.fenxisoft.disruptor;

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent>{

	@Override
	public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
			throws Exception {
		System.out.println(Thread.currentThread().getName()+" : "+event);
		
	}

}

4.编写测试类

package org.fenxisoft.disruptor;

import java.util.concurrent.Executors;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.dsl.ProducerType;

public class DisruptorDemo {
	
	public static void main(String[] args) {
		
		LongEventFactory eventFactory = new LongEventFactory();		
		int bufferSize = 4;//1024 * 1024;//ring Buffer Size
		WaitStrategy waitStrategy = new BlockingWaitStrategy();
		
		RingBuffer<LongEvent> ringBuffer = RingBuffer.create(ProducerType.MULTI, eventFactory,
				bufferSize, waitStrategy);
		
		WorkHandler<LongEvent> workHandler1 = new WorkHandler<LongEvent>() {

			@Override
			
			public void onEvent(LongEvent event) throws Exception {
				
				System.out.println(Thread.currentThread().getName()+"消费数据: "+event);
			}
		};
		
		
		WorkerPool<LongEvent> workerPool = new WorkerPool<LongEvent>(ringBuffer, 
				ringBuffer.newBarrier(), new IgnoreExceptionHandler(), workHandler1,workHandler1);
		
		//每个消费者,也就是workProcessor都有一个sequence
		Sequence[] sequences = workerPool.getWorkerSequences();
		ringBuffer.addGatingSequences(sequences);
		workerPool.start(Executors.newFixedThreadPool(10));
		
		System.out.println("运行的线程数量:"+Thread.activeCount());//由此可以说明生产者就是主线程,消费者是两个子线程
		//生产者生产数据
		System.out.println("开始生产");
		for(int i = 0; i < 100; i++){
			long sequence = ringBuffer.next();
			
			try {
				LongEvent event = ringBuffer.get(sequence);
				event.set(i);
			} finally{
				ringBuffer.publish(sequence);
			}
			System.out.println("-----------------");
			
		}
		
	}
	
	

}

 

核心方法分析:

1.next()方法分析,这个方法是单线程调用

  public long next(int n)
    {
        if (n < 1)
        {
            throw new IllegalArgumentException("n must be > 0");
        }

        long current;//定义当前坐标
        long next;//定义下次要访问的数组坐标

        do
        {
        	current = cursor.get();//获取当前坐标
            next = current + n;//当前坐标加1就是下次要访问的数组坐标
            
            long wrapPoint = next - bufferSize;
            long cachedGatingSequence = gatingSequenceCache.get();
            
            //如果成立,说明生产过快
            if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
            {
            	//获取已经消费【或者准备消费的】的最小序列
                long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
                //如果成立,说明需要等待消费者消费事件
                if (wrapPoint > gatingSequence)
                {
                    waitStrategy.signalAllWhenBlocking();//尝试唤醒消费者WorkProcessor来消费事件Event
                    LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
                    continue;
                }

                gatingSequenceCache.set(gatingSequence);
            }
            else if (cursor.compareAndSet(current, next))
            {
                break;
            }
        }
        while (true);

        return next;
    }

 

2.run()方法分析,这个方法是多线程调用

 public void run()
    {
        if (!running.compareAndSet(false, true))
        {
            throw new IllegalStateException("Thread is already running");
        }
        sequenceBarrier.clearAlert();

        notifyStart();

        boolean processedSequence = true;
        long cachedAvailableSequence = Long.MIN_VALUE;
        long nextSequence = sequence.get();
        T event = null;
        while (true)
        {
            try
            {
                // if previous sequence was processed - fetch the next sequence and set
                // that we have successfully processed the previous sequence
                // typically, this will be true
                // this prevents the sequence getting too far forward if an exception
                // is thrown from the WorkHandler
                if (processedSequence)
                {
                    processedSequence = false;
                    do
                    {
                        nextSequence = workSequence.get() + 1L;
                        sequence.set(nextSequence - 1L);
                    }
                    //两个WorkProcessor共用同一个workSequence,所以不会出现同一个事件Event被2个消费者消费
                    while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
                }
                if (cachedAvailableSequence >= nextSequence)
                {
                    event = ringBuffer.get(nextSequence);
                    workHandler.onEvent(event);
                    processedSequence = true;
                }
                else
                {    
                	//获取可以消费的最大序列
                    cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
                }
            }
            catch (final TimeoutException e)
            {
                notifyTimeout(sequence.get());
            }
            catch (final AlertException ex)
            {
                if (!running.get())
                {
                    break;
                }
            }
            catch (final Throwable ex)
            {
                // handle, mark as processed, unless the exception handler threw an exception
                exceptionHandler.handleEventException(ex, nextSequence, event);
                processedSequence = true;
            }
        }

        notifyShutdown();

        running.set(false);
    }

 

 

如果没明白,看下图:


 

 

 参考:http://www.cnblogs.com/jony-zhang/p/3817208.html

           http://maoyidao.iteye.com/blog/1663193

  • 大小: 9.4 KB
  • 大小: 24.7 KB
  • 大小: 62.6 KB
分享到:
评论

相关推荐

    disruptor 多个消费者

    与传统的队列不同,Disruptor的环形缓冲区允许多个生产者和消费者并发操作,避免了锁竞争,从而实现了更高的性能。 1. **多消费者机制**:在Disruptor中,可以注册多个消费者监听事件。每个消费者都有一个独立的...

    Disruptor3.2官方例子测试

    NULL 博文链接:https://yanbingwei.iteye.com/blog/1985778

    Disruptor C++版(仅支持单生产者)

    由于这里提到的是“仅支持单生产者”,这意味着在一个Disruptor实例中,只能有一个线程发布事件。这简化了并发控制,但限制了并行度。 2. 消费者:消费者从Disruptor中获取并处理事件。Disruptor支持多个消费者,...

    Disruptor并发框架中文参考文档

    **Disruptor** 的核心在于它的Ring Buffer设计,这是一种特殊的循环数组结构,能够高效地支持多个生产者向其中添加数据以及多个消费者从中读取数据的过程,而无需使用传统的锁机制。下面将详细介绍Disruptor的工作...

    disruptor jar包+Demo+Api

    在提供的压缩包中,包含了 Disruptor 的两个版本为 2.10.3 的 jar 包,分别是源码版本 `disruptor-2.10.3-sources.jar` 和运行时版本 `disruptor-2.10.3.jar`。源码包可以让我们深入理解其内部实现机制,而运行时包...

    DisruptorDemo.zip

    环形缓冲区是一种固定大小的数组,生产者在数组的一端写入数据,消费者在另一端读取数据,当写入和读取的位置相遇时,表示缓冲区已满,需要等待消费者消费后才能继续写入。这种设计避免了锁的使用,减少了线程同步的...

    Disruptor demo

    2. **序列号(Sequence)**:每个生产者和消费者都有自己的序列号,用于跟踪其在环形缓冲区中的位置。序列号的增减操作是原子性的,确保了多线程环境下的一致性。 3. **事件处理(Event Processing)**:在...

    springboot整合Disruptor并发编程框架 #资源达人分享计划#

    Disruptor的工作原理基于环形缓冲区(Ring Buffer)的设计,它将生产者和消费者之间的数据传递过程转换为顺序写入和读取,避免了传统并发模型中的锁竞争和上下文切换,从而实现了极高的消息传递效率。在SpringBoot中...

    netty结合disruptor队列实现即时通信

    netty结合disruptor队列实现即时通信1、简介使用disruptor改造netty通讯,使提高吞吐率,主要是提供disruptor如何与netty整合的思路2、软件架构spring-boot2.7.3 + netty4.1.36.Final + disruptor + jdk1.83、源码...

    Disruptor学习(1)

    Disruptor-examples这个压缩包文件很可能是Disruptor的示例代码,包括了各种应用场景的实现,如简单的生产者消费者模型、多级处理链等,通过这些示例,我们可以更直观地理解Disruptor如何在实际中应用。 总的来说,...

    Disruptor应用实例

    Disruptor的另一个关键特性是序列号(Sequence),每个生产者和消费者都有独立的序列号,用于跟踪它们对环形缓冲区的操作。这种设计保证了数据处理的有序性,避免了线程间的竞态条件。 在Disruptor中,生产者将数据...

    高性能高稳定性分布式的游戏架构,游戏逻辑运行在Disruptor消费者线程中,其它线程都为辅助线程, 整体为多生产者.zip

    本文将深入探讨一个使用Java开发的游戏项目,该项目采用了一种独特的方式,即游戏逻辑运行在Disruptor消费者线程中,其余线程作为辅助线程,构建了一个多生产者的分布式架构。这个设计模式旨在优化并发性能,提高...

    disruptor高性能Java线程间通讯库

    尽管Disruptor最初设计为支持多个生产者和单个消费者,但通过EventProcessor链,它可以实现多消费者模式。每个EventProcessor负责处理一部分事件,形成流水线化的工作方式,进一步提高了处理效率。 3. **事件...

    Disruptor资料合集

    Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟。Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用...

    disruptor-3.2.1源码带jar包20140321

    序列号的增一操作是原子性的,这使得多个生产者和消费者可以安全地并行操作,而无需传统的锁机制。 3. **事件处理器**:Disruptor中的事件处理器是处理缓冲区内事件的类。它们按照预定义的顺序连接在一起,形成一个...

    disruptor 实例

    3. **多生产者与多消费者**:Disruptor支持多个生产者和消费者并发操作,通过序列号的同步机制,保证了数据的一致性。 4. **事件处理流水线**:通过预定义的事件处理器链,Disruptor能构建出高效的数据处理流水线,...

    disruptor 代码分析

    Disruptor框架是LMAX交易所开源的一个高性能、低延迟的消息队列实现,它采用无锁化编程技术,利用环形缓冲区(Ring Buffer)来实现高效的多生产者多消费者模型。本文将深入分析Disruptor的代码,特别聚焦于`...

    disruptor技术培训

    - **抢占机制**:当多个生产者试图写入同一个位置时,Disruptor使用CAS操作来进行位置的抢占,确保只有一个生产者可以成功写入。 - **位置分配**:生产者通过调用`next()`方法来获取下一个可用的位置,然后将事件...

Global site tag (gtag.js) - Google Analytics