`
tangzhibin
  • 浏览: 20859 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java concurrent下BlockQueue阻塞队列应用

阅读更多

最近在公司做大批量的数据交换用到了阻塞队列(mysql->mongodb,约600w左右的数据,期间包含了其他业务逻辑,不纯是数据交换),效率蛮不错。现在写个queue使用例子,供其他人参考。如有不对之处,欢迎指导...小弟第一次发技术贴

 

 

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 
 * @Title: MyQueue.java
 * @Copyright: Copyright (c) 2005
 * @Description: <br> <br>
 * @Company:*
 * @Created on Jun 15, 2012 10:19:01 AM
 * @author <a href="mailTo:*">tzb</a>
 */
public class MyQueue extends ArrayBlockingQueue<Object>{

	private static final long serialVersionUID = 1L;

	/**
	 * 构造一个队列
	 * @param size
	 */
	public MyQueue(int size){
		super(size);
	}
	
	/**
	 * 定义一个Puter
	 */
	class Puter implements Callable<Object>{

		private MyQueue queue;

		public Puter(MyQueue queue){
			this.queue=queue;
		}

		public Object call() {
			try {
				for (int i = 0; i < 5; i++) {
					queue.put(i);
					System.out.println(Thread.currentThread().getName()+" put data :"+i);
					Thread.sleep(1 * 2000);
				}
				//结束时将标志放入队列,防止队列中没有数据后消费者一直阻塞等待,从而告知消费者可以退出
				queue.put(endDataFlag);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return "put data over...";
		}
	}
	
	/**
	 * 定义一个结束标示
	 */
	private static Object endDataFlag=new Object();

	/**
	 * 
	 * 定义一个Taker
	 */
	class Taker  implements Callable<Object>{
		private MyQueue queue;

		public Taker(MyQueue queue){
			this.queue=queue;
		}

		public Object call() {
			boolean flag=true;
			while (flag) {
				try {
					Object data=queue.take();
					//每次获取数据后判断获取的数据是否是最后一个结束标示,如果是并且队列已经为空,则退出消费
					if (data == endDataFlag && queue.isEmpty()) {
					   //此处重新将结束标示放入队列,防止其他消费者上次任务消费完毕后,下次去队列中取不到数据而导致阻塞
						queue.put(data);
						flag=false;
					}else{
						System.out.println(Thread.currentThread().getName()+" take data :"+data);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			return "take data over...";
		}
	}

	public static void main(String[] args) throws Exception {
	    
		MyQueue que=new MyQueue(5);
		
	    ExecutorService exs=Executors.newCachedThreadPool();
	    
	    List<Callable<Object>> tasks=new ArrayList<Callable<Object>>();
	 
	    tasks.add(que.new Puter(que));
	    
	    tasks.add(que.new Taker(que));
	    
	    List<Future<Object>> listResult=exs.invokeAll(tasks);//提交执行所有task
	    
	    for(Future<Object> f:listResult){
	        System.out.println(Thread.currentThread().getName()+":"+f.get());//获取结果
	    }
	    exs.shutdown();
	    System.out.println("task over...");
	}

}
分享到:
评论
7 楼 tangzhibin 2012-06-18  
Shen.Yiyang 写道
tangzhibin 写道
是的,当整个数据交换任务完毕后,整个生产-消费系统也随之退出,没必要一直继续下去嘛

如果是一直有数据交换,但是不同时段有不同的吞吐量,这种情况应该怎么处理比较好呢?比较典型的就是类似一个web服务器的消费系统,有时候请求很少就线程少,有时候请求很多就创建很多线程,但是需要有个池来限制个数,同时减少创建的开销。 这种时候一般用什么策略来让无用的消费者退出? 用超时取不到请求就退出之类的? 不太确定比较理想的处理方式是什么。

推荐你你使用MQ,如ActiveMQ,我们公司现在消费系统都用的这个
6 楼 Shen.Yiyang 2012-06-18  
tangzhibin 写道
是的,当整个数据交换任务完毕后,整个生产-消费系统也随之退出,没必要一直继续下去嘛

如果是一直有数据交换,但是不同时段有不同的吞吐量,这种情况应该怎么处理比较好呢?比较典型的就是类似一个web服务器的消费系统,有时候请求很少就线程少,有时候请求很多就创建很多线程,但是需要有个池来限制个数,同时减少创建的开销。 这种时候一般用什么策略来让无用的消费者退出? 用超时取不到请求就退出之类的? 不太确定比较理想的处理方式是什么。
5 楼 tangzhibin 2012-06-18  
Shen.Yiyang 写道
tangzhibin 写道
Shen.Yiyang 写道
tangzhibin 写道
tangzhibin 写道
tangzhibin 写道
一样啊,直接List<Callable<Object>> tasks里面add多个Puter 或者多个Taker 啊

如果有多个生产者 多个消费者,你这个endfalg要怎么发呢?

endfalg在所有生产者的任务做完毕之后,发这个消息,消费者接受到后就会退出

你用的是take从queue中拿数据,即获取并移除此队列的头部;
也就是说你放一个endflag,只有一个消费者可以这个endflag,怎么保证多个消费者退出?

你自己看代码嘛...当某个消费者退出时,又重新把endFlag放入了队列中....那么其他的消费者当然又可以获得了,从而保证任务结束后多个消费者都能够退出工作

所以你假定的case是, 在一个时间段有大量的数据交换,一段时间后全部结束,整个生产消费系统也结束了?

是的,当整个数据交换任务完毕后,整个生产-消费系统也随之退出,没必要一直继续下去嘛
4 楼 Shen.Yiyang 2012-06-18  
tangzhibin 写道
Shen.Yiyang 写道
tangzhibin 写道
tangzhibin 写道
tangzhibin 写道
一样啊,直接List<Callable<Object>> tasks里面add多个Puter 或者多个Taker 啊

如果有多个生产者 多个消费者,你这个endfalg要怎么发呢?

endfalg在所有生产者的任务做完毕之后,发这个消息,消费者接受到后就会退出

你用的是take从queue中拿数据,即获取并移除此队列的头部;
也就是说你放一个endflag,只有一个消费者可以这个endflag,怎么保证多个消费者退出?

你自己看代码嘛...当某个消费者退出时,又重新把endFlag放入了队列中....那么其他的消费者当然又可以获得了,从而保证任务结束后多个消费者都能够退出工作

所以你假定的case是, 在一个时间段有大量的数据交换,一段时间后全部结束,整个生产消费系统也结束了?
3 楼 tangzhibin 2012-06-15  
Shen.Yiyang 写道
tangzhibin 写道
tangzhibin 写道
tangzhibin 写道
一样啊,直接List<Callable<Object>> tasks里面add多个Puter 或者多个Taker 啊

如果有多个生产者 多个消费者,你这个endfalg要怎么发呢?

endfalg在所有生产者的任务做完毕之后,发这个消息,消费者接受到后就会退出

你用的是take从queue中拿数据,即获取并移除此队列的头部;
也就是说你放一个endflag,只有一个消费者可以这个endflag,怎么保证多个消费者退出?

你自己看代码嘛...当某个消费者退出时,又重新把endFlag放入了队列中....那么其他的消费者当然又可以获得了,从而保证任务结束后多个消费者都能够退出工作
2 楼 Shen.Yiyang 2012-06-15  
tangzhibin 写道
tangzhibin 写道
tangzhibin 写道
一样啊,直接List<Callable<Object>> tasks里面add多个Puter 或者多个Taker 啊

如果有多个生产者 多个消费者,你这个endfalg要怎么发呢?

endfalg在所有生产者的任务做完毕之后,发这个消息,消费者接受到后就会退出

你用的是take从queue中拿数据,即获取并移除此队列的头部;
也就是说你放一个endflag,只有一个消费者可以这个endflag,怎么保证多个消费者退出?
1 楼 tangzhibin 2012-06-15  
tangzhibin 写道
tangzhibin 写道
一样啊,直接List<Callable<Object>> tasks里面add多个Puter 或者多个Taker 啊

如果有多个生产者 多个消费者,你这个endfalg要怎么发呢?

endfalg在所有生产者的任务做完毕之后,发这个消息,消费者接受到后就会退出

相关推荐

    实战Concurrent-BlockQueue

    在Java并发编程领域,`Concurrent-BlockQueue`是一个重要的数据结构,它结合了线程安全与高效性能。本文将深入探讨`ConcurrentLinkedQueue`、`ArrayBlockingQueue`以及`LinkedBlockingQueue`这三种实现,并分析它们...

    java 中 阻塞队列BlockingQueue详解及实例

    总结来说,Java中的阻塞队列BlockingQueue是并发编程中的一种高效、线程安全的数据结构,它提供了线程间的同步和通信机制,简化了多线程环境下的编程复杂性。通过选择适当的实现类和使用适当的方法,开发者可以灵活...

    BlockQueue练习

    BlockQueue是Java并发编程中非常重要的一个接口,它位于`java.util.concurrent`包下,是线程安全的队列,特别适用于多生产者多消费者(multi-producer multi-consumer, MPMC)的场景。在本练习中,我们将通过`...

    迅雷2010Java笔试题哈尔滨站

    1. **Java阻塞队列BlockQueue**:需要实现一个队列,当队列满时,插入操作会阻塞,直到有空间可用。当队列空时,取出操作也会阻塞,直到有元素可取。这是Java并发编程中的典型应用场景,可以基于`java.util....

    大数据基础复习

    BlockQueue是Java并发编程中的重要工具,它是一种特殊的队列,当队列满时,生产者尝试添加元素会阻塞,直到队列有空位;当队列为空时,消费者尝试取出元素也会阻塞,直到队列中有元素可用。BlockQueue提供了一种线程...

Global site tag (gtag.js) - Google Analytics