- 浏览: 378141 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
真的全站唯一:
描述的能不能准确一点,我也以为bigDecimal性能比dou ...
【性能】Java BigDecimal和double性能比较 -
zhanggang807:
学习到了。。以后会考虑往这方面设计
【java规范】Java spi机制浅谈 -
Xiong506:
xiyuan1025 写道你这是在linux下吗,我在linu ...
[监控]Btrace监控简单笔记 -
Xiong506:
xiyuan1025 写道你这是在linux下吗,我在linu ...
[监控]Btrace监控简单笔记 -
Bll:
找不到实现类
【java规范】Java spi机制浅谈
【java并发】juc Executor框架详解
Executor 框架是 juc 里提供的线程池的实现。前两天看了下 Executor 框架的一些源码,做个简单的总结。
线程池大概的思路是维护一个的线程池用于执行提交的任务。我理解池的技术的主要意义有两个:
1. 资源的控制,如并发量限制。像连接池这种是对数据库资源的保护。
2. 资源的有效利用,如线程复用,避免频繁创建线程和线程上下文切换。
那么想象中设计一个线程池就需要有线程池大小、线程生命周期管理、等待队列等等功能,下面结合代码看看原理。
Excutor
整体结构如下:
Executor 接口定义了最基本的 execute 方法,用于接收用户提交任务。 ExecutorService 定义了线程池终止和创建及提交 futureTask 任务支持的方法。
AbstractExecutorService 是抽象类,主要实现了 ExecutorService 和 futureTask 相关的一些任务创建和提交的方法。
ThreadPoolExecutor 是最核心的一个类,是线程池的内部实现。线程池的功能都在这里实现了,平时用的最多的基本就是这个了。其源码很精练,远没当时想象的多。
ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基础上提供了支持定时调度的功能。线程任务可以在一定延时时间后才被触发执行。
1.ThreadPoolExecutor 原理
1.1 ThreadPoolExecutor内部的几个重要属性
1.线程池本身的状态
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
2.等待任务队列和工作集
private final BlockingQueue<Runnable> workQueue; //等待被执行的Runnable任务 private final HashSet<Worker> workers = new HashSet<Worker>(); //正在被执行的Worker任务集
3.线程池的主要状态锁。线程池内部的状态变化 ( 如线程大小 ) 都需要基于此锁。
private final ReentrantLock mainLock = new ReentrantLock();
4.线程的存活时间和大小
private volatile long keepAliveTime;// 线程存活时间 private volatile boolean allowCoreThreadTimeOut;// 是否允许核心线程存活 private volatile int corePoolSize;// 核心池大小 private volatile int maximumPoolSize; // 最大池大小 private volatile int poolSize; //当前池大小 private int largestPoolSize; //最大池大小,区别于maximumPoolSize,是用于记录线程池曾经达到过的最大并发,理论上小于等于maximumPoolSize。
5.线程工厂和拒绝策略
private volatile RejectedExecutionHandler handler;// 拒绝策略,用于当线程池无法承载新线程是的处理策略。 private volatile ThreadFactory threadFactory;// 线程工厂,用于在线程池需要新创建线程的时候创建线程
6.线程池完成任务数
private long completedTaskCount;//线程池运行到当前完成的任务数总和
1.2 ThreadPoolExecutor 的内部工作原理
有了以上定义好的数据,下面来看看内部是如何实现的 。 Doug Lea 的整个思路总结起来就是 5 句话:
1. 如果当前池大小 poolSize 小于 corePoolSize ,则创建新线程执行任务。
2. 如果当前池大小 poolSize 大于 corePoolSize ,且等待队列未满,则进入等待队列
3. 如果当前池大小 poolSize 大于 corePoolSize 且小于 maximumPoolSize ,且等待队列已满,则创建新线程执行任务。
4. 如果当前池大小 poolSize 大于 corePoolSize 且大于 maximumPoolSize ,且等待队列已满,则调用拒绝策略来处理该任务。
5. 线程池里的每个线程执行完任务后不会立刻退出,而是会去检查下等待队列里是否还有线程任务需要执行,如果在 keepAliveTime 里等不到新的任务了,那么线程就会退出。
下面看看代码实现 :
线程池最重要的方法是由 Executor 接口定义的 execute 方法 , 是任务提交的入口。
我们看看 ThreadPoolExecutor.execute(Runnable cmd) 的实现:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
解释如下:
当提交一个新的 Runnable 任务:
分支1 : 如果当前池大小小于 corePoolSize, 执行 addIfUnderCorePoolSize(command) , 如果线程池处于运行状态且 poolSize < corePoolSize addIfUnderCorePoolSize(command) 会做如下事情,将 Runnable 任务封装成 Worker 任务 , 创建新的 Thread ,执行 Worker 任务。如果不满足条件,则返回 false 。代码如下:
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
分支2 : 如果大于 corePoolSize 或 1 失败失败,则:
-
如果等待队列未满,把
Runnable
任务加入到
workQueue
等待队列
workQueue .offer(command)
- 如多等待队列已经满了,调用 addIfUnderMaximumPoolSize(command) ,和 addIfUnderCorePoolSize 基本类似,只不过判断条件是 poolSize < maximumPoolSize 。如果大于 maximumPoolSize ,则把 Runnable 任务交由 RejectedExecutionHandler 来处理。
问题:如何实现线程的复用 ?
Doug Lea 的实现思路是 线程池里的每个线程执行完任务后不立刻退出,而是去检查下等待队列里是否还有线程任务需要执行,如果在 keepAliveTime 里等不到新的任务了,那么线程就会退出。这个功能的实现 关键在于 Worker 。线程池在执行 Runnable 任务的时候,并不单纯把 Runnable 任务交给创建一个 Thread 。而是会把 Runnable 任务封装成 Worker 任务。
下面看看 Worker 的实现:
代码很简单,可以看出, worker 里面包装了 firstTask 属性,在构造worker 的时候传进来的那个 Runnable 任务就是 firstTask 。 同时也实现了Runnable接口,所以是个代理模式,看看代理增加了哪些功能。 关键看 woker 的 run 方法:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
可以看出 worker 的 run 方法是一个循环,第一次循环运行的必然是 firstTask ,在运行完 firstTask 之后,并不会立刻结束,而是会调用 getTask 获取新的任务( getTask 会从等待队列里获取等待中的任务),如果 keepAliveTime 时间内得到新任务则继续执行,得不到新任务则那么线程才会退出。这样就保证了多个任务可以复用一个线程,而不是每次都创建新任务。 keepAliveTime 的逻辑在哪里实现的呢?主要是利用了 BlockingQueue 的 poll 方法支持等待。可看 getTask 的代码段:
if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take();
2.ThreadFactory 和R ejectedExecutionHandler
ThreadFactory 和 RejectedExecutionHandler是ThreadPoolExecutor的两个属性,也 可以认为是两个简单的扩展点 . ThreadFactory 是创建线程的工厂。
默认的线程工厂会创建一个带有“ pool-poolNumber-thread-threadNumber ”为名字的线程,如果我们有特别的需要,如线程组命名、优先级等,可以定制自己的 ThreadFactory 。
RejectedExecutionHandler 是拒绝的策略。常见有以下几种:
AbortPolicy :不执行,会抛出 RejectedExecutionException 异常。
CallerRunsPolicy :由调用者(调用线程池的主线程)执行。
DiscardOldestPolicy :抛弃等待队列中最老的。
DiscardPolicy: 不做任何处理,即抛弃当前任务。
3.ScheduledThreadPoolExecutor
ScheduleThreadPoolExecutor 是对ThreadPoolExecutor的集成。增加了定时触发线程任务的功能。需要注意
从内部实现看, ScheduleThreadPoolExecutor 使用的是 corePoolSize 线程和一个无界队列的固定大小的池,所以调整 maximumPoolSize 没有效果。无界队列是一个内部自定义的 DelayedWorkQueue 。
ScheduleThreadPoolExecutor 线程池接收定时任务的方法是 schedule ,看看内部实现:
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; }
以上代码会初始化一个 RunnableScheduledFuture 类型的任务 t, 并交给 delayedExecute 方法。 delayedExecute(t) 方法实现如下:
private void delayedExecute(Runnable command) { if (isShutdown()) { reject(command); return; } if (getPoolSize() < getCorePoolSize()) prestartCoreThread(); super.getQueue().add(command); }
如果当前线程池大小 poolSize 小于 CorePoolSize ,则创建一个新的线程,注意这里创建的线程是空的,不会把任务直接交给线程来做,而是把线程任务放到队列里。因为任务是要定时触发的,所以不能直接交给线程去执行。
问题: 那如何做到定时触发呢?
关键在于DelayedWorkQueue,它代理了 DelayQueue 。可以认为 DelayQueue 是这样一个队列(具体可以去看下源码,不详细分析):
1. 队列里的元素按照任务的 delay 时间长短升序排序, delay 时间短的在队头, delay 时间长的在队尾。
2. 从 DelayQueue 里 FIFO 的获取一个元素的时候,不会直接返回 head 。可能会阻塞,等到 head 节点到达 delay 时间后才能被获取。可以看下 DelayQueue 的 take 方法实现:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { available.await(); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { long tl = available.awaitNanos(delay);//等待delay时间 } else { E x = q.poll(); assert x != null; if (q.size() != 0) available.signalAll(); // wake up other takers return x; } } } } finally { lock.unlock(); } }
4.线程池使用策略
通过以上的详解基本上能够定制出自己需要的策略了,下面简单介绍下Executors里面提供的一些常见线程池策略:
1.FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
实际上就是个不支持keepalivetime,且corePoolSize和maximumPoolSize相等的线程池。
2.SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
实际上就是个不支持keepalivetime,且corePoolSize和maximumPoolSize都等1的线程池。
3.CachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
实际上就是个支持keepalivetime时间是60秒(线程空闲存活时间),且corePoolSize为0,maximumPoolSize无穷大的线程池。
4.SingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); }
实际上是个corePoolSize为1的ScheduledExecutor。上文说过ScheduledExecutor采用无界等待队列,所以maximumPoolSize没有作用。
5.ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
实际上是corePoolSize课设定的ScheduledExecutor。上文说过ScheduledExecutor采用无界等待队列,所以maximumPoolSize没有作用。
以上还不一定满足你的需要,完全可以根据自己需要去定制。
5.参考资料
juc executor源码
评论
发表评论
-
Xml ResourceBundle简单实现
2012-04-17 21:45 4452ResourceBundle主要是用于和本地语言环境相关的一些 ... -
【maven】多子模块maven模板工程archetype创建过程
2012-04-02 20:55 17654最近项目里需要创建一 ... -
【java基础】如何设计java应用程序的平滑停止
2012-03-05 23:44 10999java应用程序退出的触发机制有: 1.自动结束:应用没有存 ... -
【java并发】juc高级锁机制探讨
2012-02-23 00:52 8694最近在看一些j ... -
【java并发】基于JUC CAS原理,自己实现简单独占锁
2012-02-14 13:47 7849synchronized的基本原理回 ... -
[NoSQL]MongoDB初体验
2012-01-05 16:06 3964因为未来业务发展的一 ... -
【JVM】HotSpot JVM内存管理和GC策略总结
2011-12-13 22:05 15952JVM的相关知识是学习java ... -
32位机器下的一个java.lang.OutOfMemoryError错误分析
2011-10-17 11:19 2595昨天在本人windows机器( ... -
[监控]Btrace监控简单笔记
2011-09-09 10:57 5037前阵子看了公司网站的一个cache 命中率统计的btrace监 ... -
DBCP数据源配置项记录
2011-09-01 20:22 2994网站最近发生了数据库连接爆掉的问题。排查了下各个应用存在 ... -
【性能】Java BigDecimal和double性能比较
2011-08-28 20:06 14247我们知道 java 里面有个 BigDecimal ... -
【Spring】IOC容器并发条件下,可能发生死锁
2011-08-28 17:07 69211.背景 上周在生产环境应用启 ... -
JDK7 AIO 初体验
2011-08-17 19:20 2578JDK7 AIO初体验 JDK7已经releas ... -
如果要用java实现算法,一定慎用递归
2011-04-06 20:41 12947现象 : 递归是我们很经典的一种算法实现,可以很好的 ... -
java日志,需要知道的几件事(commons-logging,log4j,slf4j,logback)
2011-02-28 17:12 46353java日志,需要知道的几件事 如果对于comm ... -
JVM问题诊断常用命令:jinfo,jmap,jstack
2010-08-17 17:55 124821.jinfo 描述:输出给定 java ... -
java 浮点数为什么精度会丢失
2010-07-15 22:30 4915由于对float或double 的使用不当,可能会出现精度 ... -
一个枚举类的方法设计
2010-06-21 15:28 1685public enum ActionType { A ... -
java内部字符编码浅析
2010-06-07 21:43 6850java内部字符编码浅析 本周遇到一个 ... -
JDK反射之JDK动态proxy
2010-06-07 21:27 4114JDK动态代理 JDK 动态代理是 java 反射的一 ...
相关推荐
自己总结的,有需要的朋友可以借鉴借鉴,大家互相学习,相互进步,感觉有问题的地方,欢迎大家留言~~~
根据提供的信息,我们可以推断出该视频资源主要围绕“Java并发编程”这一主题展开。由于提供的链接和代码片段无法直接访问或解读具体内容,本篇将基于标题、描述以及标签所暗示的内容来构建相关的知识点。 ### Java...
java高级技术JUC高并发编程教程2021(1.5G) 〖课程介绍〗: java高级技术JUC高并发编程教程2021(1.5G) 〖课程目录〗: 01-JUC高并发编程-课程介绍.mp4 02-JUC高并发编程-JUC概述和进程线程概念(1).mp4 03-JUC...
Java并发编程是Java开发中的重要领域,而Java并发工具包(Java Concurrency Utility,简称JUC)则是Java标准库提供的一套强大而丰富的工具,它极大地简化了多线程环境下的编程工作。JUC主要包含在`java.util....
Java并发工具包(J.U.C)是Java编程语言中用于并发编程的一系列工具包的统称,它包含了一系列方便实现多线程编程的类和接口,使得开发者可以更加方便地编写高效、线程安全的程序。本文将深入浅出地探讨J.U.C的原理和...
JUC(Java.util.concurrent)是Java并发编程库的一个重要组成部分,专门提供了在多线程环境下用于控制并发执行的工具类和接口。本篇内容将详细介绍JUC中的一些核心组件及其使用方法,包括CountDownLatch、Executors...
尚硅谷作为一家知名的IT在线教育平台,提供了丰富的Java编程课程资源,其中“尚硅谷Java视频_JUC视频教程”是针对Java并发编程的一个系列教程。该教程旨在帮助学习者掌握Java多线程编程的核心技术和高级特性,通过一...
阿里专家级并发编程架构师级课程,完成课程的学习可以帮助同学们解决非常多的JAVA并发编程疑难杂症,极大的提高JAVA并发编程的效率。课程内容包括了JAVA手写线程池,UC线程池API详解,线程安全根因详解,锁与原子类...
- **并发工具类的高级用法**:Executor框架、Future、Callable、CyclicBarrier、Phaser等,帮助构建高效的并发程序。 - **并发设计原则**:如最小化锁的使用,正确处理中断,避免活跃性问题等。 - **并发测试**:...
在Java并发编程中,尤其是在处理多线程同步时,一个重要的概念是“虚假唤醒”(Spurious Wakeup)。这是一种较为罕见的现象,但在实际开发过程中仍然值得高度关注。虚假唤醒指的是,在没有其他线程调用`notify()`...
Java并发编程是Java开发中的重要领域,而Java.util.concurrent(JUC)工具包则是Java并发编程的核心组件。这个集合提供了一系列高效、线程安全的类和接口,用于简化多线程环境下的编程任务。本资源"JUC代码收集,...
5. **Java并发工具包(JUC)**:包括原子类、并发容器(如`ConcurrentHashMap`)、阻塞队列(如`ArrayBlockingQueue`)以及锁机制。 6. **Lock和AQS**:Lock接口提供了比`synchronized`更灵活的锁机制,而...
Java并发编程中的JUC线程池是Java程序员必须掌握的关键技术之一,它允许开发者高效地管理并发执行的任务,充分利用多核处理器的性能。线程池的出现解决了在并发环境中线程创建、销毁带来的开销,提高了系统资源的...
JUC并发编程知识点梳理思维导图
Java并发编程中的多线程协作机制 在 Java 并发编程中,多线程协作机制是非常重要的一部分。多线程协作机制是指在多线程编程中,多个线程之间如何协作、同步和通信,以达到共同完成某个任务的目的。Java 提供了多种...
Java并发编程是Java平台的重要特性,它为多线程编程提供了强大的支持。JUC,全称为Java Util Concurrency,是Java并发包的简称,包含了大量用于处理并发问题的类和接口,极大地简化了多线程环境下的编程。在这个深度...
Java-JUC-多线程进阶resources是 Java 并发编程的高级课程,涵盖了 Java 中的并发编程概念、线程安全、锁机制、集合类、线程池、函数式接口、Stream流式计算等多个方面。 什么是JUC JUC(Java Utilities for ...
Java并发编程中的`JUC`(Java Util Concurrency)库是Java平台中用于处理多线程问题的核心工具包,它提供了一系列高效、线程安全的工具类,帮助开发者编写并发应用程序。`AQS`(AbstractQueuedSynchronizer)是JUC库中的...
根据提供的文档内容,以下是关于Java多线程并发编程与JUC知识点的详细解读: 1. 线程的状态 Java线程在其生命周期中可以拥有不同的状态。线程可能处于以下几种状态: - 新建(New):线程被创建时的状态。 - 就绪...