- 浏览: 980956 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等) :
http://donald-draper.iteye.com/blog/2366934
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):
http://donald-draper.iteye.com/blog/2367064
ThreadPoolExecutor解析三(线程池执行提交任务):
http://donald-draper.iteye.com/blog/2367199
ThreadPoolExecutor解析四(线程池关闭):
http://donald-draper.iteye.com/blog/2367246
ScheduledThreadPoolExecutor解析一(调度任务,任务队列):
http://donald-draper.iteye.com/blog/2367332
ScheduledThreadPoolExecutor解析二(任务调度):
http://donald-draper.iteye.com/blog/2367593
ScheduledThreadPoolExecutor解析三(关闭线程池):
http://donald-draper.iteye.com/blog/2367698
从上面来看Executors创建固定线程池,实际为ThreadPoolExecutor,核心线程池数量和最大线程池数量相等并且固定,任务队列为LinkedBlockingQueue;创建单线程执行器,通过单线程执行器代理,实际为线程池执行器的ThreadPoolExecutor静态代理,核心线程池数量和最大线程池数量相等并且为1,任务队列为LinkedBlockingQueue。
再来看newCachedThreadPool
从newCachedThreadPool方法来看,核心线程池为0,最大线程池为Integer.MAX_VALUE,
工作线程可空闲时间为60秒,任务队列为SynchronousQueue。newCachedThreadPool方法
创建的线程适合需要执行大量执行时间短的异步任务场景。
再来看调度执行器创建:
从上面来看单线程调度器实际为调度执行器静态代理,实际的调度执行器为ScheduledThreadPoolExecutor。
根据核心线程池数量和线程工厂,构造调度线程池:
不可重新配置的线程池执行器和调度线程池执行器,通过线程池执行器和调度线程池执行器的静态代理来实现。
默认的线程工厂:这个前面已说,这里仅贴出代码,保证完整性
其他方法:
总结:
Executors创建固定线程池,实际为ThreadPoolExecutor,核心线程池数量和最大线程池数量相等并且固定,任务队列为LinkedBlockingQueue;创建单线程执行器,通过单线程执行器代理,实际为线程池执行器的ThreadPoolExecutor静态代理,核心线程池数量和最大线程池数量相等并且为1,任务队列为LinkedBlockingQueue。newCachedThreadPool方法来看,核心线程池为0,最大线程池为Integer.MAX_VALUE,工作线程可空闲时间为60秒,任务队列为SynchronousQueue。newCachedThreadPool方法创建的线程适合需要执行大量执行时间短的异步任务场景。单线程调度器实际为调度执行器静态代理,实际的调度执行器为ScheduledThreadPoolExecutor。newScheduledThreadPool返回的调度执行器为ScheduledThreadPoolExecutor。不可重新配置的线程池执行器和调度线程池执行器,通过线程池执行器和调度线程池执行器的静态代理来实现。
附:
下面的是Executors其余部分,将Runnable和Callable形式的线程包装在与当前线程具有相同控制权限和类加载器的环境下执行,
这一部分可以当扩展。
//PrivilegedThreadFactory
再来看PrivilegedAction的定义:
//PrivilegedExceptionAction,与PrivilegedAction基本相同,不同的是在赋予权限后,可能会
抛出检查异常
从上面可以看出PrivilegedThreadFactory,权限线程工厂创建的线程与创建权限线程工厂的线程具有相同的上下文访问控制权限和上下文类加载器。权限线程工厂创建的线程线程时首先检查当前线程是否有获取和设置上下文类加载器权限,有则,将权限赋予创建的线程,并在权限动作PrivilegedAction中,设置任务线程的上下类加载器与当前线程相同,并执行任务。如果想要创建线程具有与当前线程具有相同的ThreadLocal和InheritableThreadLocal变量的,则可以重写ThreadPoolExecutor#beforeExecute方法,在beforeExecute中设置或重置ThreadLocal和InheritableThreadLocal变量。
简单看一下ThreadLocal和InheritableThreadLocal的javaDoc,具体代码以后有时间,我们在深入;
//ThreadLocal
//InheritableThreadLocal
再来看执行器Executors的其他方法:
http://donald-draper.iteye.com/blog/2366934
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等):
http://donald-draper.iteye.com/blog/2367064
ThreadPoolExecutor解析三(线程池执行提交任务):
http://donald-draper.iteye.com/blog/2367199
ThreadPoolExecutor解析四(线程池关闭):
http://donald-draper.iteye.com/blog/2367246
ScheduledThreadPoolExecutor解析一(调度任务,任务队列):
http://donald-draper.iteye.com/blog/2367332
ScheduledThreadPoolExecutor解析二(任务调度):
http://donald-draper.iteye.com/blog/2367593
ScheduledThreadPoolExecutor解析三(关闭线程池):
http://donald-draper.iteye.com/blog/2367698
package java.util.concurrent; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.security.PrivilegedActionException; import java.security.AccessControlException; import sun.security.util.SecurityConstants; /** * Factory and utility methods for {@link Executor}, {@link * ExecutorService}, {@link ScheduledExecutorService}, {@link * ThreadFactory}, and {@link Callable} classes defined in this * package. This class supports the following kinds of methods: * Executors提供了工厂方法和有效方法为Executor,ExecutorService, ScheduledExecutorService,ThreadFactory,Callable。提供一下方法: * [list] * <li> Methods that create and return an {@link ExecutorService} * set up with commonly useful configuration settings. 根据通用的配置参数,创建并返回一个ExecutorService * <li> Methods that create and return a {@link ScheduledExecutorService} * set up with commonly useful configuration settings. 根据通用的配置参数,创建并返回一个ScheduledExecutorService * <li> Methods that create and return a "wrapped" ExecutorService, that * disables reconfiguration by making implementation-specific methods * inaccessible. 创建并返回一个包装的ExecutorService,不能重新配置 * <li> Methods that create and return a {@link ThreadFactory} * that sets newly created threads to a known state. 创建并返回一个ThreadFactory,设置创建线程为指定状态 * <li> Methods that create and return a {@link Callable} * out of other closure-like forms, so they can be used * in execution methods requiring <tt>Callable</tt>. 创建并返回一个闭包形式的Callable,以便可以在执行方法中执行需要的形式的Callable。 * [/list] * * @since 1.5 * @author Doug Lea */ public class Executors { /** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * <tt>nThreads</tt> threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * 创建一个可以重用的固定数量的工作线程和无界的共享队列的线程池。在任何时候, 最多有nThreads个工作线程。如果所有的工作线程在执行任务,新提交的任务 将会在任务队列中等待,直到有工作线程可利用。如果在线程池关闭之前,如果 有任务在执行中失败并结束,需要的话,则一个新工作线程将会创建,替代旧的 工作线程。直到线程池关闭,线程池中的工作线程才退出。 * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } //与上一个方法的区别增加了线程池参数ThreadFactory public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } /** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * <tt>newFixedThreadPool(1)</tt> the returned executor is * guaranteed not to be reconfigurable to use additional threads. * 创建单工作线程与无界队列的执行器(如果工作线程在线程池关闭之前,执行任务的过程, 工作线程由于失败结束,则一个新工作线程将会创建,替代旧的工作线程)。 * @return the newly created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } //FinalizableDelegatedExecutorService,所有操作委托给执行器代理 static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { FinalizableDelegatedExecutorService(ExecutorService executor) { super(executor); } //回收时关闭执行器 protected void finalize() { super.shutdown(); } } /** 执行器静态代理,所有方法委托给内部执行器 * A wrapper class that exposes only the ExecutorService methods * of an ExecutorService implementation. */ static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List<Runnable> shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future<?> submit(Runnable task) { return e.submit(task); } public <T> Future<T> submit(Callable<T> task) { return e.submit(task); } public <T> Future<T> submit(Runnable task, T result) { return e.submit(task, result); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { return e.invokeAll(tasks); } public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { return e.invokeAll(tasks, timeout, unit); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { return e.invokeAny(tasks); } public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return e.invokeAny(tasks, timeout, unit); } } }
//创建单线程执行器,与前一个方法的不同为,添加了线程工厂参数ThreadFactory public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
从上面来看Executors创建固定线程池,实际为ThreadPoolExecutor,核心线程池数量和最大线程池数量相等并且固定,任务队列为LinkedBlockingQueue;创建单线程执行器,通过单线程执行器代理,实际为线程池执行器的ThreadPoolExecutor静态代理,核心线程池数量和最大线程池数量相等并且为1,任务队列为LinkedBlockingQueue。
再来看newCachedThreadPool
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to <tt>execute</tt> will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * 创建一个线程池,可以根据需要创建工作线程,如果有空闲工作线程,并且可用, 则重用工作线程。这个线程池可以显著地改善执行大量执行时间短的异步任务场景的性能。 当调用execute方法时,将重用空闲的工作线程。如果没有工作线程可利用,则 将创建新的工作线程,添加到线程池。如果工作线程在60之内,没有执行任务,那个将会 从工作线程缓存中移除。这样工作线程空闲足够的时间等待任务,并不会消耗太多的资源。 * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } //带线程工厂ThreadFactory参数的newCachedThreadPool方法 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
从newCachedThreadPool方法来看,核心线程池为0,最大线程池为Integer.MAX_VALUE,
工作线程可空闲时间为60秒,任务队列为SynchronousQueue。newCachedThreadPool方法
创建的线程适合需要执行大量执行时间短的异步任务场景。
再来看调度执行器创建:
/** * Creates a single-threaded executor that can schedule commands * to run after a given delay, or to execute periodically. * (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * <tt>newScheduledThreadPool(1)</tt> the returned executor is * guaranteed not to be reconfigurable to use additional threads. 创建一个单线程的执行器可以执行延时任务和间歇性的任务(如果工作线程在线程池关闭之前,执行任务的过程, 工作线程由于失败结束,则一个新工作线程将会创建,替代旧的工作线程)。 保证任务按顺序执行,在任何时候不会有两个任务同时执行。 * @return the newly created scheduled executor */ public static ScheduledExecutorService newSingleThreadScheduledExecutor() { //返回调度执行器代理 return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); }
/** 调度执行器静态代理,所有的方法通过内部的调度执行器 * A wrapper class that exposes only the ScheduledExecutorService * methods of a ScheduledExecutorService implementation. */ static class DelegatedScheduledExecutorService extends DelegatedExecutorService implements ScheduledExecutorService { private final ScheduledExecutorService e; DelegatedScheduledExecutorService(ScheduledExecutorService executor) { super(executor); e = executor; } public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { return e.schedule(command, delay, unit); } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { return e.schedule(callable, delay, unit); } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { return e.scheduleAtFixedRate(command, initialDelay, period, unit); } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { return e.scheduleWithFixedDelay(command, initialDelay, delay, unit); } }
//此方与上一个方法的不同为,待线程工厂参数 public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); }
从上面来看单线程调度器实际为调度执行器静态代理,实际的调度执行器为ScheduledThreadPoolExecutor。
根据核心线程池数量和线程工厂,构造调度线程池:
/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle. * @return a newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } //不可重新配置的线程池执行器 public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedExecutorService(executor); } //不可重新配置的调度线程池执行器 public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedScheduledExecutorService(executor); }
不可重新配置的线程池执行器和调度线程池执行器,通过线程池执行器和调度线程池执行器的静态代理来实现。
默认的线程工厂:这个前面已说,这里仅贴出代码,保证完整性
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } /** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
其他方法:
//根据Runnable和结果类型result创建一个Callable public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } //根据Runnable创建一个无返回结果的Callable public static Callable<Object> callable(Runnable task) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<Object>(task, null); } //RunnableAdapter /** * A callable that runs given task and returns given result */ static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
总结:
Executors创建固定线程池,实际为ThreadPoolExecutor,核心线程池数量和最大线程池数量相等并且固定,任务队列为LinkedBlockingQueue;创建单线程执行器,通过单线程执行器代理,实际为线程池执行器的ThreadPoolExecutor静态代理,核心线程池数量和最大线程池数量相等并且为1,任务队列为LinkedBlockingQueue。newCachedThreadPool方法来看,核心线程池为0,最大线程池为Integer.MAX_VALUE,工作线程可空闲时间为60秒,任务队列为SynchronousQueue。newCachedThreadPool方法创建的线程适合需要执行大量执行时间短的异步任务场景。单线程调度器实际为调度执行器静态代理,实际的调度执行器为ScheduledThreadPoolExecutor。newScheduledThreadPool返回的调度执行器为ScheduledThreadPoolExecutor。不可重新配置的线程池执行器和调度线程池执行器,通过线程池执行器和调度线程池执行器的静态代理来实现。
附:
下面的是Executors其余部分,将Runnable和Callable形式的线程包装在与当前线程具有相同控制权限和类加载器的环境下执行,
这一部分可以当扩展。
/** * Returns a thread factory used to create new threads that * have the same permissions as the current thread. * This factory creates threads with the same settings as {@link * Executors#defaultThreadFactory}, additionally setting the * AccessControlContext and contextClassLoader of new threads to * be the same as the thread invoking this * <tt>privilegedThreadFactory</tt> method. A new * <tt>privilegedThreadFactory</tt> can be created within an * {@link AccessController#doPrivileged} action setting the * current thread's access control context to create threads with * the selected permission settings holding within that action. 返回一个与当前线程具有相同权限的线程工厂,用于创建线程。线程工厂与 默认执行器有着相同的配置,不同的是新创建线程的访问控制上下文AccessControlContex和 上下文类加载器contextClassLoader与调用privilegedThreadFactory的线程相同。 在AccessController#doPrivileged动作设置当前可以访问控制创建线程的已选择的 配置权限时,PrivilegedThreadFactory被创建。 * * <p> Note that while tasks running within such threads will have * the same access control and class loader settings as the * current thread, they need not have the same {@link * java.lang.ThreadLocal} or {@link * java.lang.InheritableThreadLocal} values. If necessary, * particular values of thread locals can be set or reset before * any task runs in {@link ThreadPoolExecutor} subclasses using * {@link ThreadPoolExecutor#beforeExecute}. Also, if it is * necessary to initialize worker threads to have the same * InheritableThreadLocal settings as some other designated * thread, you can create a custom ThreadFactory in which that * thread waits for and services requests to create others that * will inherit its values. * 当任务线程执行时,将会有和当前线程一样的访问控制权限和类加载器, 而不需要相同的ThreadLocal和InheritableThreadLocal。如果需要,在任务 被工作线程执行的ThreadPoolExecutor#beforeExecute的方法中可以设置或重置 thread locals值。如果需要初始化工作线程与一些特定的线程具有相同的 InheritableThreadLocal,你可以创建一个定制的线程工厂,当线程等待请求时, 请求过来,创建一个新的线程处理请求,新的线程将会继承创建它的线程的属性值。 * @return a thread factory * @throws AccessControlException if the current access control * context does not have permission to both get and set context * class loader. */ public static ThreadFactory privilegedThreadFactory() { return new PrivilegedThreadFactory(); }
//PrivilegedThreadFactory
/** * Thread factory capturing access control context and class loader */ static class PrivilegedThreadFactory extends DefaultThreadFactory { private final AccessControlContext acc; private final ClassLoader ccl; PrivilegedThreadFactory() { super(); SecurityManager sm = System.getSecurityManager(); if (sm != null) { //检查获取和设置上下文类加载器权限 // Calls to getContextClassLoader from this class // never trigger a security check, but we check // whether our callers have this permission anyways. sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION); // Fail fast sm.checkPermission(new RuntimePermission("setContextClassLoader")); } //获取当前线程的访问控制上下文,和上下文类加载器 this.acc = AccessController.getContext(); this.ccl = Thread.currentThread().getContextClassLoader(); } public Thread newThread(final Runnable r) { return super.newThread(new Runnable() { public void run() { //设置当前线程的访问控制权限 AccessController.doPrivileged(new PrivilegedAction<Void>() { public Void run() { //设置创建线程的上下文类加载器 Thread.currentThread().setContextClassLoader(ccl); r.run(); return null; } }, acc); } }); } }
再来看PrivilegedAction的定义:
/** * A computation to be performed with privileges enabled. The computation is * performed by invoking <code>AccessController.doPrivileged</code> on the * <code>PrivilegedAction</code> object. This interface is used only for * computations that do not throw checked exceptions; computations that * throw checked exceptions must use <code>PrivilegedExceptionAction</code> * instead. * 一个计算操作可以,以赋予的权限执行。这个计算操作在AccessController.doPrivileged方法 的PrivilegedAction中执行。这个接口不会抛出检查异常,如果操作需要抛出检查异常, 则用PrivilegedExceptionAction。 * @see AccessController * @see AccessController#doPrivileged(PrivilegedAction) * @see PrivilegedExceptionAction */ public interface PrivilegedAction<T> { /** * Performs the computation. This method will be called by * <code>AccessController.doPrivileged</code> after enabling privileges. * 此方放将会在AccessController.doPrivileged开启权限后执行。 * @return a class-dependent value that may represent the results of the * computation. Each class that implements * <code>PrivilegedAction</code> * should document what (if anything) this value represents. * @see AccessController#doPrivileged(PrivilegedAction) * @see AccessController#doPrivileged(PrivilegedAction, * AccessControlContext) */ T run(); }
//PrivilegedExceptionAction,与PrivilegedAction基本相同,不同的是在赋予权限后,可能会
抛出检查异常
package java.security; /** * A computation to be performed with privileges enabled, that throws one or * more checked exceptions. The computation is performed by invoking * <code>AccessController.doPrivileged</code> on the * <code>PrivilegedExceptionAction</code> object. This interface is * used only for computations that throw checked exceptions; * computations that do not throw * checked exceptions should use <code>PrivilegedAction</code> instead. * * @see AccessController * @see AccessController#doPrivileged(PrivilegedExceptionAction) * @see AccessController#doPrivileged(PrivilegedExceptionAction, * AccessControlContext) * @see PrivilegedAction */ public interface PrivilegedExceptionAction<T> { /** * Performs the computation. This method will be called by * <code>AccessController.doPrivileged</code> after enabling privileges. * * @return a class-dependent value that may represent the results of the * computation. Each class that implements * <code>PrivilegedExceptionAction</code> should document what * (if anything) this value represents. * @throws Exception an exceptional condition has occurred. Each class * that implements <code>PrivilegedExceptionAction</code> should * document the exceptions that its run method can throw. * @see AccessController#doPrivileged(PrivilegedExceptionAction) * @see AccessController#doPrivileged(PrivilegedExceptionAction,AccessControlContext) */ T run() throws Exception; }
从上面可以看出PrivilegedThreadFactory,权限线程工厂创建的线程与创建权限线程工厂的线程具有相同的上下文访问控制权限和上下文类加载器。权限线程工厂创建的线程线程时首先检查当前线程是否有获取和设置上下文类加载器权限,有则,将权限赋予创建的线程,并在权限动作PrivilegedAction中,设置任务线程的上下类加载器与当前线程相同,并执行任务。如果想要创建线程具有与当前线程具有相同的ThreadLocal和InheritableThreadLocal变量的,则可以重写ThreadPoolExecutor#beforeExecute方法,在beforeExecute中设置或重置ThreadLocal和InheritableThreadLocal变量。
简单看一下ThreadLocal和InheritableThreadLocal的javaDoc,具体代码以后有时间,我们在深入;
//ThreadLocal
package java.lang; import java.lang.ref.*; import java.util.concurrent.atomic.AtomicInteger; /** * This class provides thread-local variables. These variables differ from * their normal counterparts in that each thread that accesses one (via its * <tt>get</tt> or <tt>set</tt> method) has its own, independently initialized * copy of the variable. <tt>ThreadLocal</tt> instances are typically private * static fields in classes that wish to associate state with a thread (e.g., * a user ID or Transaction ID). * ThreadLocal提供线程本地变量。这些变量不同于线程一般用get和set方法获取的变量, 它是一个独立的变量初始化拷贝。ThreadLocal实例是典型的私有静态访问fields,比如 我们希望每个线程关联一个状态,如用户或事物的ID * <p>For example, the class below generates unique identifiers local to each * thread. * A thread's id is assigned the first time it invokes <tt>ThreadId.get()</tt> * and remains unchanged on subsequent calls. 举个例子,ThreadId保证了每个线程用于一个本地唯一的标识。如果一个线程的id,在以第一次调用 ThreadId.get()方法时指定,接下来将不能改变 * <pre> * import java.util.concurrent.atomic.AtomicInteger; * * public class ThreadId { * // Atomic integer containing the next thread ID to be assigned * private static final AtomicInteger nextId = new AtomicInteger(0); * * // Thread local variable containing each thread's ID * private static final ThreadLocal<Integer> threadId = * new ThreadLocal<Integer>() { * @Override protected Integer initialValue() { * return nextId.getAndIncrement(); * } * }; * * // Returns the current thread's unique ID, assigning it if necessary * public static int get() { * return threadId.get(); * } * } * </pre> * <p>Each thread holds an implicit reference to its copy of a thread-local * variable as long as the thread is alive and the <tt>ThreadLocal</tt> * instance is accessible; after a thread goes away, all of its copies of * thread-local instances are subject to garbage collection (unless other * references to these copies exist). * 只要线程存活,每个线程将拥有一个隐式的线程本地变量ThreadLocal的副本,在一个线程结束 之后所有的线程本地变量将会被垃圾回收器回收,除非有其他副本引用。 * @author Josh Bloch and Doug Lea * @since 1.2 */ public class ThreadLocal<T> { //获取线程本地变量的拷贝 public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) return (T)e.value; } return setInitialValue(); } //设置初始化值 private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; } protected T initialValue() { //待子类扩展 return null; } 这个我们这里不多讲,以后我们会单写一篇文章。 }
//InheritableThreadLocal
package java.lang; import java.lang.ref.*; /** * This class extends <tt>ThreadLocal</tt> to provide inheritance of values * from parent thread to child thread: when a child thread is created, the * child receives initial values for all inheritable thread-local variables * for which the parent has values. Normally the child's values will be * identical to the parent's; however, the child's value can be made an * arbitrary function of the parent's by overriding the <tt>childValue</tt> * method in this class. * InheritableThreadLocal继承了ThreadLocal,提供了子线程继承父线程本地变量的实现; 当子线程被创建,子线程将会拥有父线程所有可继承的线程本地变量。一般情况子线程的本地 变量值与父线程相同;如果子线程重写的父线程的childValue,将不能保证。 * <p>Inheritable thread-local variables are used in preference to * ordinary thread-local variables when the per-thread-attribute being * maintained in the variable (e.g., User ID, Transaction ID) must be * automatically transmitted to any child threads that are created. 当每个线程的属性保存在一个变量中如用户ID和事务ID,如果是可继承的线程本地变量 必须自动的传给所有创建的子线程,可继承的线程本地变量优先于一般的线程变量被使用。 * @author Josh Bloch and Doug Lea * @see ThreadLocal * @since 1.2 */ public class InheritableThreadLocal<T> extends ThreadLocal<T> { /** * Computes the child's initial value for this inheritable thread-local * variable as a function of the parent's value at the time the child * thread is created. This method is called from within the parent * thread before the child is started. 在子线程创建时,初始化子线程从父线程继承的线程本地变量。这个方法在子线程启动之前, 父线程调用。 * <p> * This method merely returns its input argument, and should be overridden * if a different behavior is desired. * * @param parentValue the parent thread's value * @return the child thread's initial value */ protected T childValue(T parentValue) { return parentValue; } /** * Get the map associated with a ThreadLocal. * 获取线程关联ThreadLocal * @param t the current thread */ ThreadLocalMap getMap(Thread t) { return t.inheritableThreadLocals; } /** * Create the map associated with a ThreadLocal. * 创建一个线程关联ThreadLocal的Map * @param t the current thread * @param firstValue value for the initial entry of the table. * @param map the map to store. */ void createMap(Thread t, T firstValue) { t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue); } }
再来看执行器Executors的其他方法:
/** * Returns a {@link Callable} object that, when * called, runs the given privileged action and returns its result. 返回 一个包装权限动作的Callable,执行时,运行权限动作的run,并返回结果 * @param action the privileged action to run * @return a callable object * @throws NullPointerException if action null */ public static Callable<Object> callable(final PrivilegedAction<?> action) { if (action == null) throw new NullPointerException(); return new Callable<Object>() { public Object call() { return action.run(); }}; } /** * Returns a {@link Callable} object that, when * called, runs the given privileged exception action and returns * its result. 返回 一个包装权限异常动作的Callable,执行时,运行权限异常动作的run,并返回结果 * @param action the privileged exception action to run * @return a callable object * @throws NullPointerException if action null */ public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) { if (action == null) throw new NullPointerException(); return new Callable<Object>() { public Object call() throws Exception { return action.run(); }}; } /** * Returns a {@link Callable} object that will, when * called, execute the given <tt>callable</tt> under the current * access control context. This method should normally be * invoked within an {@link AccessController#doPrivileged} action * to create callables that will, if possible, execute under the * selected permission settings holding within that action; or if * not possible, throw an associated {@link * AccessControlException}. 返回一个Callable,Callable可以在与当前线程相同访问控制权限上下文的情况,执行。 Callable方法将会在AccessController#doPrivileged的创建的PrivilegedCallable中,执行, 如果可能话,将会在已选的权限Action中执行,否则抛出AccessControlException。 * @param callable the underlying task * @return a callable object * @throws NullPointerException if callable null * */ public static <T> Callable<T> privilegedCallable(Callable<T> callable) { if (callable == null) throw new NullPointerException(); return new PrivilegedCallable<T>(callable); } /** * A callable that runs under established access control settings 在已经建立的访问控制权限下,执行Callable,与权限线程工厂基本相同, 只是权限工厂创建出来的线程除了相同访问控制权限外,还有相同的类加载器。 */ static final class PrivilegedCallable<T> implements Callable<T> { private final Callable<T> task; private final AccessControlContext acc; PrivilegedCallable(Callable<T> task) { this.task = task; this.acc = AccessController.getContext(); } //有了前面的知识,这个应该很容易理解 public T call() throws Exception { try { return AccessController.doPrivileged( new PrivilegedExceptionAction<T>() { public T run() throws Exception { return task.call(); } }, acc); } catch (PrivilegedActionException e) { throw e.getException(); } } }
/** * Returns a {@link Callable} object that will, when * called, execute the given <tt>callable</tt> under the current * access control context, with the current context class loader * as the context class loader. This method should normally be * invoked within an {@link AccessController#doPrivileged} action * to create callables that will, if possible, execute under the * selected permission settings holding within that action; or if * not possible, throw an associated {@link * AccessControlException}. 返回一个Callable,Callable可以在与当前线程相同访问控制权限上下文和类加载器的情况,执行。Callable方法将会在AccessController#doPrivileged的创建的PrivilegedCallable中,执行,如果可能话,将会在已选的权限Action中执行,否则抛出AccessControlException。 * @param callable the underlying task * * @return a callable object * @throws NullPointerException if callable null * @throws AccessControlException if the current access control * context does not have permission to both set and get context * class loader. */ public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) { if (callable == null) throw new NullPointerException(); return new PrivilegedCallableUsingCurrentClassLoader<T>(callable); } /** * A callable that runs under established access control settings and * current ClassLoader 在已经建立的访问控制权限和当前类加载器下,执行Callable,与权限线程工厂基本相同, 相同访问控制权限外,还有相同的类加载器。 */ static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> { private final Callable<T> task; private final AccessControlContext acc; private final ClassLoader ccl; PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) { SecurityManager sm = System.getSecurityManager(); if (sm != null) { //检查获取和设置类加载的权限 // Calls to getContextClassLoader from this class // never trigger a security check, but we check // whether our callers have this permission anyways. sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION); // Whether setContextClassLoader turns out to be necessary // or not, we fail fast if permission is not available. sm.checkPermission(new RuntimePermission("setContextClassLoader")); } this.task = task; //访问控制上下文 this.acc = AccessController.getContext(); //当前线程类加载器 this.ccl = Thread.currentThread().getContextClassLoader(); } public T call() throws Exception { try { return AccessController.doPrivileged( new PrivilegedExceptionAction<T>() { public T run() throws Exception { Thread t = Thread.currentThread(); ClassLoader cl = t.getContextClassLoader(); if (ccl == cl) { //只有在与当前线程具有相同控制权限和类加载器的情况, //才执行任务 return task.call(); } else { t.setContextClassLoader(ccl); try { return task.call(); } finally { t.setContextClassLoader(cl); } } } }, acc); } catch (PrivilegedActionException e) { throw e.getException(); } } }
发表评论
-
ScheduledThreadPoolExecutor解析三(关闭线程池)
2017-04-06 20:52 4451ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析二(任务调度)
2017-04-06 12:56 2117ScheduledThreadPoolExecutor解析一( ... -
ScheduledThreadPoolExecutor解析一(调度任务,任务队列)
2017-04-04 22:59 4987Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析四(线程池关闭)
2017-04-03 23:02 9100Executor接口的定义:http: ... -
ThreadPoolExecutor解析三(线程池执行提交任务)
2017-04-03 12:06 6082Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析二(线程工厂、工作线程,拒绝策略等)
2017-04-01 17:12 3036Executor接口的定义:http://donald-dra ... -
ThreadPoolExecutor解析一(核心线程池数量、线程池状态等)
2017-03-31 22:01 20514Executor接口的定义:http://donald-dra ... -
ScheduledExecutorService接口定义
2017-03-29 12:53 1503Executor接口的定义:http://donald-dra ... -
AbstractExecutorService解析
2017-03-29 08:27 1071Executor接口的定义:http: ... -
ExecutorCompletionService解析
2017-03-28 14:27 1586Executor接口的定义:http://donald-dra ... -
CompletionService接口定义
2017-03-28 12:39 1061Executor接口的定义:http://donald-dra ... -
FutureTask解析
2017-03-27 12:59 1325package java.util.concurrent; ... -
Future接口定义
2017-03-26 09:40 1193/* * Written by Doug Lea with ... -
ExecutorService接口定义
2017-03-25 22:14 1159Executor接口的定义:http://donald-dra ... -
Executor接口的定义
2017-03-24 23:24 1672package java.util.concurrent; ... -
简单测试线程池拒绝执行任务策略
2017-03-24 22:37 2025线程池多余任务的拒绝执行策略有四中,分别是直接丢弃任务Disc ... -
JAVA集合类简单综述
2017-03-23 22:51 920Queue接口定义:http://donald-draper. ... -
DelayQueue解析
2017-03-23 11:00 1733Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析下-TransferQueue
2017-03-22 22:20 2133Queue接口定义:http://donald-draper. ... -
SynchronousQueue解析上-TransferStack
2017-03-21 22:08 3048Queue接口定义:http://donald-draper. ...
相关推荐
《深入解析Java Executors源码》 在Java多线程编程中,`java.util.concurrent.Executors`类扮演着至关重要的角色。它提供了一系列静态工厂方法,用于创建和管理线程池,以及与之相关的ExecutorService、...
分别为1、storm项目-流数据监控系列1《设计文档》2、storm项目-流数据监控系列2《代码解析》 3、storm项目-流数据监控系列3《实例运行》4、storm项目-流数据监控系列4《MetaQ接口》5、storm项目-流数据监控系列5...
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); // 更新歌词的定时任务 Runnable updateLyricTask = () -> { long currentTime = mediaPlayer.getCurrentPosition(); // ...
# 掌握并发的钥匙:Java Executor框架深度解析 Java作为一种广泛应用的编程语言,自1995年由Sun Microsystems公司(现属Oracle公司)首次发布以来,已经发展成为软件开发领域的重要工具。Java的设计目标包括跨平台...
ExecutorService mExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public ImageLoader() { initImageCache(); } private void initImageCache() { final ...
- 使用`ExecutorService`创建固定数量的工作线程,如`Executors.newFixedThreadPool(threadCount)`。 - 每个工作线程执行一个下载任务,下载对应的数据块。 4. **实现下载任务**: - 在每个下载任务中,首先根据...
以下是对JUC库的详细解析: 1. **线程池(ExecutorService)**: - `ExecutorService`是线程池的核心接口,它负责管理线程的创建和执行。通过`Executors`类提供的静态工厂方法,可以创建不同类型的线程池,如`...
3. Spark Driver程序初始化,创建SparkContext,解析用户应用程序。 4. SparkContext向Master注册应用程序,并请求计算资源来启动executors。 5. Master节点接收到请求后,分配Worker节点资源,并启动相应的...
而`BackgroundThread`则根据当前是否为Android运行环境来决定使用`java.util.concurrent.Executors`还是`AndroidExecutors`创建线程池,确保资源的有效利用。 引入Bolts库到Android项目中,只需要在Gradle依赖中...
例如,"nunaliit2-upload" 可用于处理用户上传的文件,然后"mogwee executors" 可以在后台异步处理这些文件,如进行文件解析、转换、存储等操作,同时保持应用的响应性。 综上所述,这两个开源项目为开发者提供了...
- **Executors工厂方法**:提供newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool和newWorkStealingPool等便捷的线程池创建方式。 6. **线程安全** - **线程不安全的集合**:如ArrayList、...
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3); ``` #### 4.3 可缓存线程池(CachedThreadPool) - 特点:线程数量无限制,空闲线程存活时间短,适合执行短期异步任务。 -...
Java FutureTask类使用案例解析 Java FutureTask类是一种异步计算的工具,用于执行长时间的任务并获取结果。它实现了Runnable和Future接口,既可以作为一个Runnable对象提交给Executor执行,也可以作为一个Future...
// 解析数据报,获取发送者的信息和数据 String received = new String(packet.getData(), 0, packet.getLength()); String senderIP = packet.getAddress().getHostAddress(); int senderPort = packet.getPort...
在实际开发中,Java还提供了一些预定义的线程池,如`Executors.newFixedThreadPool(int nThreads)`创建固定大小的线程池,`Executors.newSingleThreadExecutor()`创建只有一个线程的线程池等。这些预定义的线程池...
ExecutorService executor = Executors.newFixedThreadPool(10); List<Future<String>> futures = new ArrayList(); for (String ip : ips) { Future<String> future = executor.submit(() -> query(ip)); ...
ExecutorService executor = Executors.newFixedThreadPool(10); // 创建固定大小的线程池 for (String url : urls) { executor.submit(() -> { // 在这里执行每个网址的抓取任务 }); } executor.shutdown();...
- **自定义插件**:支持与其他框架(如JUnit、TestNG、Cucumber等)集成,可扩展报告的生成和解析能力。 - **多语言支持**:支持多种语言,便于国际化团队协作。 2. **Allure Report的使用流程:** - **安装与...
可以使用`Executors`工厂类创建不同类型的线程池。 4. **线程上下文切换**:线程上下文切换是CPU在不同线程间切换的过程,涉及保存和恢复线程状态。上下文切换开销包括CPU时间、内存占用等。常见的上下文切换原因有...
ExecutorService executorService = Executors.newFixedThreadPool(1000); IntStream.range(0, 1000).forEach(i -> executorService.execute(() -> stringRedisTemplate.opsForValue().increment("lcl",1))); ...