最近在公司做大批量的数据交换用到了阻塞队列(mysql->mongodb,约600w左右的数据,期间包含了其他业务逻辑,不纯是数据交换),效率蛮不错。现在写个queue使用例子,供其他人参考。如有不对之处,欢迎指导...小弟第一次发技术贴
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
*
* @Title: MyQueue.java
* @Copyright: Copyright (c) 2005
* @Description: <br> <br>
* @Company:*
* @Created on Jun 15, 2012 10:19:01 AM
* @author <a href="mailTo:*">tzb</a>
*/
public class MyQueue extends ArrayBlockingQueue<Object>{
private static final long serialVersionUID = 1L;
/**
* 构造一个队列
* @param size
*/
public MyQueue(int size){
super(size);
}
/**
* 定义一个Puter
*/
class Puter implements Callable<Object>{
private MyQueue queue;
public Puter(MyQueue queue){
this.queue=queue;
}
public Object call() {
try {
for (int i = 0; i < 5; i++) {
queue.put(i);
System.out.println(Thread.currentThread().getName()+" put data :"+i);
Thread.sleep(1 * 2000);
}
//结束时将标志放入队列,防止队列中没有数据后消费者一直阻塞等待,从而告知消费者可以退出
queue.put(endDataFlag);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "put data over...";
}
}
/**
* 定义一个结束标示
*/
private static Object endDataFlag=new Object();
/**
*
* 定义一个Taker
*/
class Taker implements Callable<Object>{
private MyQueue queue;
public Taker(MyQueue queue){
this.queue=queue;
}
public Object call() {
boolean flag=true;
while (flag) {
try {
Object data=queue.take();
//每次获取数据后判断获取的数据是否是最后一个结束标示,如果是并且队列已经为空,则退出消费
if (data == endDataFlag && queue.isEmpty()) {
//此处重新将结束标示放入队列,防止其他消费者上次任务消费完毕后,下次去队列中取不到数据而导致阻塞
queue.put(data);
flag=false;
}else{
System.out.println(Thread.currentThread().getName()+" take data :"+data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "take data over...";
}
}
public static void main(String[] args) throws Exception {
MyQueue que=new MyQueue(5);
ExecutorService exs=Executors.newCachedThreadPool();
List<Callable<Object>> tasks=new ArrayList<Callable<Object>>();
tasks.add(que.new Puter(que));
tasks.add(que.new Taker(que));
List<Future<Object>> listResult=exs.invokeAll(tasks);//提交执行所有task
for(Future<Object> f:listResult){
System.out.println(Thread.currentThread().getName()+":"+f.get());//获取结果
}
exs.shutdown();
System.out.println("task over...");
}
}
分享到:
相关推荐
在Java并发编程领域,`Concurrent-BlockQueue`是一个重要的数据结构,它结合了线程安全与高效性能。本文将深入探讨`ConcurrentLinkedQueue`、`ArrayBlockingQueue`以及`LinkedBlockingQueue`这三种实现,并分析它们...
总结来说,Java中的阻塞队列BlockingQueue是并发编程中的一种高效、线程安全的数据结构,它提供了线程间的同步和通信机制,简化了多线程环境下的编程复杂性。通过选择适当的实现类和使用适当的方法,开发者可以灵活...
BlockQueue是Java并发编程中非常重要的一个接口,它位于`java.util.concurrent`包下,是线程安全的队列,特别适用于多生产者多消费者(multi-producer multi-consumer, MPMC)的场景。在本练习中,我们将通过`...
1. **Java阻塞队列BlockQueue**:需要实现一个队列,当队列满时,插入操作会阻塞,直到有空间可用。当队列空时,取出操作也会阻塞,直到有元素可取。这是Java并发编程中的典型应用场景,可以基于`java.util....
BlockQueue是Java并发编程中的重要工具,它是一种特殊的队列,当队列满时,生产者尝试添加元素会阻塞,直到队列有空位;当队列为空时,消费者尝试取出元素也会阻塞,直到队列中有元素可用。BlockQueue提供了一种线程...