`
canofy
  • 浏览: 831374 次
  • 性别: Icon_minigender_1
  • 来自: 北京、四川
社区版块
存档分类
最新评论

生产者/消费者模式(阻塞队列)

    博客分类:
  • j2EE
阅读更多
生产消费者模式

貌似也是阻塞的问题

花了一些时间终于弄明白这个鸟东东,以前还以为是不复杂的一个东西的,以前一直以为和观察者模式差不多(其实也是差不多的,呵呵),生产消费者模式应该是可以通过观察者模式来实现的,对于在什么环境下使用现在想的还不是特别清楚,主要是在实际中还没使用过这个。

需要使用到同步,以及线程,属于多并发行列,和观察者模式的差异也就在于此吧,所以实现起来也主要在这里的差异。

参考地址:http://blog.csdn.net/program_think/


在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。
 
单单抽象出生产者和消费者,还够不上是生产者/消费者模式。该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据


◇解耦
  假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。
  
◇支持并发(concurrency)
  生产者直接调用消费者的某个方法,还有另一个弊端。由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。
  使用了生产者/消费者模式之后,生产者和消费者可以是两个独立的并发主体(常见并发类型有进程和线程两种,后面的帖子会讲两种并发类型下的应用)。生产者把制造出来的数据往缓冲区一丢,就可以再去生产下一个数据。基本上不用依赖消费者的处理速度。其实当初这个模式,主要就是用来处理并发问题的。
  
◇支持忙闲不均
  缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。
  

用了两种方式实现了一下这个模式,主要参考了网上的一些例子才弄明白,这里对队列的实现有很多种方法,需要和具体的应用相结合吧,队列缓冲区很简单,现在已有大量的实现,缺点是在性能上面(内存分配的开销和同步/互斥的开销),下面的实现都是这种方式;环形缓冲区(减少了内存分配的开销),双缓冲区(减少了同步/互斥的开销)。
第一个例子是使用的信号量的东东,没有执行具体的东西,只是实现了这个例子,要做复杂的业务逻辑的话需要自己在某些方法内去具体实现
代码如下:

消费者:
public class TestConsumer implements Runnable {
	
	TestQueue obj;
	
	public TestConsumer(TestQueue tq){
		this.obj=tq;
	}

	public void run() {				
		try {
			for(int i=0;i<10;i++){
				obj.consumer();
			}			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

生产者:
public class TestProduct implements Runnable {
	
	TestQueue obj;
	
	public TestProduct(TestQueue tq){
		this.obj=tq;
	}
	
	public void run() {
		for(int i=0;i<10;i++){
			try {
				obj.product("test"+i);
			} catch (Exception e) {				
				e.printStackTrace();
			}
		}
	}

}


队列(使用了信号量,采用synchronized进行同步,采用lock进行同步会出错,或许是还不知道实现的方法):
public static Object signal=new Object();
	boolean bFull=false; 
	private List thingsList=new ArrayList(); 
	private final ReentrantLock lock = new ReentrantLock(true);
	BlockingQueue q = new ArrayBlockingQueue(10);
	/**
	 * 生产
	 * @param thing
	 * @throws Exception
	 */
	public void product(String thing) throws Exception{	
		synchronized(signal){
			if(!bFull){
				bFull=true;
				  //产生一些东西,放到 thingsList 共享资源中
				System.out.println("product");
				System.out.println("仓库已满,正等待消费..."); 
				thingsList.add(thing); 
			    signal.notify(); //然后通知消费者
			}	       
		    signal.wait(); // 然后自己进入signal待召队列
			
		}
		
	}
	
	/**
	 * 消费
	 * @return
	 * @throws Exception
	 */
	public String consumer()throws Exception{			
		synchronized(signal){
			if(!bFull)  {  
					 signal.wait(); // 进入signal待召队列,等待生产者的通知
			}
			bFull=false; 
			// 读取buf 共享资源里面的东西
			System.out.println("consume");
			System.out.println("仓库已空,正等待生产..."); 
			signal.notify(); // 然后通知生产者
		}
		String result="";
		if(thingsList.size()>0){
			result=thingsList.get(thingsList.size()-1).toString();
			thingsList.remove(thingsList.size()-1);
		}
		return result;
	}

测试代码:
public class TestMain {
	public static void main(String[] args) throws Exception{
		TestQueue tq=new TestQueue();
		TestProduct tp=new TestProduct(tq);
		TestConsumer tc=new TestConsumer(tq);
		Thread t1=new Thread(tp);
		Thread t2=new Thread(tc);
		t1.start();
		t2.start();
	}
}


运行结果:
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...
product
仓库已满,正等待消费...
consume
仓库已空,正等待生产...

第二种发放使用java.util.concurrent.BlockingQueue类来重写的队列那个类,使用这个方法比较简单,并且性能上也没有什么问题。
这是jdk里面的例子
* class Producer implements Runnable {
 *   private final BlockingQueue queue;
 *   Producer(BlockingQueue q) { queue = q; }
 *   public void run() {
 *     try {
 *       while(true) { queue.put(produce()); }
 *     } catch (InterruptedException ex) { ... handle ...}
 *   }
 *   Object produce() { ... }
 * }
 *
 * class Consumer implements Runnable {
 *   private final BlockingQueue queue;
 *   Consumer(BlockingQueue q) { queue = q; }
 *   public void run() {
 *     try {
 *       while(true) { consume(queue.take()); }
 *     } catch (InterruptedException ex) { ... handle ...}
 *   }
 *   void consume(Object x) { ... }
 * }
 *
 * class Setup {
 *   void main() {
 *     BlockingQueue q = new SomeQueueImplementation();
 *     Producer p = new Producer(q);
 *     Consumer c1 = new Consumer(q);
 *     Consumer c2 = new Consumer(q);
 *     new Thread(p).start();
 *     new Thread(c1).start();
 *     new Thread(c2).start();
 *   }
 * }


jdk1.5以上的一个实现,使用了Lock以及条件变量等东西
 class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length) 
         notFull.await();
       items[putptr] = x; 
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0) 
         notEmpty.await();
       Object x = items[takeptr]; 
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   } 
 }
分享到:
评论
1 楼 only_xxp 2010-09-05  
学习了!!!

相关推荐

    生产者/消费者模式 阻塞队列 LinkedBlockingQueue

    在Java中,阻塞队列(BlockingQueue)是一个很好的实现生产者/消费者模式的工具,而LinkedBlockingQueue则是Java并发包(java.util.concurrent)中提供的一个具体实现。 LinkedBlockingQueue是一个基于链表结构的...

    架构设计 -- 生产者/消费者模式

    【生产者/消费者模式】是一种常见的并发编程和系统设计模式,它主要解决的是在多线程环境下,如何协调生产者和消费者之间的数据处理问题。在软件开发中,生产者通常是生成数据的一方,而消费者则是处理这些数据的...

    由生产者/消费者问题看JAVA多线程

    ### 由生产者/消费者问题深入理解JAVA多线程 #### 生产者/消费者问题概述 生产者/消费者问题是一个经典的计算机科学问题,用于展示进程间的通信与同步问题。在这个模型中,生产者负责创建数据并将其放置到共享内存...

    生产者和消费者模式多线程

    阻塞队列在生产者和消费者模式中起着关键的作用。Java中的`BlockingQueue`接口提供了几个实现类,如`ArrayBlockingQueue`、`LinkedBlockingQueue`和`PriorityBlockingQueue`等,它们都实现了线程安全的队列操作。...

    《生产者/消费者循环》后续资源包.zip

    同时,也会涉及简单的阻塞队列(如ArrayBlockingQueue)的使用,它是实现生产者/消费者模式的一个常见工具。 中级教程则可能进一步深入到更复杂的同步策略,如信号量(Semaphore)、条件变量(Condition)以及各种...

    生产者 消费者 模式 c++

    生产者消费者模式是一种多线程或并发编程中的经典设计模式,它主要用于解决系统资源的高效利用和同步问题。在C++中实现生产者消费者模式,我们可以利用C++11及更高版本提供的线程库()、互斥量()、条件变量()等...

    生产消费者队列(c#),用于线程的队列自动同步

    在多线程编程中,生产者消费者模型是一种常见的设计模式,用于解决线程间的通信和同步问题。在C#中,我们可以利用各种机制实现这样的队列。本篇将详细讲解如何在C#中构建一个生产消费者队列,以及它如何帮助优化线程...

    阻塞队列实现生产者消费者模式Java开发Java经验技巧共

    在"阻塞队列实现生产者消费者模式Java开发Java经验技巧共4页.pdf.zip"这个压缩包中,很可能是详细介绍了如何使用Java的阻塞队列来构建生产者消费者模式,可能包括以下知识点: 1. **阻塞队列接口**:首先,会介绍`...

    java生产者消费者图形界面模拟

    生产者消费者模式是软件设计中的一种设计模式,它解决了一个或多个人或程序(生产者)如何有效地向另一个或多个(消费者)提供数据的问题,同时确保生产速度不会超过消费速度,避免了资源的浪费。 在这个项目中,...

    Java多线程 生产者-消费者模式

    在这个模式中,生产者负责生成数据并放入共享的数据结构(如队列),而消费者则从这个数据结构中取出数据并进行处理。这个模式有效地实现了线程间的协作,避免了资源竞争和死锁。 在Java中,我们可以使用`java.util...

    android 生产者消费者模式

    在Android开发中,生产者-消费者模式是一种常见的多线程设计模式,用于处理并发问题,尤其是在数据处理和异步操作中。这个模式的核心思想是通过一个共享的数据缓冲区,使得生产者线程可以生成数据并放入缓冲区,而...

    多进程同步-生产者消费者模式-C实现

    在这个场景下,我们关注的是一个经典的并发编程模型——生产者消费者模式。该模式是多进程同步的一种典型应用,通过它我们可以高效地管理数据的生产和消费。 生产者消费者模式基于操作系统提供的信号量(Semaphore...

    Qt C++11 生产者消费者模式类

    生产者消费者模式是一种多线程同步的经典设计模式,它在多线程编程中扮演着重要的角色,用于协调生产数据和消费数据的两个并发过程。在本项目“Qt C++11 生产者消费者模式类”中,我们看到利用Qt框架和C++11的新特性...

    模拟操作系统的进程同步与互斥(生产者—消费者问题)

    这个模拟操作系统的进程同步与互斥问题是根据生产者-消费者问题来实现的。该问题是一个经典的进程同步问题,由 Dijkstra 提出,用以演示他提出的信号量机制。生产者线程生产物品,然后将物品放置在一个空缓冲区中供...

    多线程_生产者与消费者模式示例

    生产者与消费者模式是设计模式中的经典范例,它有效地展示了线程间的协作和同步。这个模式主要解决的问题是数据的生产和消费过程中的等待与协作问题。 在多线程环境下,生产者负责生成数据,而消费者则负责处理这些...

    生产者消费者模式在java中的应用

    1. `BlockingQueue`接口:这是生产者消费者模式的核心,它提供了一种线程安全的队列,支持阻塞的插入(put)和移除(take)操作。当队列满时,生产者线程尝试插入元素会被阻塞,直到有消费者消费;当队列空时,消费...

    单一生产者消费者队列,支持阻塞和不阻塞,支持固定大小和扩容

    在给定的“单一生产者消费者队列”实现中,它提供了阻塞和非阻塞两种模式,以及固定大小和动态扩容的能力,这使得它在实际应用中具有较高的灵活性。 首先,我们来深入理解队列这一数据结构。队列是一种先进先出...

    界面话模拟生产者消费者模式java

    - **阻塞队列(BlockingQueue)**:Java中的`BlockingQueue`接口用于实现生产者消费者模式的关键组件。它提供了一种线程安全的数据交换方式,当队列满时,生产者会被阻塞,直到消费者取走数据;反之,当队列空时,...

Global site tag (gtag.js) - Google Analytics