1.ThreadPoolExecutor代码示例
package com.landon.mavs.example.concurrent;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.landon.mavs.example.util.MavsCachedThreadPoolExecutor;
import com.landon.mavs.example.util.MavsFixedThreadPoolExecutor;
import com.landon.mavs.example.util.MavsRejectedExecutionPolicy;
import com.landon.mavs.example.util.MavsThreadFactory;
import com.landon.mavs.example.util.MavsThreadPoolStateMonitor;
/**
*
* {@link java.util.concurrent.ThreadPoolExecutor}示例
*
*
*
* @author landon
*
*/
public class ThreadPoolExecutorExample {
private static final Logger LOGGER = LoggerFactory
.getLogger(ThreadPoolExecutorExample.class);
public static void main(String[] args) throws Exception {
// 固定2个线程的线程池
MavsFixedThreadPoolExecutor fixedThreadPoolExecutor1 = new MavsFixedThreadPoolExecutor(
2, new MavsThreadFactory("Example", "FixedThreadPool-1"),
new MavsRejectedExecutionPolicy());
// 从线程池的状态监视器来看:此时poolSize=1/workQueueSize=0,即启动了一个线程,工作队列没有任务
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
// 从线程池的状态监视器来看:此时poolSize=2/workQueueSize=0,即又启动了一个线程
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
// 由提交了3个任务,从输出来看:poolSize一直为2.而workQueueSize最多为3->随着任务的执行,workQueueSize变为0
// 所以MavsFixedThreadPoolExecutor这个线程池会保持固定线程数量
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
// 执行shutdown
// 另外从输出看:发现线程池终止的时候调用了terminate方法
fixedThreadPoolExecutor1.shutdown();
Thread.sleep(1 * 1000);
// 测试shutdown后,还可以执行任务吗?
// 答案当然是NO.因为新建worker线程的条件包括插入队列都必须是在RUNNING状态下.
// 而执行了shutdown后则更改了运行状态为SHUTDOWN
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
LOGGER.debug("");
// cached线程池
MavsCachedThreadPoolExecutor cachedThreadPoolExecutor1 = new MavsCachedThreadPoolExecutor(
new MavsThreadFactory("Example", "CachedThreadPool-1"),
new MavsRejectedExecutionPolicy());
// 从输出可以看到,线程池最多启动了5个线程,workQueueSize一直为0
cachedThreadPoolExecutor1.execute(new ThreadPoolTask());
cachedThreadPoolExecutor1.execute(new ThreadPoolTask());
cachedThreadPoolExecutor1.execute(new ThreadPoolTask());
cachedThreadPoolExecutor1.execute(new ThreadPoolTask());
cachedThreadPoolExecutor1.execute(new ThreadPoolTask());
// 暂停2分钟,使得默认空闲1分钟的worker线程退出
Thread.sleep(2 * 60 * 1000);
LOGGER.debug("");
// 从输出可以看到:poolSize=0,即空闲的worker线程被回收了.
// 另外所有的worker线程被回收了,线程池就结束了.
// 因为:ThreadPoolExecutor#void workerDone(Worker w)->
// if (--poolSize ==0)tryTerminate()
// 但是这种线程自然结束的话,并没有调用覆写的terminate方法.因为tryTerminate的实现中是判断当前线程池状态是STOP/SHUTDOWN的时候才执行terminated方法的
LOGGER.debug("cachedThreadPoolExecutor1.state:{}",
MavsThreadPoolStateMonitor.monitor(cachedThreadPoolExecutor1));
// 单线程线程池,注意这个和{@link
// Executors#newSingleThreadExecutor的区别},后者仅是返回的暴露的ExecutorService接口
MavsFixedThreadPoolExecutor singleExecutor = new MavsFixedThreadPoolExecutor(
1, new MavsThreadFactory("Example", "SingleThreadPool-1"),
new MavsRejectedExecutionPolicy());
// 提交一个可抛出异常的任务
// 从输出看出
// 1:执行了afterExecute方法且其中的Throwable t为不null.此执行任务的时候抛出了异常.
// 2.线程因为异常终止,因指定了线程默认的UncaughtExceptionHandler,所以执行了uncaughtException方法.
singleExecutor.execute(new ThreadPoolExceptionTask());
Thread.sleep(1 * 1000);
// 从输出可以看到:poolSize=0变为了0.即线程终止了.
// 因为Worker线程的run方法只是try/finally,即并没有捕获异常.而runTask向上抛出异常至run,直接到finally.->workerDone->poolSize--
// ->tryTerminate
LOGGER.debug("singleExecutor.state:{}",
MavsThreadPoolStateMonitor.monitor(singleExecutor));
Thread.sleep(1 * 1000);
// 测试线程池异常终止后,还可以执行任务吗?
// 答案是YES.因为此时的线程池状态依然是RUNNING.
singleExecutor.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
// 从输出发现:poolSize=1,即新增了一个worker线程.另外从线程的名字Mavs-Example-SingleThreadPool-1-2也可看得出.
LOGGER.debug("singleExecutor.state:{}",
MavsThreadPoolStateMonitor.monitor(singleExecutor));
// 这里是提交了一个任务,内部会被封装成->RunnableFuture->FutureTask
// 而其内部run->Sync#innerRun->其内部会被try/catch的->所以理论上结果应该线程应该不会异常终止.
// 从输出看:1.afterExecute方法中的异常参数为null.
// 2.没用调用默认的UncaughtExceptionHandler.也就是说线程正常运行.
singleExecutor.submit(new ThreadPoolExceptionTask());
singleExecutor.shutdown();
// 测试setCoreSize以及setMaximumSize
// 3个固定线程数目的线程池
MavsFixedThreadPoolExecutor fixedThreadPoolExecutor2 = new MavsFixedThreadPoolExecutor(
3, new MavsThreadFactory("Example", "FixedThreadPool-2"),
new MavsRejectedExecutionPolicy());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
// 设置核心线程大小为6.
fixedThreadPoolExecutor2.setCorePoolSize(6);
Thread.sleep(1 * 1000);
// 从输出看:poolSize=6
LOGGER.debug("fixedThreadPoolExecutor2.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor2));
// 设置核心线程大小为2
fixedThreadPoolExecutor2.setCorePoolSize(2);
Thread.sleep(1 * 1000);
// 从输出看.poolSize=6
// 因为 workQueue.remainingCapacity()此时不为0,即不会中断多余的空闲线程.
// 另外此时所有的worker线程正在处在等待状态.
LOGGER.debug("fixedThreadPoolExecutor2.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor2));
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
// 从输出可以看到:此时poolSize=5.因为某个等待线程获得执行机会后再次getTask后->会执行pool(keepAliveTime),则直接回收退出.
LOGGER.debug("fixedThreadPoolExecutor2.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor2));
// 继续执行3个任务
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
// 从输出可以发现:此时poolSize=2,因为多余的线程在执行完任务下次getTask判断的时候直接就被回收了.
// 另外:此时maximumSize是3.coreSize为2.也就是说此时的线程池已经不再是固定数量线程的线程池了.
LOGGER.debug("fixedThreadPoolExecutor2.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor2));
fixedThreadPoolExecutor2.shutdown();
// 测试setMaximumPoolSize
// 2个固定线程数目的线程池
MavsFixedThreadPoolExecutor fixedThreadPoolExecutor3 = new MavsFixedThreadPoolExecutor(
2, new MavsThreadFactory("Example", "FixedThreadPool-3"),
new MavsRejectedExecutionPolicy());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
// 设置最大线程池大小为4
fixedThreadPoolExecutor3.setMaximumPoolSize(4);
// 提交一系列任务
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
// 从输出看:maximumPoolSize=4/poolSize=2
// 即只是修改了maximumPoolSize的值/poolSize仍然为2.因为用的是无限阻塞队列,所以多余的任务都被放到了队列.
LOGGER.debug("fixedThreadPoolExecutor3.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor3));
try {
// 这里抛出了一个异常,因为1比coreSize 2还要小
fixedThreadPoolExecutor3.setMaximumPoolSize(1);
} catch (Exception e) {
LOGGER.warn("fixedThreadPoolExecutor3.setMaximumPoolSize.err.", e);
}
fixedThreadPoolExecutor3.shutdown();
// 自定义线程池1
// 工作队列为容量3的阻塞队列
// 等待空闲时间为60s
ThreadPoolExecutor userDefinedExecutor1 = new ThreadPoolExecutor(2, 4,
10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
new MavsThreadFactory("Example", "User-Define-Executor-1"),
new MavsRejectedExecutionPolicy());
// 直接提交很多任务
// 这个测试的目的在于测试拒绝策略.从输出可以看到:
// poolSize=4/workQueueSize=3这个时候,即已经达到了最大线程数目和队列上限,则执行了拒绝策略.
for (int i = 0; i < 20; i++) {
userDefinedExecutor1.execute(new ThreadPoolTask());
}
Thread.sleep(5 * 1000);
LOGGER.debug("userDefinedExecutor1.state:{}",
MavsThreadPoolStateMonitor.monitor(userDefinedExecutor1));
// 将线程池最大池数目调整为3.此时的poolSize为4.
userDefinedExecutor1.setMaximumPoolSize(3);
Thread.sleep(1 * 1000);
// 从输出看:poolSize还是为4.因为此时所有的worker线程都在poll(timeout)->然后setMaximumPoolSize->会中断一个空闲线程->但是getTask这里
// 被try/catch了.
// 不过多余的线程在空闲的时候都会被回收.
LOGGER.debug("userDefinedExecutor1.state:{}",
MavsThreadPoolStateMonitor.monitor(userDefinedExecutor1));
Thread.sleep(5 * 1000);
LOGGER.debug("userDefinedExecutor1.state:{}",
MavsThreadPoolStateMonitor.monitor(userDefinedExecutor1));
userDefinedExecutor1.shutdown();
// 测试prestartCoreThread()/prestartAllCoreThreads
MavsFixedThreadPoolExecutor fixedThreadPoolExecutor4 = new MavsFixedThreadPoolExecutor(
3, new MavsThreadFactory("Example", "FixedThreadPool-4"),
new MavsRejectedExecutionPolicy());
LOGGER.debug("fixedThreadPoolExecutor4.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor4));
// 启动一个核心线程
fixedThreadPoolExecutor4.prestartCoreThread();
// 从输出可以看出:poolSize为1,即启动了一个worker.
LOGGER.debug("fixedThreadPoolExecutor4.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor4));
// 启动所有核心线程
// 从输出可以看出:poolSize为3,即现在启动了所有的核心线程
fixedThreadPoolExecutor4.prestartAllCoreThreads();
LOGGER.debug("fixedThreadPoolExecutor4.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor4));
}
/**
*
* 用于测试的线程池任务
*
* @author landon
*
*/
private static class ThreadPoolTask implements Runnable {
private static final AtomicInteger COUNTER = new AtomicInteger(1);
private int id;
public ThreadPoolTask() {
id = COUNTER.getAndIncrement();
}
@Override
public void run() {
LOGGER.debug(this + " begin");
try {
TimeUnit.MICROSECONDS.sleep(100);
} catch (InterruptedException e) {
LOGGER.warn(this + " was interrupted", e);
}
LOGGER.debug(this + " end");
}
@Override
public String toString() {
return "ThreadPoolTask [id=" + id + "]" + "["
+ Thread.currentThread().getName() + "]";
}
}
/**
*
* 用于测试的线程池异常任务
*
* @author landon
*
*/
private static class ThreadPoolExceptionTask implements Runnable {
private static final AtomicInteger COUNTER = new AtomicInteger(1);
private int id;
public ThreadPoolExceptionTask() {
id = COUNTER.getAndIncrement();
}
@Override
public void run() {
LOGGER.debug(this + " begin");
throw new RuntimeException("ThreadPoolExceptionTask.Exception.");
}
@Override
public String toString() {
return "ThreadPoolExceptionTask [id=" + id + "]" + "["
+ Thread.currentThread().getName() + "]";
}
}
}
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.landon.mavs.example.util.MavsCachedThreadPoolExecutor;
import com.landon.mavs.example.util.MavsFixedThreadPoolExecutor;
import com.landon.mavs.example.util.MavsRejectedExecutionPolicy;
import com.landon.mavs.example.util.MavsThreadFactory;
import com.landon.mavs.example.util.MavsThreadPoolStateMonitor;
/**
*
* {@link java.util.concurrent.ThreadPoolExecutor}示例
*
*
* 1.public class ThreadPoolExecutor extends AbstractExecutorService * * 2.AbstractExecutorService内部的提交任务方法系列最终均调用了execute方法执行任务.{@link java.util.concurrent.RunnableFuture} * * 3.Executors: * // 1.corePoolSize和maximumPoolSize一致. * // 2.keepAliveTime传入0L,即在队列无元素时则直接不等待直接返回null(因线程池大小不会超过corePoolSize_无界阻塞队列且【1描述】). * 因该参数是在getTask方法当(poolSize > corePoolSize || allowCoreThreadTimeOut)时调用,而第一个条件已经是false了,即当队列空的时候 * 则一直会阻塞(take).所以只有在线程池设置了allowCoreThreadTimeOut参数时才会进行调用.而在allowCoreThreadTimeOut(boolean value)方法 * 的实现中,如果(value && keepAliveTime <= 0)则抛出异常.即allowCoreThreadTimeOut(true)和keepAliveTime<=0这两个参数不能同时存在. * 所以在FixedThreadPool实现中keepAliveTime参数无效(即永远不会回收Worker线程). * // 3.workQueue为LinkedBlockingQueue(未指定capacity),即无界阻塞队列.则线程池大小>=corePoolSize时则将任务插入队列. * // 4.总结:FixedThreadPool正如其名字一样,线程池中的线程数目是Fixed,固定的,Worker线程不会被回收且队列无任务时则一直阻塞. * public static ExecutorService newFixedThreadPool(int nThreads) { * return new ThreadPoolExecutor(nThreads, nThreads, * 0L, TimeUnit.MILLISECONDS, * new LinkedBlockingQueue()); * } * * // 单线程+无界阻塞消息队列的经典模型.无线程安全问题. * // 注意其返回的是封装的FinalizableDelegatedExecutorService并实现了finalize方法,而finalize方法则调用了线程池的shutdown方法. * // 同时要主要强转的问题.其实际类型不是ThreadPoolExecutor * // 而FinalizableDelegatedExecutorService继承了DelegatedExecutorService(委托 /代理),其只是包装了,仅暴露了ExecutorService的实现方法. * // 个人认为因为其就是是单线程的,所以完全没有必要暴露ThreadPoolExecutor的所有访问方法,暴露了反而可能因为不必要的麻烦. * // 但是之所以再封装一层finalize就不知为何了.(GC回收之前的调用?有啥必要呢?如果我们没有对它调用shutdown(),那么可以确保它在被回收时调用shutdown()来终止线程) * // 所以只能用安全网来解释这个设计了 * // landon:终于明白了.1.其主要原因只是要暴露ExecutorService的方法,不要暴露ThreadPoolExecutor的所有访问方法 * 2.加上finalize的原因在于ThreadPoolExecutor本身有finalize方法,且实现为shutdown.而DelegatedExecutorService本身是没有的. * 所以额外加了在FinalizableDelegatedExecutorService加上了finalize.与ThreadPoolExecutor的finalize保持一致. * public static ExecutorService newSingleThreadExecutor() { * return new FinalizableDelegatedExecutorService * (new ThreadPoolExecutor(1, 1, * 0L, TimeUnit.MILLISECONDS, * new LinkedBlockingQueue())); * } * * // 1.corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE.则执行任务的时候会直接向workQueue.offer任务. * // 2.workQueue为SynchronousQueue,即同步阻塞队列(非公平),即offer的是恰好有线程poll才可以成功.第一次执行任务的时候,offer肯定fail.所以 * ->addIfUnderMaximumPoolSize->即添加一个worker线程.(常规情况下会一直UnderMaximum.因为Integer.MAX_VALUE) * // 3.keepAliveTime为60秒.而poolSize一定大于corePoolSize(为0)->workQueue.poll(keepAliveTime)->即从工作队列poll.所以说 * 如果在60秒内有任务offer则worker线程getTask成功则执行任务;否则返回null,又因为是同步阻塞队列所有在判断workerCanExit的时候(isEmpty永远为true), * 所以worker线程会退出被回收. * // 总结:1.如果线程池繁忙的情况下,每个线程都在执行任务的时候,新任务会新建新的worker线程去执行任务. * 2.假如在提交新任务的时候恰好有线程正在空闲getTask(60s超时内)则会委托空闲线程去做. * 3.如果线程池不繁忙,偶尔来一个任务.则第一个任务会创建一个workder线程,此时执行完毕如果1分钟内还没有任务则该线程会被自动回收.即该线程池最小的线程数目其实是0. * public static ExecutorService newCachedThreadPool() { * return new ThreadPoolExecutor(0, Integer.MAX_VALUE, * 60L, TimeUnit.SECONDS, * new SynchronousQueue()); * } * * 4.RejectPolicy * 1.ThreadPoolExecutor内部预定义了4中拒绝的处理程序策略 * 2.Reject的执行时机: * 1.当提交一个任务t的时候,线程池中数目超出了coreSize->提交至workQueue.此时检查线程池状态不在是running或线程池中突然没有了线程(有可能是额外线程调用了线程池 * 的shutdown/shutdownNow)->ensureQueuedTaskHandled->即如果发现此时状态不是running且可以从workQueue将t移除,则执行拒绝策略.即shutdown的时候 * 新的任务会被拒绝.->后续如果发现是调用了shutdown(线程池状态是SHUTDOWN)->且workQueue不为空且poolSize添加worker线程->即保证 * 队列任务执行完毕.{@link #shutdown}方法会中断空闲的线程(超出coreSize的线程) {@link getTask} * 2.当线程池线程数目超出maximumPoolSize的时候则执行拒绝策略. * 3. * 1.AbortPolicy:终止策略 * // 其是线程池的默认拒绝策略defaultHandler.->execute方法的调用线程则直接抛出异常 * public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { * throw new RejectedExecutionException(); * } * * 2.CallerRunsPolicy:调用者运行策略 * // 直接在execute方法的调用线程运行被拒绝的任务.如果线程池已关闭则丢弃该任务. * // 因为是在execute的调用线程中运行的.所以可简单的减缓新任务的提交速度.即得等到执行完被线程池拒绝的任务后才能提交任务. * public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { * if (!e.isShutdown()) { * r.run(); * } * } * * 3.DiscardOldestPolicy:丢弃最旧的任务策略 * // 放弃最旧的未处理的任务(即队头元素),重新提交执行被拒绝的任务r.如果线程池已关闭则丢弃该任务. * public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { * if (!e.isShutdown()) { * e.getQueue().poll(); * e.execute(r); * } * } * * 4.DiscardPolicy:丢弃策略 * // 空实现,即直接丢弃被拒绝的任务 * public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { * } * * 5.钩子方法: * 1.protected void beforeExecute(Thread t, Runnable r),可在子类覆写.在执行的线程运行任务之前调用的方法.此方法由t调用. * ->(方法结束时,子类通常应该调用super.beforeExecute->嵌套多个重写操作) * 2.protected void afterExecute(Runnable r, Throwable t) ,可在子类覆写,完成给定任务后所调用的方法.此方法由执行任务的worker线程 * 调用.t为执行该任务时导致终止时的异常->该异常会被抛到上层run然后被try.(没有catch).->如果t为null,则表示任务执行顺利. * 注:当提交的任务类似于submit方法提交的(如FutureTask时)->{@link FutureTask$Sync#innerRun}会在内部捕获该异常.所以其不会导致worker * 线程突然终止.而异常也不会传递给该方法。 * ->(方法开始时,子类通常应该调用super.afterExecute->嵌套多个重写操作) * 注:workerDone方法是在worker线程结束时调用的方法->完成任务计数 * 3.protected void terminated,此为线程池终止时调用的方法{@link #tryTerminate}.子类可覆写. * * 6.public BlockingQueue getQueue(),该方法用来访问工作队列.->用于监控和调试目的.->强烈反对出于其他目的而使用此方法. * * 7.public boolean remove(Runnable task),从线程池的内部工作队列中移除此任务.如果其尚未开始,则其不再运行.注:对于通过submit输入的runnable无法移除. * 因为其已经被转换了其他形式如FutureTask. * * 8. public void purge(),尝试从工作队列移除已取消的Future任务.->取消的任务不会再次执行.但是他们可能在工作队列中累积.直到worker线程将其主动移除(从工作队列poll). * 该方法则试图移除他们.如果出现其他线程的干预->则抛出ConcurrentModificationException.则失败. * * 9. 即使用户忘记调用了shutdown关闭线程池:也希望确保可回收线程->设置keepAliveTime/allowCoreThreadTimeOut/corePoolSize为0. * {@link #getTask()} * * 10.public boolean prestartCoreThread() 启动核心线程,使其处于getTask的空闲状态. 如果已启动了线程,则返回true * 从源码上:其内部直接调用了addIfUnderCorePoolSize(null).既如果coreSize不为0,则会启动一个worker线程并处于getTask的等待状态 * * 11.public int prestartAllCoreThreads() 启动所有核心线程,使其处于等待任务的空闲状态 * 从源码上: while (addIfUnderCorePoolSize(null))->即超出coreSize则跳出循环. 返回已启动的线程数 * * 12.Worker#isActive * // runLock是在runTask方法内的锁.而不是run的锁.即如果线程在getTask等待空闲的时候不是active.只有在真正执行任务的时候是active. * boolean isActive() { * return runLock.isLocked(); * } * * 13.public void setCorePoolSize(int corePoolSize) 设置核心线程数 * 从源码上:1.设置coreSize为传入的新值 * 2.如果新值大于旧值,则会添加额外线程,但是启动的线程数目一定不会超过当前工作队列的大小. * 3.如果新值小于旧值,则会遍历当前所有的worker线程,将多出的线程进行interruptIfIdle().其中还有一个条件是workQueue.remainingCapacity() == 0. * 也就是说要求此时工作队列的可附加元素数量为0,则当前工作队列已满. * (个人认为这个条件的添加是因为如果此时工作队列已满,则再次提交任务的时候会在maximum之下继续添加线程的.也就是说在这时候中断一个core线程是没有问题的.) * ->如果条件不满足的时候则多余的现有线程将在下一次空闲时终止(因为poolSize > coreSize).{@link #getTask} * * 14.public void setMaximumPoolSize(int maximumPoolSize) 设置运行最大的线程数 * 从源码上:1.参数maximumPoolSize必须>0 且 >=corePoolSize,否则抛出IllegalArgumentException. * 2.设置新值. * 3.如果新值小于当前值且当前poolSize > maximumPoolSize->则会遍历工作线程,将多余的线程interruptIfIdle. * * 15.线程池在调用了shutdown方法后便不能在提交任务了,因为此时的线程池状态已经不是running了.但是如果线程因为执行任务而异常终止的话,却依然可以提交任务. * 因为此时状态还是running. *
*
* @author landon
*
*/
public class ThreadPoolExecutorExample {
private static final Logger LOGGER = LoggerFactory
.getLogger(ThreadPoolExecutorExample.class);
public static void main(String[] args) throws Exception {
// 固定2个线程的线程池
MavsFixedThreadPoolExecutor fixedThreadPoolExecutor1 = new MavsFixedThreadPoolExecutor(
2, new MavsThreadFactory("Example", "FixedThreadPool-1"),
new MavsRejectedExecutionPolicy());
// 从线程池的状态监视器来看:此时poolSize=1/workQueueSize=0,即启动了一个线程,工作队列没有任务
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
// 从线程池的状态监视器来看:此时poolSize=2/workQueueSize=0,即又启动了一个线程
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
// 由提交了3个任务,从输出来看:poolSize一直为2.而workQueueSize最多为3->随着任务的执行,workQueueSize变为0
// 所以MavsFixedThreadPoolExecutor这个线程池会保持固定线程数量
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
// 执行shutdown
// 另外从输出看:发现线程池终止的时候调用了terminate方法
fixedThreadPoolExecutor1.shutdown();
Thread.sleep(1 * 1000);
// 测试shutdown后,还可以执行任务吗?
// 答案当然是NO.因为新建worker线程的条件包括插入队列都必须是在RUNNING状态下.
// 而执行了shutdown后则更改了运行状态为SHUTDOWN
fixedThreadPoolExecutor1.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
LOGGER.debug("");
// cached线程池
MavsCachedThreadPoolExecutor cachedThreadPoolExecutor1 = new MavsCachedThreadPoolExecutor(
new MavsThreadFactory("Example", "CachedThreadPool-1"),
new MavsRejectedExecutionPolicy());
// 从输出可以看到,线程池最多启动了5个线程,workQueueSize一直为0
cachedThreadPoolExecutor1.execute(new ThreadPoolTask());
cachedThreadPoolExecutor1.execute(new ThreadPoolTask());
cachedThreadPoolExecutor1.execute(new ThreadPoolTask());
cachedThreadPoolExecutor1.execute(new ThreadPoolTask());
cachedThreadPoolExecutor1.execute(new ThreadPoolTask());
// 暂停2分钟,使得默认空闲1分钟的worker线程退出
Thread.sleep(2 * 60 * 1000);
LOGGER.debug("");
// 从输出可以看到:poolSize=0,即空闲的worker线程被回收了.
// 另外所有的worker线程被回收了,线程池就结束了.
// 因为:ThreadPoolExecutor#void workerDone(Worker w)->
// if (--poolSize ==0)tryTerminate()
// 但是这种线程自然结束的话,并没有调用覆写的terminate方法.因为tryTerminate的实现中是判断当前线程池状态是STOP/SHUTDOWN的时候才执行terminated方法的
LOGGER.debug("cachedThreadPoolExecutor1.state:{}",
MavsThreadPoolStateMonitor.monitor(cachedThreadPoolExecutor1));
// 单线程线程池,注意这个和{@link
// Executors#newSingleThreadExecutor的区别},后者仅是返回的暴露的ExecutorService接口
MavsFixedThreadPoolExecutor singleExecutor = new MavsFixedThreadPoolExecutor(
1, new MavsThreadFactory("Example", "SingleThreadPool-1"),
new MavsRejectedExecutionPolicy());
// 提交一个可抛出异常的任务
// 从输出看出
// 1:执行了afterExecute方法且其中的Throwable t为不null.此执行任务的时候抛出了异常.
// 2.线程因为异常终止,因指定了线程默认的UncaughtExceptionHandler,所以执行了uncaughtException方法.
singleExecutor.execute(new ThreadPoolExceptionTask());
Thread.sleep(1 * 1000);
// 从输出可以看到:poolSize=0变为了0.即线程终止了.
// 因为Worker线程的run方法只是try/finally,即并没有捕获异常.而runTask向上抛出异常至run,直接到finally.->workerDone->poolSize--
// ->tryTerminate
LOGGER.debug("singleExecutor.state:{}",
MavsThreadPoolStateMonitor.monitor(singleExecutor));
Thread.sleep(1 * 1000);
// 测试线程池异常终止后,还可以执行任务吗?
// 答案是YES.因为此时的线程池状态依然是RUNNING.
singleExecutor.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
// 从输出发现:poolSize=1,即新增了一个worker线程.另外从线程的名字Mavs-Example-SingleThreadPool-1-2也可看得出.
LOGGER.debug("singleExecutor.state:{}",
MavsThreadPoolStateMonitor.monitor(singleExecutor));
// 这里是提交了一个任务,内部会被封装成->RunnableFuture->FutureTask
// 而其内部run->Sync#innerRun->其内部会被try/catch的->所以理论上结果应该线程应该不会异常终止.
// 从输出看:1.afterExecute方法中的异常参数为null.
// 2.没用调用默认的UncaughtExceptionHandler.也就是说线程正常运行.
singleExecutor.submit(new ThreadPoolExceptionTask());
singleExecutor.shutdown();
// 测试setCoreSize以及setMaximumSize
// 3个固定线程数目的线程池
MavsFixedThreadPoolExecutor fixedThreadPoolExecutor2 = new MavsFixedThreadPoolExecutor(
3, new MavsThreadFactory("Example", "FixedThreadPool-2"),
new MavsRejectedExecutionPolicy());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
// 设置核心线程大小为6.
fixedThreadPoolExecutor2.setCorePoolSize(6);
Thread.sleep(1 * 1000);
// 从输出看:poolSize=6
LOGGER.debug("fixedThreadPoolExecutor2.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor2));
// 设置核心线程大小为2
fixedThreadPoolExecutor2.setCorePoolSize(2);
Thread.sleep(1 * 1000);
// 从输出看.poolSize=6
// 因为 workQueue.remainingCapacity()此时不为0,即不会中断多余的空闲线程.
// 另外此时所有的worker线程正在处在等待状态.
LOGGER.debug("fixedThreadPoolExecutor2.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor2));
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
// 从输出可以看到:此时poolSize=5.因为某个等待线程获得执行机会后再次getTask后->会执行pool(keepAliveTime),则直接回收退出.
LOGGER.debug("fixedThreadPoolExecutor2.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor2));
// 继续执行3个任务
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
fixedThreadPoolExecutor2.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
// 从输出可以发现:此时poolSize=2,因为多余的线程在执行完任务下次getTask判断的时候直接就被回收了.
// 另外:此时maximumSize是3.coreSize为2.也就是说此时的线程池已经不再是固定数量线程的线程池了.
LOGGER.debug("fixedThreadPoolExecutor2.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor2));
fixedThreadPoolExecutor2.shutdown();
// 测试setMaximumPoolSize
// 2个固定线程数目的线程池
MavsFixedThreadPoolExecutor fixedThreadPoolExecutor3 = new MavsFixedThreadPoolExecutor(
2, new MavsThreadFactory("Example", "FixedThreadPool-3"),
new MavsRejectedExecutionPolicy());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
// 设置最大线程池大小为4
fixedThreadPoolExecutor3.setMaximumPoolSize(4);
// 提交一系列任务
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
fixedThreadPoolExecutor3.execute(new ThreadPoolTask());
Thread.sleep(1 * 1000);
// 从输出看:maximumPoolSize=4/poolSize=2
// 即只是修改了maximumPoolSize的值/poolSize仍然为2.因为用的是无限阻塞队列,所以多余的任务都被放到了队列.
LOGGER.debug("fixedThreadPoolExecutor3.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor3));
try {
// 这里抛出了一个异常,因为1比coreSize 2还要小
fixedThreadPoolExecutor3.setMaximumPoolSize(1);
} catch (Exception e) {
LOGGER.warn("fixedThreadPoolExecutor3.setMaximumPoolSize.err.", e);
}
fixedThreadPoolExecutor3.shutdown();
// 自定义线程池1
// 工作队列为容量3的阻塞队列
// 等待空闲时间为60s
ThreadPoolExecutor userDefinedExecutor1 = new ThreadPoolExecutor(2, 4,
10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),
new MavsThreadFactory("Example", "User-Define-Executor-1"),
new MavsRejectedExecutionPolicy());
// 直接提交很多任务
// 这个测试的目的在于测试拒绝策略.从输出可以看到:
// poolSize=4/workQueueSize=3这个时候,即已经达到了最大线程数目和队列上限,则执行了拒绝策略.
for (int i = 0; i < 20; i++) {
userDefinedExecutor1.execute(new ThreadPoolTask());
}
Thread.sleep(5 * 1000);
LOGGER.debug("userDefinedExecutor1.state:{}",
MavsThreadPoolStateMonitor.monitor(userDefinedExecutor1));
// 将线程池最大池数目调整为3.此时的poolSize为4.
userDefinedExecutor1.setMaximumPoolSize(3);
Thread.sleep(1 * 1000);
// 从输出看:poolSize还是为4.因为此时所有的worker线程都在poll(timeout)->然后setMaximumPoolSize->会中断一个空闲线程->但是getTask这里
// 被try/catch了.
// 不过多余的线程在空闲的时候都会被回收.
LOGGER.debug("userDefinedExecutor1.state:{}",
MavsThreadPoolStateMonitor.monitor(userDefinedExecutor1));
Thread.sleep(5 * 1000);
LOGGER.debug("userDefinedExecutor1.state:{}",
MavsThreadPoolStateMonitor.monitor(userDefinedExecutor1));
userDefinedExecutor1.shutdown();
// 测试prestartCoreThread()/prestartAllCoreThreads
MavsFixedThreadPoolExecutor fixedThreadPoolExecutor4 = new MavsFixedThreadPoolExecutor(
3, new MavsThreadFactory("Example", "FixedThreadPool-4"),
new MavsRejectedExecutionPolicy());
LOGGER.debug("fixedThreadPoolExecutor4.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor4));
// 启动一个核心线程
fixedThreadPoolExecutor4.prestartCoreThread();
// 从输出可以看出:poolSize为1,即启动了一个worker.
LOGGER.debug("fixedThreadPoolExecutor4.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor4));
// 启动所有核心线程
// 从输出可以看出:poolSize为3,即现在启动了所有的核心线程
fixedThreadPoolExecutor4.prestartAllCoreThreads();
LOGGER.debug("fixedThreadPoolExecutor4.state:{}",
MavsThreadPoolStateMonitor.monitor(fixedThreadPoolExecutor4));
}
/**
*
* 用于测试的线程池任务
*
* @author landon
*
*/
private static class ThreadPoolTask implements Runnable {
private static final AtomicInteger COUNTER = new AtomicInteger(1);
private int id;
public ThreadPoolTask() {
id = COUNTER.getAndIncrement();
}
@Override
public void run() {
LOGGER.debug(this + " begin");
try {
TimeUnit.MICROSECONDS.sleep(100);
} catch (InterruptedException e) {
LOGGER.warn(this + " was interrupted", e);
}
LOGGER.debug(this + " end");
}
@Override
public String toString() {
return "ThreadPoolTask [id=" + id + "]" + "["
+ Thread.currentThread().getName() + "]";
}
}
/**
*
* 用于测试的线程池异常任务
*
* @author landon
*
*/
private static class ThreadPoolExceptionTask implements Runnable {
private static final AtomicInteger COUNTER = new AtomicInteger(1);
private int id;
public ThreadPoolExceptionTask() {
id = COUNTER.getAndIncrement();
}
@Override
public void run() {
LOGGER.debug(this + " begin");
throw new RuntimeException("ThreadPoolExceptionTask.Exception.");
}
@Override
public String toString() {
return "ThreadPoolExceptionTask [id=" + id + "]" + "["
+ Thread.currentThread().getName() + "]";
}
}
}
2.ExecutorService示例代码
package com.landon.mavs.example.concurrent;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* ExecutorServiceExample
*
* @author landon
*
*/
publicclass ExecutorServiceExample {
privatestaticfinal Logger LOGGER = LoggerFactory
.getLogger(ExecutorServiceExample.class);
publicstaticvoid main(String[] args) {
ExecutorService exeSrv = Executors.newFixedThreadPool(4);
// execute(Runnable command) 执行一个Runnable
exeSrv.execute(new OneRunnable(1));
// Future submit(Runnable task) 提交一个Runable
Future oneRunFuture = exeSrv.submit(new OneRunnable(2));
// Future#isDone 返回任务是否结束
LOGGER.debug("oneRun is complete:"+ oneRunFuture.isDone());
try{
// 等待计算完成,返回计算结果
// 当前成功完成的时候 #get 返回null
LOGGER.debug("oneRun result:"+ oneRunFuture.get());
}catch (InterruptedException e) {
LOGGER.warn("exception_oneRun#get is interrupted while waiting result.");
}catch (ExecutionException e) {
LOGGER.warn("exception_oneRun#get compuation throws a exception");
}
// Future submit(Callable task) 提交一个Callable
Future<String> oneCallFuture = exeSrv.submit(new OneCallable(1));
try{
// V get() throws InterruptedException, ExecutionException
// 等待计算完成,返回计算结果
LOGGER.debug("oneCall result:"+ oneCallFuture.get());
}catch (InterruptedException e) {
LOGGER.warn("exception_oneCall#get is interrupted while waiting result.");
}catch (ExecutionException e) {
LOGGER.warn("exception_oneCall#get compuation throws a exception");
}
Future<String> oneCallFuture2 = exeSrv.submit(new OneCallable(2));
try{
// V get(long timeout, TimeUnit unit) 指定等待超时时间
LOGGER.debug("oneCall2 result:"
+ oneCallFuture2.get(1, TimeUnit.SECONDS));
}catch (InterruptedException e) {
LOGGER.warn("exception_oneCall2#get is interrupted while waiting result.");
}catch (ExecutionException e) {
LOGGER.warn("exception_oneCall2#get compuation throws a exception");
}catch (TimeoutException e) {
LOGGER.warn("exception_oneCall2#get timeout");
}
Future<String> oneCallFuture3 = exeSrv.submit(new OneCallable(3));
// boolean cancel(boolean mayInterruptIfRunning)
// 尝试取消任务的执行.如果任务已完成或者已经被取消或者因为其他一些原因被能取消则尝试会失败
// 如果尝试成功且任务还未开始则该任务再也不会运行.如果任务已经启动,mayInterruptIfRunning参数决定任务执行线程是否中断尝试结束任务
boolean isFuture3CancelSuccess = oneCallFuture3.cancel(false);
LOGGER.debug("oneCallFuture3#cancel(false) result:"
+ isFuture3CancelSuccess);
LOGGER.debug("oneCallFuture3#isDone:"+ oneCallFuture3.isDone());
LOGGER.debug("oneCallFuture3#isCancelled:"
+ oneCallFuture3.isCancelled());
Future<String> oneCallFuture4 = exeSrv.submit(new OneCallable(4));
// 主线程暂停2秒后执行cancel
try{
TimeUnit.SECONDS.sleep(2);
}catch (InterruptedException e) {
}
// 此处cancel传true则表明如果任务已启动则中断执行任务线程尝试结束任务
// 从输出可以看到,即输出了任务开始,但是却没有输出任务结束->且返回true表明任务被中断取消
boolean isFuture4CancelSuccess = oneCallFuture4.cancel(true);
LOGGER.debug("oneCallFuture4#cancel(true) result:"
+ isFuture4CancelSuccess);
LOGGER.debug("oneCallFuture4#isDone:"+ oneCallFuture4.isDone());
LOGGER.debug("oneCallFuture4#isCancelled:"
+ oneCallFuture4.isCancelled());
Future<String> oneCallFuture5 = exeSrv.submit(new OneCallable(5));
// 主线程暂停8秒后执行cancel,此时任务有可能已经执行完毕
try{
TimeUnit.SECONDS.sleep(8);
}catch (InterruptedException e) {
}
// 从输入可以看到,任务5输出了end.即cancel时任务已经完成.所以isFuture5CancelSuccess为false.isDone为true.isCancelled为false
boolean isFuture5CancelSuccess = oneCallFuture5.cancel(true);
LOGGER.debug("oneCallFuture5#cancel(true) result:"
+ isFuture5CancelSuccess);
LOGGER.debug("oneCallFuture5#isDone:"+ oneCallFuture5.isDone());
LOGGER.debug("oneCallFuture5#isCancelled:"
+ oneCallFuture5.isCancelled());
// Future submit(Runnable task, T result) 当任务完成时get方法会返回指定的result
Future<String> oneRun3Future = exeSrv
.submit(new OneRunnable(3), "isOk");
try{
// 从输入可以看到get方法的返回是传入的"isOk"
LOGGER.debug("oneRun3 result:"+ oneRun3Future.get());
}catch (InterruptedException e) {
LOGGER.warn("exception_oneRun3l#get is interrupted while waiting result.");
}catch (ExecutionException e) {
LOGGER.warn("exception_oneRun3#get compuation throws a exception");
}
// 任务集合
List<OneCallable> oneCallList = Arrays.asList(new OneCallable(10),
new OneCallable(11), new OneCallable(12));
try{
// List> invokeAll(Collection>
// tasks) throws InterruptedException;
// 相当于批量执行任务.从方法的异常列表可以看出.此方法会等待(即阻塞)直到所有任务完成
List<Future<String>> oneCallListFutures = exeSrv
.invokeAll(oneCallList);
// 处理完成结果 从输出可以看到->invokeAll确实是在等待所有任务执行完毕.
List<Boolean> resultList =new ArrayList<Boolean>();
for (Iterator<Future<String>> iterator = oneCallListFutures
.iterator(); iterator.hasNext();) {
if (iterator.next().isDone()) {
resultList.add(true);
}
}
LOGGER.debug("oneCallListFutures result: "+ resultList);
}catch (InterruptedException e) {
LOGGER.warn("exeSrv#invokeAll(oneCallList) exception_waiting all task complete was interrupted.");
}
// 任务集合2
List<OneCallable> oneCallList2 = Arrays.asList(new OneCallable(20),
new OneCallable(21), new OneCallable(22));
try{
// T invokeAny(Collection> tasks) throws
// InterruptedException, ExecutionException;
// 批量执行任务->等待直到某个任务已成功完成(注意只要某个任务成功返回则返回结果) 另外注意返回结果是T,而非Future
String oneCallList2Result = exeSrv.invokeAny(oneCallList2);
// 从输出结果可以看到:
// [oneCallList2Result:OneCallable [taskNum=20]OK],即20号任务执行完成即返回了
LOGGER.debug("oneCallList2Result:"+ oneCallList2Result);
}catch (InterruptedException e) {
LOGGER.warn("exeSrv#invokeAny(oneCallList2) exception_waiting one task complete was interrupted.");
}catch (ExecutionException e) {
LOGGER.warn("exeSrv#invokeAll(oneCallList2) exception_one any one task was completed.");
}
// 任务集合3
List<OneCallable> oneCallList3 = Arrays.asList(new OneCallable(30),
new OneCallable(31), new OneCallable(32));
try{
// List> invokeAll(Collection>
// tasks, long timeout, TimeUnit unit) throws InterruptedException;
// 批量执行任务->指定等待超时时间->注意该方法并不会抛出超时异常,即如果没有被打断的情况下,超时后(直接返回),则某些任务只是未完成而已(注返回后会取消尚未完成的任务)
List<Future<String>> oneCallList3Futures = exeSrv.invokeAll(
oneCallList3, 2, TimeUnit.SECONDS);
List<Boolean> oneCallList3Results =new ArrayList<Boolean>();
for (Future<String> future : oneCallList3Futures) {
if (future.isDone()) {
oneCallList3Results.add(true);
}else{
oneCallList3Results.add(false);
}
}
// 从输出看,这个方法很特殊.即返回的Future列表的isDone方法都返回true.且所有的任务没有输出结束end.
// 从API看,即当所有任务完成或者超时(无论哪个首先发生)则返回的Future列表的isDone方法返回true
// 一旦返回后,即取消尚未完成的任务
LOGGER.debug("oneCallList3Results:"+ oneCallList3Results);
}catch (InterruptedException e) {
LOGGER.warn("exeSrv#invokeAll(oneCallList3, 2, TimeUnit.SECONDS) exception_waiting all task compelte was interrupted.");
}
// 任务集合4
List<OneCallable> oneCallList4 = Arrays.asList(new OneCallable(40),
new OneCallable(41), new OneCallable(42));
try{
// T invokeAny(Collection> tasks,long
// timeout, TimeUnit unit) throws InterruptedException,
// ExecutionException, TimeoutException;
// 批量执行任务->指定等待超时时间->注意这个方法抛出了TimeoutException->即在等待超时后会抛出异常
// 从输出结果可以看到,在等待超时后->尚未完成的任务都被取消了,因为输出只有begin没有end
exeSrv.invokeAny(oneCallList4, 1, TimeUnit.SECONDS);
}catch (InterruptedException e) {
LOGGER.debug("exeSrv#invokeAny(oneCallList4, 1, TimeUnit.SECONDS) exception_waiting any one task complete was interrupted.");
}catch (ExecutionException e) {
LOGGER.debug("exeSrv#invokeAny(oneCallList4, 1, TimeUnit.SECONDS) exception_no any one task was completed");
}catch (TimeoutException e) {
LOGGER.debug("exeSrv#invokeAny(oneCallList4, 1, TimeUnit.SECONDS) exception_waiting timeout");
}
// void shutdown() 启动一次顺序关闭,执行以前提交的任务,但是不接受新任务.如果已经关闭,则调用没有其他作用
exeSrv.shutdown();
// boolean isShutdown()
// ThreadPoolExecutor#isShutdown{return runState != RUNNING}
LOGGER.debug("exeSrv#shutdown.isShutdown:"+ exeSrv.isShutdown());
// isTerminated
// ThreadPoolExecutor#isTerminated{return runState == TERMINATED}
// 如果关闭后所有任务都完成,则返回true.注:必须要先调用shutdown/shutdownNow
// 该方法可结合awaitTermination使用awaitTermination,即if(!isTerminated){awaitTermination}
LOGGER.debug("exeSrv#shutdown.isTerminated:"+ exeSrv.isTerminated());
ExecutorService exeSrv2 = Executors.newFixedThreadPool(2);
exeSrv2.submit(new OneRunnable(50));
exeSrv2.submit(new OneCallable(60));
// List shutdownNow()
// 试图终止所有正在执行的活动任务.暂停处理正在等待的任务,并返回等待执行的任务列表
// 无法保证能够停止正在处理的活动执行任务,但是会尽力尝试.如通过Thread.interrupt这种典型的实现来取消->所以任何任务无法响应中断都可能永远无法停止
// 从输出可以看到50号任务被interrupt了(异常被捕获了).而60号的任务其实也被interrupt了,但是异常被抛出到了上层.
exeSrv2.shutdownNow();
LOGGER.debug("exeSrv2#shutdown.isShutdown:"+ exeSrv2.isShutdown());
LOGGER.debug("exeSrv2#shutdown.isTerminated:"+ exeSrv2.isTerminated());
ExecutorService exeSrv3 = Executors.newFixedThreadPool(2);
exeSrv3.submit(new OneRunnable(70));
exeSrv3.submit(new OneRunnable(80));
exeSrv3.shutdown();
// boolean awaitTermination(long timeout, TimeUnit unit) throws
// InterruptedException
// 1.阻塞直到shutdown请求后,所有任务完成 2.阻塞直到超时 3.阻塞直到当前线程被中断
try{
if (!exeSrv3.isTerminated()) {
exeSrv3.awaitTermination(10, TimeUnit.SECONDS);
// 从输出看.任务花费了5秒即执行完毕(多线程并行).所以线程池所有任务任务完成后,awaitTermination也不在阻塞.
LOGGER.debug("exeSrv3EwaitTermination(10, TimeUnit.SECONDS) end.");
}
}catch (InterruptedException e) {
LOGGER.debug("exeSrv3.awaitTermination(10, TimeUnit.SECONDS) was interrupted.");
}
}
privatestaticclass OneRunnable implements Runnable {
privateint taskNum;
public OneRunnable(int taskNum) {
this.taskNum = taskNum;
}
@Override
publicvoid run() {
LOGGER.debug(this+" begin");
// 用sleep模拟业务逻辑耗时
try{
TimeUnit.SECONDS.sleep(5);
}catch (InterruptedException e) {
LOGGER.warn("execute"+this+" was interrupt");
}
LOGGER.debug(this+" end");
}
@Override
public String toString() {
return"OneRunnable [taskNum="+ taskNum +"]";
}
}
privatestaticclass OneCallable implements Callable<String>{
privateint taskNum;
public OneCallable(int taskNum) {
this.taskNum = taskNum;
}
@Override
public String call() throws Exception {
LOGGER.debug(this+" begin");
// 用sleep模拟业务逻辑耗时
Thread.sleep(3*1000);
LOGGER.debug(this+" end");
returnthis+" OK";
}
@Override
public String toString() {
return"OneCallable [taskNum="+ taskNum +"]";
}
}
}
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* ExecutorServiceExample
*
* @author landon
*
*/
publicclass ExecutorServiceExample {
privatestaticfinal Logger LOGGER = LoggerFactory
.getLogger(ExecutorServiceExample.class);
publicstaticvoid main(String[] args) {
ExecutorService exeSrv = Executors.newFixedThreadPool(4);
// execute(Runnable command) 执行一个Runnable
exeSrv.execute(new OneRunnable(1));
// Future submit(Runnable task) 提交一个Runable
Future oneRunFuture = exeSrv.submit(new OneRunnable(2));
// Future#isDone 返回任务是否结束
LOGGER.debug("oneRun is complete:"+ oneRunFuture.isDone());
try{
// 等待计算完成,返回计算结果
// 当前成功完成的时候 #get 返回null
LOGGER.debug("oneRun result:"+ oneRunFuture.get());
}catch (InterruptedException e) {
LOGGER.warn("exception_oneRun#get is interrupted while waiting result.");
}catch (ExecutionException e) {
LOGGER.warn("exception_oneRun#get compuation throws a exception");
}
// Future submit(Callable task) 提交一个Callable
Future<String> oneCallFuture = exeSrv.submit(new OneCallable(1));
try{
// V get() throws InterruptedException, ExecutionException
// 等待计算完成,返回计算结果
LOGGER.debug("oneCall result:"+ oneCallFuture.get());
}catch (InterruptedException e) {
LOGGER.warn("exception_oneCall#get is interrupted while waiting result.");
}catch (ExecutionException e) {
LOGGER.warn("exception_oneCall#get compuation throws a exception");
}
Future<String> oneCallFuture2 = exeSrv.submit(new OneCallable(2));
try{
// V get(long timeout, TimeUnit unit) 指定等待超时时间
LOGGER.debug("oneCall2 result:"
+ oneCallFuture2.get(1, TimeUnit.SECONDS));
}catch (InterruptedException e) {
LOGGER.warn("exception_oneCall2#get is interrupted while waiting result.");
}catch (ExecutionException e) {
LOGGER.warn("exception_oneCall2#get compuation throws a exception");
}catch (TimeoutException e) {
LOGGER.warn("exception_oneCall2#get timeout");
}
Future<String> oneCallFuture3 = exeSrv.submit(new OneCallable(3));
// boolean cancel(boolean mayInterruptIfRunning)
// 尝试取消任务的执行.如果任务已完成或者已经被取消或者因为其他一些原因被能取消则尝试会失败
// 如果尝试成功且任务还未开始则该任务再也不会运行.如果任务已经启动,mayInterruptIfRunning参数决定任务执行线程是否中断尝试结束任务
boolean isFuture3CancelSuccess = oneCallFuture3.cancel(false);
LOGGER.debug("oneCallFuture3#cancel(false) result:"
+ isFuture3CancelSuccess);
LOGGER.debug("oneCallFuture3#isDone:"+ oneCallFuture3.isDone());
LOGGER.debug("oneCallFuture3#isCancelled:"
+ oneCallFuture3.isCancelled());
Future<String> oneCallFuture4 = exeSrv.submit(new OneCallable(4));
// 主线程暂停2秒后执行cancel
try{
TimeUnit.SECONDS.sleep(2);
}catch (InterruptedException e) {
}
// 此处cancel传true则表明如果任务已启动则中断执行任务线程尝试结束任务
// 从输出可以看到,即输出了任务开始,但是却没有输出任务结束->且返回true表明任务被中断取消
boolean isFuture4CancelSuccess = oneCallFuture4.cancel(true);
LOGGER.debug("oneCallFuture4#cancel(true) result:"
+ isFuture4CancelSuccess);
LOGGER.debug("oneCallFuture4#isDone:"+ oneCallFuture4.isDone());
LOGGER.debug("oneCallFuture4#isCancelled:"
+ oneCallFuture4.isCancelled());
Future<String> oneCallFuture5 = exeSrv.submit(new OneCallable(5));
// 主线程暂停8秒后执行cancel,此时任务有可能已经执行完毕
try{
TimeUnit.SECONDS.sleep(8);
}catch (InterruptedException e) {
}
// 从输入可以看到,任务5输出了end.即cancel时任务已经完成.所以isFuture5CancelSuccess为false.isDone为true.isCancelled为false
boolean isFuture5CancelSuccess = oneCallFuture5.cancel(true);
LOGGER.debug("oneCallFuture5#cancel(true) result:"
+ isFuture5CancelSuccess);
LOGGER.debug("oneCallFuture5#isDone:"+ oneCallFuture5.isDone());
LOGGER.debug("oneCallFuture5#isCancelled:"
+ oneCallFuture5.isCancelled());
// Future submit(Runnable task, T result) 当任务完成时get方法会返回指定的result
Future<String> oneRun3Future = exeSrv
.submit(new OneRunnable(3), "isOk");
try{
// 从输入可以看到get方法的返回是传入的"isOk"
LOGGER.debug("oneRun3 result:"+ oneRun3Future.get());
}catch (InterruptedException e) {
LOGGER.warn("exception_oneRun3l#get is interrupted while waiting result.");
}catch (ExecutionException e) {
LOGGER.warn("exception_oneRun3#get compuation throws a exception");
}
// 任务集合
List<OneCallable> oneCallList = Arrays.asList(new OneCallable(10),
new OneCallable(11), new OneCallable(12));
try{
// List> invokeAll(Collection>
// tasks) throws InterruptedException;
// 相当于批量执行任务.从方法的异常列表可以看出.此方法会等待(即阻塞)直到所有任务完成
List<Future<String>> oneCallListFutures = exeSrv
.invokeAll(oneCallList);
// 处理完成结果 从输出可以看到->invokeAll确实是在等待所有任务执行完毕.
List<Boolean> resultList =new ArrayList<Boolean>();
for (Iterator<Future<String>> iterator = oneCallListFutures
.iterator(); iterator.hasNext();) {
if (iterator.next().isDone()) {
resultList.add(true);
}
}
LOGGER.debug("oneCallListFutures result: "+ resultList);
}catch (InterruptedException e) {
LOGGER.warn("exeSrv#invokeAll(oneCallList) exception_waiting all task complete was interrupted.");
}
// 任务集合2
List<OneCallable> oneCallList2 = Arrays.asList(new OneCallable(20),
new OneCallable(21), new OneCallable(22));
try{
// T invokeAny(Collection> tasks) throws
// InterruptedException, ExecutionException;
// 批量执行任务->等待直到某个任务已成功完成(注意只要某个任务成功返回则返回结果) 另外注意返回结果是T,而非Future
String oneCallList2Result = exeSrv.invokeAny(oneCallList2);
// 从输出结果可以看到:
// [oneCallList2Result:OneCallable [taskNum=20]OK],即20号任务执行完成即返回了
LOGGER.debug("oneCallList2Result:"+ oneCallList2Result);
}catch (InterruptedException e) {
LOGGER.warn("exeSrv#invokeAny(oneCallList2) exception_waiting one task complete was interrupted.");
}catch (ExecutionException e) {
LOGGER.warn("exeSrv#invokeAll(oneCallList2) exception_one any one task was completed.");
}
// 任务集合3
List<OneCallable> oneCallList3 = Arrays.asList(new OneCallable(30),
new OneCallable(31), new OneCallable(32));
try{
// List> invokeAll(Collection>
// tasks, long timeout, TimeUnit unit) throws InterruptedException;
// 批量执行任务->指定等待超时时间->注意该方法并不会抛出超时异常,即如果没有被打断的情况下,超时后(直接返回),则某些任务只是未完成而已(注返回后会取消尚未完成的任务)
List<Future<String>> oneCallList3Futures = exeSrv.invokeAll(
oneCallList3, 2, TimeUnit.SECONDS);
List<Boolean> oneCallList3Results =new ArrayList<Boolean>();
for (Future<String> future : oneCallList3Futures) {
if (future.isDone()) {
oneCallList3Results.add(true);
}else{
oneCallList3Results.add(false);
}
}
// 从输出看,这个方法很特殊.即返回的Future列表的isDone方法都返回true.且所有的任务没有输出结束end.
// 从API看,即当所有任务完成或者超时(无论哪个首先发生)则返回的Future列表的isDone方法返回true
// 一旦返回后,即取消尚未完成的任务
LOGGER.debug("oneCallList3Results:"+ oneCallList3Results);
}catch (InterruptedException e) {
LOGGER.warn("exeSrv#invokeAll(oneCallList3, 2, TimeUnit.SECONDS) exception_waiting all task compelte was interrupted.");
}
// 任务集合4
List<OneCallable> oneCallList4 = Arrays.asList(new OneCallable(40),
new OneCallable(41), new OneCallable(42));
try{
// T invokeAny(Collection> tasks,long
// timeout, TimeUnit unit) throws InterruptedException,
// ExecutionException, TimeoutException;
// 批量执行任务->指定等待超时时间->注意这个方法抛出了TimeoutException->即在等待超时后会抛出异常
// 从输出结果可以看到,在等待超时后->尚未完成的任务都被取消了,因为输出只有begin没有end
exeSrv.invokeAny(oneCallList4, 1, TimeUnit.SECONDS);
}catch (InterruptedException e) {
LOGGER.debug("exeSrv#invokeAny(oneCallList4, 1, TimeUnit.SECONDS) exception_waiting any one task complete was interrupted.");
}catch (ExecutionException e) {
LOGGER.debug("exeSrv#invokeAny(oneCallList4, 1, TimeUnit.SECONDS) exception_no any one task was completed");
}catch (TimeoutException e) {
LOGGER.debug("exeSrv#invokeAny(oneCallList4, 1, TimeUnit.SECONDS) exception_waiting timeout");
}
// void shutdown() 启动一次顺序关闭,执行以前提交的任务,但是不接受新任务.如果已经关闭,则调用没有其他作用
exeSrv.shutdown();
// boolean isShutdown()
// ThreadPoolExecutor#isShutdown{return runState != RUNNING}
LOGGER.debug("exeSrv#shutdown.isShutdown:"+ exeSrv.isShutdown());
// isTerminated
// ThreadPoolExecutor#isTerminated{return runState == TERMINATED}
// 如果关闭后所有任务都完成,则返回true.注:必须要先调用shutdown/shutdownNow
// 该方法可结合awaitTermination使用awaitTermination,即if(!isTerminated){awaitTermination}
LOGGER.debug("exeSrv#shutdown.isTerminated:"+ exeSrv.isTerminated());
ExecutorService exeSrv2 = Executors.newFixedThreadPool(2);
exeSrv2.submit(new OneRunnable(50));
exeSrv2.submit(new OneCallable(60));
// List shutdownNow()
// 试图终止所有正在执行的活动任务.暂停处理正在等待的任务,并返回等待执行的任务列表
// 无法保证能够停止正在处理的活动执行任务,但是会尽力尝试.如通过Thread.interrupt这种典型的实现来取消->所以任何任务无法响应中断都可能永远无法停止
// 从输出可以看到50号任务被interrupt了(异常被捕获了).而60号的任务其实也被interrupt了,但是异常被抛出到了上层.
exeSrv2.shutdownNow();
LOGGER.debug("exeSrv2#shutdown.isShutdown:"+ exeSrv2.isShutdown());
LOGGER.debug("exeSrv2#shutdown.isTerminated:"+ exeSrv2.isTerminated());
ExecutorService exeSrv3 = Executors.newFixedThreadPool(2);
exeSrv3.submit(new OneRunnable(70));
exeSrv3.submit(new OneRunnable(80));
exeSrv3.shutdown();
// boolean awaitTermination(long timeout, TimeUnit unit) throws
// InterruptedException
// 1.阻塞直到shutdown请求后,所有任务完成 2.阻塞直到超时 3.阻塞直到当前线程被中断
try{
if (!exeSrv3.isTerminated()) {
exeSrv3.awaitTermination(10, TimeUnit.SECONDS);
// 从输出看.任务花费了5秒即执行完毕(多线程并行).所以线程池所有任务任务完成后,awaitTermination也不在阻塞.
LOGGER.debug("exeSrv3EwaitTermination(10, TimeUnit.SECONDS) end.");
}
}catch (InterruptedException e) {
LOGGER.debug("exeSrv3.awaitTermination(10, TimeUnit.SECONDS) was interrupted.");
}
}
privatestaticclass OneRunnable implements Runnable {
privateint taskNum;
public OneRunnable(int taskNum) {
this.taskNum = taskNum;
}
@Override
publicvoid run() {
LOGGER.debug(this+" begin");
// 用sleep模拟业务逻辑耗时
try{
TimeUnit.SECONDS.sleep(5);
}catch (InterruptedException e) {
LOGGER.warn("execute"+this+" was interrupt");
}
LOGGER.debug(this+" end");
}
@Override
public String toString() {
return"OneRunnable [taskNum="+ taskNum +"]";
}
}
privatestaticclass OneCallable implements Callable<String>{
privateint taskNum;
public OneCallable(int taskNum) {
this.taskNum = taskNum;
}
@Override
public String call() throws Exception {
LOGGER.debug(this+" begin");
// 用sleep模拟业务逻辑耗时
Thread.sleep(3*1000);
LOGGER.debug(this+" end");
returnthis+" OK";
}
@Override
public String toString() {
return"OneCallable [taskNum="+ taskNum +"]";
}
}
}
3.附:MavsCachedThreadPoolExecutor/MavsFixedThreadPoolExecutor/MavsRejectedExecutionPolicy/MavsThreadDefaultUncaughtExceptionHandler/
MavsThreadFactory/MavsThreadPoolExecutor/MavsThreadPoolStateMonitor源码
package com.landon.mavs.example.util;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
*
* Mavs Cache线程池 {@link java.util.concurrent.Executors#newCachedThreadPool()}
*
* @author landon
*
*/
publicclass MavsCachedThreadPoolExecutor extends MavsThreadPoolExecutor {
public MavsCachedThreadPoolExecutor() {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public MavsCachedThreadPoolExecutor(ThreadFactory threadFactory) {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory);
}
public MavsCachedThreadPoolExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectHandler) {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory, rejectHandler);
}
}
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
*
* Mavs Cache线程池 {@link java.util.concurrent.Executors#newCachedThreadPool()}
*
* @author landon
*
*/
publicclass MavsCachedThreadPoolExecutor extends MavsThreadPoolExecutor {
public MavsCachedThreadPoolExecutor() {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public MavsCachedThreadPoolExecutor(ThreadFactory threadFactory) {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory);
}
public MavsCachedThreadPoolExecutor(ThreadFactory threadFactory,
RejectedExecutionHandler rejectHandler) {
super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), threadFactory, rejectHandler);
}
}
package com.landon.mavs.example.util;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
*
* 封装的固定线程数量的Mavs线程池
* {@link java.util.concurrent.Executors#newFixedThreadPool(int)}
*
* @author landon
*
*/
publicclass MavsFixedThreadPoolExecutor extends MavsThreadPoolExecutor {
public MavsFixedThreadPoolExecutor(int nThreads) {
super(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public MavsFixedThreadPoolExecutor(int nThreads, ThreadFactory threadFactory) {
super(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
}
public MavsFixedThreadPoolExecutor(int nThreads,
ThreadFactory threadFactory, RejectedExecutionHandler rejectHandler) {
super(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory,
rejectHandler);
}
}
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
/**
*
* 封装的固定线程数量的Mavs线程池
* {@link java.util.concurrent.Executors#newFixedThreadPool(int)}
*
* @author landon
*
*/
publicclass MavsFixedThreadPoolExecutor extends MavsThreadPoolExecutor {
public MavsFixedThreadPoolExecutor(int nThreads) {
super(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public MavsFixedThreadPoolExecutor(int nThreads, ThreadFactory threadFactory) {
super(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory);
}
public MavsFixedThreadPoolExecutor(int nThreads,
ThreadFactory threadFactory, RejectedExecutionHandler rejectHandler) {
super(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), threadFactory,
rejectHandler);
}
}
package com.landon.mavs.example.util;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Mavs线程池拒绝执行策略
*
* @author landon
*
*/
publicclass MavsRejectedExecutionPolicy implements RejectedExecutionHandler {
privatestaticfinal Logger LOGGER = LoggerFactory
.getLogger(MavsRejectedExecutionPolicy.class);
@Override
publicvoid rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
LOGGER.debug("task rejectedExecution.ThreadPool.state:{}",
MavsThreadPoolStateMonitor.monitor(executor));
}
}
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Mavs线程池拒绝执行策略
*
* @author landon
*
*/
publicclass MavsRejectedExecutionPolicy implements RejectedExecutionHandler {
privatestaticfinal Logger LOGGER = LoggerFactory
.getLogger(MavsRejectedExecutionPolicy.class);
@Override
publicvoid rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
LOGGER.debug("task rejectedExecution.ThreadPool.state:{}",
MavsThreadPoolStateMonitor.monitor(executor));
}
}
package com.landon.mavs.example.util;
import java.lang.Thread.UncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Mavs线程默认的异常终止处理器
* {@link java.lang.ThreadGroup#uncaughtException(Thread, Throwable)}
* 中对于ThreadDeath的处理
*
* @author landon
*
*/
publicclass MavsThreadDefaultUncaughtExceptionHandler implements
UncaughtExceptionHandler {
privatestaticfinal Logger LOGGER = LoggerFactory
.getLogger(MavsThreadDefaultUncaughtExceptionHandler.class);
@Override
publicvoid uncaughtException(Thread t, Throwable e) {
LOGGER.warn("Exception in thread \"" + t.getName() + "\"", e);
}
}
import java.lang.Thread.UncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Mavs线程默认的异常终止处理器
* {@link java.lang.ThreadGroup#uncaughtException(Thread, Throwable)}
* 中对于ThreadDeath的处理
*
* @author landon
*
*/
publicclass MavsThreadDefaultUncaughtExceptionHandler implements
UncaughtExceptionHandler {
privatestaticfinal Logger LOGGER = LoggerFactory
.getLogger(MavsThreadDefaultUncaughtExceptionHandler.class);
@Override
publicvoid uncaughtException(Thread t, Throwable e) {
LOGGER.warn("Exception in thread \"" + t.getName() + "\"", e);
}
}
package com.landon.mavs.example.util;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* Mavs线程工厂 {@link java.util.concurrent.Executors#defaultThreadFactory()} 参考
* {@link java.util.concurrent.Executors$DefaultThreadFactory}实现
*
* @author landon
*
*/
publicclass MavsThreadFactory implements ThreadFactory {
privatestaticfinal String MAVS_NAME_PREFIX ="Mavs-";
/** 线程号 */
privatefinal AtomicInteger threadNumber =new AtomicInteger(1);
/** 线程组 */
privatefinal ThreadGroup threadGroup;
/** 线程名字前缀 */
privatefinal String namePrefix;
/**
*
* 构造MavsThreadFactory
*
* @param processPrefix
* 进程前缀
* @param threadName
* 线程名
*/
public MavsThreadFactory(String processPrefix, String threadName) {
SecurityManager sm = System.getSecurityManager();
threadGroup = (sm !=null) ? sm.getThreadGroup() : Thread
.currentThread().getThreadGroup();
namePrefix = MAVS_NAME_PREFIX + processPrefix +"-"+ threadName +"-";
}
@Override
public Thread newThread(Runnable r) {
Thread t =new Thread(threadGroup, r, namePrefix
+ threadNumber.getAndIncrement(), 0);
// 做这两个设置的原因在于线程的daemon/priority属性默认是由Thread.currentThread决定
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
// 设置Mavs线程默认的异常终止处理器
if (Thread.getDefaultUncaughtExceptionHandler() ==null) {
Thread.setDefaultUncaughtExceptionHandler(new MavsThreadDefaultUncaughtExceptionHandler());
}
return t;
}
public String getNamePrefix() {
return namePrefix;
}
}
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* Mavs线程工厂 {@link java.util.concurrent.Executors#defaultThreadFactory()} 参考
* {@link java.util.concurrent.Executors$DefaultThreadFactory}实现
*
* @author landon
*
*/
publicclass MavsThreadFactory implements ThreadFactory {
privatestaticfinal String MAVS_NAME_PREFIX ="Mavs-";
/** 线程号 */
privatefinal AtomicInteger threadNumber =new AtomicInteger(1);
/** 线程组 */
privatefinal ThreadGroup threadGroup;
/** 线程名字前缀 */
privatefinal String namePrefix;
/**
*
* 构造MavsThreadFactory
*
* @param processPrefix
* 进程前缀
* @param threadName
* 线程名
*/
public MavsThreadFactory(String processPrefix, String threadName) {
SecurityManager sm = System.getSecurityManager();
threadGroup = (sm !=null) ? sm.getThreadGroup() : Thread
.currentThread().getThreadGroup();
namePrefix = MAVS_NAME_PREFIX + processPrefix +"-"+ threadName +"-";
}
@Override
public Thread newThread(Runnable r) {
Thread t =new Thread(threadGroup, r, namePrefix
+ threadNumber.getAndIncrement(), 0);
// 做这两个设置的原因在于线程的daemon/priority属性默认是由Thread.currentThread决定
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
// 设置Mavs线程默认的异常终止处理器
if (Thread.getDefaultUncaughtExceptionHandler() ==null) {
Thread.setDefaultUncaughtExceptionHandler(new MavsThreadDefaultUncaughtExceptionHandler());
}
return t;
}
public String getNamePrefix() {
return namePrefix;
}
}
package com.landon.mavs.example.util;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Mavs线程池,提供了钩子方法的默认实现
*
* @author landon
*
*/
publicclass MavsThreadPoolExecutor extends ThreadPoolExecutor {
privatestaticfinal Logger LOGGER = LoggerFactory
.getLogger(MavsThreadPoolExecutor.class);
public MavsThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public MavsThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
handler);
}
public MavsThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
}
public MavsThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
}
@Override
protectedvoid beforeExecute(Thread t, Runnable r) {
LOGGER.info("Thread["+ t.getName() +"]#beforeExecute:{}",
MavsThreadPoolStateMonitor.monitor(this));
super.beforeExecute(t, r);
}
@Override
protectedvoid afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
LOGGER.info("Thread["+ Thread.currentThread().getName()
+"]EfterExecute:{}", MavsThreadPoolStateMonitor.monitor(this));
if (t !=null) {
LOGGER.warn("Worker.runs.task.err", t);
}
}
@Override
protectedvoid terminated() {
super.terminated();
LOGGER.info("terminated:{}", MavsThreadPoolStateMonitor.monitor(this));
}
}
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Mavs线程池,提供了钩子方法的默认实现
*
* @author landon
*
*/
publicclass MavsThreadPoolExecutor extends ThreadPoolExecutor {
privatestaticfinal Logger LOGGER = LoggerFactory
.getLogger(MavsThreadPoolExecutor.class);
public MavsThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public MavsThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
handler);
}
public MavsThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory);
}
public MavsThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, handler);
}
@Override
protectedvoid beforeExecute(Thread t, Runnable r) {
LOGGER.info("Thread["+ t.getName() +"]#beforeExecute:{}",
MavsThreadPoolStateMonitor.monitor(this));
super.beforeExecute(t, r);
}
@Override
protectedvoid afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
LOGGER.info("Thread["+ Thread.currentThread().getName()
+"]EfterExecute:{}", MavsThreadPoolStateMonitor.monitor(this));
if (t !=null) {
LOGGER.warn("Worker.runs.task.err", t);
}
}
@Override
protectedvoid terminated() {
super.terminated();
LOGGER.info("terminated:{}", MavsThreadPoolStateMonitor.monitor(this));
}
}
package com.landon.mavs.example.util;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*
* Mavs线程池状态Monitor
*
* @author landon
*
*/
publicclass MavsThreadPoolStateMonitor {
/**
*
* 线程池状态监视
*
* @param executor
* @return
*/
publicstatic String monitor(ThreadPoolExecutor executor) {
if (executor ==null) {
thrownew NullPointerException();
}
// 核心线程数
int corePoolSize = executor.getCorePoolSize();
// 最大线程数
int maximumPoolSize = executor.getMaximumPoolSize();
// 线程保持活动时间
long keepAliveTime = executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
// 当前线程数
int poolSize = executor.getPoolSize();
// 返回活跃(正在执行任务)的近似线程数
int activeThreadCount = executor.getActiveCount();
// 返回曾经同时位于池中的最大线程数(包括已被回收的worker线程计数)
int largestPoolSize = executor.getLargestPoolSize();
// 已完成执行的近似任务总数
long completedTaskCount = executor.getCompletedTaskCount();
// 曾计划完成的近似任务总数(completedTaskCount + 工作队列大小 + 正在执行任务的worker线程数目)
long taskCount = executor.getTaskCount();
// 工作队列大小
int workQueueSize = executor.getQueue().size();
// 是否在非RUNNING状态下
boolean isShutdown = executor.isShutdown();
// 是否是TERMINATED状态
boolean isTerminated = executor.isTerminated();
// 是否是SHUTDOWN或者STOP状态
boolean isTerminating = executor.isTerminating();
String executorName ="Default-ThreadPoolExecutor";
ThreadFactory factory = executor.getThreadFactory();
if (factory !=null&& factory instanceof MavsThreadFactory) {
executorName = ((MavsThreadFactory) factory).getNamePrefix();
}
return executorName +" [corePoolSize="+ corePoolSize
+", maximumPoolSize="+ maximumPoolSize +", keepAliveTime="
+ keepAliveTime +", poolSize="+ poolSize
+", activeThreadCount="+ activeThreadCount
+", largestPoolSize="+ largestPoolSize
+", completedTaskCount="+ completedTaskCount +", taskCount="
+ taskCount +", workQueueSize="+ workQueueSize
+", isShutdown="+ isShutdown +", isTerminated="
+ isTerminated +", isTerminating="+ isTerminating +"]";
}
}
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
*
* Mavs线程池状态Monitor
*
* @author landon
*
*/
publicclass MavsThreadPoolStateMonitor {
/**
*
* 线程池状态监视
*
* @param executor
* @return
*/
publicstatic String monitor(ThreadPoolExecutor executor) {
if (executor ==null) {
thrownew NullPointerException();
}
// 核心线程数
int corePoolSize = executor.getCorePoolSize();
// 最大线程数
int maximumPoolSize = executor.getMaximumPoolSize();
// 线程保持活动时间
long keepAliveTime = executor.getKeepAliveTime(TimeUnit.MILLISECONDS);
// 当前线程数
int poolSize = executor.getPoolSize();
// 返回活跃(正在执行任务)的近似线程数
int activeThreadCount = executor.getActiveCount();
// 返回曾经同时位于池中的最大线程数(包括已被回收的worker线程计数)
int largestPoolSize = executor.getLargestPoolSize();
// 已完成执行的近似任务总数
long completedTaskCount = executor.getCompletedTaskCount();
// 曾计划完成的近似任务总数(completedTaskCount + 工作队列大小 + 正在执行任务的worker线程数目)
long taskCount = executor.getTaskCount();
// 工作队列大小
int workQueueSize = executor.getQueue().size();
// 是否在非RUNNING状态下
boolean isShutdown = executor.isShutdown();
// 是否是TERMINATED状态
boolean isTerminated = executor.isTerminated();
// 是否是SHUTDOWN或者STOP状态
boolean isTerminating = executor.isTerminating();
String executorName ="Default-ThreadPoolExecutor";
ThreadFactory factory = executor.getThreadFactory();
if (factory !=null&& factory instanceof MavsThreadFactory) {
executorName = ((MavsThreadFactory) factory).getNamePrefix();
}
return executorName +" [corePoolSize="+ corePoolSize
+", maximumPoolSize="+ maximumPoolSize +", keepAliveTime="
+ keepAliveTime +", poolSize="+ poolSize
+", activeThreadCount="+ activeThreadCount
+", largestPoolSize="+ largestPoolSize
+", completedTaskCount="+ completedTaskCount +", taskCount="
+ taskCount +", workQueueSize="+ workQueueSize
+", isShutdown="+ isShutdown +", isTerminated="
+ isTerminated +", isTerminating="+ isTerminating +"]";
}
}
三.总结:
本篇结合jdk源码重点讲述了ThreadPoolExecutor和ExecutorService.另外提供了本人封装的一些线程池相关的库源码.
相关推荐
YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;
项目包含完整前后端源码和数据库文件 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7
weixin138社区互助养老+ssm(论文+源码)_kaic.zip
光纤到户及通信基础设施报装申请表.docx
项目包含完整前后端源码和数据库文件 环境说明: 开发语言:Java 框架:ssm,mybatis JDK版本:JDK1.8 数据库:mysql 5.7 数据库工具:Navicat11 开发软件:eclipse/idea Maven包:Maven3.3 服务器:tomcat7
功能完善的电商数据智能爬虫采集系统项目全套技术资料.zip
YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;
### Android程序开发初级教程(一):初识Android **平台概述** Google推出的Android操作系统平台已经正式亮相,这是一个基于Linux内核的开源操作系统。对于开发者而言,了解其架构和支持的开发语言至关重要。以下是Android平台的架构概览: **平台架构及功能** 1. **应用框架(Application Framework)**:包含可重用和可替换的组件,确保所有软件在该层面上的平等性。 2. **Dalvik虚拟机(Dalvik Virtual Machine)**:一个基于Linux的虚拟机,为Android应用提供运行环境。 3. **集成浏览器(Integrated Browser)**:基于开源WebKit引擎的浏览器,位于应用层。 4. **优化图形(Optimized Graphics)**:包括自定义的2D图形库和遵循OpenGL ES 1.0标准的3D实现。 5. **SQLite数据库**:用于数据存储。 6. **多媒体支持(Media Support)**:支持通用音频、视频以及多种图片格式(如MPEG4, H.264
内容概要:本文档是《组合数学答案-网络流传版.pdf》的内容,主要包含了排列组合的基础知识以及一些经典的组合数学题目。这些题目涵盖了从排列数计算、二项式定理的应用到容斥原理的实际应用等方面。通过对这些题目的解析,帮助读者加深对组合数学概念和技巧的理解。 适用人群:适合初学者和有一定基础的学习者。 使用场景及目标:可以在学习组合数学课程时作为练习题参考,也可以在复习考试或准备竞赛时使用,目的是提高解决组合数学问题的能力。 其他说明:文档中的题目覆盖了组合数学的基本知识点,适合逐步深入学习。每个题目都有详细的解答步骤,有助于读者掌握解题思路和方法。
.net core mvc在线考试系统asp.net考试系统源码考试管理系统 主要技术: 基于.net core mvc架构和sql server数据库,数据库访问采用EF core code first,前端采用vue.js和bootstrap。 功能模块: 系统包括前台和后台两个部分,分三种角色登录。 管理员登录后台,拥有科目管理,题库管理,考试管理,成绩管理,用户管理等功能。 教师登录后台,可进行题库管理,考试管理和成绩管理。 用户登录前台,可查看考试列表,参加考试,查看已考试的结果,修改密码等。 系统实现了国际化,支持中英两种语言。 源码打包: 包含全套源码,数据库文件,需求分析和代码说明文档。 运行环境: 运行需vs2019或者以上版本,sql server2012或者以上版本。
YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;
包含了登陆注册、用户管理、部门管理、文件管理、权限管理、日志管理、个人中心、数据字典和代码生成这九个功能模块 系统采用了基于角色的访问控制,角色和菜单关联,一个角色可以配置多个菜单权限;然后再将用户和角色关联,一位用户可以赋予多个角色。这样用户就可以根据角色拿到该有的菜单权限,更方便管理者进行权限管控。 本系统还封装了文件管理功能,在其他模块如若要实现图片/文件上传预览时,前端只需导入现成的 Vue 组件即可实现(使用 viewerjs 依赖实现),后端只需定义 String 类型的实体类变量即可,无需再去研究文件上传预览的相关功能,简化了开发者的工作量。内容来源于网络分享,如有侵权请联系我删除。另外如果没有积分的同学需要下载,请私信我。
三相10Kw光伏并网逆变器。包含全套理图 PCB 源代码
GJB 5236-2004 军用软件质量度量文档,本称准规定了车用软件产品的质重模型和基本的度量。本标准为确定车用软件质量需求和衡量军用 软件产品的能力提供了一个框架。
基于MATLAB车牌识别系统【GUI含界面】.zip。内容来源于网络分享,如有侵权请联系我删除。另外如果没有积分的同学需要下载,请私信我。
【宿舍管理系统】是一种专为高校或住宿机构设计的信息化解决方案,旨在提高宿舍管理的效率和准确性。该系统包含了多项核心功能,如宿舍管理员管理、宿舍信息维护、查询、卫生检查以及电费缴纳等,旨在实现全面的宿舍运营自动化。 **宿舍管理员管理**功能允许指定的管理员进行用户权限分配和角色设定。这包括对管理员账户的创建、修改和删除,以及设置不同的操作权限,例如只读、编辑或管理员权限。通过这样的权限控制,可以确保数据的安全性和管理的规范性。 **宿舍添加与管理**是系统的基础模块。管理员可以录入宿舍的基本信息,如宿舍号、楼栋、楼层、房间类型(单人间、双人间等)、容纳人数、设施配置等。此外,系统还支持批量导入或导出宿舍信息,方便数据的备份和迁移。 **查询功能**是系统的重要组成部分,它允许管理员和学生根据不同的条件(如宿舍号、楼栋、学生姓名等)快速查找宿舍信息。此外,系统还可以生成各种统计报告,如宿舍占用率、空闲宿舍数量等,以便于决策者进行资源优化。 **卫生检查**功能则是对宿舍卫生状况进行定期评估。管理员可设定检查计划,包括检查周期、评分标准等,并记录每次检查的结果。系统能自动生成卫生报表,用于
YOLO系列算法目标检测数据集,包含标签,可以直接训练模型和验证测试,数据集已经划分好,包含数据集配置文件data.yaml,适用yolov5,yolov8,yolov9,yolov7,yolov10,yolo11算法; 包含两种标签格:yolo格式(txt文件)和voc格式(xml文件),分别保存在两个文件夹中,文件名末尾是部分类别名称; yolo格式:<class> <x_center> <y_center> <width> <height>, 其中: <class> 是目标的类别索引(从0开始)。 <x_center> 和 <y_center> 是目标框中心点的x和y坐标,这些坐标是相对于图像宽度和高度的比例值,范围在0到1之间。 <width> 和 <height> 是目标框的宽度和高度,也是相对于图像宽度和高度的比例值; 【注】可以下拉页面,在资源详情处查看标签具体内容;
九缸星形发动机点火器3D
本项目可以作为小程序毕设项目,主要功能为音乐播放器,主要功能是:可以播放歌曲(采用mp3网络连接实现)、专辑封面播放时可以旋转,能够实现开始和暂停播放,可以点击下一首歌曲,主页面实现动态轮播图
出差审批单(表格模板).docx