论坛首页 Java企业应用论坛

java并发线程-----阻塞队列的实现和思考

浏览 17444 次
精华帖 (3) :: 良好帖 (0) :: 新手帖 (2) :: 隐藏帖 (6)
作者 正文
   发表时间:2011-07-04   最后修改:2011-07-04
     根据think in java 的描述,如果没有任务会阻塞,那么单处理器机器上使用并发就没有任何意义。这句话我
觉得可以反着理解,那就是假如你对你的程序不考虑用并发来实现,那么你无须考虑任务的阻塞问题。
     阻塞队列多半是解决对同一个(共享)资源进行操作的时候互相协作的问题。比如说有这样一个场景:某个工厂有生产工和搬运工两种角色,前者负责把生产好的产品放入仓库,后者把仓库里面的产品搬出去卖。假如说仓库的产品数量为零,那么搬运工大可以睡上一觉,等生产工生产出产品,然后再搬。我们抽象出一个实现阻塞队列需要的元素,首先,仓库里面的产品是共享的资源,其次,生产工和搬运工需要互相协调这个资源的产生和消耗,他们应该是对应于程序中的两个线程,互不干扰又互相协调。
     基于这个分析,我们就开始吧。
     首先我们新建阻塞操作类BlockOperator,代码如下:
public class BlockOperator {

  private Object notEmpty = new Object();
  private Queue<Object> linkedList = new LinkedList<Object>();


/**
 * 取物品
 * 
 * @return
 * @throws InterruptedException
 */
 public Object take() throws InterruptedException {
	 synchronized (notEmpty) {
	 String cureadname = Thread.currentThread().getName();
	 System.out.println("搬运工" + cureadname + "来到仓库");
	 sleep(1000l);
	 if (linkedList.size() == 0) {
	 // 假如仓库没东西了,那么就先不取物品,此时释放锁,被唤醒之前,需要先得到锁
	System.out.println("搬运工" + cureadname + "发觉没有物品,只能等待生产");
				notEmpty.wait();
	}
	Object obj = linkedList.poll();//返回并删除此元素
	System.out.println("搬运工" + cureadname + "这时看到有了物品,搬出了:" + obj
					+ "仓库还有物品数量:" + linkedList.size());
		return obj;

}
}

// 生产物品
public void offer(Object object) throws InterruptedException {
     synchronized (notEmpty) {
	String cureadname = Thread.currentThread().getName();
	System.out.println("生产工" + cureadname + "来到仓库准备放物品");
	sleep(3000l);
	if (linkedList.size() == 0) {
	// 假如仓库没东西了,唤醒对象锁。分析:这个时候有可能没有等待锁,也可能有。
	System.out.println("生产工" + cureadname+ "发现来到仓库的时候一件物品都没有,发觉搬运工在睡觉等他或者感觉搬运工在等他,于是喊醒了它");
	notEmpty.notifyAll(); 
	/*
	 * 注 假如仓库有东西,那么不用唤醒搬运工,因为有物品的时候,搬运工不会等待。
	 * 分析:有的人肯定会觉得,有没有这种可能:当linkedList.size=0的时候,notEmpty就wait了,然后在本同步块中,
	 * 发现linkedList.size!=0,那么notEmpty就不会去唤醒了。其实这完全没有可能,因为size!=0只有在完成了
	 * linkedList.add之后才有可能,而在add之前,必然会判断size=0的情况
           */
	}
	System.out.println("生产工" + cureadname + "把物品" + object
					+ "放到了仓库");
	linkedList.add(object);

	}
}

