`
greemranqq
  • 浏览: 974735 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

ThreadPoolExecutor 源码分析(一)-- 线程工作流程

阅读更多

一、序言

       关于“池”的概念,我的理解是它是为了让我们更快的获得资源,节省时间,在我所知的所有池(线程池、连接池、常量池、缓存池、对象池等等),都是这个作用,这里我们仅仅分享线程池的相关理解。

       1.我们什么时候要用线程池?

       在JAVA 里面我们一切都是对象,线程(Thread)同样也是对象,只要是对象那么就要涉及创建、使用、回收等三个主要步骤。通常情况下,创建线程的时间 和 回收(销毁)线程的时间的开销,由JVM控制,而使用过程由我们控制。假设我们在使用时间很短,并且发生频率很高的情况下,那么线程的频繁创建和销毁就会占用大量的的时间,为了减少这种开销,我们利用线程池技术,创建一个线程池,用的时候从里面拿,不用了放回去,再空闲的时间再进行销毁,能节省时间。

        并且在一定程度上,无状态的线程是可以复用的,可以减少对象的创建。

 

 

二、功能介绍和设计:我们将分析JAVA线程池,然后写一个出来方便理解

        1.创建线程池:

 // 这是JDK 提供的此线程池的创建
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

 

        a.corePoolSize :线程池基本线程数,因为一般池的创建分为两种,一种是初始化的时候就进行创建默认的最小连数目,还有一种是当使用的时候才创建,直到创建的数目超过基本数值的时候,后面的获取,才根据池里面的数量空闲状态进行返回,在此之前,都是一直创建,直到超过基本值。

 

        b.maximumPoolSize:线程池最大容量

 

        c.keepAliveTime:线程活动保持时间 ,线程在池里面的空闲的时间,相当于外包人员,在长时间没有接到任务的时候,就让他离职!减少负担。

 

        d.unit:时间单位,这个可以参考TimeUtil 提供了很多精确的时间类型

 

     e.workQueue:工作队列,这里负责维护多个任务调度,比如当我们进网吧的时候是需要刷卡的,当人比较多的时候需要排队,而且同时还要应对结账的人员,这里用队列的方式进行管理。

     至于BlockingQueue 的实现,我们后面再介绍。

         

     f.threadFactory:线程工厂,这里面提供了多种线程的创建方式

 

     g:RejectedExecutionHandler:饱和策略,和缓存池类似,我们不可能无限制的分配下去,当线程数量到最大值的时候,我们需要用一种策略进行处理。

       具体的处理策略我们也留到后面。

      

    我们的设计如下:

    

 

 

     2.功能体现:

     既然是线程池,里面肯定放的是线程,同时我们肯定要有执行线程的方法execute,先来看看JDK这部分主体代码:

     

/**
 * 自己实现线程池,为了方便理解,按照JDK 的进行编写
 * 
 * @author Ran
 */
public class ThreadPool {

	private volatile int corePoolSize;
	private volatile int maximumPoolSize;
	private volatile ThreadFactory threadFactory;
	private final BlockingQueue<Runnable> workQueue;
	private volatile long keepAliveTime;
	private volatile RejectedExecutionHandler handler;
	// 当前实际线程数
	private volatile int   poolSize;
	// 状态锁,主要用于对poolSize,corePoolSize,maximumPoolSize,runState,workers 更新时的锁定
	private final ReentrantLock mainLock = new ReentrantLock();
	
	// 线程的一些状态
	volatile int runState;   
	static final int RUNNING = 0;   
        // 不接受新任务了,但是已经加入队列的还会执行
	static final int SHUTDOWN = 1; 
        // 停止了,队列里面的任务也不执行了
	static final int STOP = 2; 
        // 全部停止,会关闭所有正在执行的线程 
	static final int TERMINATED = 3; 
	
	// 存放工作线程的集合
	private final HashSet<Worker> workers = new HashSet<Worker>();
	
	// 记录线程池到达的最高峰值 的线程数
	private int largestPoolSize;

	// 构造
	public ThreadPool(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
			RejectedExecutionHandler handler) {
		if (corePoolSize < 0 || maximumPoolSize <= 0
				|| maximumPoolSize < corePoolSize || keepAliveTime < 0)
			throw new IllegalArgumentException();
		if (workQueue == null || threadFactory == null || handler == null)
			throw new NullPointerException();
		this.corePoolSize = corePoolSize;
		this.maximumPoolSize = maximumPoolSize;
		this.workQueue = workQueue;
		this.keepAliveTime = unit.toNanos(keepAliveTime);
		this.threadFactory = threadFactory;
		this.handler = handler;
	}
	
	// 执行方法
	public void execute(Runnable command) {
	        if (command == null)
	            throw new NullPointerException();
	        // 如果线程池数,超过我们的基本连接数,直接执行下面
	        // 如果当前线程数小于基本线程数,就创建并执行 ,返回
	        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
	        	// 如果创建失败再次检测,如果有正在取线程的就放进去
	            if (runState == RUNNING && workQueue.offer(command)) {
	                if (runState != RUNNING || poolSize == 0)
	                	// 如果线程池不处于运行状态,或者是第一个线程进入,执行
	                	// 确保始终有一个线程执行该任务
	                    ensureQueuedTaskHandled(command);
	                    
	            }else if (!addIfUnderMaximumPoolSize(command))
	            	// 否则就执行其他策略
	                reject(command); // is shutdown or saturated
	        }
	    }
	
	  // 添加并返回
	  private boolean addIfUnderCorePoolSize(Runnable firstTask) {
	        Thread t = null;
	        final ReentrantLock mainLock = this.mainLock;
	        mainLock.lock();
	        try {
	        	// 可以看出,当前线程数是小于基本线程数,并且线程池处于活动状态
	        	// 那么就进行创建 添加操作
	            if (poolSize < corePoolSize && runState == RUNNING)
	                t = addThread(firstTask);
	        } finally {
	            mainLock.unlock();
	        }
	        if (t == null)
	            return false;
	        //然后会启动该线程 
	        t.start();
	        return true;
	    }
	  
	    // 添加操作
	    private Thread addThread(Runnable firstTask) {
	        Worker w = new Worker(firstTask);
	        // 创建,这路的创建,如果传的null, 会new 一个,默认在DefaultThreadFactory                  // 里面 可以看到
	        Thread t = threadFactory.newThread(w);
	        if (t != null) {
	        	// 可以看出,创建的线程实际是worker 对象,里面封装了很多内容
	            w.thread = t;
	            // 然后保存进去。返回
	            workers.add(w);
	            int nt = ++poolSize;
	            if (nt > largestPoolSize)
	                largestPoolSize = nt;
	        }
	        return t;
	    }
	    
	    
	    // 排队后从新检查状态,如果处于非RUNNING 状态,会把刚才队列里面的清掉
	    // 确保有一个线程来处理这个任务(前提是addThread 要成功)
	    private void ensureQueuedTaskHandled(Runnable command) {
	        final ReentrantLock mainLock = this.mainLock;
	        mainLock.lock();
	        boolean reject = false;
	        Thread t = null;
	        try {
	        	// 重新检查状态
	            int state = runState;
	            // 如果挂了,就把刚才那个移除返回true,执行处理的策略了
	            if (state != RUNNING && workQueue.remove(command))
	                reject = true;
	            else if (state < STOP &&
	                     poolSize < Math.max(corePoolSize, 1) &&
	                     !workQueue.isEmpty())
	            	// 如果线程池禁止添加新任务 了,并且队列不为空,并且基本数未满
	                t = addThread(null);
	        } finally {
	            mainLock.unlock();
	        }
	        if (reject)
	            reject(command);
	        else if (t != null)
	            t.start();
	    }
	    // 调用拒绝执行的策略
	    void reject(Runnable command) {
	        //handler.rejectedExecution(command, this);
	    }
	    
	    // 队列添加失败(一般是满了)的时候,在满足条件的情况下,会再次创建新
	    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
	        Thread t = null;
	        final ReentrantLock mainLock = this.mainLock;
	        mainLock.lock();
	        try {
	            if (poolSize < maximumPoolSize && runState == RUNNING)
	                t = addThread(firstTask);
	        } finally {
	            mainLock.unlock();
	        }
	        if (t == null)
	            return false;
	        t.start();
	        return true;
	    }
	    
	    
	    // 这是内部类,里面对封装了,我们对线程
	    private final class Worker implements Runnable {
	    	 // 针对每个线程任务进行控制,
	    	 private final ReentrantLock runLock = new ReentrantLock();
	         private Runnable firstTask;
	         Thread thread;

			public Worker(Runnable firstTask) {
				this.firstTask = firstTask;
			}

			@Override
			public void run() {
				
			}
	    }
}

 

 

执行方法小结:

        上面代码,估计大家看着有点乱,这里我进行描述,然后借助图例,然后再回过头进行理解,就清楚了。整个执行方法无非就坐了几件事情:

         1.当前线程数poolSize 小于 corePoolSize基本线程数,也就是说连最小、最基本的线程数都没满足,那么就会执行addIfUnderCorePoolSize方法。

         1.1 addIfUnderCorePoolSize 主要是把一个线程包装成Worker对象,会执行addThread方法,用工厂创建线程,然后保存在我们的集合里,然后执行该线程,OK 返回true,失败返回false.

 

         2.当上面条件不满足的情况,我们会看workQueue 是否满了,如果workQueue.offer(command) 成功,表示未满,就保存进队列。

         2.1成功判断如果是第一个进入的线程,poolSize == 0,那么会执行ensureQueuedTaskHandled方法,该方法会再次验证线程池处于什么状态,如果是非RUNNING了,就把刚才加入队列的线程,移除,然后执行拒绝的策略,如果remove 失败了,会创建一个线程来,保证该

 

         3. 如果workQueue.offer(command) 失败,说明队列满了,会执行addIfUnderMaximumPoolSize 方法,该方法是去判断线程池是否满了,未满的情况下 ,创建(包装)线程,并且执行,返回true, 否则返回false.

执行拒绝策略。

 

         好吧,如果你还无法理解,我还是用实际例子:

         假设我有4台电脑,准备让4个人帮我打文件:corePoolSize = 4.这时候来了3个人 3<4. 我就给他们穿上工作服(Worker包装),然后 让他们做事(addIfUnderCorePoolSize),过了一会又来2个人,这时候发现人够了,那么就把新来的放进等候室(队列)(workQueue.offer(command) ),然后人来多了,再让他们去等候间的时候,发现人满了,这是就要用你指定的策略了(RejectedExecutionHandler)。当然还有一种可能是也许第一个人进来的时候,可以add 失败,也许电脑就坏了,或者卡死了(!RUNNING),这时候就会为了确这个人有活干(ensureQueuedTaskHandled)你得重新检查一下,重新弄弄系统什么的吧!

        下面我copy 的逻辑图,给大家再理解理解,图片来源:http://ifeve.com/java-threadpool/

       

 

 

 

 3. 细节处理

     上面我们初步解释了连接池的工作原理,但是里面线程怎么工作,怎么管理,以及淘汰策略怎么完成的,这些现在进行解释:

      3.1 线程如何工作?

      我们回到worker 里面的run 方法:

      

		@Override
		public void run() {
			try {
				Runnable task = firstTask;
				firstTask = null;
				// 一直从队列里面取,知道执行完成
				while (task != null || (task = getTask()) != null) {
					runTask(task);
					task = null;
				}
			} finally {
				workerDone(this);// 退出
			

    上面主要有getTask 和 runTask ,我们分别来看看 这里面做了什么吧;

    

        Runnable getTask() {
			for (;;) {
				try {
					int state = runState;
					// 放弃队列里面任务执行,直接返回
					if (state > SHUTDOWN)
						return null;
					Runnable r;
					// 该状态,会返回已经加入队列连的线程
					if (state == SHUTDOWN) // Help drain queue
						r = workQueue.poll();
					// 如果超过的基本线程,并且allowCoreThreadTimeOut                                          // 参数允许回收
			else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
				// 获取超时,那么就会回收
				r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
					else
					// 否则就一直等待可以线程进入
					r = workQueue.take();
					if (r != null)
					return r;
					// 判断是否可以退出
					if (workerCanExit()) {
						// 如果已经停了,就中断所有工作的线程(除当                                                // 前线程)
						if (runState >= SHUTDOWN) // Wake up others
							interruptIdleWorkers();
						return null;
					}
					// Else retry
				} catch (InterruptedException ie) {
					// On interruption, re-check runState
				}
			}
		}

    

    我们可以看出,getTask主要是返回队列中的任务,过程中根据池的不同状态做不同处理,获得task之后,我我们再看看runTask 的执行。

    

	private void runTask(Runnable task) {
			final ReentrantLock runLock = this.runLock;
			runLock.lock();
			try {
			// 这段代码的锁和线程池的锁不一样,用多次判断!我不明所以。。
			// 在JDK 1.7 里面的变化挺大的,大家可以去参考里面的。
			if (runState < STOP && Thread.interrupted() && runState >= STOP)
				thread.interrupt();
				boolean ran = false;
				// 这是在任务之前执行方法,方便你重写的的。
				beforeExecute(thread, task);
				try {
					// 看出来了,还是用的Runable 的run 方法
					task.run();
					ran = true;
					// 这里也方便重写
					afterExecute(task, null);
					// 这里会记录完成任务的数量
					++completedTasks;
				} catch (RuntimeException ex) {
					if (!ran)
						afterExecute(task, ex);
					throw ex;
				}
			} finally {
				runLock.unlock();
			}
		}

 

 虽然解释得不很清楚,我相信至少能理解线程在线程池里面主要的额工作流程了。

 

 

小结:

       1.上面分析了JDK 线程池的基本实现原理,仅供参考

       2.JDK1.7 的变化挺大的,我表示很无奈,以后分析源码,还是往JDK1.8 上面靠吧,不然被淘汰了都不知道!

      3.整个流程有些在没加锁的地方,老是喜欢用多次判断的方式,因为volatile的可见性,确实可以这么做,我不得不吐槽,写得并不好,这里JDK 1.7 里面进行的大量重构!

      4.有写得不好,或者不明白的地方,欢迎大家一起提出,分享,由于篇幅和知识吸收的关系,里面的其他策略,下次分享,这里仅仅分析主要工作流程。

 

 

 

 

  • 大小: 64.4 KB
分享到:
评论
1 楼 肥啦A梦 2014-04-09  
获益不少

相关推荐

    JDK之ThreadPoolExecutor源码分析1

    《JDK之ThreadPoolExecutor源码分析1》 在Java编程中,线程池是一种高效的管理线程的方式,它通过复用已存在的线程来避免频繁创建和销毁线程带来的开销。ThreadPoolExecutor作为Java中的线程池实现,其内部机制相当...

    11-线程池ThreadPoolExecutor底层原理源码分析(上)-周瑜.pdf

    2. **增加工作线程**:如果当前工作的线程数少于核心线程数,则创建一个新的工作线程来执行任务;如果当前工作的线程数已经达到或超过核心线程数,则进入下一步。 3. **使用阻塞队列**:将任务放入阻塞队列中等待...

    源码深度分析线程池中Worker线程的执行流程

    本文将深入分析Worker线程的执行流程,特别是其源码实现。 首先,Worker类是ThreadPoolExecutor的一个内部类,它继承了AbstractQueuedSynchronizer(AQS),这是一个抽象的同步队列,用于实现锁和其他同步组件的...

    Android应用源码之SundPoolSample-IT计算机-毕业设计.zip

    《Android应用源码解析——基于SundPoolSample的深度学习》 在当今移动互联网时代,Android作为主流的智能...通过实际操作和源码分析,我们可以将理论知识转化为实战经验,进一步提升我们的编程素养和问题解决能力。

    多线程环境下快速登录YY案例源码

    本文将深入探讨“多线程环境下快速登录YY案例源码”这一主题,帮助你理解如何利用多线程优化登录流程,提升用户体验。 标题中的“多线程环境下快速登录YY案例源码”意味着这个示例代码着重展示了如何在多个线程中...

    使用线程池ThreadPoolExecutor 抓取论坛帖子列表

    在源码分析方面,`ThreadPoolExecutor`的工作流程主要包括: 1. 如果当前线程数小于核心线程数,会立即创建新线程执行任务。 2. 如果当前线程数等于核心线程数,任务会被放入任务队列。 3. 如果任务队列已满且线程数...

    Thread(线程)

    - 对于Java开发者,`jstack`是分析线程状态的常用工具,可以帮助诊断死锁等问题。 综上所述,"Thread(线程)"这一主题涵盖了大量的理论知识和实践经验,包括线程的创建、管理、同步、通信、性能优化等方面。通过...

    java线程池的源码分析.zip

    4. **`RunandReset`源码分析**:`RunandReset`是一个内部操作,用于决定线程是否应该重新执行任务。如果线程在执行过程中被中断,`RunandReset`会决定是否恢复中断标志并重新提交任务。 5. **`set`方法设置返回值**...

    java线程池源码-java-source:Java源码学习多线程、线程池、集合

    通过分析Java线程池源码,我们可以学习到如何合理配置线程池参数,如何选择合适的工作队列,以及如何处理拒绝策略,从而在实际开发中更好地利用多线程来提高程序效率。此外,源码阅读也有助于理解Java并发库的设计...

    多线程数据采集器源码

    通过这种方式,可以灵活地控制线程的工作流程,比如优先处理高优先级的任务。 3. **并发控制**:为了避免过多的并发请求导致服务器压力过大,源码可能会包含一些限制并发数的策略,如设置线程池的最大线程数,或者...

    线程池源码

    线程池是Java多线程编程中不可或缺的一部分,它通过管理一组可重用线程来提高应用程序的性能和效率。在Java中,`java.util.concurrent` 包中的 `ExecutorService` 和其子类如 `ThreadPoolExecutor` 提供了线程池的...

    Android AsyncTask 源码解析

    - **WorkerThread**:每个任务都在一个单独的 WorkerThread 中执行,这些线程由 ThreadPoolExecutor 创建并管理。 - **InternalHandler**:AsyncTask 使用 InternalHandler 在后台线程和 UI 线程之间通信,将结果和...

    java 手术任务(线程池)

    在Java编程中,线程池是一种管理线程的机制,它可以帮助我们更有效地调度和执行并发任务。...结合`SurgeryThreadPool.java`源码分析和`Java.jpg`中的示例,我们可以进一步理解线程池在实际项目中的具体实现和应用。

    Android-用于app模块初始化可区分进程线程并设置优先级

    - **IntentService**:默认在后台服务进程中运行,自动管理线程,适合执行一次性任务。 6. **文件名称列表解析** - "module-initializer-master"可能包含项目的主目录,可能包括源码、配置文件、示例等,通过阅读...

    自我多线程的学习资料

    源码分析更是能够深入理解其工作原理,帮助提升编程技能。 首先,我们要明白什么是多线程。在单核CPU系统中,多线程是指在同一个进程中,同时执行多个不同的线程,以此来提高CPU的利用率和程序的响应速度。Java提供...

    线程学习实例和笔记

    9. **源码分析**:阅读和理解JDK中Thread类和其他相关类的源码,有助于深入理解线程的工作原理,例如线程调度、中断机制等。 10. **性能优化**:合理设计线程数量、避免过度竞争、合理使用同步机制等,都是提升多...

    Java线程池,正式上线运行的源码,分享欢迎使用并提意见

    源码分析** 阅读源码可以帮助我们更好地理解线程池的内部机制。例如,`ThreadPoolExecutor.execute()`方法是如何处理任务的提交,`ThreadPoolExecutor.shutdown()`和`shutdownNow()`又是如何优雅地关闭线程池。 **...

    线程处理UI的理解Demo

    以下是一个简单的Handler-Looper-MessageQueue工作流程: 1. 在工作线程中初始化Looper。 2. 创建一个Handler实例,关联到该Looper。 3. 在主线程中,创建Message对象并设置其目标Handler,然后将消息放入Message...

    基于Java的源码-短信网关平台(值得一看).zip

    【标题】基于Java的短信网关平台源码分析 短信网关平台是通信系统中的重要组成部分,它负责处理和传递短信服务请求。本项目以Java为开发语言,提供了丰富的功能和可靠的性能,对于理解Java在大型系统中的应用,以及...

    Android Universal Image Loader源码详解

    本文将深入分析其源码,了解其工作原理和设计思路。 1. **初始化与配置** - UIL的初始化涉及到`ImageLoaderConfiguration`的构建,这包括内存缓存、磁盘缓存、线程池和图片处理策略等设置。`DiskCacheUtils`和`...

Global site tag (gtag.js) - Google Analytics