Jdk1.6 JUC源码解析(20)-Executors
作者:大飞
功能简介:
- Executors是JUC包提供的一个工具性质的帮助类,它针对ExecutorService、ScheduledExecutorService、ThreadFactory和Callable提供了一系列工厂方法和工具方法。
源码分析:
- 首先看下针对ExecutorService提供的一些工厂方法:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }通过之前文章中对ThreadPoolExecutor的分析可知:
1.这个方法创建了一个核心线程数量和最大线程数量一致的,并且任务队列是无界队列的线程池。
2.由于默认核心线程不会超时,所以超时相关的参数也没有意义。
3.如果在线程关闭之前,一个工作线程由于某种原因挂了,那么线程池会自动补上一个新的工作线程。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);除了能定制ThreadFactory之外,和上个方法一样。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }这个工厂方法看上去有点类似newFixedThreadPool(1) ,但有一点儿区别,这个不能重新调整配置(比如动态增大核心线程数量)了,由于方法内返回的不是ThreadPoolExecutor实例,而是一个包装类:
static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } protected void finalize() { super.shutdown();//被垃圾回收时,关闭线程池。 } } /** * 包装类,方法代理到内部的ExecutorService,只暴漏ExecutorService定义的方法。 */ static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } }
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }除了能定制ThreadFactory之外,和上个方法一样。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }通过之前文章中对ThreadPoolExecutor的分析可知:
1.这个方法创建了一个核心线程数量为0,最大线程(可以认为)无上限,并且任务队列是同步队列(无实际容量)的线程池。
2.针对每一个新任务,如果当前没有空闲线程,都会创建一个新的工作线程来处理任务。工作线程默认空闲超过60秒超时被回收。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory);除了能定制ThreadFactory之外,和上个方法一样。
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedExecutorService(executor); }
DelegatedExecutorService这面已经看到过,这个方法就相当于将一个ExecutorService包装成一个不可配置的ExecutorService。
- 继续看下针对ScheduledExecutorService提供的一些工厂方法:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
通过之前文章中对ScheduledThreadPoolExecutor的分析可知:
1.这个方法创建了一个给定(核心)线程数量的ScheduledThreadPoolExecutor(由于其内部的任务队列是无界的,所以尽管继承自ThreadPoolExecutor,但最大线程数量无意义)。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); }类似newScheduledThreadPool(1) (这个其实看起来更像是一个加强版的Timer),但不能调整配置:
static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService { private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); e = executor; } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return e.schedule(command, delay, unit); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { return e.schedule(callable, delay, unit); } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return e.scheduleAtFixedRate(command, initialDelay, period, unit); } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return e.scheduleWithFixedDelay(command, initialDelay, delay, unit); } }
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedScheduledExecutorService(executor); }相当于将一个ScheduledExecutorService包装成一个不可配置的ScheduledExecutorService。
- 再看下针对ThreadFactory提供的一些工厂方法:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); }
返回一个DefaultThreadFactory实例。在创建ThreadPoolExecutor和ScheduledThreadPoolExecutor时如果没有显式指定ThreadFactory,会默认使用这个,看下实现:
static class DefaultThreadFactory implements ThreadFactory { static final AtomicInteger poolNumber = new AtomicInteger(1); final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null)? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
public static ThreadFactory privilegedThreadFactory() { return new PrivilegedThreadFactory(); } static class PrivilegedThreadFactory extends DefaultThreadFactory { private final ClassLoader ccl; private final AccessControlContext acc; PrivilegedThreadFactory() { super(); this.ccl = Thread.currentThread().getContextClassLoader(); this.acc = AccessController.getContext(); acc.checkPermission(new RuntimePermission("setContextClassLoader")); } public Thread newThread(final Runnable r) { return super.newThread(new Runnable() { public void run() { AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { Thread.currentThread().setContextClassLoader(ccl); r.run(); return null; } }, acc); } }); } }
privilegedThreadFactory和defaultThreadFactory返回的工厂类会创建设置相同的Thread,只是PrivilegedThreadFactory创建的Thread会使用和当前线程(创建线程)相同的访问控制和类加载器。
- 最后看下针对Callable提供的一些工具方法:
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
将一个Runnable和一个返回值包装成一个Callable,返回的这个适配类之前也见过:
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
public static Callable<Object> callable(Runnable task) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<Object>(task, null); }
上面方法的重载版本,返回值默认为null。
当然也会涉及到有访问控制和类加载器设定的工具方法:
public static Callable<Object> callable(final PrivilegedAction<?> action) { if (action == null) throw new NullPointerException(); return new Callable<Object>() { public Object call() { return action.run(); }}; } public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) { if (action == null) throw new NullPointerException(); return new Callable<Object>() { public Object call() throws Exception { return action.run(); }}; } public static <T> Callable<T> privilegedCallable(Callable<T> callable) { if (callable == null) throw new NullPointerException(); return new PrivilegedCallable<T>(callable); } public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) { if (callable == null) throw new NullPointerException(); return new PrivilegedCallableUsingCurrentClassLoader<T>(callable); } static final class PrivilegedCallable<T> implements Callable<T> { private final AccessControlContext acc; private final Callable<T> task; private T result; private Exception exception; PrivilegedCallable(Callable<T> task) { this.task = task; this.acc = AccessController.getContext(); } public T call() throws Exception { AccessController.doPrivileged(new PrivilegedAction<T>() { public T run() { try { result = task.call(); } catch (Exception ex) { exception = ex; } return null; } }, acc); if (exception != null) throw exception; else return result; } } static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> { private final ClassLoader ccl; private final AccessControlContext acc; private final Callable<T> task; private T result; private Exception exception; PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) { this.task = task; this.ccl = Thread.currentThread().getContextClassLoader(); this.acc = AccessController.getContext(); acc.checkPermission(new RuntimePermission("getContextClassLoader")); acc.checkPermission(new RuntimePermission("setContextClassLoader")); } public T call() throws Exception { AccessController.doPrivileged(new PrivilegedAction<T>() { public T run() { Thread t = Thread.currentThread(); try { ClassLoader cl = t.getContextClassLoader(); if (ccl == cl) { result = task.call(); } else { t.setContextClassLoader(ccl); try { result = task.call(); } finally { t.setContextClassLoader(cl); } } } catch (Exception ex) { exception = ex; } return null; } }, acc); if (exception != null) throw exception; else return result; } }
Executors的代码解析完毕!
相关推荐
aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-15.8.0-jdk1.6aspose-words-...
2部分: jdk-1.6-windows-64-01 jdk-1.6-windows-64-02
1.okhttp3.8源码使用jdk1.6重新编译,已集成了okio,在javaweb项目中使用,未在安卓项目中使用 2.okhttp3.8源码使用jdk1.6重新编译_okhttp3.8.0-jdk1.6.jar
1. 解压缩"java-jdk1.6-jdk-6u45-windows-x64.zip"文件,这将释放出"jdk-6u45-windows-x64.exe"可执行文件。 2. 双击运行"jdk-6u45-windows-x64.exe",安装向导会引导你完成安装过程。通常,你需要选择安装路径,...
下载的压缩包文件"jdk-6u45-windows-x64(1.6 64).exe"是Windows 64位系统的安装程序。安装过程中,用户需要选择安装路径,并设置环境变量,包括`JAVA_HOME`指向JDK的安装目录,`PATH`添加JDK的bin目录,确保系统可以...
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
标题中的"jdk-jdk1.6.0.24-windows-i586.exe"是一个Java Development Kit(JDK)的安装程序,适用于Windows操作系统且为32位版本。JDK是Oracle公司提供的一个用于开发和运行Java应用程序的软件包。这个特定的版本,...
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
标题中的“jdk1.6集成jjwt的问题”指的是在Java Development Kit (JDK) 版本1.6的环境下,尝试整合JSON Web Token (JWT) 库jjwt时遇到的挑战。JWT是一种开放标准(RFC 7519),用于在各方之间安全地传输信息作为 ...
logback-cfca-jdk1.6-3.1.0.0.jar
三部分: jdk-1.6-windows-32-1 jdk-1.6-windows-32-2 jdk-1.6-windows-32-3
三部分: jdk-1.6-linux-64-1 jdk-1.6-linux-64-2 jdk-1.6-linux-64-3
### JDK1.6安装及与JDK-1.5版本共存 #### 一、前言 随着软件开发环境的变化和技术的进步,不同的项目可能需要不同的Java版本来支持其运行。例如,在某些特定环境下,可能既需要使用JDK1.5(Java Development Kit ...
- 这可能是ZXing库的完整源码包,专门针对JDK1.6编译,包含了所有必要的源文件和资源,供开发者进行更深度的定制和集成。 总之,ZXing库是一个强大的条形码和二维码工具,这个特别适配JDK1.6的版本为那些仍在使用...
jdk-1.6-linux-32-1 jdk-1.6-linux-32-2 jdk-1.6-linux-32-3
JDK1.6是Oracle公司发布的一个较早版本,适用于Windows操作系统。在这个解压版中,用户无需进行安装过程,可以直接在Windows环境下使用JDK的各个工具。 JDK1.6包含的主要组件有: 1. **Java编译器**(javac):...
这个压缩包文件"jdk-6u45-linux-x64.zip"包含的是JDK 1.6.0_45(也被称为6u45或1.6u45)的64位Linux版本。JDK 1.6是Java平台标准版的一个重要版本,它提供了许多功能和性能改进,是许多企业级应用的基础。 JDK 1.6u...
压缩包中的文件`jdk-6u45-windows-i586.exe`是JDK 1.6更新45的Windows 32位安装程序。安装步骤通常包括: 1. 下载并运行安装程序。 2. 遵循安装向导的提示,选择安装路径和组件。 3. 设置环境变量,包括`JAVA_HOME`...
java环境搭建 jdk6(包含jre)64位 jdk-6u45-windows-x64
《OkHttp3.8.0-JDK1.6:低版本环境下的高效网络通信库》 OkHttp3.8.0-jdk1.6.zip是一个专门为Java Web项目设计的网络通信库,它针对JDK1.6进行了优化和重新编译,确保在较低版本的Java环境中也能稳定运行。OkHttp,...