- 浏览: 98531 次
- 性别:
- 来自: 北京
最新评论
-
LUCKYZHOUSTAR:
牛逼,楼主怎么学习的呢
Jedis 与 ShardedJedis 设计 -
demoxshiroki:
总结的不错
ThreadPoolExecutor机制 -
bluky999:
bluky999 写道opentan 写道zhangfeikr ...
Jedis 与 ShardedJedis 设计 -
bluky999:
opentan 写道zhangfeikr 写道使用Sharde ...
Jedis 与 ShardedJedis 设计 -
opentan:
zhangfeikr 写道使用ShardedJedis,一台挂 ...
Jedis 与 ShardedJedis 设计
ScheduledThreadPoolExecutor实现:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { /** * False if should cancel/suppress periodic tasks on shutdown. */ private volatile boolean continueExistingPeriodicTasksAfterShutdown; /** * False if should cancel non-periodic tasks on shutdown. */ private volatile boolean executeExistingDelayedTasksAfterShutdown = true; /** * Creates a new ScheduledThreadPoolExecutor with the given core * pool size. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue()); } /** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> * @throws NullPointerException if threadFactory is null */ public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), threadFactory); } /** * Creates a new ScheduledThreadPoolExecutor with the given * initial parameters. * * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if <tt>corePoolSize < 0</tt> * @throws NullPointerException if handler is null */ public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new DelayedWorkQueue(), handler); } }
ScheduledThreadPoolExecutor构造参数:
- corePoolSize: 任务执行线程池大小
- RejectedExecutionHandler: 任务拒绝策略,当线程池shutdown时,任务处理策略
- DelayedWorkQueue: 无界延迟队列,提交任务都加入队列中,由队列实现延迟执行功能
- MaximumPoolSize: 由于DelayedWorkQueue为无界队列,所以该值没有意义
提交任务:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); schedule(command, 0, TimeUnit.NANOSECONDS); } // Override AbstractExecutorService methods public Future<?> submit(Runnable task) { return schedule(task, 0, TimeUnit.NANOSECONDS); } public <T> Future<T> submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, TimeUnit.NANOSECONDS); } public <T> Future<T> submit(Callable<T> task) { return schedule(task, 0, TimeUnit.NANOSECONDS); }
ExecutorService的任务提交方式都有schedule(task,0,TimeUnit. NANOSECONDS)以延迟时间为0任务实现
任务计划执行方法:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Object>(command, null, triggerTime(initialDelay, unit), unit.toNanos(period))); delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (delay <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Boolean>(command, null, triggerTime(initialDelay, unit), unit.toNanos(-delay))); delayedExecute(t); return t; }
任务计划执行步骤:
- 封装任务:decorateTask()方法封装后,返回RunnableScheduledFuture
- 延迟执行:delayedExecute()延迟任务执行
/** * Specialized variant of ThreadPoolExecutor.execute for delayed tasks. */ private void delayedExecute(Runnable command) { if (isShutdown()) { reject(command); return; } // Prestart a thread if necessary. We cannot prestart it // running the task because the task (probably) shouldn't be // run yet, so thread will just idle until delay elapses. if (getPoolSize() < getCorePoolSize()) prestartCoreThread(); super.getQueue().add(command); }
延迟执行步骤:
- 线程池关闭(runState!=RUNNING),由拒绝策略决定
- 否则,线程池小于核心线程池大小,启动新线程
- 添加任务到等待队列
ScheduledThreadPoolExecutor线程池关闭
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. If the * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has * been set <tt>false</tt>, existing delayed tasks whose delays * have not yet elapsed are cancelled. And unless the * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has * been set <tt>true</tt>, future executions of existing periodic * tasks will be cancelled. */ public void shutdown() { cancelUnwantedTasks(); super.shutdown(); } /** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @return list of tasks that never commenced execution. Each * element of this list is a {@link ScheduledFuture}, * including those tasks submitted using <tt>execute</tt>, which * are for scheduling purposes used as the basis of a zero-delay * <tt>ScheduledFuture</tt>. * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { return super.shutdownNow(); }
shutdownNow()方式与ThreadPoolExecutor一样
shutdown()方法:不在接收新提交任务
- 先调用cancelUnwantedTasks(),决定未执行任务时候继续执行
- 调用父类ThreadPoolExecutor方法
/** * Cancels and clears the queue of all tasks that should not be run * due to shutdown policy. */ private void cancelUnwantedTasks() { boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) super.getQueue().clear(); else if (keepDelayed || keepPeriodic) { Object[] entries = super.getQueue().toArray(); for (int i = 0; i < entries.length; ++i) { Object e = entries[i]; if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e; if (t.isPeriodic()? !keepPeriodic : !keepDelayed) t.cancel(false); } } entries = null; purge(); } }
等待队列中任务处理取决于:
- executeExistingDelayedTasksAfterShutdown : 执行已经存在与延迟队列任务
- continueExistingPeriodicTasksAfterShutdown : 继续执行周期任务
ScheduledFutureTask任务实现:
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> { /** Sequence number to break ties FIFO */ private final long sequenceNumber; /** The time the task is enabled to execute in nanoTime units */ private long time; /** * Period in nanoseconds for repeating tasks. A positive * value indicates fixed-rate execution. A negative value * indicates fixed-delay execution. A value of 0 indicates a * non-repeating task. */ private final long period; /** * Creates a one-shot action with given nanoTime-based trigger time. */ ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a periodic action with given nano time and period. */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a one-shot action with given nanoTime-based trigger. */ ScheduledFutureTask(Callable<V> callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } //... }
任务继承自FutureTask,同时实现了RunnableScheduledFuture接口(任务延迟执行时间getDelay()/任务周期之执行isPeriodic())
实例变量:
- time:任务延迟执行时间
- period:任务重复执行周期;正值:固定频率执行;负值:定时延迟执行;零: 不重复执行
任务执行:
public void run() { if (isPeriodic()) runPeriodic(); else ScheduledFutureTask.super.run(); }
非周期任务:直接调用FutureTask.run()方法执行
周期任务:调用runPeriodic()周期执行
private void runPeriodic() { boolean ok = ScheduledFutureTask.super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { long p = period; if (p > 0) time += p; else time = triggerTime(-p); ScheduledThreadPoolExecutor.super.getQueue().add(this); } // This might have been the final executed delayed // task. Wake up threads to check. else if (down) interruptIdleWorkers(); }
runPeriodic实现:
- 首先调用runAndReset(),保证任务重复执行(任务执行后,重置state=0)
- 根据runAndReset返回状态,以及线程池是否关闭或者线程池shutdown且getContinueExistingPeriodicTasksAfterShutdownPolicy()是否继续执行已有中期任务策略;将任务放入任务队列
- 否则,若是最后延迟任务,则唤醒空闲线程检查
DelayedWorkQueue实现:
包装了DelayQueue作为实现
发表评论
-
FutureTask实现分析
2012-11-15 22:27 1450FutureTask实现一个可以取消的异步计算任务。 ... -
ExecutorCompletionService实现解析
2012-11-15 22:26 1315ExecutorCompletionService解耦异步任务 ... -
ThreadPoolExecutor分析
2012-11-15 22:26 1651ThreadPoolExecutor 状态: RUNN ... -
ThreadPoolExecutor机制
2012-04-28 09:38 3528ThreadPoolExecutor作为java.util.c ... -
java concurrent包中任务执行框架分析
2012-04-26 22:43 2809提到java并发编程必然绕不过java的线程和任务接口;那么, ...
相关推荐
Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...
JDK1.8源码分析 相关的原始码分析结果会以注解的形式体现到原始码中 已完成部分: ReentrantLock CountDownLatch Semaphore HashMap TreeMap LinkedHashMap ConcurrentHashMap 执行器 ...
文档内容丰富,既包括了Java的基本语法、源码分析、多线程处理、IO流操作、设计模式、常用框架、数据库技术、数据结构与算法、JVM原理、Web开发技术,也包括了Linux操作系统、Redis数据库、UML绘图以及JDK的新特性等...
7. 异步结果源码分析 异步结果通常通过 `Future` 接口获取,它提供了获取任务结果、判断任务是否完成、取消任务等功能。`ScheduledThreadPoolExecutor` 的 `delayedExecute()` 方法中,如果任务被取消,会调用 `task...
常见的实现类有`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`,前者用于执行提交的任务,后者则支持定时及周期性任务。 2. `Semaphore`:信号量,用于控制同时访问特定资源的线程数量,通过acquire()获取一...
在实际开发中,源码分析对于理解定时任务的工作原理至关重要。例如,通过阅读`ScheduledThreadPoolExecutor`的源码,我们可以了解其调度策略和线程池的管理方式。同时,利用IDE(如IntelliJ IDEA或Eclipse)的调试...
通过阅读本书的源码,你可以更深入地理解并发编程的实现细节,而PDF版则提供了清晰的理论解释和实例分析。无论你是初学者还是经验丰富的开发者,这个压缩包都是一份宝贵的参考资料,能助你在Java并发编程的道路上...
#### 六、JDK源码分析 深入理解JDK源码可以帮助开发者更好地理解Java语言的底层实现。 - **重要源码分析**: - List、Map、Set等集合类的实现细节。 - 多线程相关的类如`Thread`、`Lock`等。 - I/O相关的类如`...
【Tomcat源码分析】 Tomcat是Java应用服务器,其源码剖析有助于理解类加载器的工作原理以及如何处理HTTP请求。了解Tomcat的内部机制对于优化Web应用性能和排查问题非常有帮助。 【NIO与Netty】 非阻塞I/O (NIO) ...
这本书结合了丰富的源码分析,旨在帮助读者理解和掌握Java多线程与并发机制的核心概念。 在Java编程中,并发处理是构建高性能、可扩展应用的关键技术之一。Java并发库(java.util.concurrent)提供了一系列高效、...
9. **源码分析**: - 对于面试中常问的开源库,如Spring、MyBatis等,理解其核心原理和工作流程。 10. **工具**: - Maven或Gradle构建工具的使用,理解它们的项目管理和依赖管理功能。 - JUnit测试框架,单元...
了解JDK源码对于优化至关重要,例如,通过研究`ScheduledThreadPoolExecutor`的源码,可以发现其任务取消机制的细节,进而理解为何任务未从队列中移除。 **结论** Java性能调优是一个涉及多个层面的过程,需要结合...
在Java编程中,线程池是一种管理线程的机制,它可以帮助我们更有效地调度和执行并发任务。...结合`SurgeryThreadPool.java`源码分析和`Java.jpg`中的示例,我们可以进一步理解线程池在实际项目中的具体实现和应用。
### Java进阶知识点详解 #### 第一章:基本语法 ##### 关键字 - **static**:用于定义...以上是对Java进阶知识点的详细解析,覆盖了基本语法、JDK源码分析等多个方面,有助于深入理解Java语言的核心机制及高级特性。
在Java编程中,线程是...这可能包括分析线程安全的代码示例,例如使用Collections.synchronizedList()、ConcurrentHashMap等线程安全的数据结构,或者理解线程池的工作原理,如ExecutorService、ThreadPoolExecutor和...
源码分析显示,Spring Task默认使用了`ScheduledThreadPoolExecutor`,并设置核心线程数为1,这意味着所有任务会在同一线程中依次执行。`ScheduledThreadPoolExecutor`结合`DelayedWorkQueue`实现了任务的延迟或定时...
7. **并发集合**:分析了并发环境下集合类的使用,如ConcurrentHashMap、CopyOnWriteArrayList、ConcurrentLinkedQueue等,它们是如何实现线程安全的。 8. **并发设计模式**:介绍了生产者消费者模式、读写锁模式、...
在IT行业中,任务管理是一项至关重要的技能,尤其是在软件开发领域。Java作为一种广泛应用的...在`task-management-main`项目中,很可能包含了这些概念的实际应用,通过源码分析可以深入理解如何在Java中实现任务管理。
而“工具”可能是指一些辅助进行多线程调试、分析性能的工具,如JConsole或VisualVM,它们可以帮助开发者监控线程状态,找出死锁、竞态条件等问题。 从压缩包内的文件名来看,我们涵盖了多个与Java多线程相关的主题...