`
exploreman
  • 浏览: 2040 次
最近访客 更多访客>>
社区版块
存档分类
最新评论

生产者/消费者模式(转载)

阅读更多
架构设计:


生产者/消费者模式[0]:概述
  今天打算来介绍一下“生产者/消费者模式”,这玩意儿在很多开发领域都能派上用场。由于该模式很重要,打算分几个帖子来介绍。今天这个帖子先来扫盲一 把。如果你对这个模式已经比较了解,请跳过本帖子,直接看下一个帖子(关于如何确定数据单元)。
  看到这里,可能有同学心中犯嘀咕了:在四人帮(GOF)的 23种模式里面似乎没听说过这种嘛!其实GOF那经典的23种模式主要是基于OO的(从书名《Design Patterns: Elements of Reusable Object-Oriented Software》就可以看出来)。而Pattern实际上即可以是OO的Pattern,也可以是非OO的Pattern的。

  ★简 介
  言归正传!在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处 的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者; 而处理数据的模块,就称为消费者。
  单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需 要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据。大概的结构如下图。

  为了不至于太抽象,我们举一个寄信的例子(虽说这年头寄信已经不时兴,但这个例子还是比较贴切的)。假设你要寄一封平信,大 致过程如下:
  1、你把信写好——相当于生产者制造数据
  2、你把信放入邮筒——相当于生产者把数据放入缓冲区
  3、邮递 员把信从邮筒取出——相当于消费者把数据取出缓冲区
  4、邮递员把信拿去邮局做相应的处理——相当于消费者处理数据

  ★优 点
  可能有同学会问了:这个缓冲区有什么用捏?为什么不让生产者直接调用消费者的某个函数,直接把数据传递过去?搞出这么一 个缓冲区作甚?
  其实这里面是大有讲究的,大概有如下一些好处。
  ◇解耦
  假设生产者和消费者分别是两个类。如果让生产者 直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某 个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
  接着上述的例子,如果不使用邮筒(也就是缓冲区),你必须得把信直接交给邮递员。有同学 会说,直接给邮递员不是挺简单的嘛?其实不简单,你必须得认识谁是邮递员,才能把信给他(光凭身上穿的制服,万一有人假冒,就惨了)。这就产生和你和邮递 员之间的依赖(相当于生产者和消费者的强耦合)。万一哪天邮递员换人了,你还要重新认识一下(相当于消费者变化导致修 改生产者代码)。而邮筒相对来说比较固定,你依赖它的成本就比较低(相当于和缓冲区之间的弱耦合)。
  ◇支 持并发(concurrency)
  生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法 没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。
  使用了生产者/消费者模式之后,生产者和消费 者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后面的帖子会讲两种并发类型下的应用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生 产下一个数据。基本上不用依赖消费者的处理速度。
  其实当初这个模式,主要就是用来处理并发问题的。
  从寄信的例子来看。如果没有邮 筒,你得拿着信傻站在路口等邮递员过来收(相当于生产者阻塞);又或者邮递员得挨家挨户问,谁要寄信(相当于消费者轮询)。不管是哪种方法,都挺土的。
   ◇支持忙闲不均
  缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未 处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
  为了充分复用,我们再拿寄信的例子来说事。假设邮递员一次 只能带走1000封信。万一某次碰上情人节(也可能是圣诞节)送贺卡,需要寄出去的信超过1000封,这时候邮筒这个缓冲区就派上用场了。邮递员把来不及 带走的信暂存在邮筒中,等下次过来时再拿走。
  费了这么多口水,希望原先不太了解生产者/消费者模式的同学能够明白它是怎么一回事。然后在下一 个帖子中,我们来说说如何确定数据单元。

  另外,为了方便阅读,把本系列帖子的目录整理如下:
  1、如何确定数据单元
  2、队列缓冲区
  3、环形缓冲区
  4、双缓冲区
  5、......



架构设计:生产者/消费者模式[1]:如何确定数据单元?
from 编程随想的博客 by 编 程随想
  既然前一个帖子已经搞过扫盲了,那接下来应该开始聊一些具体的编程技术问题了。不过在进入具体的技术细节之前,咱 们先要搞明白一个问题:如何确定数据单元?只有把数据单元分析清楚,后面的技术设计才好搞。

  ★啥是数据单元
   何谓数据单元捏?简单地说,每次生产者放到缓冲区的,就是一个数据单元; 每次消费者从缓冲区取出的,也是一个数据单元。对于前一个帖子中寄信的例子,我们可以把每一封单独的信件看成是一个数据单元。
  不过光这么介绍,太过 于简单,无助于大伙儿分析出这玩意儿。所以,后面咱们来看一下数据单元需要具备哪些特性。搞明白这些特性之后,就容易从复杂的业务逻辑中分析出适合做数据 单元的东西了。

  ★数据单元的特性
  分析数据单元,需要考虑如下几个方面的特性:
   ◇关联到业务对象
  首先,数据单元必须关联到某种业务对象。在考虑该问题的时候,你必须深刻理解当前这个生产者/消费者模式所对应的业 务逻辑,才能够作出合适的判断。
  由于“寄信”这个业务逻辑比较简单,所以大伙儿很容易就可以判断出数据单元是啥。但现实生 活中,往往没这么乐观。大多数业务逻辑都比较复杂,当中包含的业务对象是层次繁多、类型各异。在这种情况下,就不易作出决策了。
  这一步很重 要,如果选错了业务对象,会导致后续程序设计和编码实现的复杂度大为上升,增加了开发和维护成本。
  ◇完整性
  所谓完整性,就是在传 输过程中,要保证该数据单元的完整。要么整个数据单元被传递到消费者,要么完全没有传递到消费者。不允许出现部 分传递的情形。
  对于寄信来说,你不能把半封信放入邮筒;同样的,邮递员从邮筒中拿信,也不能只拿出信的一部分。
   ◇独立性
  所谓独立性,就是各个数据单元之间没有互相依赖,某个数据单元传输失败不应该影响已经完成传输的单元;也不应该影响尚未传输的单 元。
  为啥会出现传输失败捏?假如生产者的生产速度在一段时间内一直超过消费者的处理速度,那就会导致缓冲区不断增长并达到上限,之后的数据单 元就会被丢弃。如果数据单元相互独立,等到生产者的速度降下来之后,后续的数据单元继续处理,不会受到牵连;反之,如果数据单元之间有某种耦合,导致被丢 弃的数据单元会影响到后续其它单元的处理,那就会使程序逻辑变得非常复杂。
  对于寄信来说,某封信弄丢了,不会影响后续信件的送达;当然更不会 影响已经送达的信件。
  ◇颗粒度
  前面提到,数据单元需要关联到某种业务对象。那么数据单元和业务对象是否要一一对应捏?很多场合确 实是一一对应的。
  不过,有时出于性能等因素的考虑,也可能会把N个业务对象打包成一个数据单元。那么,这个N该如何取值就是颗粒度的考虑了。 颗粒度的大小是有讲究的。太大的颗粒度可能会造成某种浪费;太小的颗粒度可能会造成性能问题。颗粒度的权衡要基于多方面的因素,以及一些经验值的考量。
   还是拿寄信的例子。如果颗粒度过小(比如设定为1),那邮递员每次只取出1封信。如果信件多了,那就得来回跑好多趟,浪费了时间。
  如果颗粒 度太大(比如设定为100),那寄信的人得等到凑满100封信才拿去放入邮筒。假如平时很少写信,就得等上很久,也不太爽。
  可能有同学会问: 生产者和消费者的颗粒度能否设置成不同大小(比如对于寄信人设置成1,对于邮递员设置成100)。当然,理论上可以这么干,但是在某些情况下会增加程序逻 辑和代码实现的复杂度。后面讨论具体技术细节时,或许会聊到这个问题。

  好,数据单元的话题就说到这。希望通过本帖子,大伙儿能够搞明 白数据单元到底是怎么一回事。下一个帖子,咱们来聊一下“基于队列的缓冲区”,技术上如何实现。




架构设计:生产者/消费者模式[2]:队列缓冲区
from 编程随想的博客 by 编 程随想
  经过前面两个帖子的铺垫,今天终于开始聊一些具体的编程技术了。由于不同的缓冲区类型、不同的并发场景对于具体的技术实现有较大的影响。为了深入浅 出、便于大伙儿理解,咱们先来介绍最传统、最常见的方式。也就是单个生产者 对应单个消费者,当中用队列(FIFO)作缓冲。
  关于并发的场景,在之前的帖子“进程还线程?是一个问题!”中,已经专门论述了进程和线程各自的优缺点,两者皆不可偏废。所以,后面对各种缓 冲区类型的介绍都会同时提及进程方式和线程方式。

  ★线程方式
  先来说一下并发线程中使 用队列的例子,以及相关的优缺点。
  ◇内存分配的性能
  在线程方式下,生产者和消费者各自是一个 线程。生产者把数据写入队列头(以下简称push),消费者从队列尾部读出数据(以下简称pop)。当队列为空,消费者就稍息(稍事休息);当队列满(达 到最大长度),生产者就稍息。整个流程并不复杂。
  那么,上述过程会有什么问题捏?一个主要的问题是关于内存分配的性能开销。对于常见的队列实 现:在每次push时,可能涉及到堆内存的分配;在每次pop时,可能涉及堆内存的 释放。假如生产者和消费者都很勤快,频繁地push、pop,那内存分配的开销就很可观了。对于内存分配的开销,用Java的同学可以参见前几天的帖子“Java性能优化[1]”;对于用C/C++的同学,想必对OS底层机制会更清楚,应该知道分配堆 内存(new或malloc)会有加锁的开销和用户态/核心态切换的开销。
  那该怎么办捏?请听下文分解,关于“生产者/消费者模式[3]:环形缓冲区”。
  ◇同步和互斥的性能
   另外,由于两个线程共用一个队列,自然就会涉及到线程间诸如同步啊、互斥啊、死锁啊等等劳心费神的事情。好在"操作系统"这门课程对此有详细介绍,学过 的同学应该还有点印象吧?对于没学过这门课的同学,也不必难过,网上相关的介绍挺多的(比如"这里"),大伙自己去瞅一瞅。关于这方面的细节,咱今天就不多啰嗦了。
  这会儿要细谈的是,同步和互 斥的性能开销。在很多场合中,诸如信号量、互斥量等玩意儿的使用也是有不小的开销的(某些情况下,也可能导致用户态/核心态切换)。如果像刚才所说,生产 者和消费者都很勤快,那这些开销也不容小觑啊。
  这又该咋办捏?请听下文的下文分解,关于“生产者/消费者模式[4]:双缓冲区”。
  ◇适用于队列的场合
   刚才尽批判了队列的缺点,难道队列方式就一无是处?非也。由于队列是很常见的数据结构,大部分编程语言都内置了队列的支持(具体介绍见"这 里"),有些语言甚至提供了线程安全的队列(比如JDK 1.5引入的ArrayBlockingQueue)。因此,开发人员可以捡现成,避免了重新发明轮子。
  所 以,假如你的数据流量不是很大,采用队列缓冲区的好处还是很明显的:逻辑清晰、代码简单、维护方便。比较符合KISS原则。

  ★进 程方式
  说完了线程的方式,再来介绍基于进程的并发。
  跨进程的生产者/消费者模式,非常依赖于具体的进程间通讯 (IPC)方式。而IPC的种类名目繁多,不便于挨个列举(毕竟口水有限)。因此咱们挑选几种跨平台、且编程语言支持较多的IPC方式来说事儿。
   ◇匿名管道
  感觉管道是最像队列的IPC类型。生产者进程在管道的写端放 入数据;消费者进程在管道的读端取出数据。整个的效果和线程中使用队列非常类似,区别在于使用管道就无需操心线程安全、内存分配等琐事(操作系统暗中都帮 你搞定了)。
  管道又分命名管道和匿名管道两种,今天主要聊匿名管道。因为命名管道在不同的操作系统下差异较大(比如Win32和POSIX,在 命名管道的API接口和功能实现上都有较大差异;有些平台不支持命名管道,比如Windows CE)。除了操作系统的问题,对于有些编程语言(比如Java)来说,命名管道是无法使用的。所以我一般不推荐使用这玩意儿。
  其实匿名管道在 不同平台上的API接口,也是有差异的(比如Win32的CreatePipe和POSIX的pipe,用法就很不一样)。但是我们可以仅使用标准输入和 标准输出(以下简称stdio)来进行数据的流入流出。然后利用shell的管道符把生产者进程和消费者进程关联起来(没听说过这种手法的同学,可以看"这里")。实际上,很多操作系统(尤其是POSIX风格的)自带的命令都充分利用了这个特性来实现数据的传输(比如more、grep等)。
  这么干有 几个好处:
  1、基本上所有操作系统都支持在shell方式下使用管道符。因此很容易实现跨平台。
  2、大部分编程语言都能够操作 stdio,因此跨编程语言也就容易实现。
  3、刚才已经提到,管道方式省却了线程安全方面的琐事。有利于降低开发、调试成本。
  当 然,这种方式也有自身的缺点:
  1、生产者进程和消费者进程必须得在同一台主机上,无法跨机器通讯。这个缺点比较明显。
  2、在一对 一的情况下,这种方式挺合用。但如果要扩展到一对多或者多对一,那就有点棘手了。所以这种方式的扩展性要打个折扣。假如今后要考虑类似的扩展,这个缺点就 比较明显。
  3、由于管道是shell创建的,对于两边的进程不可见(程序看到的只是stdio)。在某些情况下,导致程序不便于对管道进行操 纵(比如调整管道缓冲区尺寸)。这个缺点不太明显。
  4、最后,这种方式只能单向传数据。好在大多数情况下,消费者进程不需要传数据给生产者进程。万一你确实需要信息反馈(从消费者到生产者),那就费劲 了。可能得考虑换种IPC方式。
  顺便补充几个注意事项,大伙儿留意一下:
  1、对stdio进行读写操作是以阻塞方式进行。比如管 道中没有数据,消费者进程的读操作就会一直停在哪儿,直到管道中重新有数据。
  2、由于stdio内部带有自己的缓冲区(这缓冲区和管道缓冲区 是两码事),有时会导致一些不太爽的现象(比如生产者进程输出了数据,但消费者进程没有立即读到)。具体的细节,大伙 儿可以看"这里"。

  ◇SOCKET(TCP方式)
  基 于TCP方式的SOCKET通讯是又一个类似于队列的IPC方式。它同样保证了数据的顺序到达;同样有缓冲的机制。而且这玩意儿也是跨平台和跨语言的,和 刚才介绍的shell管道符方式类似。
  SOCKET相比shell管道符的方式,有啥优点捏?主要有如下几个优点:
  1、 SOCKET方式可以跨机器(便于实现分布式)。这是主要优点。
  2、SOCKET方式便于将来扩展成为多对一或者一对多。这也是主要优点。
   3、SOCKET可以设置阻塞和非阻塞方法,用起来比较灵活。这是次要优点。
  4、SOCKET支持双向通讯,有利于消费者反馈信息。
   当然有利就有弊。相对于上述shell管道的方式,使用SOCKET在编程上会更复杂一些。好在前人已经做了大量的工作,搞出很多SOCKET通讯库和 框架给大伙儿用(比如C++的ACE库、 Python的Twisted)。借助于这些第三方的库和框架,SOCKET方式用起来还是比较爽的。由于具体的网络通讯库该 怎么用不是本系列的重点,此处就不细说了。
  虽然TCP在很多方面比UDP可靠,但鉴于跨机器通讯先天的不可预料性(比如网线可能被某傻X给拔 错了,网络的忙闲波动可能很大),在程序设计上我们还是要多留一手。具体该如何做捏?可以在生产者进程和消费者进 程内部各自再引入基于线程的"生产者/消费者模式"。这话听着像绕口令,为了便于理解,画张图给大 伙儿瞅一瞅。

  这么做的关键点在于把代码分为两部分:生产线程和消费线程属于和业务逻辑相关的代码(和通讯逻辑无 关);发送线程和接收线程属于通讯相关的代码(和业务逻辑无关)。
  这样的好处是很明显的,具体如下:
  1、能够应对暂时性的网络故障。并且在网络故障解除后,能够继续工作。
   2、网络故障的应对处理方式(比如断开后的尝试重连),只影响发送和接收线程,不会影响生产线程和消费线程(业务逻辑部分)。
  3、具体的 SOCKET方式(阻塞和非阻塞)只影响发送和接收线程,不影响生产线程和 消费线程(业务逻辑部分)。
  4、不依赖TCP自 身的发送缓冲区和接收缓冲区。(默认的TCP缓冲区的大小可能无法满足实际要求)
  5、业务逻辑的变化(比如业务需求变更)不影 响发送线程和接收线程。
  针对上述的最后一条,再多啰嗦几句。如果整个业务系统中有多个进程是采用上述的模式,那或许可以重构一把:在业务逻辑 代码和通讯逻辑代码之间切一刀,把业务逻辑无关的部分封装成一个通讯中间件(说中间件显得比较牛X :-)。如果大伙儿对这玩意儿有兴趣,以后专门开个帖子聊。
  下一个帖子,咱们来介绍一下环形缓冲区的话题。
简单的队列缓存区实现:
public class Queue <E>{
    private List<E> queue = new ArrayList<E>();
    private int max;
   
    public Queue() {
        super();
        max=100;
    }

    public Queue(int max) {
        super();
        this.max = max;
    }

    public synchronized void produce(E e) {
        while(true){
            if(queue.size()>=max){
                try {
                    wait();
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
            }else{
                break;
            }
        }
        queue.add(e);
        notifyAll();
    }
   
    public synchronized E consume() {
        E result = null;
        while(true){
            if(queue.size()<=0){
                try {
                    wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }else{
                result = queue.remove(0);
                notifyAll();
                break;
            }
        }
        return  result;
    }
}



架构设计:生产者/消费者模式[3]:环形缓冲区
from 编程随想的博客 by 编 程随想
  前一个帖子提及了队列缓冲区可能存在的性能问题及解决方法:环形缓冲区。今天就专门来描述一下这个话题。
   为了防止有人给咱扣上“过度设计”的大帽子,事先声明一下:只有当存储空间的分配/释放非常频繁并且确实产生了明显的影 响,你才应该考虑环形缓冲区的使用。否则的话,还是老老实实用最基本、最简单的队列缓冲区吧。还有一点需要说明一下:本文所提及的“存储空间”,不仅包 括内存,还可能包括诸如硬盘之类的存储介质。

  ★环形缓冲区 vs 队列缓冲区
  ◇外部 接口相似
  在介绍环形缓冲区之前,咱们先来回顾一下普通的队列。普通的队列有一个写入端和一个读出端。队列为空的时候,读出端无法读取数据;当 队列满(达到最大尺寸)时,写入端无法写入数据。
  对于使用者来讲,环形缓冲区和队列缓冲区是一样的。它也有一个写入端(用于push)和一个 读出端(用于pop),也有缓冲区“满”和“空”的状态。所以,从队列缓冲区切换到环形缓冲区,对于使用者来说能比较平滑地过渡。
  ◇内部结构 迥异
  虽然两者的对外接口差不多,但是内部结构和运作机制有很大差别。队列的内部结构此处就不多啰嗦了。重点介绍一下环形缓冲区的内部结构。
   大伙儿可以把环形缓冲区的读出端(以下简称R)和写入端(以下简称W)想象成是两个人在体育场跑道上追逐(R追W)。当R追上W的时候,就是缓冲区为 空;当W追上R的时候(W比R多跑一圈),就是缓冲区满。
  为了形象起见,去找来一张图并略作修改,如下:

  从上图可以看出,环形缓冲区所有的push和pop操作都是在一个固定的存储空间内进 行。而队列缓冲区在push的时候,可能会分配存储空间用于存储新元素;在pop时,可能会释放废弃元素的存储空间。所以环形方式相比队列方式,少掉了对 于缓冲区元素所用存储空间的分配、释放。这是环形缓冲区的一个主要优势。

  ★环形缓冲区的实现
   如果你手头已经有现成的环形缓冲区可供使用,并且你对环形缓冲区的内部实现不感兴趣,可以跳过这段。
  ◇数组方式 vs 链表方式
   环形缓冲区的内部实现,即可基于数组(此处的数组,泛指连续存储空间)实现,也可基于链表实现。
  数组在物理存储上是一维的连续线性结构,可 以在初始化时,把存储空间一次性分配好,这是数组方式的优点。但是要使用数组来模拟环,你必须在逻辑上把数组的头和尾 相连。在顺序遍历数组时,对尾部元素(最后一个元素)要作一下特殊处理。访问尾部元素的下一个元素时,要重新回到头部元素(第0个元素)。如下图所示:

  使用链表的方式,正好和数组相反:链表省去了头尾相连的特殊处理。但是链表在初始化的时候比较繁琐,而且在有些场合(比如后 面提到的跨进程的IPC)不太方便使用。
  ◇读写操作
  环形缓冲区要维护两个索引,分别对应写入端(W)和读取端(R)。写入 (push)的时候,先确保环没满,然后把数据复制到W所对应的元素,最后W指向下一个元素;读取(pop)的时候,先确保环没空,然后返回R对应的元 素,最后R指向下一个元素。
  ◇判断“空”和“满”
  上述的操作并不复杂,不过有一个小小的麻烦:空环和满环的时候,R和W都指向同 一个位置!这样就无法判断到底是“空”还是“满”。大体上有两种方法可以解决该问题。
  办法1:始终保持一个元素不用
  当空环的时 候,R和W重叠。当W比R跑得快,追到距离R还有一个元素间隔的时候,就认为环已经满。当环内元素占用的存储空间较大的时候,这种办法显得很土(浪费空 间)。
  办法2:维护额外变量
  如果不喜欢上述办法,还可以采用额外的变量来解决。比如可以用一个整数记录当前环中已经保存的元素个 数(该整数>=0)。当R和W重叠的时候,通过该变量就可以知道是“空”还是“满”。
  ◇元素的存储
  由于环形缓冲区本身就是 要降低存储空间分配的开销,因此缓冲区中元素的类型要选好。尽量存储值类型的数据,而不要存储指针 (引用)类型的数据。因为指针类型的数据又会引起存储空间(比如堆内存)的分配和释放,使得环形缓冲区的效果打折扣。

   ★应用场合
  刚才介绍了环形缓冲区内部的实现机制。按照前一个帖子的惯例,我们来介绍一下在线程和进程方式下的使用。
  如果你所使用的编程语言和开发库中 带有现成的、成熟的环形缓冲区,强烈建议使用现成的库,不要重新制造轮子;确实找不到现成的,才考虑自己实现。如果你 纯粹是业余时间练练手,那另当别论。
  ◇用于并发线程
  和线程中的队列缓冲区类似,线程中的环形缓冲区也要考虑线程安全的问题。除非 你使用的环形缓冲区的库已经帮你实现了线程安全,否则你还是得自己动手搞定。线程方式下的环形缓冲区用得比较多,相关的网上资料也多,下面就大致介绍几 个。
  对于C++的程序员,强烈推荐使用boost提供的circular_buffer模 板,该模板最开始是在boost 1.35版本中引入的。鉴于boost在C++社区中的地位,大伙儿应该可以放心使用该模板。
  对于C程序 员,可以去看看开源项目circbuf, 不过该项目是GPL协议的,不太爽;而且活跃度不太高;而且只有一个开发人员。大伙儿慎用!建议只拿它当参考。
  对于C#程序员,可以参考CodeProject上的一个示例。
  ◇用于并发进程
  进程间的环形缓冲区,似乎少有现 成的库可用。大伙儿只好自己动手、丰衣足食了。
  适用于进程间环形缓冲的IPC类型,常见的有共享内存和文件。在这两种方式上进行环形缓冲,通常都采用数组的方式实现。程序事先分配好一个固定长度的存储空 间,然后具体的读写操作、判断“空”和“满”、元素存储等细节就可参照前面所说的来进行。
  共享内存方式的性能很好,适用于数据流量很大的场 景。但是有些语言(比如Java)对于共享内存不支持。因此,该方式在多语言协同开发的系统中,会有一定的局限性。
  而文件方式在编程语言方面 支持很好,几乎所有编程语言都支持操作文件。但它可能会受限于磁盘读写(Disk I/O)的性能。所以文件方式不太适合于快速数据传输;但是对于某些“数据单元”很大的场合,文件方式是值得考虑的。
  对于进程间的环形缓冲区,同样要考虑好进程间的同 步、互斥等问题,限于篇幅,此处就不细说了。

  下一个帖子,咱们来聊一下双缓冲区的使用。
环形缓冲区实现:
public class CircularBuf {
    int NMAX = 1000;

    int iput = 0; // 环形缓冲区的当前放人位置

    int iget = 0; // 缓冲区的当前取出位置

    int n = 0; // 环形缓冲区中的元素总数量

    Object buffer[];
   
    public CircularBuf() {
        super();
        buffer = new Object[NMAX];
    }

    public CircularBuf(int nmax) {
        super();
        NMAX = nmax;
        buffer = new Object[NMAX];
    }

    /*
     * 环形缓冲区的地址编号计算函数,,如果到达唤醒缓冲区的尾部,将绕回到头部。
     *
     * 环形缓冲区的有效地址编号为:0到(NMAX-1)
     *
     */
    public int addring(int i) {
        return (i + 1) == NMAX ? 0 : i + 1;
    }

    /* 从环形缓冲区中取一个元素 */
    public synchronized Object get() {
        int pos;
        while (true) {
            if (n > 0) {
                pos = iget;
                iget = addring(iget);
                n--;
                // System.out.println("get-->" + buffer[pos]);
                notifyAll();
                return buffer[pos];

            } else {
                // System.out.println("Buffer is Empty");
                try {
                    wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }

            }
        }

    }

    /* 向环形缓冲区中放人一个元素 */
    public synchronized void put(Object z) {

        while (true) {
            if (n < NMAX) {
                buffer[iput] = z;
                // System.out.println("put<--" + buffer[iput]);
                iput = addring(iput);
                n++;
                notifyAll();
            } else {
                // System.out.println("Buffer is full");
                try {
                    wait();
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }

    }
}



架构设计:生产者/消费者模式[4]:双缓冲区
from 编程随想的博客 by 编 程随想
  “双缓冲区”是一个应用很广的手法。该手法用得最多的地方想必是屏幕绘 制相关的领域(主要是为了减少屏幕闪烁)。另外,在设备驱动和工控方面,双缓冲也经常被使用。不过今天要聊的,并不是针对上述的某个具体领 域,而是侧重于并发方面的同步/互斥开销。

  ★为啥要双缓冲区
  记得前几天在介绍队列缓冲区时,提及了普通队列缓冲区的两个性能问题:“内存分配的开销”和“同步/互斥的开销”(健忘的 同学,先回去看看那个帖子复习一下)。“内存分配的开销”已经在介绍环形缓冲区的时候解决了,而今天要介绍的双缓冲区,就是冲着同步/互斥的开销来的。
  为了防止 有人给咱扣上“过度设计”的大帽子,又得来一个事先声明:只有当同步或互斥 的开销非常明显的时候,你才应该考虑双缓冲区的使用。否则的话,大伙儿还是老老实实用最基本、最简单的队列缓冲区吧。

   ★双缓冲区的原理
  前面说了一通废话,现在开始切入正题,说说具体实现。
  所谓“双缓冲区”,故 名思义就是要有俩缓冲区(简称A和B)。这俩缓冲区,总是一个用于生产者, 另一个用于消费者。当俩缓冲区都操作完,再进行一次切换(先前被生产者写入的转为消费者读出,先前消费者读取的转为生产者写入)。由于生产 者和消费者不会同时操作同一个缓冲区(不发生冲突),所以就不需要在读写每 一个数据单元的时候都进行同步/互斥操作。顺便提一下,这又一次展现了空间换时间的 优化思路。
  但是光有俩缓冲区还不够。为了做到“不冲突”,还得再搞两个互斥锁(简称La和Lb),分别对应俩缓冲区。生产者或消费者如果要操 作某个缓冲区,必须先拥有对应的互斥锁。补充一句:要达到“不冲突”的效果,其实可以有多种搞法,今天只是挑一个简单的来聊。

  ★双 缓冲区的几种状态
  为了加深某些同学的理解,再描述一下双缓冲区的几种状态。
  ◇俩缓冲区都在使用的状态(并发读 写)
  大多数情况下,生产者和消费者都处于并发读写状态。不妨设生产者写入A,消费者读取B。在这种状态下,生产者拥有锁La;同样的,消费者 拥有锁Lb。由于俩缓冲区都是处于独占状态,因此每次读写缓冲区中的元素(数据单元)都不需要再进行加锁、解锁操作。这是节约开销的主要来源。
   ◇单个缓冲区空闲的状态
  由于两个并发实体的速度会有差异,必然会出现一个缓冲区已经操作完,而另一个尚未操作完。不妨假设生产者快于消费 者。
  在这种情况下,当生产者把A写满的时候,生产者要先释放La(表示它已经不再操作A),然后尝试获取Lb。由于B还没有被读空,Lb还被 消费者持有,所以生产者进入发呆(Suspend)状态。
  ◇缓冲区的切换
  接着上面的话题。
  过了若干时间,消费者终于 把B读完。这时候,消费者也要先释放Lb,然后尝试获取La。由于La刚才已经被生产者释放,所以消费者能立即拥有La并开始读取A的数据。而由于Lb被 消费者释放,所以刚才发呆的生产者会缓过神来(Resume)并拥有Lb,然后生产者继续往B写入数据。
  经过上述几个步骤,俩缓冲区完成了对 调,变为:生产者写入B,消费者读取A。

  ★可能的并发问题
  本来单个缓冲区的生产者/ 消费者问题就已经是教科书的经典问题了,现在搞出俩缓冲区,所以就更加耗费脑细胞了。一不小心,就会搞出些并发的Bug,而且并发的Bug还很难调试和测 试(这也就是为啥不要轻易使用该玩意儿的原因)。
  ◇死锁的问题
  假如把前面介绍的操作步骤调换一下顺序:生产者或消费者在操作完当 前的缓冲区之后,先去获取另一个缓冲区的锁,再来释放当前缓冲区的锁。那会咋样捏?
  一旦两个并发实体同时处 理完各自缓冲区,然后同时去获取对方拥有的锁,那就会出现典型的死锁(死锁的详细解释参见“这里”)场景。它俩从此陷入万劫不复的境地。

  ★应用场景
   介绍完并发问题,按照本系列的惯例,最后再来介绍一下双缓冲区在某些场合的应用。
  ◇用于并发线程
  在线程方 式下,首先要考虑的是缓冲区的类型:到底用队列方式还是环形方式。这方面的选择依据在介绍环形缓冲区的时候已经阐述过了,此处不再啰嗦(省去不少口水)。
  另一个需要注意的是,某些编 程语言或者程序库提供了的线程安全的缓冲区(比如JDK 1.5引入的ArrayBlockingQueue)。由于这种缓冲区会自动为每次的读写进行同步/互斥,所以就把双缓冲 的优势抵消掉了。因此,大伙儿在进行缓冲区选型的时候要避开这类缓冲区。
  ◇用于并发进程
  在进程间使用双缓冲,先得考察不同IPC 类型的特点。由于今天讨论双缓冲的目的是降低同步/互斥的开销,对于那些已经封装了同步/互斥的IPC类型,就没太大必要再去搞双缓冲了(单凭这条就已经 排除了好多种IPC类型)。剩下的IPC类型中,比较适合用双缓冲的主要是:共享内存和文件。非常凑巧,这两个玩意儿的特点和适用范围在环形缓冲区的帖子里面也已经介绍过了,俺又可以节省不少口水

双缓冲区实现:
public class CircularDoubleBufferedQueue<E> {
    private static final long serialVersionUID = 1L;
    private Logger logger = Logger.getLogger(CircularDoubleBufferedQueue.class.getName());

    /** The queued items  */
    private final E[] itemsA;
    private final E[] itemsB;
   
    private ReentrantLock readLock, writeLock;
    private Condition notEmpty;
    private Condition notFull;
    private Condition awake;
   
   
    private E[] writeArray, readArray;
    private volatile int writeCount, readCount;
    private int writeArrayHP, writeArrayTP, readArrayHP, readArrayTP;
   
   
    public CircularDoubleBufferedQueue(int capacity)
    {
        if(capacity<=0)
        {
            throw new IllegalArgumentException("Queue initial capacity can't less than 0!");
        }
       
        itemsA = (E[])new Object[capacity];
        itemsB = (E[])new Object[capacity];

        readLock = new ReentrantLock();
        writeLock = new ReentrantLock();
       
        notEmpty = readLock.newCondition();
        notFull = writeLock.newCondition();
        awake = writeLock.newCondition();
       
        readArray = itemsA;
        writeArray = itemsB;
    }
   
    private void insert(E e)
    {
        writeArray[writeArrayTP] = e;
        ++writeArrayTP;
        ++writeCount;
    }
   
    private E extract()
    {
        E e = readArray[readArrayHP];
        readArray[readArrayHP] = null;
        ++readArrayHP;
        --readCount;
        return e;
    }

   
    /**
     *switch condition:
     *read queue is empty && write queue is not empty
     *
     *Notice:This function can only be invoked after readLock is grabbed,
     * or may cause dead lock
     * @param timeout
     * @param isInfinite: whether need to wait forever until some other
     * thread awake it
     * @return
     * @throws InterruptedException
     */
    private long queueSwitch(long timeout, boolean isInfinite) throws InterruptedException
    {
        writeLock.lock();
        try
        {
            if (writeCount <= 0)
            {
                logger.debug("Write Count:" + writeCount + ", Write Queue is empty, do not switch!");
                try
                {
                    logger.debug("Queue is empty, need wait....");
                    if(isInfinite && timeout<=0)
                    {
                        awake.await();
                        return -1;
                    }
                    else
                    {
                        return awake.awaitNanos(timeout);
                    }
                }
                catch (InterruptedException ie)
                {
                    awake.signal();
                    throw ie;
                }
            }
            else
            {
                E[] tmpArray = readArray;
                readArray = writeArray;
                writeArray = tmpArray;

                readCount = writeCount;
                readArrayHP = 0;
                readArrayTP = writeArrayTP;

                writeCount = 0;
                writeArrayHP = readArrayHP;
                writeArrayTP = 0;
               
                notFull.signal();
                logger.debug("Queue switch successfully!");
                return -1;
            }
        }
        finally
        {
            writeLock.unlock();
        }
    }


    public boolean produce(E e, long timeout, TimeUnit unit) throws InterruptedException
    {
        if(e == null)
        {
            throw new NullPointerException();
        }
       
        long nanoTime = unit.toNanos(timeout);
        writeLock.lockInterruptibly();
        try
        {
            for (;;)
            {
                if(writeCount < writeArray.length)
                {
                    insert(e);
                    if (writeCount == 1)
                    {
                        awake.signal();
                    }
                    return true;
                }
               
                //Time out
                if(nanoTime<=0)
                {
                    logger.debug("offer wait time out!");
                    return false;
                }
                //keep waiting
                try
                {
                    logger.debug("Queue is full, need wait....");
                    nanoTime = notFull.awaitNanos(nanoTime);
                }
                catch(InterruptedException ie)
                {
                    notFull.signal();
                    throw ie;
                }
            }
        }
        finally
        {
            writeLock.unlock();
        }
    }

    public E consume(long timeout, TimeUnit unit) throws InterruptedException
    {
        long nanoTime = unit.toNanos(timeout);
        readLock.lockInterruptibly();
       
        try
        {
            for(;;)
            {
                if(readCount>0)
                {
                    return extract();
                }
               
                if(nanoTime<=0)
                {
                    logger.debug("poll time out!");
                    return null;
                }
                nanoTime = queueSwitch(nanoTime, false);
            }
        }
        finally
        {
            readLock.unlock();
        }
    }

    /**
     * blocking interface
     */
    public void put(E e) throws InterruptedException
    {
        if (e == null)
        {
            throw new NullPointerException();
        }
        writeLock.lockInterruptibly();
        try
        {
            try
            {
                while(writeCount >= writeArray.length)
                {
                    notFull.await();
                }
            }
            catch (InterruptedException ie)
            {
                notFull.signal();
                throw ie;
            }
           
            insert(e);
           
            if (writeCount == 1)
            {
                awake.signal();
            }
        }
        finally
        {
            writeLock.unlock();
        }
    }

   
    /**
     * blocking interface
     */
    public E take() throws InterruptedException
    {
        readLock.lockInterruptibly();
        try
        {
            for(;;)
            {
                if(readCount>0)
                {
                    return extract();
                }
               
                queueSwitch(-1, true);
            }
        }
        finally
        {
            readLock.unlock();
        }
    }
}


转自:http://shilei78.blog.163.com/blog/static/76004198201052863720488/
分享到:
评论

相关推荐

    中国知识付费行业发展白皮书2017.pdf

    不同的产品形态对应着不同的付费模式,如订阅合辑付费模式,用户订阅知名知识生产者的系列知识产品,这种模式下用户期望值较高,适合有一定知名度的内容创作者。单次付费模式则针对特定问题,用户只需为一次性的解答...

    2017中国知识付费行业发展白皮书.pdf

    其中,订阅合辑付费模式通常适用于知名的知识生产者,用户预先支付较高的费用获取一系列知识产品;单次付费模式则适用于特定问题的解答,用户按需付费;打赏模式允许用户自主决定是否以及支付多少费用;授权转载付费...

    观光果园的设计与管理(转载)作文.doc

    这包括采用有机栽培和无公害栽培技术,减少化肥和农药的使用,生产出符合消费者健康需求的无公害果品。果树的修剪不仅是为了果实的产量和质量,更应注重形成独特的观赏树形,同时配合现代化灌溉设施,如喷灌、滴灌,...

    中国知识付费行业发展白皮书2017-V3-已美化1

    - 订阅合辑付费模式要求知识生产者具有较高知名度,适合深度内容消费。 - 单次付费模式则满足用户对特定问题的即时需求,而打赏模式体现了用户对内容价值的认可。 综上所述,中国知识付费行业在技术进步、消费...

    Java 最常见 200+ 面试题全解析:面试必备.pdf

    15. Kafka:作为高性能的分布式消息系统,Kafka用于构建实时的数据管道和流应用程序,了解其生产者和消费者模型。 16. Zookeeper:分布式协调服务,主要管理分布式环境中的数据,如配置信息、命名服务、分布式锁等...

    hdmf-2020中国数字营销行动报告-2020.3-64页.pdf

    - 新兴的消费者态度和行为模式,如对国产商品的自信和对线上娱乐、协作工作、社交等线上活动的接受度,这些都对数字营销活动产生重大影响。 - 疫情等社会事件改变了消费者的消费习惯和心态,对数字营销有深远影响...

    中国知识付费行业发展白皮书2017-V3-已美化.pdf

    不同产品形态和付费模式的出现,满足了不同用户的需求,并促进了内容生产者与平台之间的价值分配。 4. 从行业发展的阶段来看,中国知识付费市场经历了探索期、市场启动期和高速发展期,并逐步进入应用成熟期。在这...

    《给 Android 开发者的 RxJava 详解》 PDF

    3. **背压策略**:当数据生产速度超过消费速度时,背压策略就显得尤为重要。RxJava 支持多种背压策略,如_BUFFER、DROP、LATEST 和 ERROR,以防止数据丢失或内存溢出。 4. **组合与拆分流**:concat、merge 和 zip ...

    2021年上半年社交电商行业分析报告

    这类内容与商品直接相关,能够有效地提升消费者的购买意愿。例如,报告中提及的品牌如usmile、添可和韩束等,都是通过内容营销和KOL的推广在社交电商领域取得了显著成绩。 报告还指出,内容型社交电商的发展具有其...

    保险数据分析行业研究及十四五规划分析报告(2020-2026).doc

    上游供应商、生产商、经销商、进出口商以及消费者都在采访的范围之内。采访中 获得的数据将会被仔细地检查甄别,并与之前的二次研究进行比对以求真实有效。 阶段三:已收集数据分析 研究团队检查、综合、整理之前...

    顺舟WIF 模块用户使用手册

    5. **天线选项**:用户可以选择内置天线或外置天线,后者通过U.FL连接器连接。 6. **接口类型**:采用UART接口,支持1200至115200bps的波特率范围,便于与其他串口设备集成。 7. **电源供应**:支持3.3V和5V两种供电...

    电子书刊市场竞争力与投资前景预测报告.docx

    - 用户:最终消费者。 2. **合作模式分析:** - 联合出版:内容提供商与平台运营商共同策划出版项目。 - 内容分销:内容提供商通过平台运营商将内容分发至多个渠道。 - 广告合作:平台运营商引入广告商,通过...

Global site tag (gtag.js) - Google Analytics