`

Java多线程(十二)之线程池深入分析(下)

阅读更多

一、数据结构与线程构造方法

 

由于已经看到了ThreadPoolExecutor的源码,因此很容易就看到了ThreadPoolExecutor线程池的数据结构。图1描述了这种数据结构。

图1 ThreadPoolExecutor 数据结构

其实,即使没有上述图形描述ThreadPoolExecutor的数据结构,我们根据线程池的要求也很能够猜测出其数据结构出来。

  • 线程池需要支持多个线程并发执行,因此有一个线程集合Collection<Thread>来执行线程任务;
  • 涉及任务的异步执行,因此需要有一个集合来缓存任务队列Collection<Runnable>;
  • 很显然在多个线程之间协调多个任务,那么就需要一个线程安全的任务集合,同时还需要支持阻塞、超时操作,那么BlockingQueue是必不可少的;
  • 既然是线程池,出发点就是提高系统性能同时降低资源消耗,那么线程池的大小就有限制,因此需要有一个核心线程池大小(线程个数)和一个最大线程池大小(线程个数),有一个计数用来描述当前线程池大小;
  • 如果是有限的线程池大小,那么长时间不使用的线程资源就应该销毁掉,这样就需要一个线程空闲时间的计数来描述线程何时被销毁;
  • 前面描述过线程池也是有生命周期的,因此需要有一个状态来描述线程池当前的运行状态;
  • 线程池的任务队列如果有边界,那么就需要有一个任务拒绝策略来处理过多的任务,同时在线程池的销毁阶段也需要有一个任务拒绝策略来处理新加入的任务;
  • 上面种的线程池大小、线程空闲实际那、线程池运行状态等等状态改变都不是线程安全的,因此需要有一个全局的锁(mainLock)来协调这些竞争资源;
  • 除了以上数据结构以外,ThreadPoolExecutor还有一些状态用来描述线程池的运行计数,例如线程池运行的任务数、曾经达到的最大线程数,主要用于调试和性能分析。

 

对于ThreadPoolExecutor而言,一个线程就是一个Worker对象,它与一个线程绑定,当Worker执行完毕就是线程执行完毕,这个在后面详细讨论线程池中线程的运行方式。

既然是线程池,那么就首先研究下线程的构造方法。

 

[java] view plaincopy
 
  1. public interface ThreadFactory {  
  2.     Thread newThread(Runnable r);  
  3. }  

 

 

ThreadPoolExecutor使用一个线程工厂来构造线程。线程池都是提交一个任务Runnable,然后在某一个线程Thread中执行,ThreadFactory 负责如何创建一个新线程。

在J.U.C中有一个通用的线程工厂java.util.concurrent.Executors.DefaultThreadFactory,它的构造方式如下:

 

[java] view plaincopy
 
  1. static class DefaultThreadFactory implements ThreadFactory {  
  2.     static final AtomicInteger poolNumber = new AtomicInteger(1);  
  3.     final ThreadGroup group;  
  4.     final AtomicInteger threadNumber = new AtomicInteger(1);  
  5.     final String namePrefix;  
  6.     DefaultThreadFactory() {  
  7.         SecurityManager s = System.getSecurityManager();  
  8.         group = (s != null)? s.getThreadGroup() :  
  9.                              Thread.currentThread().getThreadGroup();  
  10.         namePrefix = "pool-" +  
  11.                       poolNumber.getAndIncrement() +  
  12.                      "-thread-";  
  13.     }  
  14.     public Thread newThread(Runnable r) {  
  15.         Thread t = new Thread(group, r,  
  16.                               namePrefix + threadNumber.getAndIncrement(),  
  17.                               0);  
  18.         if (t.isDaemon())  
  19.             t.setDaemon(false);  
  20.         if (t.getPriority() != Thread.NORM_PRIORITY)  
  21.             t.setPriority(Thread.NORM_PRIORITY);  
  22.         return t;  
  23.     }  
  24. }  

 

 

在这个线程工厂中,同一个线程池的所有线程属于同一个线程组,也就是创建线程池的那个线程组,同时线程池的名称都是“pool-<poolNum>-thread-<threadNum>”,其中poolNum是线程池的数量序号,threadNum是此线程池中的线程数量序号。这样如果使用jstack的话很容易就看到了系统中线程池的数量和线程池中线程的数量。另外对于线程池中的所有线程默认都转换为非后台线程,这样主线程退出时不会直接退出JVM,而是等待线程池结束。还有一点就是默认将线程池中的所有线程都调为同一个级别,这样在操作系统角度来看所有系统都是公平的,不会导致竞争堆积。

 

二、线程池中线程生命周期

 

一个线程Worker被构造出来以后就开始处于运行状态。以下是一个线程执行的简版逻辑。

 

[java] view plaincopy
 
  1. private final class Worker implements Runnable {  
  2.     private final ReentrantLock runLock = new ReentrantLock();  
  3.     private Runnable firstTask;  
  4.     Thread thread;  
  5.     Worker(Runnable firstTask) {  
  6.         this.firstTask = firstTask;  
  7.     }  
  8.     private void runTask(Runnable task) {  
  9.         final ReentrantLock runLock = this.runLock;  
  10.         runLock.lock();  
  11.         try {  
  12.            task.run();  
  13.         } finally {  
  14.             runLock.unlock();  
  15.         }  
  16.     }  
  17.     public void run() {  
  18.         try {  
  19.             Runnable task = firstTask;  
  20.             firstTask = null;  
  21.             while (task != null || (task = getTask()) != null) {  
  22.                 runTask(task);  
  23.                 task = null;  
  24.             }  
  25.         } finally {  
  26.             workerDone(this);  
  27.         }  
  28.     }  
  29. }  

 

 

当提交一个任务时,如果需要创建一个线程(何时需要在下一节中探讨)时,就调用线程工厂创建一个线程,同时将线程绑定到Worker工作队列中。需要说明的是,Worker队列构造的时候带着一个任务Runnable,因此Worker创建时总是绑定着一个待执行任务。换句话说,创建线程的前提是有必要创建线程(任务数已经超出了线程或者强制创建新的线程,至于为何强制创建新的线程后面章节会具体分析),不会无缘无故创建一堆空闲线程等着任务。这是节省资源的一种方式。

一旦线程池启动线程后(调用线程run())方法,那么线程工作队列Worker就从第1个任务开始执行(这时候发现构造Worker时传递一个任务的好处了),一旦第1个任务执行完毕,就从线程池的任务队列中取出下一个任务进行执行。循环如此,直到线程池被关闭或者任务抛出了一个RuntimeException。

由此可见,线程池的基本原理其实也很简单,无非预先启动一些线程,线程进入死循环状态,每次从任务队列中获取一个任务进行执行,直到线程池被关闭。如果某个线程因为执行某个任务发生异常而终止,那么重新创建一个新的线程而已。如此反复。

其实,线程池原理看起来简单,但是复杂的是各种策略,例如何时该启动一个线程,何时该终止、挂起、唤醒一个线程,任务队列的阻塞与超时,线程池的生命周期以及任务拒绝策略等等。

 

三、线程池任务执行流程

 

我们从一个API开始接触Executor是如何处理任务队列的。

java.util.concurrent.Executor.execute(Runnable)

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

线程池中所有任务执行都依赖于此接口。这段话有以下几个意思:

  1. 任务可能在将来某个时刻被执行,有可能不是立即执行。为什么这里有两个“可能”?继续往下面看。
  2. 任务可能在一个新的线程中执行或者线程池中存在的一个线程中执行。
  3. 任务无法被提交执行有以下两个原因:线程池已经关闭或者线程池已经达到了容量限制。
  4. 所有失败的任务都将被“当前”的任务拒绝策略RejectedExecutionHandler 处理。

回答上面两个“可能“。任务可能被执行,那不可能的情况就是上面说的情况3;可能不是立即执行,是因为任务可能还在队列中排队,因此还在等待分配线程执行。了解完了字面上的问题,我们再来看具体的实现。

 

[java] view plaincopy
 
  1. public void execute(Runnable command) {  
  2.     if (command == null)  
  3.         throw new NullPointerException();  
  4.     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
  5.         if (runState == RUNNING && workQueue.offer(command)) {  
  6.             if (runState != RUNNING || poolSize == 0)  
  7.                 ensureQueuedTaskHandled(command);  
  8.         }  
  9.         else if (!addIfUnderMaximumPoolSize(command))  
  10.             reject(command); // is shutdown or saturated  
  11.     }  
  12. }  



 

这一段代码看起来挺简单的,其实这就是线程池最重要的一部分,如果能够完全理解这一块,线程池还是挺容易的。整个执行流程是这样的:

  1. 如果任务command为空,则抛出空指针异常,返回。否则进行2。
  2. 如果当前线程池大小 大于或等于 核心线程池大小,进行4。否则进行3。
  3. 创建一个新工作队列(线程,参考上一节),成功直接返回,失败进行4。
  4. 如果线程池正在运行并且任务加入线程池队列成功,进行5,否则进行7。
  5. 如果线程池已经关闭或者线程池大小为0,进行6,否则直接返回。
  6. 如果线程池已经关闭则执行拒绝策略返回,否则启动一个新线程来进行执行任务,返回。
  7. 如果线程池大小 不大于 最大线程池数量,则启动新线程来进行执行,否则进行拒绝策略,结束。

文字描述步骤不够简单?下面图形详细表述了此过程。

老实说这个图比上面步骤更难以理解,那么从何入手呢。

流程的入口很简单,我们就是要执行一个任务(Runnable command),那么它的结束点在哪或者有哪几个?

根据左边这个图我们知道可能有以下几种出口:

(1)图中的P1、P7,我们根据这条路径可以看到,仅仅是将任务加入任务队列(offer(command))了;

(2)图中的P3,这条路径不将任务加入任务队列,但是启动了一个新工作线程(Worker)进行扫尾操作,用户处理为空的任务队列;

(3)图中的P4,这条路径没有将任务加入任务队列,但是启动了一个新工作线程(Worker),并且工作现场的第一个任务就是当前任务;

(4)图中的P5、P6,这条路径没有将任务加入任务队列,也没有启动工作线程,仅仅是抛给了任务拒绝策略。P2是任务加入了任务队列却因为线程池已经关闭于是又从任务队列中删除,并且抛给了拒绝策略。

如果上面的解释还不清楚,可以去研究下面两段代码:

 

[java] view plaincopy
 
  1. java.util.concurrent.ThreadPoolExecutor.addIfUnderCorePoolSize(Runnable)  
  2. java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Runnable)  
  3. java.util.concurrent.ThreadPoolExecutor.ensureQueuedTaskHandled(Runnable)  



 

那么什么时候一个任务被立即执行呢?

在线程池运行状态下,如果线程池大小 小于 核心线程池大小或者线程池已满(任务队列已满)并且线程池大小 小于 最大线程池大小(此时线程池大小 大于 核心线程池大小的),用程序描述为:

 

[java] view plaincopy
 
  1. runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())  

 

 

上面的条件就是一个任务能够被立即执行的条件。

有了execute的基础,我们看看ExecutorService中的几个submit方法的实现。

 

[java] view plaincopy
 
  1. public Future<?> submit(Runnable task) {  
  2.         if (task == nullthrow new NullPointerException();  
  3.         RunnableFuture<Object> ftask = newTaskFor(task, null);  
  4.         execute(ftask);  
  5.         return ftask;  
  6.     }  
  7.   
  8.     public <T> Future<T> submit(Runnable task, T result) {  
  9.         if (task == nullthrow new NullPointerException();  
  10.         RunnableFuture<T> ftask = newTaskFor(task, result);  
  11.         execute(ftask);  
  12.         return ftask;  
  13.     }  
  14.   
  15.     public <T> Future<T> submit(Callable<T> task) {  
  16.         if (task == nullthrow new NullPointerException();  
  17.         RunnableFuture<T> ftask = newTaskFor(task);  
  18.         execute(ftask);  
  19.         return ftask;  
  20.     }  



 

很简单,不是么?对于一个线程池来说复杂的地方也就在execute方法的执行流程。在下一节中我们来讨论下如何获取任务的执行结果,也就是Future类的使用和原理。

 

四、线程池任务执行结果

 

这一节来探讨下线程池中任务执行的结果以及如何阻塞线程、取消任务等等。

 

[java] view plaincopy
 
  1. package info.imxylz.study.concurrency.future;  
  2.    
  3.  public class SleepForResultDemo implements Runnable {  
  4.    
  5.      static boolean result = false;  
  6.    
  7.      static void sleepWhile(long ms) {  
  8.          try {  
  9.              Thread.sleep(ms);  
  10.          } catch (Exception e) {}  
  11.      }  
  12.    
  13.      @Override  
  14.      public void run() {  
  15.          //do work  
  16.          System.out.println("Hello, sleep a while.");  
  17.          sleepWhile(2000L);  
  18.          result = true;  
  19.      }  
  20.    
  21.      public static void main(String[] args) {  
  22.          SleepForResultDemo demo = new SleepForResultDemo();  
  23.          Thread t = new Thread(demo);  
  24.          t.start();  
  25.          sleepWhile(3000L);  
  26.          System.out.println(result);  
  27.      }  
  28.    
  29.  }  



 

在没有线程池的时代里面,使用Thread.sleep(long)去获取线程执行完毕的场景很多。显然这种方式很笨拙,他需要你事先知道任务可能的执行时间,并且还会阻塞主线程,不管任务有没有执行完毕。

 

[java] view plaincopy
 
  1. package info.imxylz.study.concurrency.future;  
  2.    
  3.  public class SleepLoopForResultDemo implements Runnable {  
  4.    
  5.      boolean result = false;  
  6.    
  7.      volatile boolean finished = false;  
  8.    
  9.      static void sleepWhile(long ms) {  
  10.          try {  
  11.              Thread.sleep(ms);  
  12.          } catch (Exception e) {}  
  13.      }  
  14.    
  15.      @Override  
  16.      public void run() {  
  17.          //do work  
  18.          try {  
  19.              System.out.println("Hello, sleep a while.");  
  20.              sleepWhile(2000L);  
  21.              result = true;  
  22.          } finally {  
  23.              finished = true;  
  24.          }  
  25.      }  
  26.    
  27.      public static void main(String[] args) {  
  28.          SleepLoopForResultDemo demo = new SleepLoopForResultDemo();  
  29.          Thread t = new Thread(demo);  
  30.          t.start();  
  31.          while (!demo.finished) {  
  32.              sleepWhile(10L);  
  33.          }  
  34.          System.out.println(demo.result);  
  35.      }  
  36.    
  37. }  



 

使用volatile与while死循环的好处就是等待的时间可以稍微小一点,但是依然有CPU负载高并且阻塞主线程的问题。最简单的降低CPU负载的方式就是使用Thread.join().

 

[java] view plaincopy
 
  1. SleepLoopForResultDemo demo = new SleepLoopForResultDemo();  
  2.         Thread t = new Thread(demo);  
  3.         t.start();  
  4.         t.join();  
  5.         System.out.println(demo.result);  



 

显然这也是一种不错的方式,另外还有自己写锁使用wait/notify的方式。其实join()从本质上讲就是利用while和wait来实现的。

上面的方式中都存在一个问题,那就是会阻塞主线程并且任务不能被取消。为了解决这个问题,线程池中提供了一个Future接口。

在Future接口中提供了5个方法。

  • V get() throws InterruptedException, ExecutionException: 等待计算完成,然后获取其结果。
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException。最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
  • boolean cancel(boolean mayInterruptIfRunning):试图取消对此任务的执行。
  • boolean isCancelled():如果在任务正常完成前将其取消,则返回 true
  • boolean isDone():如果任务已完成,则返回 true。 可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回true

API看起来容易,来研究下异常吧。get()请求获取一个结果会阻塞当前进程,并且可能抛出以下三种异常:

  • InterruptedException:执行任务的线程被中断则会抛出此异常,此时不能知道任务是否执行完毕,因此其结果是无用的,必须处理此异常。
  • ExecutionException:任务执行过程中(Runnable#run())方法可能抛出RuntimeException,如果提交的是一个java.util.concurrent.Callable<V>接口任务,那么java.util.concurrent.Callable.call()方法有可能抛出任意异常。
  • CancellationException:实际上get()方法还可能抛出一个CancellationException的RuntimeException,也就是任务被取消了但是依然去获取结果。

对于get(long timeout, TimeUnit unit)而言,除了get()方法的异常外,由于有超时机制,因此还可能得到一个TimeoutException。

boolean cancel(boolean mayInterruptIfRunning)方法比较复杂,各种情况比较多:

  1. 如果任务已经执行完毕,那么返回false。
  2. 如果任务已经取消,那么返回false。
  3. 循环直到设置任务为取消状态,对于未启动的任务将永远不再执行,对于正在运行的任务,将根据mayInterruptIfRunning是否中断其运行,如果不中断那么任务将继续运行直到结束。
  4. 此方法返回后任务要么处于运行结束状态,要么处于取消状态。isDone()将永远返回true,如果cancel()方法返回true,isCancelled()始终返回true。

来看看Future接口的实现类java.util.concurrent.FutureTask<V>具体是如何操作的。

在FutureTask中使用了一个AQS数据结构来完成各种状态以及加锁、阻塞的实现。

在此AQS类java.util.concurrent.FutureTask.Sync中一个任务用4中状态:

初始情况下任务状态state=0,任务执行(innerRun)后状态变为运行状态RUNNING(state=1),执行完毕后变成运行结束状态RAN(state=2)。任务在初始状态或者执行状态被取消后就变为状态CANCELLED(state=4)。AQS最擅长无锁情况下处理几种简单的状态变更的。

 

[java] view plaincopy
 
  1. void innerRun() {  
  2.             if (!compareAndSetState(0, RUNNING))  
  3.                 return;  
  4.             try {  
  5.                 runner = Thread.currentThread();  
  6.                 if (getState() == RUNNING) // recheck after setting thread  
  7.                     innerSet(callable.call());  
  8.                 else  
  9.                     releaseShared(0); // cancel  
  10.             } catch (Throwable ex) {  
  11.                 innerSetException(ex);  
  12.             }  
  13.         }  



 

执行一个任务有四步:设置运行状态、设置当前线程(AQS需要)、执行任务(Runnable#run或者Callable#call)、设置执行结果。这里也可以看到,一个任务只能执行一次,因为执行完毕后它的状态不在为初始值0,要么为CANCELLED,要么为RAN。

取消一个任务(cancel)又是怎样进行的呢?对比下前面取消任务的描述是不是很简单,这里无非利用AQS的状态来改变任务的执行状态,最终达到放弃未启动或者正在执行的任务的目的。

 

[java] view plaincopy
 
  1. boolean innerCancel(boolean mayInterruptIfRunning) {  
  2.     for (;;) {  
  3.         int s = getState();  
  4.         if (ranOrCancelled(s))  
  5.             return false;  
  6.         if (compareAndSetState(s, CANCELLED))  
  7.             break;  
  8.     }  
  9.     if (mayInterruptIfRunning) {  
  10.         Thread r = runner;  
  11.         if (r != null)  
  12.             r.interrupt();  
  13.     }  
  14.     releaseShared(0);  
  15.     done();  
  16.     return true;  
  17. }  



 

到目前为止我们依然没有说明到底是如何阻塞获取一个结果的。下面四段代码描述了这个过程。

 

[java] view plaincopy
 
  1. V innerGet() throws InterruptedException, ExecutionException {  
  2.          acquireSharedInterruptibly(0);  
  3.          if (getState() == CANCELLED)  
  4.              throw new CancellationException();  
  5.          if (exception != null)  
  6.              throw new ExecutionException(exception);  
  7.          return result;  
  8.      }  
  9.      //AQS#acquireSharedInterruptibly  
  10.      public final void acquireSharedInterruptibly(int arg) throws InterruptedException {  
  11.          if (Thread.interrupted())  
  12.              throw new InterruptedException();  
  13.          if (tryAcquireShared(arg) < 0)  
  14.              doAcquireSharedInterruptibly(arg); //park current Thread for result  
  15.      }  
  16.      protected int tryAcquireShared(int ignore) {  
  17.          return innerIsDone()? 1 : -1;  
  18.      }  
  19.    
  20.      boolean innerIsDone() {  
  21.          return ranOrCancelled(getState()) && runner == null;  
  22.      }  



 

当调用Future#get()的时候尝试去获取一个共享变量。这就涉及到AQS的使用方式了。这里获取一个共享变量的状态是任务是否结束(innerIsDone()),也就是任务是否执行完毕或者被取消。如果不满足条件,那么在AQS中就会doAcquireSharedInterruptibly(arg)挂起当前线程,直到满足条件。AQS前面讲过,挂起线程使用的是LockSupport的park方式,因此性能消耗是很低的。

至于将Runnable接口转换成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供了一个简单实现。

 

[java] view plaincopy
 
  1. static final class RunnableAdapter<T> implements Callable<T> {  
  2.        final Runnable task;  
  3.        final T result;  
  4.        RunnableAdapter(Runnable  task, T result) {  
  5.            this.task = task;  
  6.            this.result = result;  
  7.        }  
  8.        public T call() {  
  9.            task.run();  
  10.            return result;  
  11.        }  
  12.    }  



 

五、延迟、周期性任务调度的实现

 

java.util.concurrent.ScheduledThreadPoolExecutor是默认的延迟、周期性任务调度的实现。

有了整个线程池的实现,再回头来看延迟、周期性任务调度的实现应该就很简单了,因为所谓的延迟、周期性任务调度,无非添加一系列有序的任务队列,然后按照执行顺序的先后来处理整个任务队列。如果是周期性任务,那么在执行完毕的时候加入下一个时间点的任务即可。

由此可见,ScheduledThreadPoolExecutor和ThreadPoolExecutor的唯一区别在于任务是有序(按照执行时间顺序)的,并且需要到达时间点(临界点)才能执行,并不是任务队列中有任务就需要执行的。也就是说唯一不同的就是任务队列BlockingQueue<Runnable> workQueue不一样。ScheduledThreadPoolExecutor的任务队列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,它是基于java.util.concurrent.DelayQueue<RunnableScheduledFuture>队列的实现。

DelayQueue是基于有序队列PriorityQueue实现的。PriorityQueue 也叫优先级队列,按照自然顺序对元素进行排序,类似于TreeMap/Collections.sort一样。

同样是有序队列,DelayQueue和PriorityQueue区别在什么地方?

由于DelayQueue在获取元素时需要检测元素是否“可用”,也就是任务是否达到“临界点”(指定时间点),因此加入元素和移除元素会有一些额外的操作。

典型的,移除元素需要检测元素是否达到“临界点”,增加元素的时候如果有一个元素比“头元素”更早达到临界点,那么就需要通知任务队列。因此这需要一个条件变量final Condition available 。

移除元素(出队列)的过程是这样的:

  • 总是检测队列的头元素(顺序最小元素,也是最先达到临界点的元素)
  • 检测头元素与当前时间的差,如果大于0,表示还未到底临界点,因此等待响应时间(使用条件变量available)
  • 如果小于或者等于0,说明已经到底临界点或者已经过了临界点,那么就移除头元素,并且唤醒其它等待任务队列的线程。
  • [java] view plaincopy
     
    1. public E take() throws InterruptedException {  
    2.        final ReentrantLock lock = this.lock;  
    3.        lock.lockInterruptibly();  
    4.        try {  
    5.            for (;;) {  
    6.                E first = q.peek();  
    7.                if (first == null) {  
    8.                    available.await();  
    9.                } else {  
    10.                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);  
    11.                    if (delay > 0) {  
    12.                        long tl = available.awaitNanos(delay);  
    13.                    } else {  
    14.                        E x = q.poll();  
    15.                        assert x != null;  
    16.                        if (q.size() != 0)  
    17.                            available.signalAll(); // wake up other takers  
    18.                        return x;  
    19.   
    20.                    }  
    21.                }  
    22.            }  
    23.        } finally {  
    24.            lock.unlock();  
    25.        }  
    26.    }  


同样加入元素也会有相应的条件变量操作。当前仅当队列为空或者要加入的元素比队列中的头元素还小的时候才需要唤醒“等待线程”去检测元素。因为头元素都没有唤醒那么比头元素更延迟的元素就更加不会唤醒。

 

[java] view plaincopy
 
  1. public E take() throws InterruptedException {  
  2.        final ReentrantLock lock = this.lock;  
  3.        lock.lockInterruptibly();  
  4.        try {  
  5.            for (;;) {  
  6.                E first = q.peek();  
  7.                if (first == null) {  
  8.                    available.await();  
  9.                } else {  
  10.                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);  
  11.                    if (delay > 0) {  
  12.                        long tl = available.awaitNanos(delay);  
  13.                    } else {  
  14.                        E x = q.poll();  
  15.                        assert x != null;  
  16.                        if (q.size() != 0)  
  17.                            available.signalAll(); // wake up other takers  
  18.                        return x;  
  19.   
  20.                    }  
  21.                }  
  22.            }  
  23.        } finally {  
  24.            lock.unlock();  
  25.        }  
  26.    }  



 

有了任务队列后再来看Future在ScheduledThreadPoolExecutor中是如何操作的。

java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask<V>是继承java.util.concurrent.FutureTask<V>的,区别在于执行任务是否是周期性的。

 

[java] view plaincopy
 
  1. private void runPeriodic() {  
  2.             boolean ok = ScheduledFutureTask.super.runAndReset();  
  3.             boolean down = isShutdown();  
  4.             // Reschedule if not cancelled and not shutdown or policy allows  
  5.             if (ok && (!down ||  
  6.                        (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&  
  7.                         !isStopped()))) {  
  8.                 long p = period;  
  9.                 if (p > 0)  
  10.                     time += p;  
  11.                 else  
  12.                     time = now() - p;  
  13.                 ScheduledThreadPoolExecutor.super.getQueue().add(this);  
  14.             }  
  15.             // This might have been the final executed delayed  
  16.             // task.  Wake up threads to check.  
  17.             else if (down)  
  18.                 interruptIdleWorkers();  
  19.         }  
  20.   
  21.         /** 
  22.          * Overrides FutureTask version so as to reset/requeue if periodic. 
  23.          */  
  24.         public void run() {  
  25.             if (isPeriodic())  
  26.                 runPeriodic();  
  27.             else  
  28.                 ScheduledFutureTask.super.run();  
  29.         }  
  30.     }  



 

如果不是周期性任务调度,那么就和java.util.concurrent.FutureTask.Sync的调度方式是一样的。如果是周期性任务(isPeriodic())那么就稍微有所不同的。

先从功能/结构上分析下。第一种情况假设提交的任务每次执行花费10s,间隔(delay/period)为20s,对于scheduleAtFixedRate而言,每次执行开始时间20s,对于scheduleWithFixedDelay来说每次执行开始时间30s。第二种情况假设提交的任务每次执行时间花费20s,间隔(delay/period)为10s,对于scheduleAtFixedRate而言,每次执行开始时间10s,对于scheduleWithFixedDelay来说每次执行开始时间30s。(具体分析可以参考这里

也就是说scheduleWithFixedDelay的执行开始时间为(delay+cost),而对于scheduleAtFixedRate来说执行开始时间为max(period,cost)。

回头再来看上面源码runPeriodic()就很容易了。但特别要提醒的,如果任务的任何一个执行遇到异常,则后续执行都会被取消,这从runPeriodic()就能看出。要强调的第二点就是同一个周期性任务不会被同时执行。就比如说尽管上面第二种情况的scheduleAtFixedRate任务每隔10s执行到达一个时间点,但是由于每次执行时间花费为20s,因此每次执行间隔为20s,只不过执行的任务次数会多一点。但从本质上讲就是每隔20s执行一次,如果任务队列不取消的话。

为什么不会同时执行?

这是因为ScheduledFutureTask执行的时候会将任务从队列中移除来,执行完毕以后才会添加下一个同序列的任务,因此任务队列中其实最多只有同序列的任务的一份副本,所以永远不会同时执行(尽管要执行的时间在过去)。

 

ScheduledThreadPoolExecutor使用一个无界(容量无限,整数的最大值)的容器(DelayedWorkQueue队列),根据ThreadPoolExecutor的原理,只要当容器满的时候才会启动一个大于corePoolSize的线程数。因此实际上ScheduledThreadPoolExecutor是一个固定线程大小的线程池,固定大小为corePoolSize,构造函数里面的Integer.MAX_VALUE其实是不生效的(尽管PriorityQueue使用数组实现有PriorityQueue大小限制,如果你的任务数超过了2147483647就会导致OutOfMemoryError,这个参考PriorityQueue的grow方法)。

 

再回头看scheduleAtFixedRate等方法就容易多了。无非就是往任务队列中添加一个未来某一时刻的ScheduledFutureTask任务,如果是scheduleAtFixedRate那么period/delay就是正数,如果是scheduleWithFixedDelay那么period/delay就是一个负数,如果是0那么就是一次性任务。直接调用父类ThreadPoolExecutor的execute/submit等方法就相当于period/delay是0,并且initialDelay也是0。

 

[java] view plaincopy
 
  1. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,  
  2.                                                   long initialDelay,  
  3.                                                   long period,  
  4.                                                   TimeUnit unit) {  
  5.         if (command == null || unit == null)  
  6.             throw new NullPointerException();  
  7.         if (period <= 0)  
  8.             throw new IllegalArgumentException();  
  9.         if (initialDelay < 0) initialDelay = 0;  
  10.         long triggerTime = now() + unit.toNanos(initialDelay);  
  11.         RunnableScheduledFuture<?> t = decorateTask(command,  
  12.             new ScheduledFutureTask<Object>(command,  
  13.                                             null,  
  14.                                             triggerTime,  
  15.                                             unit.toNanos(period)));  
  16.         delayedExecute(t);  
  17.         return t;  
  18.     }  



 

另外需要补充说明的一点,前面说过java.util.concurrent.FutureTask.Sync任务只能执行一次,那么在runPeriodic()里面怎么又将执行过的任务加入队列中呢?这是因为java.util.concurrent.FutureTask.Sync提供了一个innerRunAndReset()方法,此方法不仅执行任务还将任务的状态还原成0(初始状态)了,所以此任务就可以重复执行。这就是为什么runPeriodic()里面调用runAndRest()的缘故。

 

[java] view plaincopy
 
  1. boolean innerRunAndReset() {  
  2.             if (!compareAndSetState(0, RUNNING))  
  3.                 return false;  
  4.             try {  
  5.                 runner = Thread.currentThread();  
  6.                 if (getState() == RUNNING)  
  7.                     callable.call(); // don't set result  
  8.                 runner = null;  
  9.                 return compareAndSetState(RUNNING, 0);  
  10.             } catch (Throwable ex) {  
  11.                 innerSetException(ex);  
  12.                 return false;  
  13.             }  
  14.         }  

 

分享到:
评论

相关推荐

    java+socket 及多线程线程池应用(IBM教程)

    在这个“java+socket 及多线程线程池应用”的教程中,我们可以期待学习到以下核心知识点: 1. **Socket基础**:首先会讲解Socket的基本概念,包括服务器端Socket和客户端Socket的工作原理,以及TCP/IP协议在Socket...

    JAVAJAVA多线程教学演示系统论文

    《JAVA多线程教学演示系统》是一篇深入探讨JAVA多线程编程的论文,它针对教育领域中的教学需求,提供了一种生动、直观的演示方式,帮助学生更好地理解和掌握多线程技术。这篇论文的核心内容可能包括以下几个方面: ...

    Java多线程编程实战指南-核心篇

    《Java多线程编程实战指南-...通过《Java多线程编程实战指南-核心篇》,你可以深入了解Java并发编程的各个方面,提升在高并发场景下的编程能力。书中的案例和练习将帮助你更好地掌握理论知识,并将其应用于实际项目中。

    java 多线程编程实战指南(核心 + 设计模式 完整版)

    《Java多线程编程实战指南》这本书深入浅出地讲解了Java多线程的核心概念和实战技巧,分为核心篇和设计模式篇,旨在帮助开发者掌握并应用多线程技术。 1. **线程基础** - **线程的创建**:Java提供了两种创建线程...

    Java、Android多线程、线程池Demo

    通过分析和理解这个示例,开发者可以更好地掌握Java和Android中多线程和线程池的使用,提升应用程序的性能和响应速度。在实际项目中,合理地配置和使用线程池能够有效优化程序的资源消耗,保证服务的稳定性和可扩展...

    Java线程池及观察者模式解决多线程意外死亡重启问题

    Java线程池是Java并发编程中的重要组成部分,它...总之,通过Java线程池和观察者模式的结合,我们可以构建一个健壮的多线程系统,即使在部分线程意外终止的情况下,也能及时发现并采取措施恢复,确保系统的稳定运行。

    java多线程进阶

    Java多线程是Java编程中的核心概念,尤其对于高级开发者来说,掌握多线程的深入理解和应用至关重要。这本书“java多线程进阶”显然旨在帮助读者深化这方面的理解,打通编程中的“任督二脉”,使开发者能够更加熟练地...

    java多线程并发实战和源码

    Java多线程并发实战与源码分析是Java开发中至关重要的一部分,它涉及到程序性能优化、系统资源高效利用以及复杂逻辑的正确同步。本书主要聚焦于Java多线程的基础理论和实际应用,虽然书中实例和源码相对较少,但仍然...

    java多线程源码,仅供参考

    这个名为"java多线程源码,仅供参考"的压缩包文件,显然是为了帮助学习者深入理解Java多线程的运行机制。其中的示例程序"BounceThread"可能是一个演示线程交互和同步的经典案例,常用于教授线程的创建、运行以及线程...

    JAVA多线程端口扫描器

    **JAVA多线程端口扫描器** 在计算机网络中,端口扫描是一种常见的技术,用于检测目标主机上开放的服务和应用程序。此项目是基于Java语言实现的多线程端口扫描器,它允许用户对本地系统或指定的远程IP地址进行快速...

    多线程(线程池)的相关研究资料

    在计算机科学领域,多线程和线程池是并发编程中的关键概念,它们极大地提高了程序的执行效率和系统资源的利用率。线程是操作系统分配CPU时间的基本单位,而线程池则是管理和调度线程的一种机制。 多线程是指在一个...

    Java实现的线程池、消息队列功能

    线程池是一种多线程处理形式,预先创建一定数量的线程,然后将任务分配给这些线程执行。这样可以避免频繁创建和销毁线程带来的开销,提高系统效率。在Java中,`java.util.concurrent`包下的`ExecutorService`、`...

    Java多线程设计模式(带源码)

    本资源提供了详细的Java多线程设计模式的解析,包括源码分析,帮助开发者深入理解并熟练应用这些模式。 在多线程环境中,设计模式是解决常见问题的最佳实践,它们可以帮助开发者创建高效、可维护的并发代码。以下是...

    java多线程作业.docx

    ### Java多线程知识点解析 #### 一、Java多线程概述 Java作为一种现代编程语言,内置了对多线程的支持。多线程允许应用程序同时处理多个任务,从而提高程序的响应性和整体性能。在多线程环境中,一个程序可以包含...

    CVI学习文件-多线程 线程池(修改增加学习版)

    5. **性能优化**:分析多线程和线程池对程序性能的影响,包括CPU利用率、内存消耗、响应时间等,根据实际需求调整线程池参数。 通过这个学习文件,你将能掌握如何在CVI项目中合理使用多线程和线程池,提升系统的...

    Java多线程聊天

    Java多线程聊天程序是一种利用Java编程语言实现的并发通信应用,它允许多个用户在同一时间进行交互式的对话。在这个程序中,多线程技术被用来处理并发用户输入和消息传递,确保系统的高效运行和响应性。下面将详细...

    java实现多线程文件传输

    在Java编程语言中,实现多线程文件传输是一种优化程序性能、提高系统资源...在提供的`java多线程文件传输`压缩包中,可能包含了实现这些概念的示例代码,通过分析和学习,可以更好地理解多线程文件传输的原理和实践。

    JAVA多线程教材

    10. **实战案例分析**:通过分析和解决实际问题,如Web服务器的并发处理、数据库连接池管理等,可以更好地理解和应用Java多线程技术。 以上只是《JAVA多线程教材》可能涵盖的部分内容,实际教材可能会更深入地讨论...

    java多线程实例 代码可执行 绝对开源

    Java多线程是Java编程中的核心概念,尤其在开发高性能、高并发的应用...对于想要深入理解Java多线程和网络编程的开发者来说,这是一个很好的实践案例。通过分析和运行提供的源代码,你可以更好地理解和掌握这些技术。

    java线程池的源码分析.zip

    Java线程池是Java并发编程中的重要组成部分,它在多线程和高并发场景下扮演着关键角色。本文将深入探讨Java线程池的源码分析,并对比不同类型的线程池,以帮助开发者更好地理解和利用这一强大的工具。 首先,我们要...

Global site tag (gtag.js) - Google Analytics