最近在公司做大批量的数据交换用到了阻塞队列(mysql->mongodb,约600w左右的数据,期间包含了其他业务逻辑,不纯是数据交换),效率蛮不错。现在写个queue使用例子,供其他人参考。如有不对之处,欢迎指导...小弟第一次发技术贴
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
*
* @Title: MyQueue.java
* @Copyright: Copyright (c) 2005
* @Description: <br> <br>
* @Company:*
* @Created on Jun 15, 2012 10:19:01 AM
* @author <a href="mailTo:*">tzb</a>
*/
public class MyQueue extends ArrayBlockingQueue<Object>{
private static final long serialVersionUID = 1L;
/**
* 构造一个队列
* @param size
*/
public MyQueue(int size){
super(size);
}
/**
* 定义一个Puter
*/
class Puter implements Callable<Object>{
private MyQueue queue;
public Puter(MyQueue queue){
this.queue=queue;
}
public Object call() {
try {
for (int i = 0; i < 5; i++) {
queue.put(i);
System.out.println(Thread.currentThread().getName()+" put data :"+i);
Thread.sleep(1 * 2000);
}
//结束时将标志放入队列,防止队列中没有数据后消费者一直阻塞等待,从而告知消费者可以退出
queue.put(endDataFlag);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "put data over...";
}
}
/**
* 定义一个结束标示
*/
private static Object endDataFlag=new Object();
/**
*
* 定义一个Taker
*/
class Taker implements Callable<Object>{
private MyQueue queue;
public Taker(MyQueue queue){
this.queue=queue;
}
public Object call() {
boolean flag=true;
while (flag) {
try {
Object data=queue.take();
//每次获取数据后判断获取的数据是否是最后一个结束标示,如果是并且队列已经为空,则退出消费
if (data == endDataFlag && queue.isEmpty()) {
//此处重新将结束标示放入队列,防止其他消费者上次任务消费完毕后,下次去队列中取不到数据而导致阻塞
queue.put(data);
flag=false;
}else{
System.out.println(Thread.currentThread().getName()+" take data :"+data);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return "take data over...";
}
}
public static void main(String[] args) throws Exception {
MyQueue que=new MyQueue(5);
ExecutorService exs=Executors.newCachedThreadPool();
List<Callable<Object>> tasks=new ArrayList<Callable<Object>>();
tasks.add(que.new Puter(que));
tasks.add(que.new Taker(que));
List<Future<Object>> listResult=exs.invokeAll(tasks);//提交执行所有task
for(Future<Object> f:listResult){
System.out.println(Thread.currentThread().getName()+":"+f.get());//获取结果
}
exs.shutdown();
System.out.println("task over...");
}
}
分享到:
相关推荐
在Java并发编程领域,`Concurrent-BlockQueue`是一个重要的数据结构,它结合了线程安全与高效性能。本文将深入探讨`ConcurrentLinkedQueue`、`ArrayBlockingQueue`以及`LinkedBlockingQueue`这三种实现,并分析它们...
这些阻塞队列都实现了java.util.concurrent.BlockingQueue接口,提供了如put、take、offer、poll等方法,用于进行元素的插入、移除以及检查操作。它们广泛应用于生产者-消费者模型、线程池的工作队列等并发场景,...
本资源“java concurrent 精简源码”着重关注Java并发库(java.util.concurrent)的核心概念,包括阻塞队列和线程管理。下面将详细阐述这些知识点。 1. **Java并发库(java.util.concurrent)** Java并发库是Java ...
在Java编程中,阻塞队列是一种特殊类型的并发数据结构,它在多线程环境中的应用广泛,主要用于线程间的协作通信。阻塞队列在队列满时会阻止生产者线程添加元素,在队列空时会阻止消费者线程取出元素,直到条件满足...
Java中的阻塞队列实现主要依赖于`java.util.concurrent`包下的几个类,如`BlockingQueue`接口、`ArrayBlockingQueue`、`LinkedBlockingQueue`等。`BlockingQueue`接口定义了一组操作,如`put`、`take`、`offer`等,...
Java并发编程是Java开发中的重要组成部分,特别是在多核处理器和高并发应用环境下,高效地管理线程资源变得至关重要。`java.util.concurrent` 包(简称JUC)是Java提供的一个强大的并发工具包,它提供了丰富的并发...
1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
Java Concurrent in practice (animated)
在Java中,`java.util.concurrent`包提供了多种阻塞队列实现,如`ArrayBlockingQueue`, `LinkedBlockingQueue`等。它们都实现了`BlockingQueue`接口,提供了一套线程安全的方法来添加和移除元素,如`put()`, `take()...
而阻塞队列(Blocking Queue)是Java并发包(java.util.concurrent)中的一种高效数据结构,常用于线程间的协作,它能够简化同步问题并提高系统性能。 阻塞队列是一种特殊的队列,当队列为空时,取出元素的操作将会...
`BlockingQueue` 是 `java.util.concurrent` 包中的一个接口,它扩展了传统的 `Queue` 接口,并引入了阻塞特性。这意味着当队列为空时,从队列中移除元素的操作将会阻塞;同样地,当队列满时,向队列添加元素的操作...
Java 1.5引入的`java.util.concurrent`包提供了一些内置的阻塞队列实现,主要包括: 1. **ArrayBlockingQueue**:基于固定大小的数组实现,插入和删除操作都具有O(1)的时间复杂度。队列的公平性可以在构造时选择,...
Java队列模拟实现是一个典型的计算机科学中的数据结构应用,它主要涉及了Java编程语言和队列数据结构。在这个工程中,开发者已经创建了一个基于图形用户界面(GUI)的应用程序,用于演示和操作队列的各种功能。以下...
Java并发包 (`java.concurrent`) 是Java平台中处理多线程并行执行的关键工具,它提供了高效、可控且安全的线程管理机制。在后台程序设计中,尤其对于处理大量并发请求的服务,如Web服务器、短信服务器或DNS服务器,...
Java并发包(java.concurrent)是Java平台中处理多线程编程的核心工具包,它提供了丰富的类和接口,使得开发者能够高效、安全地编写多线程程序。这个包的设计目标是提高并发性能,减少同步代码的复杂性,并提供高级...
在Java编程中,"并发-线程池和阻塞队列"是两个核心概念,它们在多线程环境下处理任务调度和数据同步方面发挥着重要作用。线程池是一种管理线程资源的有效方式,而阻塞队列则常用于线程间通信和数据共享。 线程池...
java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...
在Java中,自Java 5.0起,`java.util.concurrent`包提供了多种阻塞队列的实现,例如`ArrayBlockingQueue`、`LinkedBlockingQueue`、`PriorityBlockingQueue`等。这些类都实现了`java.util.concurrent.BlockingQueue`...
Java中提供了阻塞队列的官方实现,位于java.util.concurrent包中。这些实现类包括LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue等。这些实现类都提供了阻塞队列的功能,能够在队列为空或满时阻塞...