 private void sleep(Long time) {
	try {
		Thread.sleep(time);// 模拟时间消耗
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

     

   
注:本类中,首先创建了一个notEmpty做锁的对象,取物品的时候当产品为0时,notEmpty等待,此时会让位与另一个线程(生产)得到此对象锁,并且唤醒notEmpty上所有的等待锁。在生产完成之后,由于刚才等待的线程已经被唤醒,那么这个搬运工会移除此产品。下面会有我本机运行的效果,对照看一下会一目了然

然后创建生产工线程类CreateProduce,代码如下:
/**
 * 模拟工厂生产产品
 * 
 * @author Administrator
 * 
 */
public class CreateProduce implements Runnable {

   private BlockOperator bq;
   public CreateProduce(BlockOperator bq){
		this.bq=bq;
   }
	
@Override
public void run() {
	//生产n个产品
	for(int i=0;i<5;i++){
	  Object obj="A"+i;
	  try {
			bq.offer(obj);
	  } catch (InterruptedException e) {
			e.printStackTrace();
	}
}		
}
}

此类比较简单,创建5个商品。

继续创建搬运工线程类,代码如下
/**
 * 模拟工厂搬运产品
 * @author Administrator
 *
 */
public class GetProduce implements Runnable {

	private BlockOperator bq;
	public GetProduce(BlockOperator bq){
		this.bq=bq;
	}
	@Override
	public void run() {
		 try {
			 //搬运工是有多少搬多少
			 while(true){
				 Object obj=bq.take();
			 }
			
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}



最后新建一个测试类 如下:
public class TestMain {

	public static void main(String[] args) {
		BlockOperator bq = new BlockOperator();
		Thread createThread = new Thread(new CreateProduce(bq),"creater");
		Thread getThread = new Thread(new GetProduce(bq),"getter");
		createThread.start();
		getThread.start();
		
		
	}

}



在我本地运行的结果如下:


    当我准备想在BlockOperator里面加一个判断产品最大数量的锁对象,比如TheMaxObject,后来在take和offer里面分别加上限制,但是免不了会嵌套synchronized,那么这样会产生死锁。死锁大多是嵌套synchronized产生的。快下班了,写的快,刚才那个问题没太细想,高手们可以帮助出解决方案。
  • 大小: 97.9 KB
   发表时间:2011-07-04  
能说下,你目前的程序哪种情况下会死锁
0 请登录后投票
   发表时间:2011-07-04   最后修改:2011-07-04
悲剧了 写道
能说下,你目前的程序哪种情况下会死锁

目前的这个不会死锁啊 我的意思是 再加一个锁对象来判断产品数量上限,会有死锁,死锁代码明天再贴吧 现在家耍呢
0 请登录后投票
   发表时间:2011-07-04  
你目前这个程序只能运行一个搬运工和一个成产者。
if (linkedList.size() == 0) {  
        notEmpty.wait();  
    }  
如果有两个搬运工的话,这样写就会有其中一个返回null.通常(JDK的源代码里几乎所有用到wait的地方)都使用while循环来wait.
while(linkedList.size()==0){notEmpty.wait();}

使用同步块实现这个同能,只需要一个对象锁。假设我们把notEmpty替换成this
如果你想增加数量上限,那么在offer的代码里可以增加
while(linkedList.size()>=MAX_SIZE) this.wait();
在take的代码里增加
if(linkedList.size()<MAX_SIZE) this.notify();
即可。

也有文章在介绍ReentrantLock/Condition时使用相同的案例做演示,这时才会去定义两个Condition:notempty和notfull,用同步块不需要(或者说同步块里只有一个隐含的Condition,无法定义两个Condition)。如果用两个锁,正如你分析的那样,两个锁会存在死锁的可能。

拿这个案例去演示多个Condition的作用其实并不好,你很难想象Empty和Full两个条件会同时满足(没说不可能同时满足,这个不确定,但概率很小,我不认为会影响性能),用两个Condition并不比只用一个Condition效率高。
0 请登录后投票
   发表时间:2011-07-05  
AngelAndAngel 写道
悲剧了 写道
能说下,你目前的程序哪种情况下会死锁

目前的这个不会死锁啊 我的意思是 再加一个锁对象来判断产品数量上限,会有死锁,死锁代码明天再贴吧 现在家耍呢



好吧,等待你的死锁代码,哥们,下次能否一次贴全?
0 请登录后投票
   发表时间:2011-07-05   最后修改:2011-07-05
悲剧了 写道
AngelAndAngel 写道
悲剧了 写道
能说下,你目前的程序哪种情况下会死锁

目前的这个不会死锁啊 我的意思是 再加一个锁对象来判断产品数量上限,会有死锁,死锁代码明天再贴吧 现在家耍呢



好吧,等待你的死锁代码,哥们,下次能否一次贴全?


兄弟 比如我把BlockOperator类改成如下:
 
public class BlockOperator {
     private Object notEmpty = new Object();
     private Object theMaxObject=new Object();
     private int max=5;
     private Queue<Object> linkedList = new LinkedList<Object>();


/**
 * 取物品
 * 
 * @return
 * @throws InterruptedException
 */
public Object take() throws InterruptedException {
    synchronized (notEmpty) {
	String cureadname = Thread.currentThread().getName();
	System.out.println("搬运工" + cureadname + "来到仓库");
	sleep(1000l);
	if (linkedList.size() == 0) {
	// 假如仓库没东西了,那么就先不取物品,此时释放锁,被唤醒之前,需要先得到锁
	System.out.println("搬运工" + cureadname + "发觉没有物品,只能等待生产");
			notEmpty.wait();
	}
	synchronized(theMaxObject){
		if(linkedList.size()==max){
			theMaxObject.notifyAll();	
		}
				
	}
	Object obj = linkedList.poll();//返回并删除此元素
	System.out.println("搬运工" + cureadname + "这时看到有了物品,搬出了:" + obj
					+ "仓库还有物品数量:" + linkedList.size());
	return obj;

}
}

// 生产物品
public void offer(Object object) throws InterruptedException {
  synchronized (notEmpty) {
	String cureadname = Thread.currentThread().getName();
	System.out.println("生产工" + cureadname + "来到仓库准备放物品");
	sleep(3000l);
	if (linkedList.size() == 0) {
	// 假如仓库没东西了,唤醒对象锁。分析:这个时候有可能没有等待锁,也可能有。
	System.out.println("生产工" + cureadname+ "发现来到仓库的时候一件物品都没有,发觉搬运工在睡觉等他或者感觉搬运工在等他,于是喊醒了它");
	notEmpty.notifyAll(); 
}

    synchronized(theMaxObject){
	if(linkedList.size()==max){
		System.out.println("仓库里面的数量已经到了上线:"+max+",所以先暂停一下");
					theMaxObject.wait();	
	}
				
   }
     System.out.println("生产工" + cureadname + "把物品" + object+ "放到了仓库");
    linkedList.add(object);

}
}

	private void sleep(Long time) {
		try {
			Thread.sleep(time);// 模拟时间消耗
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

  

注:把生产数量设置成10 效果好点

假如搬运工第一次进去后发现什么都没有,然后wait,这个时候生产工进入notEmpty同步块,而且连续生产了5此,这个时候发觉已到上线,然后wait。这个时候就 “悲剧了”。所以我的意思是 这样肯定是不行的,但是多个对象来判断有没有最佳实践。

0 请登录后投票
   发表时间:2011-07-05  
有新手贴的趋势 X0
0 请登录后投票
   发表时间:2011-07-05  
taolei0628 写道
你目前这个程序只能运行一个搬运工和一个成产者。
if (linkedList.size() == 0) {  
        notEmpty.wait();  
    }  
如果有两个搬运工的话,这样写就会有其中一个返回null.通常(JDK的源代码里几乎所有用到wait的地方)都使用while循环来wait.
while(linkedList.size()==0){notEmpty.wait();}

使用同步块实现这个同能,只需要一个对象锁。假设我们把notEmpty替换成this
如果你想增加数量上限,那么在offer的代码里可以增加
while(linkedList.size()>=MAX_SIZE) this.wait();
在take的代码里增加
if(linkedList.size()<MAX_SIZE) this.notify();
即可。

也有文章在介绍ReentrantLock/Condition时使用相同的案例做演示,这时才会去定义两个Condition:notempty和notfull,用同步块不需要(或者说同步块里只有一个隐含的Condition,无法定义两个Condition)。如果用两个锁,正如你分析的那样,两个锁会存在死锁的可能。

拿这个案例去演示多个Condition的作用其实并不好,你很难想象Empty和Full两个条件会同时满足(没说不可能同时满足,这个不确定,但概率很小,我不认为会影响性能),用两个Condition并不比只用一个Condition效率高。


兄弟你说的肯定是对的,不过假如用this,等于是得到本对象的锁,那么其他线程对本对象的同步方法会被暂时阻塞,假如操作的时间比较长那么阻塞的时间应该也会很长。还有,我觉得多个对象锁来判断应该是有最佳实践或者最适合做什么样的东西的,但是我目前还不太了解,兄弟你发点看法咯。
0 请登录后投票
   发表时间:2011-07-05  
没怎么仔细看。。大概是想实现阻塞队列吧,看LinkedBlockingQueue源码就可以了。
0 请登录后投票
   发表时间:2011-07-05  
[quote="AngelAndAngel
兄弟你说的肯定是对的,不过假如用this,等于是得到本对象的锁,那么其他线程对本对象的同步方法会被暂时阻塞,假如操作的时间比较长那么阻塞的时间应该也会很长。还有,我觉得多个对象锁来判断应该是有最佳实践或者最适合做什么样的东西的,但是我目前还不太了解,兄弟你发点看法咯。

你不就在做一个BlockingQueue吗?this 和 内部对象没什么区别啊,你也可以不用this.就用那个notEmpty同一个对象。只不过你已经把那个变量起了个notEmpty的名字,我不好用它来解释notFull啊。
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics