ThreadPoolExecutor机制 -
Jedis 与 ShardedJedis 设计
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { /** * False if should cancel/suppress periodic tasks on shutdown. */ private volatile boolean continueExistingPeriodicTasksAfterShutdown; /** * False if should cancel non-periodic tasks on shutdown. */ private volatile boolean executeExistingDelayedTasksAfterShutdown = true; /** * Creates a new ScheduledThreadPoolExecutor with the given core * pool size. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } /** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> * @throws NullPointerException if threadFactory is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory); } /** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> * @throws NullPointerException if handler is null */ public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), handler); } }
- corePoolSize: 任务执行线程池大小
- RejectedExecutionHandler: 任务拒绝策略,当线程池shutdown时,任务处理策略
- DelayedWorkQueue: 无界延迟队列,提交任务都加入队列中,由队列实现延迟执行功能
- MaximumPoolSize: 由于DelayedWorkQueue为无界队列,所以该值没有意义
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); schedule(command, 0, TimeUnit.NANOSECONDS); } // Override AbstractExecutorService methods public Future<?> submit(Runnable task) { return schedule(task, 0, TimeUnit.NANOSECONDS); } public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, TimeUnit.NANOSECONDS); } public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, TimeUnit.NANOSECONDS); }
ExecutorService的任务提交方式都有schedule(task,0,TimeUnit. NANOSECONDS)以延迟时间为0任务实现
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Object>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period))); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Boolean>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay))); delayedExecute(t); return t; }
- 封装任务:decorateTask()方法封装后,返回RunnableScheduledFuture
- 延迟执行:delayedExecute()延迟任务执行
/** * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. */ private void delayedExecute(Runnable command) { if (isShutdown()) { reject(command); return; } // Prestart a thread if necessary. We cannot prestart it // running the task because the task (probably) shouldn't be // run yet, so thread will just idle until delay elapses. if (getPoolSize() < getCorePoolSize()) prestartCoreThread(); super.getQueue().add(command); }
- 线程池关闭(runState!=RUNNING),由拒绝策略决定
- 否则,线程池小于核心线程池大小,启动新线程
- 添加任务到等待队列
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. If the * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has * been set <tt>false</tt>, existing delayed tasks whose delays * have not yet elapsed are cancelled. And unless the * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has * been set <tt>true</tt>, future executions of existing periodic * tasks will be cancelled. */ public void shutdown() { cancelUnwantedTasks(); super.shutdown(); } /** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @return list of tasks that never commenced execution. Each * element of this list is a {@link ScheduledFuture}, * including those tasks submitted using <tt>execute</tt>, which * are for scheduling purposes used as the basis of a zero-delay * <tt>ScheduledFuture</tt>. * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { return super.shutdownNow(); }
- 先调用cancelUnwantedTasks(),决定未执行任务时候继续执行
- 调用父类ThreadPoolExecutor方法
/** * Cancels and clears the queue of all tasks that should not be run * due to shutdown policy. */ private void cancelUnwantedTasks() { boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) super.getQueue().clear(); else if (keepDelayed || keepPeriodic) { Object[] entries = super.getQueue().toArray(); for (int i = 0; i < entries.length; ++i) { Object e = entries[i]; if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if (t.isPeriodic()? !keepPeriodic : !keepDelayed) t.cancel(false); } } entries = null; purge(); } }
- executeExistingDelayedTasksAfterShutdown : 执行已经存在与延迟队列任务
- continueExistingPeriodicTasksAfterShutdown : 继续执行周期任务
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** Sequence number to break ties FIFO */ private final long sequenceNumber; /** The time the task is enabled to execute in nanoTime units */ private long time; /** * Period in nanoseconds for repeating tasks. A positive * value indicates fixed-rate execution. A negative value * indicates fixed-delay execution. A value of 0 indicates a * non-repeating task. */ private final long period; /** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a periodic action with given nano time and period. */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a one-shot action with given nanoTime-based trigger. */ ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } //... }
- time:任务延迟执行时间
- period:任务重复执行周期;正值:固定频率执行;负值:定时延迟执行;零: 不重复执行
public void run() { if (isPeriodic()) runPeriodic(); else ScheduledFutureTask.super.run(); }
private void runPeriodic() { boolean ok = ScheduledFutureTask.super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { long p = period; if (p > 0) time += p; else time = triggerTime(-p); ScheduledThreadPoolExecutor.super.getQueue().add(this); } // This might have been the final executed delayed // task. Wake up threads to check. else if (down) interruptIdleWorkers(); }
- 首先调用runAndReset(),保证任务重复执行(任务执行后,重置state=0)
- 根据runAndReset返回状态,以及线程池是否关闭或者线程池shutdown且getContinueExistingPeriodicTasksAfterShutdownPolicy()是否继续执行已有中期任务策略;将任务放入任务队列
- 否则,若是最后延迟任务,则唤醒空闲线程检查
java concurrent包中任务执行框架分析
提到java并发编程必然绕不过java的线程和任务接口
JDK1.8源码分析 相关的原始码分析结果会以注解的形式体现到原始码中 已完成部分: ReentrantLock CountDownLatch Semaphore HashMap TreeMap LinkedHashMap ConcurrentHashMap 执行器 ...
7. 异步结果源码分析 异步结果通常通过 `Future` 接口获取,它提供了获取任务结果、判断任务是否完成、取消任务等功能。`ScheduledThreadPoolExecutor` 的 `delayedExecute()` 方法中,如果任务被取消,会调用 `task...
常见的实现类有`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`,前者用于执行提交的任务,后者则支持定时及周期性任务。 2. `Semaphore`:信号量,用于控制同时访问特定资源的线程数量,通过acquire()获取一...
在实际开发中,源码分析对于理解定时任务的工作原理至关重要。例如,通过阅读`ScheduledThreadPoolExecutor`的源码,我们可以了解其调度策略和线程池的管理方式。同时,利用IDE(如IntelliJ IDEA或Eclipse)的调试...
#### 六、JDK源码分析 深入理解JDK源码可以帮助开发者更好地理解Java语言的底层实现。 - **重要源码分析**: - List、Map、Set等集合类的实现细节。 - 多线程相关的类如`Thread`、`Lock`等。 - I/O相关的类如`...
【Tomcat源码分析】 Tomcat是Java应用服务器,其源码剖析有助于理解类加载器的工作原理以及如何处理HTTP请求。了解Tomcat的内部机制对于优化Web应用性能和排查问题非常有帮助。 【NIO与Netty】 非阻塞I/O (NIO) ...
这本书结合了丰富的源码分析,旨在帮助读者理解和掌握Java多线程与并发机制的核心概念。 在Java编程中,并发处理是构建高性能、可扩展应用的关键技术之一。Java并发库(java.util.concurrent)提供了一系列高效、...
9. **源码分析**: - 对于面试中常问的开源库,如Spring、MyBatis等,理解其核心原理和工作流程。 10. **工具**: - Maven或Gradle构建工具的使用,理解它们的项目管理和依赖管理功能。 - JUnit测试框架,单元...
了解JDK源码对于优化至关重要,例如,通过研究`ScheduledThreadPoolExecutor`的源码,可以发现其任务取消机制的细节,进而理解为何任务未从队列中移除。 **结论** Java性能调优是一个涉及多个层面的过程,需要结合...
### Java进阶知识点详解 #### 第一章:基本语法 ##### 关键字 - **static**:用于定义...以上是对Java进阶知识点的详细解析,覆盖了基本语法、JDK源码分析等多个方面,有助于深入理解Java语言的核心机制及高级特性。
源码分析显示,Spring Task默认使用了`ScheduledThreadPoolExecutor`,并设置核心线程数为1,这意味着所有任务会在同一线程中依次执行。`ScheduledThreadPoolExecutor`结合`DelayedWorkQueue`实现了任务的延迟或定时...
而“工具”可能是指一些辅助进行多线程调试、分析性能的工具,如JConsole或VisualVM,它们可以帮助开发者监控线程状态,找出死锁、竞态条件等问题。 从压缩包内的文件名来看,我们涵盖了多个与Java多线程相关的主题...