个人学习笔记,如有错误欢迎指正。。
一 线程池的介绍
如果需要线程来执行的任务,可以使用 new Thread 来创建线程,Thread.start()方法执行线程,线程执行完之后,不可再利用。还有其它任务需要线程来执行,需要重复上述步骤。
这样就形成,一个任务创建一条线程来执行,对于大量任务来说,频繁创建和销毁线程都需要耗费系统资源,这种情况可以使用线程池来提升性能减少开销。
线程池中的线程创建完成后,可以重复的执行不同的任务(Runnable或Callable),实现了线程的重复利用,减少了创线程的数量。
线程池实现的简要示意:
public class ThreadPool { private final BlockingQueue<Runnable> queue ; public ThreadPool (int threadNum){ this.queue = new ArrayBlockingQueue<Runnable>(10); for(int i=0;i<threadNum;i++){ new WorkerThread().start(); } } public void execute(Runnable runnable) throws Exception{ this.queue.put(runnable); } private final class WorkerThread extends Thread{ @Override public void run() { // TODO Auto-generated method stub try{ queue.take().run();//不断的从任务队列中取出任务来执行 }catch(Exception e){} } } }
线程池的构建器:
|
|
|
|
参数说明:
corePoolSize 核心线程数 就是线程池中创建多少条线程来处理任务。
workQueue 任务队列,用于存放需要处理的任务。线程从这个任务队列中取出需要处理的任务。
maximumPoolSize 最大线程数,当任务积压时(任务队列已满,再也存不下更多的任务时),最多创建多少条线程来来处理任务。
keepAliveTime 线程空闲超时时间,当实际线程数大于 corePoolSize ,且线程空闲时间超过 keepAliveTime ,则销毁该线程,用于保持实际线程数== corePoolSize
threadFactory 线程工厂,用于创建线程 。
handler 当线程池不再理处理更的线程时,对于当于任务调用handler.rejectedExecution(command, this)来处理。
线程的创建过程:
成员变量 private volatile int poolSize; 表示当前实际线程数据。
在execute(Runnable command)方法被调用的时候:
如果 poolSize<corePoolSize ,则创建新的线程来运行 Runnable任务,同时poolSize++;
如果 poolSize>=corePoolSize,则将 Runnable任务存入工作队列workQueue,当前实际运行的线程数==corePoolSize,这些线程运行完当前任务后,会从 workQueue.take()出任务来执行。
如果 workQueue已满,任务无法再加入队列,如果maximumPoolSize> poolSize,则创那建一条线程来运行当前任务并poolSize++; 如果 poolSize== maximumPoolSize,则当前已经创建了最大的线程数,不能再创建线程了。 对于当前的任务则调用handler.rejectedExecution(command, this);线程池提供了四种 handler:
(未指定时默认使用ThreadPoolExecutor.AbortPolicy
)
static class |
ThreadPoolExecutor.AbortPolicy 它将抛出 RejectedExecutionException. |
static class |
ThreadPoolExecutor.CallerRunsPolicy 它直接在 execute 方法的调用线程中运行任务,当前执行的execute方法会直接调用Runnable.run方法,直至任务执行完成,excecute方法才会返回;如果执行程序已关闭,则会丢弃该任务。 |
static class |
ThreadPoolExecutor.DiscardOldestPolicy 它放弃最旧的未处理请求,即将工作队列头上的任务舍弃,然后重试 execute(当前任务);如果执行程序已关闭,则会丢弃该任务。 |
static class |
ThreadPoolExecutor.DiscardPolicy 默认情况下它将放弃当前任务。 |
二 exectue方法源码分析
exectue方法源码分析:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//poolSize < corePoolSize,调用addIfUnderCorePoolSize创那建新的线程来运行任务 if (runState == RUNNING && workQueue.offer(command)) {//poolSize = corePoolSize//任务加入工作队列 if (runState != RUNNING || poolSize == 0)// ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) //poolSize < maximumPoolSize时增加线程来运行任务, reject(command); //当poolSize == maximumPoolSize时,调用handler.rejectedExecution(command, this); } }
现在看一下线程池中的线程对象,是如何运行任务的:
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; Thread thread; Worker(Runnable firstTask) {//第一次运行的任务 this.firstTask = firstTask; } /** * Runs a single task between before/after methods. */ private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { boolean ran = false; beforeExecute(thread, task);//扩展点,运行任务之前调用 try { task.run();//运行任务 ran = true; afterExecute(task, null);;//扩展点,运行完任务后调用 ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } /** * Main run loop */ public void run() {//线程执行入口 try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) {//无线循环,获取任务 runTask(task);//运行任务 task = null; } } finally { workerDone(this); } } } Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null;//线程STOP或TERMINATED状态,返回NULL来退出当前线程 Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut)//实际线程数>corePoolSize或 允许线程销毁(通过 public void allowCoreThreadTimeOut(boolean value)方法设定) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);//线程空闲超时时间内未得到任务,则返回null,用于当前线程退出,如果得到任务则运行 else r = workQueue.take();//队列有任务则取出,没有任务则阻塞 if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
三 线程池的扩展点:
protected void beforeExecute(Thread t, Runnable r) { }
--任务运行之前调用
protected void afterExecute(Runnable r, Throwable t) { }
--任务运行之后调用。
protected void terminated() { }
--线程池终止结束
四 线程池的四种状态:
RUNNING:接收新的任务并处理队列任务
SHUTDOWN:不接收新的任务,但处理队列任务
STOP:不接收新的任务,不处理队列任务,并中断所有进行中的任务。
TERMINATED:与STOP相同并所有线程结束
五 线程池的关闭:
1.shutdown()方法
调用shutdown()方法之后,更改线程池的状态为 shutdown并设置中断标志等操作后就返回了,这时线程池并没有立即停止运行:exectue对于新入的任务调用 reject(command)来拒绝任务,各个线程需要把工作队列中的任务都处理完成,才能退出。
源码分析:
public void shutdown() { SecurityManager security = System.getSecurityManager(); if (security != null) security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (security != null) { // Check if caller can modify our threads for (Worker w : workers) security.checkAccess(w.thread); } int state = runState; if (state < SHUTDOWN) runState = SHUTDOWN;//更改状态为SHUTDOWN try { for (Worker w : workers) { w.interruptIfIdle();//如果线程没有运行任务,则设置线程中断标志 } } catch (SecurityException se) { // Try to back out runState = state; // tryTerminate() here would be a no-op throw se; } tryTerminate(); // 如果线程数为0,并任务队列为空则结束 } finally { mainLock.unlock(); } } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) {//线程未执行任务,则设定线程中断标志 try { thread.interrupt(); } finally { runLock.unlock(); } } } //线程在shutdown状态下的运行情况及对中断的响应 Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // 如果shutdown了,则直接取任务队列中的任务,队列为空则返回null,而不是调用TASK(没有任务时等待) r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take();//线程被唤醒后,检测到中断标志,抛出异常InterruptedException,从而进入下一个循环 if (r != null)//如果有工作任务,则继续执行任务 return r; if (workerCanExit()) {//如果没有工作任务,再次判断工人队列为空,则当前线程可以退出 if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null;//退出当前线程 } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
2. List<Runnable> shutdownNow()
shutdownNow方法运行后,更改线程池状态为STOP,并将所有线程设置中断,工作队列清空,返回未处理的任务列表。这时线程池未必已停止,因为可能有正在运行中的线程,需要线程都退出之后,线程池才真正停止。
源码分析:
public List<Runnable> shutdownNow() { SecurityManager security = System.getSecurityManager(); if (security != null) security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (security != null) { // Check if caller can modify our threads for (Worker w : workers) security.checkAccess(w.thread); } int state = runState; if (state < STOP) runState = STOP;//设置STOP状态 try { for (Worker w : workers) { w.interruptNow();//所有线程设置中断标志 } } catch (SecurityException se) { // Try to back out runState = state; // tryTerminate() here would be a no-op throw se; } List<Runnable> tasks = drainQueue();//清空任务队列 tryTerminate(); // 如果当前没有线程运行,则试着完全停止 return tasks;//返回未处理的任务列表 } finally { mainLock.unlock(); } } //Stop状态下线程的运行情况及对中断的响应
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN)//如果是stop状态则直接返回null从而退出线程 return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take();//take方法会响应中断,抛出异常,并从循环开始再次执行,从而退出 if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
上述两个停止方法 shutDown() 和 shutDownNow()被调用后,线程池未必立即停止运行,那什么时候线程池已经停止运行了呢? awaitTermination方法返回真,则已停止。
源码分析:
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runState == TERMINATED) return true;//线程池已停止立即返回真 if (nanos <= 0)//等待时间已用完或设定的等待时间>=0,则立即返回假 return false; nanos = termination.awaitNanos(nanos);//阻塞等待线程池停止,如果已停止则被唤醒,超时也被唤醒 } } finally { mainLock.unlock(); } }
六 可获取线程任务执行结果的方法 sumit(Callable task)
方法 public <T> Future<T> submit(Callable<T> task) ,提交一个任务让线程池执行,并返回Future对象用于获取任务的执行结果。submit()方法执行后会立即返回,不会等待任务执行完成。 可以通过isDone()判断线程是否执行完成。get()方法获取结果(任务未执完则阻塞)
future可用方法:
boolean |
cancel(boolean mayInterruptIfRunning) 试图取消对此任务的执行。 |
V |
异常:
CancellationException - 如果计算被取消。ExecutionException - 如果计算抛出异常。InterruptedException - 如果当前的线程在等待时被中断。
|
V |
如果任务未完成,则阻塞指定时间,如指定时间内任务完成,则获取其结果。 异常: CancellationException - 如果计算被取消。ExecutionException - 如果计算抛出异常。InterruptedException - 如果当前的线程在等待时被中断。TimeoutException - 如果等待超时。 |
boolean |
isCancelled() 如果在任务正常完成前将其取消,则返回 true。 |
boolean |
isDone() 如果任务已完成,则返回 true。 |
ThreadPoolExecutor 线程池 submit 方法 将Callable对象转为Runnable对象并调用 execute(Runnable)方法来执行任务。
源码分析:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask);//将Callable对象转为Runnable对象,调用execute(Runnable)方法执行任务 return ftask; }
RunnableFuture 类实现了Future,提供了get()等方法。
RunnableFuture 类实现了Runnable,线程池通过execute方法调用它的run()方法来执行任务。
主要使用了AbstractQueuedSynchronizer实现阻塞及唤醒。
源码分析看看run方法做了什么:
void innerRun() {//run方法直接调用这个方法 if (!compareAndSetState(0, RUNNING))//如果state为0,则改为Running,state初始化状态为0,如果不为0,则说明该任务被(可能其它线程)调用过了,不能再调用立即返回。 return; try { runner = Thread.currentThread(); if (getState() == RUNNING) // 重新检查线程状态 innerSet(callable.call());//callable.call()调用任务,结束后调用innerSet设置结果 else releaseShared(0); // cancel } catch (Throwable ex) { innerSetException(ex); } } } void innerSet(V v) { for (;;) { int s = getState(); if (s == RAN)//任务状态为已完成则直接返回 return; if (s == CANCELLED) { releaseShared(0);//任务取消,释放线程共锁 return; } if (compareAndSetState(s, RAN)) {//将state状态变为已完成 result = v;//结果赋值 releaseShared(0);//调用tryReleaseShared方法,tryReleaseShared方法返回真,唤醒被阻塞的线程 done(); return; } } } protected boolean tryReleaseShared(int ignore) { runner = null; return true;//返回真,唤醒调用GET方法阻塞线程返回任务结果 } } //get结果方法直接调用本方法 V innerGet() throws InterruptedException, ExecutionException { acquireSharedInterruptibly(0);//方法内部调用tryAcquireShared,如果任务未完成,则阻塞,直至被唤醒,唤醒时任务已完成了,才能顺序执行,返回结果或抛出异常 if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } //方法返回-1,则表示任务未完成,线程阻塞,等待唤醒 //方法返回1,表示任务已完成,程序顺序执行 protected int tryAcquireShared(int ignore) { return innerIsDone()? 1 : -1; }其它submit方法:
相关推荐
线程池是一种多线程处理形式,通过预先创建一定数量的线程并管理它们,以提高系统的效率和响应性。在计算机科学中,特别是在软件开发领域,线程池是操作系统或者编程语言中的一种资源管理技术。它允许程序预先启动一...
corePoolSize:核心池的大小,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; ...
阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池...
一、要实现高效的线程池,可以考虑以下几点 二、实现线程池可以按照以下步骤进行 三、简单的C++线程池代码示例 四、 基于boost编写的源码库 - 线程池 4.1 基于boost编写的源码库地址 4.2 boost线程池的先进先出、...
### 线程池原理及创建(C++实现) #### 一、线程池的重要性 在现代计算环境中,网络服务器面临着处理大量并发请求的挑战,其中包括但不限于Web服务器、电子邮件服务器和数据库服务器。这类服务器通常需要在短时间...
java线程池使用后到底要关闭吗 java线程池是一种高效的并发编程技术,可以帮助开发者更好地管理线程资源,提高系统的性能和可靠性。然而,在使用java线程池时,一个常见的问题是:使用完线程池后到底要不要关闭?...
文章通过实例展示了如何创建一个全局线程池类,该类中封装了线程池对象,并提供了向线程池提交任务、检查任务是否在运行等方法。全局线程池的生命周期与Django主线程的生命周期一致,确保了线程资源的合理释放。 5....
线程池是多线程编程中的一个重要概念,它是一种线程使用模式,通过预先创建一组线程并维护一个线程集合来处理并发任务。在Windows操作系统中,内建的线程池API(Thread Pool API)提供了高效且灵活的线程管理机制,...
线程池是一种在多线程编程中非常重要的概念,它能有效地管理和调度系统中的线程资源,从而提高系统的效率和响应速度。在这个简单的线程池实现中,我们可以通过`pthread_pool.cpp`、`MainFunctionForTest.cpp`、`...
在编程领域,线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池在C++中是提高程序效率和资源管理的重要工具,尤其在处理大量并发操作时。本文将深入探讨VC++中...
Java8并行流中自定义线程池操作示例 Java8并行流中自定义线程池操作示例主要介绍了Java8并行流中自定义线程池操作,结合实例形式分析了并行流的相关概念、定义及自定义线程池的相关操作技巧。 1. 概览 Java8引入了...
Linux 线程池创建 C 实现 线程池是一种常用的并发编程技术,它可以提高应用程序的性能和响应速度。在 Linux 系统中,使用 C 语言创建线程池可以实现高效的并发处理。 什么时候需要创建线程池呢?简单的说,如果一...
DELPHI的线程池(ThreadPool)是一种高效管理并发任务的技术,它允许程序在需要时创建线程,而不是每次需要执行任务时都手动创建。线程池通过预先创建一组线程,然后根据需要分配任务,减少了线程创建和销毁的开销,...
本篇文章将重点探讨两种线程池实现:精易模块线程池和鱼刺模块线程池,并通过源码分析来展示它们的特点和用法。 首先,精易模块(SanYe Module)是由中国程序员SanYe开发的一系列开源模块,其中包含了线程池的实现...
在Linux系统中,线程池是一种高效的进程管理方式,它允许多个任务并行执行,同时限制了系统中并发线程的数量,以优化资源分配和调度。本项目实现了利用线程池进行目录拷贝的功能,这涉及到多个重要的编程概念和技术...
线程池是一种优化资源管理的机制,通过预先创建并维护一组可重用的线程,避免频繁地创建和销毁线程带来的性能开销。在Java、C++等编程语言中,线程池广泛应用于并发处理,提高系统效率,降低系统的资源消耗。本项目...
一、线程池 1、为什么需要使用线程池 1.1 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率。 记创建线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3,如果T1+T3>T2,那...
线程池管理和多线程上传是并发编程中的一个重要实践,特别是在大数据传输和网络服务中。在Java等编程语言中,线程池通过有效地管理和复用线程资源,避免了频繁创建和销毁线程带来的开销,提升了系统性能。下面将详细...
线程池是多线程编程中一个重要的概念,它能够优化系统资源的使用,提高系统的响应速度和效率。本篇文章将深入探讨C++中的线程池实现,并通过名为“OEasyPool-1.0”的示例来展示其工作原理。 线程池是预先创建并维护...
在C#编程中,线程池(ThreadPool)是一种高效的线程管理机制,它允许开发者创建并管理多个线程,而无需直接操作线程对象。线程池中的线程可以复用,减少了创建和销毁线程的开销。当我们需要执行大量短生命周期的任务...