`
tangzhibin
  • 浏览: 20683 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

java concurrent下BlockQueue阻塞队列应用

阅读更多

最近在公司做大批量的数据交换用到了阻塞队列(mysql->mongodb,约600w左右的数据,期间包含了其他业务逻辑,不纯是数据交换),效率蛮不错。现在写个queue使用例子,供其他人参考。如有不对之处,欢迎指导...小弟第一次发技术贴

 

 

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 
 * @Title: MyQueue.java
 * @Copyright: Copyright (c) 2005
 * @Description: <br> <br>
 * @Company:*
 * @Created on Jun 15, 2012 10:19:01 AM
 * @author <a href="mailTo:*">tzb</a>
 */
public class MyQueue extends ArrayBlockingQueue<Object>{

	private static final long serialVersionUID = 1L;

	/**
	 * 构造一个队列
	 * @param size
	 */
	public MyQueue(int size){
		super(size);
	}
	
	/**
	 * 定义一个Puter
	 */
	class Puter implements Callable<Object>{

		private MyQueue queue;

		public Puter(MyQueue queue){
			this.queue=queue;
		}

		public Object call() {
			try {
				for (int i = 0; i < 5; i++) {
					queue.put(i);
					System.out.println(Thread.currentThread().getName()+" put data :"+i);
					Thread.sleep(1 * 2000);
				}
				//结束时将标志放入队列,防止队列中没有数据后消费者一直阻塞等待,从而告知消费者可以退出
				queue.put(endDataFlag);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			return "put data over...";
		}
	}
	
	/**
	 * 定义一个结束标示
	 */
	private static Object endDataFlag=new Object();

	/**
	 * 
	 * 定义一个Taker
	 */
	class Taker  implements Callable<Object>{
		private MyQueue queue;

		public Taker(MyQueue queue){
			this.queue=queue;
		}

		public Object call() {
			boolean flag=true;
			while (flag) {
				try {
					Object data=queue.take();
					//每次获取数据后判断获取的数据是否是最后一个结束标示,如果是并且队列已经为空,则退出消费
					if (data == endDataFlag && queue.isEmpty()) {
					   //此处重新将结束标示放入队列,防止其他消费者上次任务消费完毕后,下次去队列中取不到数据而导致阻塞
						queue.put(data);
						flag=false;
					}else{
						System.out.println(Thread.currentThread().getName()+" take data :"+data);
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			return "take data over...";
		}
	}

	public static void main(String[] args) throws Exception {
	    
		MyQueue que=new MyQueue(5);
		
	    ExecutorService exs=Executors.newCachedThreadPool();
	    
	    List<Callable<Object>> tasks=new ArrayList<Callable<Object>>();
	 
	    tasks.add(que.new Puter(que));
	    
	    tasks.add(que.new Taker(que));
	    
	    List<Future<Object>> listResult=exs.invokeAll(tasks);//提交执行所有task
	    
	    for(Future<Object> f:listResult){
	        System.out.println(Thread.currentThread().getName()+":"+f.get());//获取结果
	    }
	    exs.shutdown();
	    System.out.println("task over...");
	}

}
分享到:
评论
7 楼 tangzhibin 2012-06-18  
Shen.Yiyang 写道
tangzhibin 写道
是的,当整个数据交换任务完毕后,整个生产-消费系统也随之退出,没必要一直继续下去嘛

如果是一直有数据交换,但是不同时段有不同的吞吐量,这种情况应该怎么处理比较好呢?比较典型的就是类似一个web服务器的消费系统,有时候请求很少就线程少,有时候请求很多就创建很多线程,但是需要有个池来限制个数,同时减少创建的开销。 这种时候一般用什么策略来让无用的消费者退出? 用超时取不到请求就退出之类的? 不太确定比较理想的处理方式是什么。

推荐你你使用MQ,如ActiveMQ,我们公司现在消费系统都用的这个
6 楼 Shen.Yiyang 2012-06-18  
tangzhibin 写道
是的,当整个数据交换任务完毕后,整个生产-消费系统也随之退出,没必要一直继续下去嘛

如果是一直有数据交换,但是不同时段有不同的吞吐量,这种情况应该怎么处理比较好呢?比较典型的就是类似一个web服务器的消费系统,有时候请求很少就线程少,有时候请求很多就创建很多线程,但是需要有个池来限制个数,同时减少创建的开销。 这种时候一般用什么策略来让无用的消费者退出? 用超时取不到请求就退出之类的? 不太确定比较理想的处理方式是什么。
5 楼 tangzhibin 2012-06-18  
Shen.Yiyang 写道
tangzhibin 写道
Shen.Yiyang 写道
tangzhibin 写道
tangzhibin 写道
tangzhibin 写道
一样啊,直接List<Callable<Object>> tasks里面add多个Puter 或者多个Taker 啊

如果有多个生产者 多个消费者,你这个endfalg要怎么发呢?

endfalg在所有生产者的任务做完毕之后,发这个消息,消费者接受到后就会退出

你用的是take从queue中拿数据,即获取并移除此队列的头部;
也就是说你放一个endflag,只有一个消费者可以这个endflag,怎么保证多个消费者退出?

你自己看代码嘛...当某个消费者退出时,又重新把endFlag放入了队列中....那么其他的消费者当然又可以获得了,从而保证任务结束后多个消费者都能够退出工作

所以你假定的case是, 在一个时间段有大量的数据交换,一段时间后全部结束,整个生产消费系统也结束了?

是的,当整个数据交换任务完毕后,整个生产-消费系统也随之退出,没必要一直继续下去嘛
4 楼 Shen.Yiyang 2012-06-18  
tangzhibin 写道
Shen.Yiyang 写道
tangzhibin 写道
tangzhibin 写道
tangzhibin 写道
一样啊,直接List<Callable<Object>> tasks里面add多个Puter 或者多个Taker 啊

如果有多个生产者 多个消费者,你这个endfalg要怎么发呢?

endfalg在所有生产者的任务做完毕之后,发这个消息,消费者接受到后就会退出

你用的是take从queue中拿数据,即获取并移除此队列的头部;
也就是说你放一个endflag,只有一个消费者可以这个endflag,怎么保证多个消费者退出?

你自己看代码嘛...当某个消费者退出时,又重新把endFlag放入了队列中....那么其他的消费者当然又可以获得了,从而保证任务结束后多个消费者都能够退出工作

所以你假定的case是, 在一个时间段有大量的数据交换,一段时间后全部结束,整个生产消费系统也结束了?
3 楼 tangzhibin 2012-06-15  
Shen.Yiyang 写道
tangzhibin 写道
tangzhibin 写道
tangzhibin 写道
一样啊,直接List<Callable<Object>> tasks里面add多个Puter 或者多个Taker 啊

如果有多个生产者 多个消费者,你这个endfalg要怎么发呢?

endfalg在所有生产者的任务做完毕之后,发这个消息,消费者接受到后就会退出

你用的是take从queue中拿数据,即获取并移除此队列的头部;
也就是说你放一个endflag,只有一个消费者可以这个endflag,怎么保证多个消费者退出?

你自己看代码嘛...当某个消费者退出时,又重新把endFlag放入了队列中....那么其他的消费者当然又可以获得了,从而保证任务结束后多个消费者都能够退出工作
2 楼 Shen.Yiyang 2012-06-15  
tangzhibin 写道
tangzhibin 写道
tangzhibin 写道
一样啊,直接List<Callable<Object>> tasks里面add多个Puter 或者多个Taker 啊

如果有多个生产者 多个消费者,你这个endfalg要怎么发呢?

endfalg在所有生产者的任务做完毕之后,发这个消息,消费者接受到后就会退出

你用的是take从queue中拿数据,即获取并移除此队列的头部;
也就是说你放一个endflag,只有一个消费者可以这个endflag,怎么保证多个消费者退出?
1 楼 tangzhibin 2012-06-15  
tangzhibin 写道
tangzhibin 写道
一样啊,直接List<Callable<Object>> tasks里面add多个Puter 或者多个Taker 啊

如果有多个生产者 多个消费者,你这个endfalg要怎么发呢?

endfalg在所有生产者的任务做完毕之后,发这个消息,消费者接受到后就会退出

相关推荐

    实战Concurrent-BlockQueue

    在Java并发编程领域,`Concurrent-BlockQueue`是一个重要的数据结构,它结合了线程安全与高效性能。本文将深入探讨`ConcurrentLinkedQueue`、`ArrayBlockingQueue`以及`LinkedBlockingQueue`这三种实现,并分析它们...

    阻塞队列阻塞队列阻塞队列

    这些阻塞队列都实现了java.util.concurrent.BlockingQueue接口,提供了如put、take、offer、poll等方法,用于进行元素的插入、移除以及检查操作。它们广泛应用于生产者-消费者模型、线程池的工作队列等并发场景,...

    java concurrent 精简源码

    本资源“java concurrent 精简源码”着重关注Java并发库(java.util.concurrent)的核心概念,包括阻塞队列和线程管理。下面将详细阐述这些知识点。 1. **Java并发库(java.util.concurrent)** Java并发库是Java ...

    Java实现简单的阻塞队列2种方式

    在Java编程中,阻塞队列是一种特殊类型的并发数据结构,它在多线程环境中的应用广泛,主要用于线程间的协作通信。阻塞队列在队列满时会阻止生产者线程添加元素,在队列空时会阻止消费者线程取出元素,直到条件满足...

    java模拟阻塞队列

    Java中的阻塞队列实现主要依赖于`java.util.concurrent`包下的几个类,如`BlockingQueue`接口、`ArrayBlockingQueue`、`LinkedBlockingQueue`等。`BlockingQueue`接口定义了一组操作,如`put`、`take`、`offer`等,...

    java.util.concurrent 实现线程池队列

    Java并发编程是Java开发中的重要组成部分,特别是在多核处理器和高并发应用环境下,高效地管理线程资源变得至关重要。`java.util.concurrent` 包(简称JUC)是Java提供的一个强大的并发工具包,它提供了丰富的并发...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    1. java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    Java Concurrent in practice (animated)

    Java Concurrent in practice (animated)

    支持多线程和泛型的阻塞队列

    在Java中,`java.util.concurrent`包提供了多种阻塞队列实现,如`ArrayBlockingQueue`, `LinkedBlockingQueue`等。它们都实现了`BlockingQueue`接口,提供了一套线程安全的方法来添加和移除元素,如`put()`, `take()...

    java线程聊天室(阻塞队列实现)

    而阻塞队列(Blocking Queue)是Java并发包(java.util.concurrent)中的一种高效数据结构,常用于线程间的协作,它能够简化同步问题并提高系统性能。 阻塞队列是一种特殊的队列,当队列为空时,取出元素的操作将会...

    java并发工具包 java.util.concurrent中文版pdf

    `BlockingQueue` 是 `java.util.concurrent` 包中的一个接口,它扩展了传统的 `Queue` 接口,并引入了阻塞特性。这意味着当队列为空时,从队列中移除元素的操作将会阻塞;同样地,当队列满时,向队列添加元素的操作...

    剖析Java中阻塞队列的实现原理及应用场景

    Java 1.5引入的`java.util.concurrent`包提供了一些内置的阻塞队列实现,主要包括: 1. **ArrayBlockingQueue**:基于固定大小的数组实现,插入和删除操作都具有O(1)的时间复杂度。队列的公平性可以在构造时选择,...

    java队列模拟实现

    Java队列模拟实现是一个典型的计算机科学中的数据结构应用,它主要涉及了Java编程语言和队列数据结构。在这个工程中,开发者已经创建了一个基于图形用户界面(GUI)的应用程序,用于演示和操作队列的各种功能。以下...

    java.concurrent包的应用

    Java并发包 (`java.concurrent`) 是Java平台中处理多线程并行执行的关键工具,它提供了高效、可控且安全的线程管理机制。在后台程序设计中,尤其对于处理大量并发请求的服务,如Web服务器、短信服务器或DNS服务器,...

    java concurrent 包 详细解析

    Java并发包(java.concurrent)是Java平台中处理多线程编程的核心工具包,它提供了丰富的类和接口,使得开发者能够高效、安全地编写多线程程序。这个包的设计目标是提高并发性能,减少同步代码的复杂性,并提供高级...

    并发-线程池和阻塞队列

    在Java编程中,"并发-线程池和阻塞队列"是两个核心概念,它们在多线程环境下处理任务调度和数据同步方面发挥着重要作用。线程池是一种管理线程资源的有效方式,而阻塞队列则常用于线程间通信和数据共享。 线程池...

    Java并发工具包java.util.concurrent用户指南中英文对照阅读版.pdf

    java.util.concurrent - Java 并发工具包 2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 ...

    java阻塞队列实现原理及实例解析.docx

    在Java中,自Java 5.0起,`java.util.concurrent`包提供了多种阻塞队列的实现,例如`ArrayBlockingQueue`、`LinkedBlockingQueue`、`PriorityBlockingQueue`等。这些类都实现了`java.util.concurrent.BlockingQueue`...

    java阻塞队列实现原理及实例解析

    Java中提供了阻塞队列的官方实现,位于java.util.concurrent包中。这些实现类包括LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue等。这些实现类都提供了阻塞队列的功能,能够在队列为空或满时阻塞...

Global site tag (gtag.js) - Google Analytics