`

ThreadPoolExecutor

 
阅读更多

ThreadPoolExecutor

 

corePoolSize 是线程池的核心线程数,通常线程池会维持这个线程数

maximumPoolSize 是线程池所能维持的最大线程数

keepAliveTime 和 unit 则分别是超额线程的空闲存活时间数和时间单位

workQueue 是提交任务到线程池的入队列

threadFactory 是线程池创建新线程的线程构造器

handler 是当线程池不能接受提交任务的时候的处理策略

 

public ThreadPoolExecutor(int corePoolSize,                       

                          int maximumPoolSize,                    

                          long keepAliveTime,

                          TimeUnit unit,

                          BlockingQueue<Runnable> workQueue,

                          ThreadFactory threadFactory,

                          RejectedExecutionHandler handler)

 

 

1)当池子大小小于corePoolSize就新建线程,并处理请求

2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理

3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理

4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁

 

默认情况下,ThreadPoolExecutor的线程数是根据需求来延迟初始化的,即有新任务加进来的时候才会挨个创建线程。

除此之外,线程池执行器也提供了提前创建初始化线程的方法:

public boolean prestartCoreThread()

public int prestartAllCoreThreads()

 

2. 线程数目的维护

 

刚刚提到,ThreadPoolExecutor有corePoolSize和maximum两个变量来维护线程池中的线程个数,提交任务的时候会有线程数目的增长,那线程的个数又是怎么来维护的呢。构造方法里还有两个参数,分别是keepAlive和unit,这两个参数确定了一个时间间隔,也就是空闲线程存活的时间间隔。默认情况下,当线程池中的线程个数超出了corePoolSize,那么空闲的线程一旦超出额定的存活时间就会被终止,以节省系统资源。在JDK1.6之后,增加了allowsCoreThreadTimeOut这个boolean属性和读写属性的方法,用来标志核心线程空闲超时是否也可以终止掉。

 

3.  线程入队列和任务丢弃原则简述

 

除了前面描述涉及到的四个属性和ThreadFactory之外,还有两个分别是workQueue和handler,分别是BlockingQueue和RejectedExecutionHandler类型。

 

BlockingQueue只是一个接口,它所表达的是当队列为空或者已满的时候,需要阻塞以等待生产者/消费者协同操作并唤醒线程。其有很多不同的具体实现类,各有特点。有的可以规定队列的长度,也有一些则是无界的。

 

按照Executors类中的几个工厂方法,分别使用的是:

 

LinkedBlockingQueue。FixedThreadPool和SingleThreadExecutor使用的是这个BlockingQueue,队列长度是无界的,适合用于提交任务相互独立无依赖的场景。

SynchronousQueue。  CachedThreadPool使用的是这个BlockingQueue,通常要求线程池不设定最大的线程数,以保证提交的任务有机会执行而不被丢掉。通常这个适合任务间有依赖的场景。

当然,开发者也可以定制ThreadPoolExecutor时使用ArrayBlockingQueue有界队列。

 

对于任务丢弃,ThreadPoolExecutor以内部类的形式实现了4个策略。分别是:

 

CallerRunsPolicy。提交任务的线程自己负责执行这个任务。

AbortPolicy。使Executor抛出异常,通过异常做处理。(默认采用)

DiscardPolicy。丢弃提交的任务。

DiscardOldestPolicy。丢弃掉队列中最早加入的任务。

在调用构造方法时,参数中未指定RejectedExecutionHandler情况下,默认采用AbortPolicy。

 

 

  public static ExecutorService newFixedThreadPool(int nThreads) {

        return new ThreadPoolExecutor(nThreads, nThreads,

                                      0L, TimeUnit.MILLISECONDS,

                                      new LinkedBlockingQueue<Runnable>());

    }

 

  public static ExecutorService newSingleThreadExecutor() {

        return new FinalizableDelegatedExecutorService

            (new ThreadPoolExecutor(1, 1,

                                    0L, TimeUnit.MILLISECONDS,

                                    new LinkedBlockingQueue<Runnable>()));

    }

 

   public static ExecutorService newCachedThreadPool() {

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                      60L, TimeUnit.SECONDS,

                                      new SynchronousQueue<Runnable>());

    }

 

 

 

ThreadPoolExecutor在任务的提交、执行,线程重用和线程数维护等方面做下分析。

public class ThreadPoolExecutor extends AbstractExecutorService

从这个类声明中我们可以看到java.util.ThreadPoolExecutor是继承于AbstractExecutorService的,而之前的文章我也提到过,AbstractExecutorService已经实现了一些任务提交处理的方法,如submit()方法都是在这个抽象类中实现的。但submit()方法,最后也是会调用ThreadPoolExecutor的execute()方法。  public void execute(Runnable command) {}

mainLock 对整个ThreadPoolExecutor对象的锁

workers  存储工作线程对应Worker对象的HashSet

termination 线程池ThreadPoolExecutor对象的生命周期终止条件,和mainLock相关

largestPoolSize 线程池跑过的最大线程数

completedTaskCount 完成任务数

ctl 执行器ThreadPoolExecutor的生命周期状态和活动状态的worker数封装:ctl是一个AtomicInteger对象,以位运算的方式打包封装了当前线程池ThreadPoolExecutor对象的状态和活动线程数两个数据

private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet<Worker> workers = new HashSet<Worker>();

private final Condition termination = mainLock.newCondition();

private int largestPoolSize;

private long completedTaskCount;

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

 

 

1.    执行器状态

 

ThreadPoolExecutor对象有五种状态,如下:

ExecutorService中已经指定了这个接口对应的类要实现的方法,其中就包括shutdown()和shutdownNow()等方法。在ThreadPoolExecutor中指明了状态的含义,并包含其于ctl属性中。

    shutdown()-->SHUTDOWN shutdownNow()-->STOP   RUNNING/SHUTDOWN/STOP/TIDYING/TERMINATED

RUNNING 在ThreadPoolExecutor被实例化的时候就是这个状态

SHUTDOWN 通常是已经执行过shutdown()方法,不再接受新任务,等待线程池中和队列中任务完成

STOP 通常是已经执行过shutdownNow()方法,不接受新任务,队列中的任务也不再执行,并尝试终止线程池中的线程

TIDYING 线程池为空,就会到达这个状态,执行terminated()方法

TERMINATED terminated()执行完毕,就会到达这个状态,ThreadPoolExecutor终结

 

各状态之间可能的转变有以下几种:

RUNNING -> SHUTDOWN

调用了shutdown方法,线程池实现了finalize方法,在里面调用了shutdown方法,因此shutdown可能是在finalize中被隐式调用的

(RUNNING or SHUTDOWN) -> STOP

调用了shutdownNow方法

SHUTDOWN -> TIDYING

当队列和线程池均为空的时候

STOP -> TIDYING

当线程池为空的时候

TIDYING -> TERMINATED

terminated()钩子方法调用完毕

 

2.    Worker内部类-->Runnable

   它既实现了Runnable,同时也是一个AQS ( AbstractQueuedSynchronizer )

     private final class Worker

        extends AbstractQueuedSynchronizer

        implements Runnable

 

封装了3样东西,Runnable类的首个任务对象,执行的线程thread和完成的任务数(volatile)completedTasks。

  /** Thread this worker is running in.  Null if factory fails. */

        final Thread thread;//执行的线程thread

        /** Initial task to run.  Possibly null. */

        Runnable firstTask; //Runnable类的首个任务对象

        /** Per-thread task counter */

        volatile long completedTasks;//完成的任务数(volatile)completedTasks。

 

3. 提交任务

提交新任务的时候,如果没达到核心线程数corePoolSize,则开辟新线程执行。如果达到核心线程数corePoolSize, 而队列未满,则放入队列,否则开新线程处理任务,直到maximumPoolSize,超出则丢弃处理。

 

    public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        int c = ctl.get();

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

            if (! isRunning(recheck) && remove(command))

                reject(command);

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

        else if (!addWorker(command, false))

            reject(command);

    }

 

4. addWorker()的实现

在上面提交任务的时候,会出现开辟新的线程来执行,这会调用addWorker()方法。

 private boolean addWorker(Runnable firstTask, boolean core) {

        retry:

        for (;;) {//是双层无限循环,尝试增加线程数到ctl变量,并且做一些比较判断,如果超出线程数限定或者ThreadPoolExecutor的状态不符合要求,则直接返回false,增加worker失败。

            int c = ctl.get();

            int rs = runStateOf(c);

 

            // Check if queue empty only if necessary.

            if (rs >= SHUTDOWN &&

                ! (rs == SHUTDOWN &&

                   firstTask == null &&

                   ! workQueue.isEmpty()))

                return false;

 

            for (;;) {

                int wc = workerCountOf(c);

                if (wc >= CAPACITY ||

                    wc >= (core ? corePoolSize : maximumPoolSize))

                    return false;

                if (compareAndIncrementWorkerCount(c))

                    break retry;

                c = ctl.get();  // Re-read ctl

                if (runStateOf(c) != rs)

                    continue retry;

                // else CAS failed due to workerCount change; retry inner loop

            }

        }

 

 

 

//把firstTask这个Runnable对象传给Worker构造方法,赋值给Worker对象的task属性。Worker对象把自身(也是一个Runnable)封装成一个Thread对象赋予Worker对象的thread属性。锁住整个线程池并实际增加worker到workers的HashSet对象当中。成功增加后开始执行t.start(),就是worker的thread属性开始运行,实际上就是运行Worker对象的run方法。Worker的run()方法实际上调用了ThreadPoolExecutor的runWorker()方法。

        boolean workerStarted = false;

        boolean workerAdded = false;

        Worker w = null;

        try {

            final ReentrantLock mainLock = this.mainLock;

            w = new Worker(firstTask);

            final Thread t = w.thread;

            if (t != null) {

                mainLock.lock();

                try {

                    // Recheck while holding lock.

                    // Back out on ThreadFactory failure or if

                    // shut down before lock acquired.

                    int c = ctl.get();

                    int rs = runStateOf(c);

 

                    if (rs < SHUTDOWN ||

                        (rs == SHUTDOWN && firstTask == null)) {

                        if (t.isAlive()) // precheck that t is startable

                            throw new IllegalThreadStateException();

                        workers.add(w);

                        int s = workers.size();

                        if (s > largestPoolSize)

                            largestPoolSize = s;

                        workerAdded = true;

                    }

                } finally {

                    mainLock.unlock();

                }

                if (workerAdded) {

                    t.start();

                    workerStarted = true;

                }

            }

        } finally {

            if (! workerStarted)

                addWorkerFailed(w);

        }

        return workerStarted;

    }

 

5. 任务的执行runWorker()

  final void runWorker(Worker w) {

        Thread wt = Thread.currentThread();

        Runnable task = w.firstTask;

        w.firstTask = null;

        w.unlock(); // allow interrupts

        boolean completedAbruptly = true;

        try {

            while (task != null || (task = getTask()) != null) {

                w.lock();//需要对worker加锁

                // If pool is stopping, ensure thread is interrupted;

                // if not, ensure thread is not interrupted.  This

                // requires a recheck in second case to deal with

                // shutdownNow race while clearing interrupt

                if ((runStateAtLeast(ctl.get(), STOP) ||

                     (Thread.interrupted() &&

                      runStateAtLeast(ctl.get(), STOP))) &&

                    !wt.isInterrupted())

                    wt.interrupt();

                try {

                    beforeExecute(wt, task);

                    Throwable thrown = null;

                    try {

                        task.run();

                    } catch (RuntimeException x) {

                        thrown = x; throw x;

                    } catch (Error x) {

                        thrown = x; throw x;

                    } catch (Throwable x) {

                        thrown = x; throw new Error(x);

                    } finally {

                        afterExecute(task, thrown);

                    }

                } finally {

                    task = null;

                    w.completedTasks++;

                    w.unlock();//完成一个任务后执行unlock()

                }

            }

            completedAbruptly = false;

        } finally {

            processWorkerExit(w, completedAbruptly);

        }

    }

 

线程开始执行前,需要对worker加锁,完成一个任务后执行unlock()

在任务执行前后,执行beforeExecute()和afterExecute()方法

记录任务执行中的异常后,继续抛出

每个任务完成后,会记录当前线程完成的任务数

当worker执行完一个任务的时候,包括初始任务firstTask,会调用getTask()继续获取任务,这个方法调用是可以阻塞的

线程退出,执行processWorkerExit(w, completedAbruptly)处理

 

6. Worker线程的复用和任务的获取getTask()

 

    private Runnable getTask() {

        boolean timedOut = false; // Did the last poll() time out?

 

        retry:

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);

 

            // Check if queue empty only if necessary.

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

                decrementWorkerCount();

                return null;

            }

 

            boolean timed;      // Are workers subject to culling?

 

            for (;;) {

                int wc = workerCountOf(c);

                timed = allowCoreThreadTimeOut || wc > corePoolSize;

 

                if (wc <= maximumPoolSize && ! (timedOut && timed))

                    break;

                if (compareAndDecrementWorkerCount(c))

                    return null;

                c = ctl.get();  // Re-read ctl

                if (runStateOf(c) != rs)

                    continue retry;

                // else CAS failed due to workerCount change; retry inner loop

            }

 

            try {

                Runnable r = timed ?

                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                    workQueue.take();

                if (r != null)

                    return r;

                timedOut = true;

            } catch (InterruptedException retry) {

                timedOut = false;

            }

        }

    }

getTask()实际上是从工作队列(workQueue)中取提交进来的任务。这个workQueue是一个BlockingQueue,通常当队列中没有新任务的时候,则getTask()会阻塞。另外,还有定时阻塞这样一段逻辑:如果从队列中取任务是计时的,则用poll()方法,并设置等待时间为keepAlive,否则调用阻塞方法take()。当poll()超时,则获取到的任务为null,timeOut设置为 true。这段代码也是放在一个for(;;)循环中,前面有判断超时的语句,如果超时,则return null。这意味着runWorker()方法的while循环结束,线程将退出,执行processWorkerExit()方法。

 

回头看看是否计时是如何确定的。

int wc = workerCountOf(c);

timed = allowCoreThreadTimeOut || wc &gt; corePoolSize;

即判断当前线程池的线程数是否超出corePoolSize,如果超出这个值并且空闲时间多于keepAlive则当前线程退出。

另外一种情况就是allowCoreThreadTimeOut为true,就是允许核心在空闲超时的情况下停掉。

 

7. 线程池线程数的维护和线程的退出处理

 

   private void processWorkerExit(Worker w, boolean completedAbruptly) {

        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

            decrementWorkerCount();

 

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            completedTaskCount += w.completedTasks;

            workers.remove(w);

        } finally {

            mainLock.unlock();

        }

 

        tryTerminate();

 

        int c = ctl.get();

        if (runStateLessThan(c, STOP)) {

            if (!completedAbruptly) {

                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

                if (min == 0 && ! workQueue.isEmpty())

                    min = 1;

                if (workerCountOf(c) >= min)

                    return; // replacement not needed

            }

            addWorker(null, false);

        }

    }

 

这个方法的第二个参数是判断是否在runWorker()中正常退出了循环向下执行,如果不是,说明在执行任务的过程中出现了异常,completedAbruptly为true,线程直接退出,需要直接对活动线程数减1。之后,加锁统计完成的任务数,并从workers这个集合中移除当前worker。

 

执行tryTerminate(),这个方法后面会详细说,主要就是尝试将线程池推向TERMINATED状态。

 

最后比较当前线程数是不是已经低于应有的线程数,如果这个情况发生,则添加无任务的空Worker到线程池中待命。

 

以上,增加新的线程和剔除多余的线程的过程大概就是如此,这样线程池能保持额定的线程数,并弹性伸缩,保证系统的资源不至于过度消耗。

 

 

 

 

ExecutorService中,和生命周期相关的,声明了5个方法:

 

awaitTermination() 阻塞等待shutdown请求后所有线程终止,会有时间参数,超时和中断也会令方法调用结束

isShutdown()  通过ctl属性判断当前的状态是否不是RUNNING状态

isTerminated()  通过ctl属性判断当前的状态是否为TERMINATED状态

shutdown() 关闭Executor,不再接受提交任务

shutdownNow() 关闭Executor,不再接受提交任务,并且不再执行入队列中的任务

 

1. ThreadPoolExecutor的shutdown() //按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。如果已经关闭,则调用没有其他作用。

  public void shutdown() {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            checkShutdownAccess();

            advanceRunState(SHUTDOWN);

            interruptIdleWorkers();

            onShutdown(); // hook for ScheduledThreadPoolExecutor

        } finally {

            mainLock.unlock();

        }

        tryTerminate();

    }

尝试将状态切换到SHUTDOWN,这样就不会再接收新的任务提交。对空闲线程进行中断调用。最后检查线程池线程是否为0,并尝试切换到TERMINATED状态。

 

 

2. ThreadPoolExecutor的shutdownNow() //尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。在从此方法返回的任务队列中排空(移除)这些任务。 

//并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。 此实现通过 Thread.interrupt() 取消任务,所以无法响应中断的任何任务可能永远无法终止。

  public List<Runnable> shutdownNow() {

        List<Runnable> tasks;

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            checkShutdownAccess();

            advanceRunState(STOP);

            interruptWorkers();

            tasks = drainQueue();

        } finally {

            mainLock.unlock();

        }

        tryTerminate();

        return tasks;

    }

主要所做的事情就是切换ThreadPoolExecutor到STOP状态,中断所有worker,并将任务队列中的任务取出来,不再执行。最后尝试修改状态到TERMINATED。

 

3. shutdown()和shutdownNow()的区别

 

shutdown()新的任务不会再被提交到线程池,但之前的都会依旧执行,通过中断方式停止空闲的(根据没有获取锁来确定)线程。

 

shutdownNow()则向所有正在执行的线程发出中断信号以尝试终止线程,并将工作队列中的任务以列表方式的结果返回。

 

两者区别:

 

是一个要将线程池推到SHUTDOWN状态,一个将推到STOP状态

并且对运行的线程处理方式不同,shutdown()只中断空闲线程,而shutdownNow()会尝试中断所有活动线程

还有就是对队列中的任务处理,shutdown()队列中已有任务会继续执行,而shutdownNow()会直接取出不被执行

相同的是都在最后尝试将线程池推到TERMINATED状态

 

4. ThreadPoolExecutor的awaitTermination()//请求关闭、发生超时或者当前线程中断,无论哪一个首先发生之后,都将导致阻塞,直到所有任务完成执行。 

                                         //阻塞等待shutdown请求后所有线程终止,会有时间参数,超时和中断也会令方法调用结束。

public boolean awaitTermination(long timeout, TimeUnit unit)

        throws InterruptedException {

        long nanos = unit.toNanos(timeout);

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            for (;;) {

                if (runStateAtLeast(ctl.get(), TERMINATED))

                    return true;

                if (nanos <= 0)

                    return false;

                nanos = termination.awaitNanos(nanos);

            }

        } finally {

            mainLock.unlock();

        }

    }

实际所做的就是Condition的定时await调用。用于状态依赖的线程阻塞。

 

5. tryTerminate()//tryTerminate()的意义就在于尝试进入终止状态,当ctl中worker数字为0时执行terminated()方法,否则等锁中断一个空闲的Worker。

    final void tryTerminate() {

        for (;;) {

            int c = ctl.get();

            if (isRunning(c) ||

                runStateAtLeast(c, TIDYING) ||

                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

                return;

            if (workerCountOf(c) != 0) { // Eligible to terminate

                interruptIdleWorkers(ONLY_ONE);

                return;

            }

 

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {

                    try {

                        terminated();

                    } finally {

                        ctl.set(ctlOf(TERMINATED, 0));

                        termination.signalAll();

                    }

                    return;

                }

            } finally {

                mainLock.unlock();

            }

            // else retry on failed CAS

        }

    }

 

其中interruptIdleWorkers()方法这里就不列代码了,空闲的worker主要是通过worker的tryLock()来确认的,因为执行任务的worker互斥地锁定对象。

中断worker导致线程退出,最终还会循环尝试终止其它的空闲线程,直到整个ThreadPoolExecutor最后终结。

 

6. ThreadPoolExecutor生命周期的扩展点

 

在生命周期上,ThreadPoolExecutor为扩展的类提供了一些扩展点,这是很好的设计,对扩展开放。

 

其中声明了如下protected的方法:

 

beforeExecute() 在每个任务执行前做的处理

afterExecute() 在每个任务执行后做的处理

terminated() 在ThreadPoolExecutor到达TERMINATED状态前所做的处理

finalize() 有默认实现,直接调用shutdown(),以保证线程池对象回收

onShutdown() 在shutdown()方法执行到最后时调用,在java.util.concurrent.ScheduledThreadPoolExecutor类实现中用到了这个扩展点,做一些任务队列的清理操作。

 

任务饱和丢弃策略

java.util.concurrent包中的RejectedExecutionHandler这个接口和对应的实现类。

 

当ThreadPoolExecutor执行任务的时候,如果线程池的线程已经饱和,并且任务队列也已满。那么就会做丢弃处理,这也是execute()方法实现中的操作,源码如下:

else if (!addWorker(command, false))

        reject(command);

这个reject()方法很简单,直接调用丢弃处理的handler方法的rejectedExecution()

 

 RejectedExecutionHandler接口

 public interface RejectedExecutionHandler {

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

}

其中只有rejectedExecution()一个方法。返回为void,而参数一个是具体的Runnable任务,另一个则是被提交任务的ThreadPoolExecutor。

 

凡是实现了这个方法的类都可以作为丢弃处理器在ThreadPoolExecutor对象构造的时候作为参数传入,这个前面的文章已经提到过了。其中ThreadPoolExecutor给出了4种基本策略的实现。分别是:

AbortPolicy //用于被拒绝任务的处理程序,它将抛出 RejectedExecutionException.

CallerRunsPolicy  //用于被拒绝任务的处理程序,它直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务。

DiscardPolicy   //用于被拒绝任务的处理程序,默认情况下它将丢弃被拒绝的任务。

DiscardOldestPolicy //用于被拒绝任务的处理程序,它放弃最旧的未处理请求,然后重试 execute;如果执行程序已关闭,则会丢弃该任务。

 

1. 直接丢弃-DiscardPolicy

   public static class DiscardPolicy implements RejectedExecutionHandler {

        public DiscardPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

        }

    }

这个也是实现最简单的类,其中的rejectedExecution()方法是空实现,即什么也不做,那么提交的任务将会被丢弃,而不做任何处理。

这个策略使用的时候要小心,要明确需求。不然不知不觉的任务就丢了

 

2. 丢弃最老--DiscardOldestPolicy

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {

        public DiscardOldestPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                e.getQueue().poll();

                e.execute(r);

            }

        }

    }

注意,会先判断ThreadPoolExecutor对象是否已经进入SHUTDOWN以后的状态。之后取出队列头的任务并不做任何处理,即丢弃,再重新调用execute()方法提交新任务

 

3. 废弃终止--AbortPolicy

    这个RejectedExecutionHandler类和直接丢弃不同的是,不是默默地处理,而是抛出java.util.concurrent.RejectedExecutionException异常,这个异常是RuntimeException的子类。

    public static class AbortPolicy implements RejectedExecutionHandler {

        public AbortPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            throw new RejectedExecutionException("Task " + r.toString() +

                                                 " rejected from " +

                                                 e.toString());

        }

    }

注意,处理这个异常的线程是执行execute()的调用者线程。

 

4. 调用者执行策略--CallerRunsPolicy

在这个策略实现中,任务还是会被执行,但线程池中不会开辟新线程,而是提交任务的线程来负责维护任务

public static class CallerRunsPolicy implements RejectedExecutionHandler {

        public CallerRunsPolicy() { }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {

            if (!e.isShutdown()) {

                r.run();

            }

        }

    }

注意,和DiscardOldestPolicy同样,也会先判断ThreadPoolExecutor对象的状态,之后执行任务。这样处理的一个好处,是让caller线程运行任务,以推迟该线程进一步提交新任务,有效的缓解了线程池对象饱和的情况。

 

 

from:http://www.molotang.com/articles/553.html

分享到:
评论

相关推荐

    java 线程池例子ThreadPoolExecutor

    Java 线程池例子 ThreadPoolExecutor Java 中的线程池是指一个容器,里面包含了多个线程,这些线程可以重复使用,以避免频繁创建和销毁线程的开销。ThreadPoolExecutor 是 Java 中一个非常重要的线程池实现类,它...

    ThreadPoolExecutor源码解析.pdf

    《ThreadPoolExecutor源码解析》 ThreadPoolExecutor是Java并发编程中重要的组件,它是ExecutorService接口的实现,用于管理和调度线程的执行。理解其源码有助于我们更好地控制并发环境下的任务执行,提高系统的...

    线程池之ThreadPoolExecutor.docx

    线程池是多线程编程中一种高效管理线程资源的方式,主要由Java的`ThreadPoolExecutor`类实现。线程池的工作机制在于控制线程数量,它会将任务放入队列,然后根据线程池的设定创建并启动线程执行这些任务。如果线程...

    ThreadPoolExecutor的使用和Android常见的4种线程池使用介绍

    ThreadPoolExecutor的使用和Android常见的4种线程池使用介绍

    JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用

    "JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用" JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用是Java多线程编程中的一种重要概念。随着多线程编程的普及,线程池的使用变得...

    线程池:java_ThreadPoolExecutor.mht

    (转)线程池:java_util_ThreadPoolExecutor 比较详细的介绍了ThreadPoolExecutor用法与属性

    ThreadPoolExecutor源码解析.md

    ThreadPoolExecutor源码解析.md

    PyQt5中多线程模块QThread和线程池ThreadPoolExecutor解决PyQt5界面程序执行比较耗时操作无响应问题

    1.资源简介:PyQt5中使用多线程模块QThread解决了PyQt5界面程序执行比较耗时操作时,程序卡顿出现的无响应以及界面输出无法实时显示的问题,采用线程池ThreadPoolExecutor解决了ping多个IP多任务耗时问题。...

    ThreadPoolExecutor使用和思考

    ThreadPoolExecutor使用和思考

    JDK1[1].5中的线程池(ThreadPoolExecutor)使用简介

    JDK1[1].5中的线程池(ThreadPoolExecutor)使用简介

    线程池原理-ThreadPoolExecutor源码解析

    线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor

    ThreadPoolExecutor运转机制介绍

    ### ThreadPoolExecutor 运转机制详解 #### 一、ThreadPoolExecutor 的基本概念与构造函数解析 在Java并发编程中,`ThreadPoolExecutor` 是一种强大的工具,它可以帮助开发者有效地管理和执行线程。`...

    12、线程池ThreadPoolExecutor实战及其原理分析(下)

    线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor实战及其原理分析(下)线程池ThreadPoolExecutor...

    Java并发编程之ThreadPoolExecutor详解与实战

    内容概要:本篇文章深入探讨了Java中多线程管理的一个关键组件——ThreadPoolExecutor的工作机制及其优化策略。主要涵盖ThreadPoolExecutor的基础概念介绍,创建配置参数的意义与选择方法,以及在实际编程中的几种...

    redis lits queue 和 ThreadPoolExecutor 结合

    在IT行业中,Redis和Java的ThreadPoolExecutor是两个非常重要的工具,它们在处理高并发和任务调度方面发挥着关键作用。Redis是一种高效的键值存储系统,常用于缓存、消息队列等场景。而ThreadPoolExecutor是Java并发...

    java线程池ThreadPoolExecutor类使用详解.docx

    在《阿里巴巴java开发手册》中...另外由于前面几种方法内部也是通过ThreadPoolExecutor方式实现,使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。

    JDK之ThreadPoolExecutor源码分析1

    《JDK之ThreadPoolExecutor源码分析1》 在Java编程中,线程池是一种高效的管理线程的方式,它通过复用已存在的线程来避免频繁创建和销毁线程带来的开销。ThreadPoolExecutor作为Java中的线程池实现,其内部机制相当...

    Java线程池与ThreadPoolExecutor.pdf

    线程池通过ThreadPoolExecutor类实现,这是一个高度可配置的工具,能够根据具体需求定制线程的创建、管理和销毁策略。 ThreadPoolExecutor的核心参数包括: 1. corePoolSize:核心线程数,这是线程池在非繁忙状态下...

    说说你对ThreadPoolExecutor的理解.docx

    ThreadPoolExecutor是Java并发编程中非常重要的一个组件,它位于`java.util.concurrent`包下,用于管理线程资源,实现线程池服务。线程池通过有效地控制并发执行的任务数量,可以提高系统的性能和稳定性。 ...

Global site tag (gtag.js) - Google Analytics