- 浏览: 342141 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (212)
- spring (21)
- design pattern(java) (12)
- linux-shell (28)
- java-thread (20)
- java-collection (6)
- java-reflect (9)
- mysql (11)
- java-io (7)
- java-util&lang&io (3)
- algorithm (3)
- interview (2)
- tools-eclipse (2)
- tools-maven (1)
- web-script (1)
- java组建 (13)
- 博客收藏 (1)
- 架构设计与实践 (10)
- active-mq (6)
- java-jvm&性能&原理 (27)
- tomcat (2)
- flume (1)
- serialization (2)
- git (1)
- cache&redis (8)
- guava (1)
- zookeeper (3)
- socket&tcp&udp&http (6)
- test (1)
最新评论
-
bbls:
有用有用有用
java-jvm-jstack-(监视器和锁的概念) -
王新春:
小侠有点帅哦 写道此流怎么关闭新春这个实现 可以不关闭的,哈哈 ...
源码剖析之java.io.ByteArrayOutputStream -
小侠有点帅哦:
此流怎么关闭新春
源码剖析之java.io.ByteArrayOutputStream -
cumt168:
写的很好为什么初始化参数,年轻代-Xmn10M def new ...
jvm之内存申请过程分析 -
ronin47:
应该是跟共享域名思路差不多,根据cookie的key作判断
跨域:一种通过服务端解决跨域的实现
继续ThreadPoolExecutor 线程池的分析~~~~~
一般一个简单线程池至少包含下列组成部分:
1.线程池管理器(ThreadPoolManager):用于创建线程并管理线程池。
2.工作线程(WorkThread): 线程池中线程。
3.任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
4.任务队列(BlockingQueue):用于存放没有处理的任务。提供一种缓冲机制。
线程池最核心的点:就是一个线程可以执行多个任务,并且是任务新增后,线程能够及时感知,并去执行。这就要求线程在没有任务执行的时候,出于等待状态,而不是直接退出。
ThreadPoolExecutor 实现的核心的思想也不例外:WorkThread 一直从BlockingQueue中获取新任务,如果获得那么执行,如果获得不了那么进入等待,直到获得新的任务。这样就保证线程可以执行多个任务。
线程池任务执行的流程:appThread 把Task 交给 ThreadPoolManager ,ThreadPoolManager 把Task交给 BlockingQueue,WorkThread 一直在等待从BlockingQueue 中获得Task执行。
注意:appThread 和 workThread 是通过 BlockingQueue 进行解耦,实现了异步执行。
那么下面我们分析下java中ThreadPoolExecutor 的实现,以更好的理解线程池~
任务队列:本实现中使用的Java中的任务队列是BlockingQueue 接口,可以是ArrayBlockingQueue 也可以是LinkedBlockingQueue等。
workThread实现:
线程池任务管理器:
下面是线程池执行任务时留给用户扩展功能的方法,这些方法可以在任务执行前执行后或者线程池结束前做一些扩展的逻辑:
如果线程池关闭或者不能接受更多的任务,丢弃测试有如下几种:
我们也可以自定义我们自己的丢弃策略,如添加日志、做特定通知等等。
一般一个简单线程池至少包含下列组成部分:
1.线程池管理器(ThreadPoolManager):用于创建线程并管理线程池。
2.工作线程(WorkThread): 线程池中线程。
3.任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
4.任务队列(BlockingQueue):用于存放没有处理的任务。提供一种缓冲机制。
线程池最核心的点:就是一个线程可以执行多个任务,并且是任务新增后,线程能够及时感知,并去执行。这就要求线程在没有任务执行的时候,出于等待状态,而不是直接退出。
ThreadPoolExecutor 实现的核心的思想也不例外:WorkThread 一直从BlockingQueue中获取新任务,如果获得那么执行,如果获得不了那么进入等待,直到获得新的任务。这样就保证线程可以执行多个任务。
线程池任务执行的流程:appThread 把Task 交给 ThreadPoolManager ,ThreadPoolManager 把Task交给 BlockingQueue,WorkThread 一直在等待从BlockingQueue 中获得Task执行。
注意:appThread 和 workThread 是通过 BlockingQueue 进行解耦,实现了异步执行。
那么下面我们分析下java中ThreadPoolExecutor 的实现,以更好的理解线程池~
任务队列:本实现中使用的Java中的任务队列是BlockingQueue 接口,可以是ArrayBlockingQueue 也可以是LinkedBlockingQueue等。
/** *任务队列 */ private final BlockingQueue<Runnable> workQueue;
workThread实现:
/** * 工作线程 */ private final class Worker implements Runnable { /** * 当task被执行时候,runLock 必须获取,并在执行完后释放。主要是用来,通过打断工作线程来取消task执行的场景。 */ private final ReentrantLock runLock = new ReentrantLock(); /** * 进入run loop之前 初始化的第一个任务 */ private Runnable firstTask; /** 每个线程完成的任务的计数,最后termination的时候,统计到completedTaskCount */ volatile long completedTasks; /** * worker 运行的线程 */ Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } /** * 如果当前线程没有执行任务,那么中断线程(关键是runLock 锁,一旦能获得锁,就意味着没有任务在执行) */ void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } /** * Interrupts thread even if running a task. */ void interruptNow() { thread.interrupt(); } /** * Runs a single task between before/after methods. */ private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { /* * Ensure that unless pool is stopping, this thread * does not have its interrupt set. This requires a * double-check of state in case the interrupt was * cleared concurrently with a shutdownNow -- if so, * the interrupt is re-enabled. */ if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); /* ran: 是用来保证afterExecute 肯定会被执行!! */ boolean ran = false; beforeExecute(thread, task); //任务执行前的扩展点 try { task.run(); //执行任务 ran = true; afterExecute(task, null); //任务执行后的扩展点 ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } /** * thread(start ->run ) -->work(->run)->runTask */ public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null //注意:如果 task != null 那么runTask 当前任务!!! || (task = getTask()) != null) { //如果task == null,并且getTask不为null 那么runTask 新任务 runTask(task); task = null; } } finally { workerDone(this); } } }
线程池任务管理器:
package java.util.concurrent; import java.util.concurrent.locks.*; import java.util.*; public class ThreadPoolExecutor extends AbstractExecutorService { /** * Permission for checking shutdown */ private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); /** * runState 功能:提供线程池生命周期的控制 * * RUNNING: 可以接受新任务,并处理队列中的任务。 * SHUTDOWN: 不再接受新任务,但是会处理队列中的任务。 * STOP: 不再接受新任务,也不处理队列中的人,并且打断正在处理中的任务 * TERMINATED: 和stop一样,并且关闭所有的线程 * * 状态转换: * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * SHUTDOWN -> TERMINATED * When both queue and pool are empty * STOP -> TERMINATED: * When pool is empty */ volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3; /** * 持有此锁 来更新o poolSize, corePoolSize, * maximumPoolSize, runState, and workers set. */ private final ReentrantLock mainLock = new ReentrantLock(); /** * 支持awaitTermination 的条件 */ private final Condition termination = mainLock.newCondition(); /** * 线程池中的所有工作线程,必须在持有mainlock的情况下更新 */ private final HashSet<Worker> workers = new HashSet<Worker>(); /** * allowCoreThreadTimeOut = ture时的参考量 */ private volatile long keepAliveTime; /** * 是否允许CoreThread 过期,默认是false ,core thread 会一直alive 即便是空闲。如果为ture,那么会等待keepAliveTime ,超时会关闭 */ private volatile boolean allowCoreThreadTimeOut; /** * 核心线程数目。小于corePoolSize 会一直创建新线程来处理任务。 */ private volatile int corePoolSize; /** * 最大线程数目 */ private volatile int maximumPoolSize; /** * 持有mainLock 锁定情况下才能更新的 当前线程数目 */ private volatile int poolSize; /** * 任务丢弃策略 */ private volatile RejectedExecutionHandler handler; /** * 线程工厂 */ private volatile ThreadFactory threadFactory; /** * 线程池中的最大数目 */ private int largestPoolSize; /** * 已经完成的任务数 */ private long completedTaskCount; /** * 默认的丢弃策略 */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); /** *创建线程池的构造函数 * * @param corePoolSize 池中所保存的线程数,包括空闲线程。 * @param maximumPoolSize 池中允许的最大线程数。超过最大线程数,将执行RejectedExecutionHandler 逻辑 * @param keepAliveTime 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 * @param unit 参数的时间单位。 * @param workQueue 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 * @param threadFactory 执行程序创建新线程时使用的工厂。 * @param handler 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。 */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } /* * Support for execute(). * * 方法 execute() 处理各种提交一个新task情况,execute 分三个步骤执行 * * 1. 当线程数目小于corePoolSize 时,会根据提供的command命令启动一个新的线程,command 会是第一个执行的任务 * 此时会调用addIfUnderCorePoolSize 来检测 runState and pool size * * 2. 把任务放到队列里。 * * 3. 如果队列不能接受command任务。那么我们试图新增一个线程, 通过addIfUnderMaximumPoolSize 来完成创建新线程来执行任务。 * * 如果失败,那么交由RejectedExecutionHandler来处理拒绝的逻辑。 */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //如果 poolSize >= corePoolSize 失败,那么addIfUnderCorePoolSize 进行创建新线程处理 if (runState == RUNNING && workQueue.offer(command)) { //如果poolsize > =corePoolSize,或者创建新线程失败了,那么向workQueue队列里放置任务 if (runState != RUNNING || poolSize == 0) //如果runState状态发生变化,那么调用;如果状态没有变化,那么无所谓又其他线程执行! ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) //如果向workQueue队列放置任务失败,那么执行addIfUnderMaximumPoolSize 添加新新线程执行command任务 reject(command); // 如果addIfUnderMaximumPoolSize 失败,那么走拒绝任务执行流程 } } /** * 创建一个(执行首个任务为firstTask)的线程,并返回 * @param firstTask 当前线程执行的第一个任务。firstTask 可以为null */ private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); //注意:非常关键thread.run() 委托给了 Worker.run() 调用。woker的run调用是循环获得任务(有可能等待的)! if (t != null) { w.thread = t; //添加工作线程 workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) //调节最大线程数 largestPoolSize = nt; } return t; } /** pool 没有shutdown,并且poolSize(当前活动的线程数) <corePoolSize,创建一个新线程 */ private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) //注意条件!! t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; } /** * 创建线程 当前线程数小于 maximumPoolSize ,并且线程池没有关闭 */ private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; } /** * 插入一个任务后,如果pool state 的状态发生变化(可能被并发的调用shutdownNow),再次做检查 */ private void ensureQueuedTaskHandled(Runnable command) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean reject = false; Thread t = null; try { int state = runState; if (state != RUNNING && workQueue.remove(command)) //如果 state != RUNNING 意味着 可能已经关闭。 reject = true; else if (state < STOP && //此处只可能是shutdown,那么如果 poolSize <corePoolSize,并且workQueue.isEmpty()=false ,那么新生成线程来处理任务。 poolSize < Math.max(corePoolSize, 1) && !workQueue.isEmpty()) t = addThread(null); } finally { mainLock.unlock(); } if (reject) reject(command); //丢弃这个任务 else if (t != null) t.start(); } /** * 丢弃一个任务 */ void reject(Runnable command) { handler.rejectedExecution(command, this); } /* Utilities for worker thread control */ Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) //注意:如果线程池已经关闭(此时为shutdownNow状态,不分配任务),那么不继续执行其他任务 return null; Runnable r; if (state == SHUTDOWN) //注意: 如果是shutdown ,此时为shutdown状态,需要把队列中的任务执行完毕!!! r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) // 如果当前线程poolSize >corePoolSize(任务比较多哈) 或者允许核心线程过期, 那么从队列中取限时任务 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); //注意:线addIfUnderMaximumPoolSize程池的最关键的实现在这里,如果没有任务,那么进入等待!!!! if (r != null) return r; //如果没有取到数据,则表示是不是需要关闭线程池里面的一些线程了 if (workerCanExit()) { //如果工作线程允许退出(调用shutdownnow,队列为空,允许线程过期并且线程数目多于1个) if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); //终端当前线程 return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } } /** * 检查是否一个worker线程没有获得一个task的时候,允许退出。如果pool 是stop 或者队列为空,或者允许核心线程过期并且线程数多于1个。 */ private boolean workerCanExit() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); boolean canExit; try { canExit = runState >= STOP || workQueue.isEmpty() || (allowCoreThreadTimeOut && poolSize > Math.max(1, corePoolSize)); } finally { mainLock.unlock(); } return canExit; } /** * 唤醒所有的出于等待任务的线程。 */ void interruptIdleWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfIdle(); } finally { mainLock.unlock(); } } /** * 记录已经退出的worker 完成的任务数量 */ void workerDone(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); if (--poolSize == 0) tryTerminate(); } finally { mainLock.unlock(); } } /* Termination support. */ /** * 终止线程池 */ private void tryTerminate() { if (poolSize == 0) { int state = runState; if (state < STOP && !workQueue.isEmpty()) { //如果活动能够队列不为空,那么不能终止线程池 state = RUNNING; // disable termination check below Thread t = addThread(null); if (t != null) t.start(); } if (state == STOP || state == SHUTDOWN) { runState = TERMINATED; termination.signalAll(); terminated(); } } } public void shutdown() { SecurityManager security = System.getSecurityManager(); if (security != null) security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (security != null) { // Check if caller can modify our threads for (Worker w : workers) security.checkAccess(w.thread); } int state = runState; if (state < SHUTDOWN) runState = SHUTDOWN; //保证不再接受新的任务 try { for (Worker w : workers) { w.interruptIfIdle();//关闭空闲的worker线程 } } catch (SecurityException se) { // Try to back out runState = state; // tryTerminate() here would be a no-op throw se; } tryTerminate(); // Terminate now if pool and queue empty } finally { mainLock.unlock(); } } /** * 立即关闭线程池 */ public List<Runnable> shutdownNow() { /* * shutdownNow differs from shutdown only in that * 1. runState is set to STOP, * 2. all worker threads are interrupted, not just the idle ones, and * 3. the queue is drained and returned. */ SecurityManager security = System.getSecurityManager(); if (security != null) security.checkPermission(shutdownPerm); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (security != null) { // Check if caller can modify our threads for (Worker w : workers) security.checkAccess(w.thread); } int state = runState; if (state < STOP) runState = STOP; try { for (Worker w : workers) { //所有工作线程立即关闭! w.interruptNow(); } } catch (SecurityException se) { // Try to back out runState = state; // tryTerminate() here would be a no-op throw se; } List<Runnable> tasks = drainQueue(); tryTerminate(); // Terminate now if pool and queue empty return tasks; } finally { mainLock.unlock(); } } /** * 调用shutdownNow 的时候,把任务列表项组织成新的list */ private List<Runnable> drainQueue() { List<Runnable> taskList = new ArrayList<Runnable>(); workQueue.drainTo(taskList); while (!workQueue.isEmpty()) { Iterator<Runnable> it = workQueue.iterator(); try { if (it.hasNext()) { Runnable r = it.next(); if (workQueue.remove(r)) taskList.add(r); } } catch (ConcurrentModificationException ignore) { } } return taskList; } /** 返回是否在running状态*/ public boolean isShutdown() { return runState != RUNNING; } /** * 返回线程池执行器是否调用shutdown或者shutdownnow */ public boolean isTerminating() { int state = runState; return state == SHUTDOWN || state == STOP; } public boolean isTerminated() { return runState == TERMINATED; } /** 等待结束当前线程池*/ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { //对于awaitNanos 的情况,一般需要循环!! if (runState == TERMINATED) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } } protected void finalize() { shutdown(); } /** * 设置线程生产工厂 */ public void setThreadFactory(ThreadFactory threadFactory) { if (threadFactory == null) throw new NullPointerException(); this.threadFactory = threadFactory; } /** * 获取线程生产工厂 */ public ThreadFactory getThreadFactory() { return threadFactory; } /** * 设置任务丢弃策略 */ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { if (handler == null) throw new NullPointerException(); this.handler = handler; } /** * 获取任务丢弃策略 */ public RejectedExecutionHandler getRejectedExecutionHandler() { return handler; } /** * Sets the core number of threads. This overrides any value set * in the constructor. If the new value is smaller than the * current value, excess existing threads will be terminated when * they next become idle. If larger, new threads will, if needed, * be started to execute any queued tasks. * * @param corePoolSize the new core size * @throws IllegalArgumentException if <tt>corePoolSize</tt> * less than zero * @see #getCorePoolSize */ public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int extra = this.corePoolSize - corePoolSize; this.corePoolSize = corePoolSize; if (extra < 0) { int n = workQueue.size(); // don't add more threads than tasks while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize) { Thread t = addThread(null); if (t != null) t.start(); else break; } } else if (extra > 0 && poolSize > corePoolSize) { try { Iterator<Worker> it = workers.iterator(); while (it.hasNext() && extra-- > 0 && poolSize > corePoolSize && workQueue.remainingCapacity() == 0) it.next().interruptIfIdle(); } catch (SecurityException ignore) { // Not an error; it is OK if the threads stay live } } } finally { mainLock.unlock(); } } /** * 获取核心线程的数量 */ public int getCorePoolSize() { return corePoolSize; } /** * 预先启动一个核心线程 */ public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); } /** * 预先启动所有的核心线程 */ public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null)) ++n; return n; } public boolean allowsCoreThreadTimeOut() { return allowCoreThreadTimeOut; } public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); allowCoreThreadTimeOut = value; } /** * Sets the maximum allowed number of threads. This overrides any * value set in the constructor. If the new value is smaller than * the current value, excess existing threads will be * terminated when they next become idle. * * @param maximumPoolSize the new maximum * @throws IllegalArgumentException if the new maximum is * less than or equal to zero, or * less than the {@linkplain #getCorePoolSize core pool size} * @see #getMaximumPoolSize */ public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int extra = this.maximumPoolSize - maximumPoolSize; this.maximumPoolSize = maximumPoolSize; if (extra > 0 && poolSize > maximumPoolSize) { try { Iterator<Worker> it = workers.iterator(); while (it.hasNext() && extra > 0 && poolSize > maximumPoolSize) { it.next().interruptIfIdle(); --extra; } } catch (SecurityException ignore) { } } } finally { mainLock.unlock(); } } /** * 返回最大的线程数 */ public int getMaximumPoolSize() { return maximumPoolSize; } public void setKeepAliveTime(long time, TimeUnit unit) { if (time < 0) throw new IllegalArgumentException(); if (time == 0 && allowsCoreThreadTimeOut()) throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); this.keepAliveTime = unit.toNanos(time); } /** * 返回线程的存活时间 */ public long getKeepAliveTime(TimeUnit unit) { return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); } /* User-level queue utilities */ /** *返回任务队列。(调试用) * * @return the task queue */ public BlockingQueue<Runnable> getQueue() { return workQueue; } public boolean remove(Runnable task) { return getQueue().remove(task); } /** * Tries to remove from the work queue all {@link Future} * tasks that have been cancelled. This method can be useful as a * storage reclamation operation, that has no other impact on * functionality. Cancelled tasks are never executed, but may * accumulate in work queues until worker threads can actively * remove them. Invoking this method instead tries to remove them now. * However, this method may fail to remove tasks in * the presence of interference by other threads. */ public void purge() { // Fail if we encounter interference during traversal try { Iterator<Runnable> it = getQueue().iterator(); while (it.hasNext()) { Runnable r = it.next(); if (r instanceof Future<?>) { Future<?> c = (Future<?>)r; if (c.isCancelled()) it.remove(); } } } catch (ConcurrentModificationException ex) { return; } } /** * 返回池中当前的线程数量 */ public int getPoolSize() { return poolSize; } /** * 返回活动的线程数量 */ public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w : workers) { if (w.isActive()) ++n; } return n; } finally { mainLock.unlock(); } } /** * 返回最大的poolsize值。 */ public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); } } /** * 返回所有提交过的任务数量(近似值) */ public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) { n += w.completedTasks; //累加池中每个线程已经完成的任务数 if (w.isActive()) //如果线程是活跃的,表明其正在执行任务 ++n; } return n + workQueue.size();//累加上缓冲队列里的任务 } finally { mainLock.unlock(); } } /** * 返回所有已经执行完的任务的数量(近似值) */ public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); } } }
下面是线程池执行任务时留给用户扩展功能的方法,这些方法可以在任务执行前执行后或者线程池结束前做一些扩展的逻辑:
/* Extension hooks */ /** * 任务执行请执行的方法。子类可以覆盖此方法 */ protected void beforeExecute(Thread t, Runnable r) { } /** * 每个任务执行完毕执行的方法.子类可以覆盖此方法 */ protected void afterExecute(Runnable r, Throwable t) { } /** * Executor 关闭时,执行的方法。子类可以覆盖此方法 */ protected void terminated() { }
如果线程池关闭或者不能接受更多的任务,丢弃测试有如下几种:
我们也可以自定义我们自己的丢弃策略,如添加日志、做特定通知等等。
/** * 在当前的调用线程中执行此任务的策略 */ public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } /** * 抛出异常的丢弃 */ public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException(); } } /** * 直接丢弃不做任何处理 */ public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } /** * 用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。 */ public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
发表评论
-
Thread.isInterrupted 的理解
2017-05-24 21:01 1864interrupt方法用于中断线程。调用该方法的线程的状态 ... -
Condition&(wait,notify)
2017-05-22 10:58 567Condition.await和Object.wait ... -
AQS-预备-背景
2017-05-20 18:16 613AbstractQueuedSynchronizer: ... -
LockSupport
2017-05-19 22:15 594LockSupport类是Java6(JSR166-JUC ... -
Synchronized&AbstractQueuedSynchronizer[摘抄]
2017-05-19 21:29 517目前在Java中存在两种锁机制:synchronized和 ... -
CountDownLatch/CyclicBarrier
2017-05-19 20:59 980CountDownLatch: 功能:是一个同步工具类 ... -
Thread-wait/notify
2017-05-19 11:59 663java 线程通过对象的Wait和Notify进行同步,但 ... -
AQS-预备-FIFOMutex
2017-05-18 20:25 523通过AtomicBoolean 和队列 ... -
AQS-预备知识 CLH lock queue[摘抄]
2017-05-18 20:07 440经典! -
多个线程到达后才能执行某个任务,并且只能执行一次
2015-04-02 23:51 4017有一种场景:多个线程到达(比如合并多个线程返回的结果)后才能执 ... -
ThreadLocal 在web环境下使用的边界问题
2014-06-12 13:30 3528ThreadLocal 相关分析,请查看http://wang ... -
原码剖析之ThreadPoolExecutor入门
2013-06-15 10:44 1440jdk 1.5 开始提供支持线 ... -
源码剖析之ThreadLocal
2013-06-08 12:57 3249背景:1、如果一个对象 ... -
源码剖析之CyclicBarrier
2013-06-07 00:07 2894CyclicBarrier:jdk current 包提供了一 ... -
Thread的join方法
2013-02-25 23:44 1616Thread类中的join方法的语义: void java. ... -
lock 锁class类对象和实例对象
2013-02-25 23:18 2211java thread 线程中 的synchronized关键 ... -
lock实现运行时死锁检测
2013-02-24 23:02 1567java的多线程机制非常强 ... -
异常与锁的释放(synchronized )
2013-02-16 23:28 5447synchronized 获取的锁,在方法抛出异常的时候会自动 ... -
异常与锁的释放(lock)
2013-02-16 22:57 2604获取lock锁后发生异常后,线程退出,lock锁不释放 p ...
相关推荐
STL原码剖析(PDF完整版).part2.rar
STL原码剖析,源码面前,了无秘密,可以深入了解STL中容器的设计思想
STL原码剖析-简体版.pdf 注意是简体版本
《STL原码剖析》是一本深度探讨标准模板库(Standard Template Library,简称STL)实现原理的专业书籍。STL是C++编程语言中的一个重要组成部分,它提供了高效、灵活的容器、迭代器、算法和函数对象,极大地提升了C++...
总的来说,《STL原码剖析<庖丁解牛>》是一本深度揭秘STL的权威之作,无论是初学者还是经验丰富的开发者,都能从中获益匪浅。通过阅读这本书,读者不仅能掌握STL的基本使用,还能深入理解其背后的思维方式和编程艺术...
学习STL的很好的入门教材,华中科技大学出版,侯捷著。
本文通过对给定的代码片段进行了详细分析,介绍了如何实现一位原码乘法的过程。通过了解这些基础知识,读者可以更好地理解计算机内部是如何执行乘法运算的。此外,熟悉这些底层实现有助于加深对计算机体系结构的理解...
本书是多位作者在3年Liunx内核分析经验和庞大资料基础上写成的,收录了其他同类书未曾讲解的内容并进行逐行分析,一扫当前市场中其他理论书带给读者的郁闷。书中详细的代码分析与大量插图能够使读者对Linux内核及ARM...
原码一位乘法器是计算机组成原理课程设计的重要组成部分,它们之间的相乘结果的符号为相乘两数符号的异或值,而数值则为两数绝对值之积。本文将讲解原码一位乘法器的设计原理和实现方法。 原码一位乘法器的设计原理...
snort原码分析,还不错的哦! Snort作为一个轻量级的网络入侵检测系统,在实际中应用可能会有些力不从心,但如果想了解研究IDS的工作原理,仔细研究一下它的源码到是非常不错.首先对snort做一个概括的评论。 从工作...
学习编程的人都知道,阅读、剖析名家代码乃是提高水平的捷径。源码之前,了无秘密。大师们的缜密思维、经验结晶、技术思路、独到风格,都原原本本体现在源码之中。 这本书所呈现的源码,使读者看到vector的实现、...
学习编程的人都知道,阅读、剖析名家代码乃是提高水平的捷径。源码之前,了无秘密。大师们的缜密思维、经验结晶、技术思路、独到风格,都原原本本体现在源码之中。这本书所呈现的源码,使读者看到vector的实现、list...
这样的表示方法解决了原码中的一些问题,如“-0”的问题不再存在,加法和减法的规则得到了一定的统一,但是仍旧保留了一些不便之处。 补码的引入,则完全解决了上述问题。在补码表示法中,正数的编码方式与原码和...
原码、反码和补码是二进制表示正负数的关键概念,它们主要用于无符号整数和有符号整数的表示。以下是对这些知识点的详细解释: 1. **原码**:原码是最直观的二进制表示,其中最高位(称为符号位)为0表示正数,为1...
将原码转换成补码,再将补码转成原码的matlab程序
在计算机科学中,二进制表示的数字有三种主要形式:原码、反码和补码,主要用于表示有符号整数。本项目是基于C++的MFC(Microsoft Foundation Classes)框架实现的一个实用工具,旨在帮助用户理解并进行原码、反码和...
在计算机科学和数字电子技术中,原码和补码是两种表示二进制数的方法,尤其是在处理负数时。原码直接用最高位作为符号位,0代表正,1代表负,其余位表示数值的绝对值。补码则是用来表示负数的一种方式,它的特点是...