`
shuofenglxy
  • 浏览: 194476 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

多线程之线程池探索

阅读更多
多线程应用执行时JDK提供了线程的载体,线程池,通过线程池管理线程,优化线程的执行,有效合理利用资源。而JDK提供的线程池有四大类:FixedThreadPool,SingleThreadExecutor,CachedThreadPool,ScheduledThreadPool。这四种池各有特点。现在一一来看。

第一,FixedThreadPool,来自jdk的解释是这样的:创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之前,池中的线程将一直存在。而通过源码
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

参数说明:第一个nThreads,线程池的线程数。第二个nThreads是线程池中允许的最大线程数。
0L表示多余最大线程数-线程池的线程数的这些线程在闲置情况下允许的存活时间。TimeUnit时间单位。 LinkedBlockingQueue<Runnable> 一个基于已链接节点的、范围任意的 blocking queue。备注:还有可能有一个参数threadFactory用来创建线程。

可以看到线程池有一个LinkedBlockingQueue<Runnable>队列来存放过量的任务(也就是JDK中所说的附加任务)。所以,这个固定大小的线程池的好处在于,无论是流量高峰还是没有访问装态,都会最多有nThreads在线程池(如果某一瞬间,某些线程down掉后,没来的急新建,那就少于这个值了),这样的话能够尽量保证消耗的内存空间较少,能够避免一些线程数猛增带来的OOM问题。

第二:SingleThreadExecutor。来自jdk的解释:创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,一个新线程将代替它执行后续的任务)。可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。

源码:
 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }


说明:只是池大小为1了。
分析:此种池的特点在于永远只有一个有效线程在运行,也就成了多个任务串行执行了,对于前后任务有顺序的多任务可能有所帮助。

第三:CachedThreadPool。来自jdk的解释:创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。

参见创建源码:
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }


说明:这个池建立参数上比较特殊,它允许最大 Integer.MAX_VALUE个线程共存,并且设置了60秒线程的闲置限制。并采用了奇怪的SynchronousQueue。这个队列是这样的,看jdk api文档:
一种阻塞队列,其中每个插入操作必须等待另一个线程的对应移除操作 ,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。不能在同步队列上进行 peek,因为仅在试图要移除元素时,该元素才存在;除非另一个线程试图移除某个元素,否则也不能(使用任何方法)插入元素;也不能迭代队列,因为其中没有元素可用于迭代。队列的头 是尝试添加到队列中的首个已排队插入线程的元素;如果没有这样的已排队线程,则没有可用于移除的元素并且 poll() 将会返回 null。对于其他 Collection 方法(例如 contains),SynchronousQueue 作为一个空 collection。此队列不允许 null 元素。

分析:这种池的特点在于能根据任务的数量设置池中线程多少,并能在一段时间后清除闲置线程,但是风险在于允许太多的线程存在,这就会导致线程创建的资源消耗过多。


第四:ScheduledThreadPool。来自jdk的解释:创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
创建源码:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }


该池最大的特点在于允许你定时delay后按照一定的速率隔时执行一些任务。

简要的理解了这几种池之后通过几个demo来进一步认识下哈。 源码是王道。恩直接看吧:

这个例子主要是理解下四个池的特点,针对runnable和callable任务的一些测试。
package ThreadSPools;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadPoolDemo {

public static void main(String[] args){
		 Thread main = Thread.currentThread();
		final HashMap<Integer, ArrayList<Integer>>  map = new HashMap<Integer,ArrayList<Integer>>(); 
		HashMap<Integer, ArrayList<Integer>> resultMap  = new HashMap<Integer,ArrayList<Integer>>();
		
		for(int i=0;i<300;i++){
			ArrayList<Integer> arrayList= new ArrayList<Integer>(); 
			for(int i1=0;i1<10000;i1++){
				int p = (int) (Math.random()*10000);
				arrayList.add(p);
			}
			map.put(i, arrayList);
		}
		//ExecutorService  executor = Executors.newFixedThreadPool(2);
		//ExecutorService  executor = Executors.newSingleThreadExecutor();
		ExecutorService  executor = Executors.newCachedThreadPool();
		//ExecutorService  executor = Executors.newScheduledThreadPool(4);
		long start = System.currentTimeMillis();
		for(final int key : map.keySet()){
			Callable<ArrayList<Integer>> task = new Callable<ArrayList<Integer>>(){

				@Override
				public ArrayList<Integer>call()
						throws Exception {
					Collections.sort(map.get(key));
					return map.get(key);
				}
				
			};
			Future<ArrayList<Integer>> future=executor.submit(task);
			try {
				resultMap.put(key, future.get());
				System.out.println(key+" "+resultMap.get(key).subList(0, 10));
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (ExecutionException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
	
		long end = System.currentTimeMillis();	
		try {
			main.sleep(6000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println("Excuting this totally costs "+(end - start));
		executor.shutdown();
		
	}
}





另外看一个自己修正的简要池,主要是用来测RejectedExecutionHandler策略以及選取的BlockingQueue<Runnable>。

源码:
package ThreadSPools;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
//一個類似于cachedPool的ThreadPool 主要用來測試任務的RejectedExecutionHandler策略以及選取的BlockingQueue<Runnable>
public class ThreadPoolOwnedByShuofengDemo extends ThreadPoolExecutor{
	
	public ThreadPoolOwnedByShuofengDemo(int nThreads,int maxnThreads,
			long keepaliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
			ThreadFactory threadfactory,RejectedExecutionHandler handler){
		 
		super(nThreads,maxnThreads,keepaliveTime,
				TimeUnit.SECONDS, workQueue,
				threadfactory,handler);
	}
	
	
	public  static void main(String[]args){
		
		ThreadPoolOwnedByShuofengDemo pool = 
			new ThreadPoolOwnedByShuofengDemo(5,10,30,TimeUnit.SECONDS,
					//使用這個隊列時,大於這個隊列的容量后,基本上直接拋出RejectedExecutionException
					//new ArrayBlockingQueue<Runnable>(10)
					
					//new LinkedBlockingQueue<Runnable>()//使用這個隊列時,基本上RejectedExecutionHandler沒用了
					
					//沒有空間冗餘,也就是說當任務數大於當前可執行的數量時,一般都直接拋出RejectedExecutionException
					//偶爾比較幸運當期執行的任務完成,那就會執行后提交的一個,不過這個概率貌似很小一樣,除非提交任務的時間點靠後
					new SynchronousQueue<Runnable>()
					,
					Executors.defaultThreadFactory(),
					//当多于 maxnThread +ArrayBlockingQueue大小时抛出异常RejectedExecutionException
					new AbortPolicy() 
					//当任务数瞬间多于 maxnThread +ArrayBlockingQueue大小时,将任务退回给调用线程执行
					//new CallerRunsPolicy()  
					//任务数瞬间多于 maxnThread +ArrayBlockingQueue大小时,后来任务被丢掉
					//new DiscardPolicy()
					//任务数瞬间多于 maxnThread +ArrayBlockingQueue大小时,最早提交还未被执行的任务被丢掉
					//new DiscardOldestPolicy()
			);
		
		final CopyOnWriteArrayList<String> arraylist = new CopyOnWriteArrayList<String>();
		//测试runnable task
//		for(int i=0;i<50;i++){
//			Runnable task = new Runnable(){
//
//				@Override
//				public void run() {
//					
//					System.out.println(Thread.currentThread().getName()+"  hello,this is shuofengTask.");
//					try {
//						Thread.currentThread().sleep(1000);
//					} catch (InterruptedException e) {
//						// TODO Auto-generated catch block
//						e.printStackTrace();
//					}
//				}
//				
//			};
//			pool.submit(task);
//		}
		//测试Callable task
		for(int i=0;i<10;i++){
			final int id= i;
			Callable task = new Callable(){

				final String taskname= id+"号task"; 
				@Override
				public Object call() throws Exception {
					String s =taskname+"  hello,this is shuofengTask.";
					
					arraylist.add(s);
					
					try {
						Thread.currentThread().sleep(1);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					return arraylist;
				}
				
			};
			
			pool.submit(task);
		}
		
		try {
			Thread.currentThread().sleep(1000);//sleep时间要合理 保证在池关闭前任务已经都执行完了
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(arraylist.size());
		System.out.println(arraylist.subList(0, arraylist.size()));
		pool.shutdown();//优雅的关闭池资源
	}
	
	
	
}


再看一下最奇怪的同步队列
package SychQueue;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
//特点在于每一个消费者都要等生产者放入字符串之后才能读取。而且同步队列没有容量,如果消费者不取的话,则会被阻塞。
public class SynchronousQueueDemo {

	public static void main(String[]args){
		final List<String> msg = Arrays.asList("start","one","two","three"); 
		final BlockingQueue<String> queue = new SynchronousQueue<String>();
		ExecutorService  executor = Executors.newCachedThreadPool();
		Runnable producerTask = new Runnable(){
			final List<String> waitingRecievingMsg = msg;
			@Override
			public void run() {
				
					try {
						for(String s:waitingRecievingMsg){
							queue.put(s);
							Thread.currentThread().sleep(1000);
						}
						queue.put("end");
						
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			
		};
		executor.submit(producerTask);
		
		Runnable consumerTask = new Runnable(){
			String recievedMsg;
			@Override
			public void run() {
				try {
					;
					while((recievedMsg = queue.take())!=null&& !recievedMsg.equals("end")){
						System.out.println(recievedMsg);
						Thread.currentThread().sleep(3000);
					}
					
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		};
		executor.submit(consumerTask);
		executor.shutdown();
	}
	
}



说明:

线程池其实也是比较简单的 ,只要记住将任务按照需要组装成runnale 或者callable形式(两者的区别在于有无返回值),然后选择合适的池类型,将任务提交就可以了。只是要注意多个任务的时候  控制任务的到达等,然后要注意的就是合适的时机关闭线程池了。
0
2
分享到:
评论
2 楼 酒杯中的大海 2011-01-14  
我也踩踩~  不错!
1 楼 shuofenglxy 2010-12-21  
试了试 原来我自己也可以踩  呵呵

相关推荐

    NetsuiteExamples1.04Beta 含WebSocketServe 应用测试,单线程,线程池

    线程池则是一种更高级的多线程管理方式,它可以复用已创建的线程,避免了频繁创建和销毁线程的开销,同时能更好地管理和调度大量并发任务,提高系统资源利用率。 标签中提到的“vfp”指的是Visual FoxPro,一种早期...

    C#的多线程机制探索

    理解并熟练掌握多线程的创建、同步与通讯,以及线程池和定时器的使用,对于编写高性能的C#应用程序至关重要。实际开发中,还需要考虑线程安全、死锁等问题,以确保程序的稳定性和可靠性。通过不断实践和学习,开发者...

    springMVC+多线程+kafka的 demo基于maven

    在本项目中,我们探索了如何将Spring MVC框架与多线程、线程池和Apache Kafka集成,构建一个高效的数据处理系统。以下是关于这些技术及其整合的详细知识点: 1. **Spring MVC**: - Spring MVC是Spring框架的一个...

    C#多线程学习机制探索

    【C#多线程学习机制探索】 在C#编程中,多线程是一种重要的机制,它允许多个任务在同一时间并行执行,从而提高程序的效率和CPU的利用率。Windows操作系统是一个多任务环境,能够同时处理多个进程和线程。在C#中,...

    深入浅出 Java 多线程.pdf

    在本文中,我们将深入浅出Java多线程编程的世界,探索多线程编程的基本概念、多线程编程的优点、多线程编程的缺点、多线程编程的应用场景、多线程编程的实现方法等内容。 一、多线程编程的基本概念 多线程编程是指...

    多线程机制全探索.rar

    《多线程机制全探索》是一份深入探讨多线程技术的资料,主要聚焦于.Net框架下的实现。多线程是计算机程序设计中的重要概念,它允许多个任务同时执行,提高了系统的并发性和效率。在现代软件开发中,尤其是在服务器端...

    POSIX多线程程序设计

    在深入探索 POSIX 多线程程序设计之前,我们需要了解POSIX(可移植操作系统接口)是一个定义一系列UNIX系统服务的接口标准,这些服务包括文件处理、进程控制、信号处理、定时器、多线程等,POSIX标准让编写的应用...

    java多线程笔记全手打

    Java多线程是Java编程中的重要概念,它允许程序同时执行多个任务,提高了程序的运行效率和资源利用率。本笔记全面涵盖了多线程的学习,包括基础理论和实践代码,旨在帮助开发者深入理解并掌握Java多线程技术。 一、...

    c#多线程的奥妙,多线程的概念STAThread教程+源代码,非常好的.

    在编程世界中,多线程是一项关键的技术,尤其是在性能需求较高的场景下,如并发处理、实时数据处理等。C#,作为.NET Framework...通过阅读提供的源代码,你可以进一步探索多线程在实际项目中的应用,提升你的编程能力。

    C#多线程机制探索与揭密

    尤其在处理并发和多任务执行时,C#的多线程机制显得尤为重要。本篇文章将深入探讨C#中的多线程机制,揭示其背后的原理与实践技巧。 一、线程基础 线程是操作系统分配CPU时间的基本单元,它允许一个程序同时执行多个...

    java多线程详解

    ### Java多线程详解:深度探索Java线程机制 #### 知识点一:线程与进程的区别 在深入探讨Java多线程之前,我们首先需要理解线程与进程的基本概念及其区别。进程是资源分配的基本单位,拥有独立的内存空间,而线程...

    多线程编程基础.pdf

    标题:多线程编程基础 描述与标签:多线程编程基础.pdf 在现代软件开发中,多线程编程已经成为了一项不可或缺的技能。...希望本文能为初学者提供一个良好的起点,引导他们深入探索多线程编程的世界。

    c#多线程处理文档

    在IT行业中,C#是一种广泛使用的编程语言,尤其在开发Windows...在这个未完成的项目中,开发者可能正在探索如何结合C#的多线程特性与文档处理库(如Microsoft Office Interop或第三方库)来实现高效、稳定的文档操作。

    java 多线程示例

    Java多线程是Java编程中的重要概念,尤其在开发高性能、高并发的应用中不可或缺。本示例旨在为初学者提供一个全面理解Java多线程的起点。通过学习这个实例,你可以掌握如何创建和管理线程,理解线程同步与通信的重要...

    多线程赛跑小游戏,练手demo

    在IT行业中,多线程编程是一项重要的技能,尤其在开发高性能、高并发的应用时更为关键。这个"多线程赛跑小游戏"是一个很好的练手项目,可以帮助...这个"多线程赛跑小游戏"是一个寓教于乐的实践平台,值得尝试和探索。

    多线程机制探索在NET下线程开发下载

    在.NET框架中,多线程机制是开发高性能和响应式应用程序的关键组成部分。它允许程序员同时执行多个任务,提高系统资源的利用率,优化程序性能。本文将深入探讨.NET下的多线程开发,包括线程的创建、同步与通信、线程...

    day24-多线程-设计模式.7z

    在探索多线程和设计模式的同时,还要关注线程间的通信(如使用`wait()`、`notify()`、`notifyAll()`等)、同步机制(如`synchronized`关键字、`Lock`接口等)、线程池的使用(`ExecutorService`、`...

Global site tag (gtag.js) - Google Analytics