`

构建高性能服务(三)Java高性能缓冲设计 vs Disruptor vs LinkedBlockingQueue

 
阅读更多

一个仅仅部署在4台服务器上的服务,每秒向Database写入数据超过100万行数据,每分钟产生超过1G的数据。而每台服务器(8核12G)上CPU占用不到100%,load不超过5。这是怎么做到呢?下面将给你描述这个架构,它的核心是一个高效缓冲区设计,我们对它的要求是:

1,该缓存区要尽量简单

2,尽量避免生产者线程和消费者线程锁

3,尽量避免大量GC

缓冲 vs 性能瓶颈

提高硬盘写入IO的银弹无疑是批量顺序写,无论是在业界流行的分布式文件系统或数据,HBase,GFS和HDFS,还是以磁盘文件为持久化方式的消息队列Kafka都采用了在内存缓存数据然后再批量写入的策略。这一个策略的性能核心就是内存中缓冲区设计。这是一个经典的数据产生者和消费者场景,缓冲区的要求是当同步写入和读出时:(1)写满则不写(2)读空则不读(3)不丢失数据(4)不读重复数据。最直接也是常用的方式就是JDK自带的LinkedBlockingQueue。LinkedBlockingQueue是一个带锁的消息队列,写入和读出时加锁,完全满缓冲区上面的四个要求。但是当你的程序跑起来之后,看看那个线程CPU消耗最高?往往就是在线程读LinkedBlockingQueue锁的时候,这也成为很多对吞吐要求很高的程序的性能瓶颈。

Disruptor

解决加锁队列产生的性能问题?Disruptor是一个选择。Disruptor是什么?看看开源它的公司LMAX自己是怎么介绍的:

 

我们花费了大量的精力去实现更高性能的队列,但是,事实证明队列作为一种基础的数据结构带有它的局限性——在生产者、消费者、以及它们的数据存储之间的合并设计问题。Disruptor就是我们在构建这样一种能够清晰地分割这些关注问题的数据结构过程中所诞生的成果。

 

OK,Disruptor是用来解决我们这个场景的问题的,而且它不是队列。那么它是什么并且如何实现高效呢?我这里不做过多介绍,网上类似资料很多,简单的总结:

1,Disruptor使用了一个RingBuffer替代队列,用生产者消费者指针替代锁。

2,生产者消费者指针使用CPU支持的整数自增,无需加锁并且速度很快。Java的实现在Unsafe package中。

 

使用Disruptor,首先需要构建一个RingBuffer,并指定一个大小,注意如果RingBuffer里面数据超过了这个大小则会覆盖旧数据。这可能是一个风险,但Disruptor提供了检查RingBuffer是否写满的机制用于规避这个问题。而且根据maoyidao测试结果,写满的可能性不大,因为Disrutpor确实高效,除非你的消费线程太慢。

 

并且使用一个单独的线程去处理RingBuffer中的数据:

 

       RingBuffer ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
                new SingleThreadedClaimStrategy(RING_SIZE),
                new SleepingWaitStrategy());

       	SequenceBarrier barrier = ringBuffer.newBarrier();

       	BatchEventProcessor<ValueEvent> eventProcessor = new BatchEventProcessor<ValueEvent>(ringBuffer, barrier, handler);
       	ringBuffer.setGatingSequences(eventProcessor.getSequence());
       	// only support single thread
       	new Thread(eventProcessor).start();

 

ValueEvent通常是个自定义的类,用于封装你自己的数据:

 

public class ValueEvent {
    private byte[] packet;

    public byte[] getValue()
    {
        return packet;
    }

    public void setValue(final byte[] packet)
    {
        this.packet = packet;
    }

    public final static EventFactory<ValueEvent> EVENT_FACTORY = new EventFactory<ValueEvent>()
    {
        public ValueEvent newInstance()
        {
            return new ValueEvent();
        }
    };
}

 

 

生产者通过RingBuffer.publish方法向buffer中添加数据,同时发出一个事件通知消费者有新数据达到,并且,,,注意我们是怎么规避数据覆盖问题的:

 

// Publishers claim events in sequence
long sequence = ringBuffer.next();

// if capacity less than 10%, don't use ringbuffer anymore
if(ringBuffer.remainingCapacity() < RING_SIZE * 0.1) {
	log.warn("disruptor:ringbuffer avaliable capacity is less than 10 %");
	// do something
}
else {
	ValueEvent event = ringBuffer.get(sequence);
	event.setValue(packet); // this could be more complex with multiple fields
	// make the event available to EventProcessors
	ringBuffer.publish(sequence);
}

 

数据消费者代码在EventHandler中实现:

 

final EventHandler<ValueEvent> handler = new EventHandler<ValueEvent>()
{
	public void onEvent(final ValueEvent event, final long sequence, final boolean endOfBatch) throws Exception
	{   
		byte[] packet = event.getValue();
		// do something
	}
};

 

很好,完成!用以上代码跑个压测,结果果然比加锁队列快很多(Disruptor官网上有benchmark数据,我这里就不提供对比数据)。好,用到线上环境。。。。结果是。。。CPU反而飙升了!??

Disruptor的坑

 

书接上文,Disruptor压测良好,但上线之后CPU使用达到650%,LOAD接近300!分析diruptor源码可知,造成cpu过高的原因是 RingBuffer 的waiting策略,Disruptor官网例子使用的策略是 SleepingWaitStrategy ,这个类的策略是当没有新数据写入RingBuffer时,每1ns检查一次RingBuffer cursor。1ns!跟死循环没什么区别,因此CPU暴高。改成每100ms检查一次,CPU立刻降为7.8%。

 

为什么Disruptor官网例子使用这种有如此风险的SleepingWaitStrategy呢?原因是此策略完全不使用锁,当吞吐极高时,RingBuffer中始终有数据存在,通过轮询策略就能最大程度的把它的性能优势发挥出来。但这显然是理想状态,互联网应用有明显的高峰低谷,不可能总处于满负荷状态。因此还是BlockingWaitStrategy 这种锁通知机制更好:

 

RingBuffer ringBuffer = new RingBuffer<ValueEvent>(ValueEvent.EVENT_FACTORY,
                new SingleThreadedClaimStrategy(RING_SIZE),
                new BlockingWaitStrategy());

 这样写入不加锁,读出加锁。相对加锁队列少了一半,性能还是有显著提高。

 

还有没有更好的方法?

Disruptor是实现缓冲区的很好选择。但它本质的目的是提供线程间交换数据的高效实现,这是一个很好的通用选择。那么真对我们数据异步批量落地的场景,还有没有更好的选择呢?答案是:Yes,we have!我最终设计了一个非常简单的buffer,原因是:

1,Disruptor很好,但毕竟多引入了一个依赖,对于新同学也有学习成本。

2,Disruptor不能很好的解决GC过多的问题。

那么更好的缓存是什么呢?这首先要从场景说起。

首先的问题是:我需要一个buffer,但为啥要一个跨线程buffer呢?如果我用同一个线程读,再用这个线程去写,这个buffer完全是线程本地buffer,锁本身就无意义。同时异步Database落地没有严格的顺序要求,因此我是多线程同步读写,也不需要集中时的buffer来维护顺序,因此一个内置于线程中的二维byte[][]数组就可以解决全部问题!

 

public class ThreadLocalBoundedMQ {
	private long lastFlushTime=0L;
	
	private byte[][] msgs=new byte[Constants.BATCH_INS_COUNT][];
	
	private int offset=0;
	
	public byte[][] getMsgs(){
		return msgs;
	}
	
	public void addMsg(byte[] msg)
	{
		msgs[offset++]=msg;
	}

	public int size() {
		return offset;
	}

	public void clear() {
		offset=0;
		lastFlushTime=System.currentTimeMillis();
	}
	
	public boolean needFlush(){
		return (System.currentTimeMillis()-lastFlushTime > Constants.MAX_BUFFER_TIME)
		&& offset>0;
	}
}

实际测试和上线效果良好(效果见本文第一节)!

总结

能够使用最简化的代码完成性能和业务要求,是最完美的方法。根据使用场景,你可以有很多假设,但不要被眼花缭乱的新技术迷惑而拿你自己的服务做小白鼠,最适合的,最简单的,就是最好的。

 

本文系maoyidao原创,转载请引用原链接:

http://maoyidao.iteye.com/blog/1663193

同时推荐本系列前2篇

 

构建高性能服务(一)ConcurrentSkipListMap和链表构建高性能Java Memcached

http://maoyidao.iteye.com/blog/1559420

构建高性能服务(二)java高并发锁的3种实现

http://maoyidao.iteye.com/blog/1563523

分享到:
评论
9 楼 abc08010051 2017-04-19  
RingBuffer不存在生产覆盖未消费的数据,或者消费已经消费过的数据这种风险,生产者消费者的index使用优化了false sharing
8 楼 dacoolbaby 2014-12-12  
RingBuffer怎么会涉及到GC的问题呢??

LZ给的最后的例子也是有冲突的,不是线程安全的。难道只是给了个模板吗?
7 楼 foxama 2014-09-04  
lz你最后一个例子,其实是不能异步的,但是有以下缺点:
1 不能一次只取一个数据,只能每次取全部msg,没有读指针,在你这里offset是写指针
2 两个线程,一个调clear,一个调addmsg,两个同时对offset操作,会有冲突。

6 楼 rprenpeng83 2013-07-19  
给个能运行的例子,看一下,行不?
5 楼 foreseer201 2013-05-07  
请教个问题,实现了无锁的缓存消息结构后性能大幅提升,是不是在缓存的消费者性能不是问题的前提下。比如写入数据库的这个步骤很慢的话(带有IO操作的消费者),整个应用的瓶颈就在这里了,而不是缓存消息锁上。这个理解对吗?谢谢。
4 楼 yelogic 2013-01-29  
学习了,很好的思路。
最后的那个实现中需要consumer自行在zk或文件中记录offset吧?
3 楼 maoyidao 2012-11-30  
没有呀,一个线程写一个线程读,还是异步。
2 楼 xishuixixia 2012-11-29  
欢迎讨论,greenyouyou@163.com
1 楼 xishuixixia 2012-11-29  
如果我用同一个线程读,再用这个线程去写。其实你没有真正理解disruptor,disruptor核心是异步,同一个线程,异步的意义在哪里?

相关推荐

    disruptor高性能Java线程间通讯库

    Disruptor是一款由LMAX交易所开发的开源Java框架,它专为高并发环境下的线程间通信设计,致力于提供极低的延迟和高效的性能。在Java开发中,尤其是在需要处理大量并发请求的系统中,Disruptor是一个重要的工具,它...

    Java工具:高性能并发工具Disruptor简单使用

    Java工具:高性能并发工具Disruptor简单使用 在Java编程中,高效并发处理是优化系统性能的关键之一。Disruptor,由LMAX公司开源的一款并发框架,为处理高并发场景提供了一种新颖且高效的解决方案。它通过消除锁和...

    Disruptor 一种可替代有界队列完成并发线程间数据交换高性能解决方案.docx

    综上所述,Disruptor 是一种革命性的并发编程工具,通过创新的设计思路和优化技术,极大地提升了并发环境下的数据交换性能,成为构建高性能、低延迟系统的理想选择。尽管随着Disruptor版本的迭代,实现细节有所变化...

    disruptor 实例

    Disruptor,由英国LMAX公司开源的一款高性能、低延迟的消息处理框架,它彻底改变了并发编程的传统模式,尤其在金融交易领域,其性能表现卓越,被誉为“游戏规则的改变者”。本文将深入探讨Disruptor的核心原理,并...

    基于Spring Boot和LMAX Disruptor的高性能并发框架.zip

    通过本项目,开发者可以深入理解并发编程的原理,掌握Disruptor框架的使用,并能够构建高性能的并发应用。 项目的主要特性和功能 1. 并发编程与无锁并行计算框架初探 介绍并发编程课程大纲与重点。 介绍无锁...

    Disruptor:一种高性能的、在并发线程间数据交换领域用于替换有界限队列的方案

    Disruptor是一种高性能的并发数据交换框架,由Martin Thompson、Dave Farley、Michael Barker、Patricia Gee和Andrew Stewart共同开发,主要用于替代传统的有界队列。这个框架的诞生源于LMAX公司在构建高性能金融...

    (源码)基于LMAX Disruptor的高性能事件处理系统.zip

    # 基于LMAX Disruptor的高性能事件处理系统 ## 项目简介 本项目是一个基于LMAX Disruptor框架的高性能事件处理系统。Disruptor是一个高性能的并发框架,主要用于在多线程环境中进行快速的事件处理。本项目通过使用...

    15、CPU缓存架构详解&高性能内存队列Disruptor实战

    接下来,我们转向Disruptor,这是一个由LinkedIn开发的高性能内存队列,其设计目标是消除传统锁和并发控制带来的性能瓶颈。Disruptor的核心在于它的环形缓冲区和序列号机制。环形缓冲区是一种固定大小的数组,避免了...

    一款分布式的java游戏服务器框架,具备高性能、可伸缩、分布式、多线程等特点,java 8 +gradle 4.0

    Disruptor 高性能线程间消息传递库,通过它来实现“消息中心”,跨线程消息传递so easy! HikariCP 稳定、高性能的JDBC连接池。github star破11k! logback 快速、灵活的日志库,log4j作者的续作。 fastjson 马爸爸家...

    高性能高稳定性分布式的游戏架构,游戏逻辑运行在Disruptor消费者线程中,其它线程都为辅助线程, 整体为多生产者.zip

    3. 分布式系统基础:了解如何设计高可用、可扩展的游戏服务,以及如何处理网络通信和数据同步问题。 4. 游戏服务器架构:研究游戏逻辑如何与网络通信、数据库交互,以及如何优化资源管理和任务调度。 通过分析和...

    springboot整合Disruptor并发编程框架 #资源达人分享计划#

    Disruptor的工作原理基于环形缓冲区(Ring Buffer)的设计,它将生产者和消费者之间的数据传递过程转换为顺序写入和读取,避免了传统并发模型中的锁竞争和上下文切换,从而实现了极高的消息传递效率。在SpringBoot中...

    一个基于Java的开源游戏服务器框架实现,使用了Netty、ProtoBuf、Disruptor等.zip

    总之,这个基于Java的开源游戏服务器框架通过集成Netty、ProtoBuf和Disruptor,提供了一种高效、可扩展的解决方案,对于那些寻求构建高性能游戏服务器的开发者来说,这是一个非常有价值的资源。通过学习和利用这个...

    54丨算法实战(三):剖析高性能队列Disruptor背后的数据结构和算法1

    总结起来,Disruptor的高性能主要得益于其独特的数据结构——环形缓冲区,以及精心设计的并发控制机制,包括序列号、多生产者-单消费者模型以及非阻塞算法。这些创新使得Disruptor在保证数据一致性的同时,极大地...

    Okra:Netty和Disruptor的高性能游戏服务器框架

    Okra是一个简单的使用JAVA开发的高性能,高扩展,高并发,低延迟的服务器框架。 主要目的是帮助中小团队快速开发实现网络游戏服务端。 本项目包含完整的MMORPG等游戏服务器的DEMO. Dependencies: JDK 1.8 : 框架...

    Disruptor并发框架中文参考文档

    **Disruptor** 是一款高性能、低延迟的并发框架,它通过无锁设计实现了高效的队列操作,从而大大提升了多线程环境下的性能表现。该框架于2011年获得了Duke's Choice Award,这充分证明了其在并发处理领域的创新性和...

    disruptor-3.2.1源码带jar包20140321

    Disruptor是一款高性能的Java并发框架,由LMAX公司开发并开源。它的设计目标是解决多线程环境下的数据同步问题,特别是在高并发场景下,能够显著提高系统的处理能力。Disruptor的核心设计理念是利用环形缓冲区(Ring...

    spring-boot-starter-disruptor.zip

    Disruptor是由LMAX公司开源的一款并发框架,其设计灵感来源于传统的消息队列,但通过独特的环形缓冲区(Ring Buffer)和事件处理机制,显著提升了并发性能,特别适用于高吞吐量、低延迟的场景。Disruptor的核心思想是...

    disruptor jar包+Demo+Api

    Disruptor 的核心设计是一个环形缓冲区(Ring Buffer),通过消除锁竞争和内存屏障,实现了极低的延迟和高效的并发性能。 在提供的压缩包中,包含了 Disruptor 的两个版本为 2.10.3 的 jar 包,分别是源码版本 `...

Global site tag (gtag.js) - Google Analytics