线程池
Java中,有两个主流的线程池实现,分别为ThreadPoolExecutor和ScheduledThreadPoolExecutor。它们的继承关系如下:
ExecutorService <----- AbstractExecutorService <------ ThreadPoolExecutor <----- ScheduledThreadPoolExecutor
ScheduledExecutorService <----- ScheduledThreadPoolExecutor
ThreadPoolExecutor支持execute(...)方法,ScheduledThreadPoolExecutor额外支持schedule(...)方法。
ThreadPoolExecutor
先来看线程池内部的一些变量。
- BlockingQueue<Runnable> workQueue : 暂时保存不能立刻执行的任务
- HashSet<Worker> workers = new HashSet<Worker>()
- Worker相当于对Thead的状态,占据一个线程,连续处理多个任务
- int poolSize: 当前存在的线程的数目
- int corePoolSize : 核心的线程数目
- 线程池会尽量保持corePoolSize个线程(可能少于)。当poolSize大于corePoolSize时,除非逼不得已,否则不会创建线程,而是将线程存入等待队列。
- int maximumPoolSize: 最多的线程数目,也即不管咋样,不能超过这么多线程。
- int runState: 当前的状态(状态变化见图),尤其注意状态都是不可逆的。
- RUNNING: 可以接受新的任务,并执行现有的任务。
- SHUTDOWN: 不可以接受新的任务,但是可以执行现有的任务。
- STOP: 不可以接受新的任务,不再执行队列中的任务,中断了所有正在执行的任务。
- TERMINATED: 所有线程都终结了。
- ReentrantLock mainLock = new ReentrantLock()
- 用来保护:poolSize, corePoolSize, maximumPoolSize, runState, and workers
添加新任务
当添加新任务时,线程池会尝试立刻执行此任务,或者将任务放入队列中等待以后执行,否则会调用RejectedExecutionHandler.rejectedExecution拒绝任务。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize // 超过核心线程数目,则肯定不能立刻执行 || !addIfUnderCorePoolSize(command) // [线程安全]尝试立刻执行任务 ) { if (runState == RUNNING && workQueue.offer(command)) { // 如果当前状态时RUNNING(可以接受新任务),则尝试把命令放入队列 // 再次验证此任务,有可能在offer的同时,线程池中的线程全部执行完毕并退出了,从而导致此任务不被处理,所以要再次验证一下。 if (runState != RUNNING // 线程池被关闭了,则应该拒绝此任务(其实也可以不拒绝?) || poolSize == 0) // 线程池空了,则应该启用一个线程,以执行此任务(这是主要的) ensureQueuedTaskHandled(command); // [线程安全]拒绝任务或者保证有一个线程执行此任务 } else if (!addIfUnderMaximumPoolSize(command))// [线程安全]尝试创建新的线程来允许此任务,此操作会导致poolSize大于corePoolSize。 reject(command); } }
从中可以看到
- 如果workQueue是无限的,则poolSize是不会大于corePoolSize的。
- 每一个路径下,最后总是一个线程安全的。这些线程安全的方法保证了每个新任务要么被立刻执行,要么被放入队列,要么被拒绝。
任务的执行
ThreadPoolExecutor.Worker对Thread进行了包装,是任务执行的实际类。
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { // 获取任务 runTask(task); // 执行任务 task = null; } } finally { workerDone(this); } }
Future
每个任务在提交后,可以返回Future对象,用来代表并行计算的结果。可以通过Future对象来取消任务,等待任务执行完毕,获取任务的返回值等。
在AbstractExecutorService中
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); // 即上面提到的execute方法 return ftask; } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
FutureTask的代码是:
public class FutureTask<V> implements RunnableFuture<V> { // 同步控制 private final Sync sync; // .... } public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
从上面的代码可以了解到,Future
的实现是通过对传入的Runnable
任务封装成RunnableFuture
任务,从而可以有额外的接口来控制或监控任务(比如取消,等待任务完成等)。
FutureTask
中会把实际的逻辑委托给Sync子类完成。Sync
继承自AbstractQueuedSynchronizer
,是具备线程安全保证的。
private final class Sync extends AbstractQueuedSynchronizer { // private volatile int state;// 在父类中被声明 // 表示正在运行(0表示等待运行,未被作为常量声明) private static final int RUNNING = 1; // 表示已经运行了 private static final int RAN = 2; // 表示被取消运行 private static final int CANCELLED = 4; // 用户提交的任务 private final Callable<V> callable; // 任务的返回值 private V result; // 任务执行期间抛出的异常 private Throwable exception; // runner表示运行任务的线程。 // 当set或者cancel后,runner会被赋值为null,标志着可以使用result对象或exception异常(也即任务正式结束)。 // 之所以使用runner,而不是state,是因为state是通过compareAndSet方式设置的, // 这种方式要求:必须在state成功设置成RAN后,才能给result赋值,从而不能通过state来判断result对象是否可用。 private volatile Thread runner; Sync(Callable<V> callable) { this.callable = callable; } private boolean ranOrCancelled(int state) { // 使用位操作来快速判断,等价于state == RAN || state == CANCELLED。 // 之所以不采用此形式,是因为有两个等于判断,不是线程安全的。 return (state & (RAN | CANCELLED)) != 0; } boolean innerIsCancelled() { return getState() == CANCELLED; } boolean innerIsDone() { // runner为null才标识着任务的正式结束 return ranOrCancelled(getState()) && runner == null; } // 尝试获取锁:任务完成返回1,否则返回-1。 @Override protected int tryAcquireShared(int ignore) { return innerIsDone()? 1 : -1; } // 释放锁,设置runner为null,标记任务完成 protected boolean tryReleaseShared(int ignore) { runner = null; return true; } V innerGet() throws InterruptedException, ExecutionException { // 获取锁,内部会调用tryAcquireShared方法,当其返回值>=0时,才表示成功获取锁 acquireSharedInterruptibly(0); // 此时,任务应该已经成功完成,state应该不会再变化 // 如果state为CANCELLED,表示任务被取消,抛出CancellationException异常 if (getState() == CANCELLED) throw new CancellationException(); // 否则state为RAN(见ranOrCancelled方法),表示任务成功执行完毕, // 先判断是否有执行异常,否则直接返回任务的返回值 if (exception != null) throw new ExecutionException(exception); return result; } // 与innerGet()类似 V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException { if (!tryAcquireSharedNanos(0, nanosTimeout)) throw new TimeoutException(); if (getState() == CANCELLED) throw new CancellationException(); if (exception != null) throw new ExecutionException(exception); return result; } // 设置任务的返回值 void innerSet(V v) { // compareAndSet方式对state赋值 for (;;) { int s = getState(); if (s == RAN) return; if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; } // 此时s==RUNNING或者0 if (compareAndSetState(s, RAN)) { // state成功设置为RAN后,才能对result复制 result = v; // releaseShared会调用tryReleaseShared,从而设置runner为null releaseShared(0); done(); return; } } } // 与innerSet(V v)类似 void innerSetException(Throwable t) { for (;;) { int s = getState(); if (s == RAN) return; if (s == CANCELLED) { // aggressively release to set runner to null, // in case we are racing with a cancel request // that will try to interrupt runner releaseShared(0); return; } if (compareAndSetState(s, RAN)) { exception = t; result = null; releaseShared(0); done(); return; } } } // 取消任务执行,mayInterruptIfRunning表示中断runner boolean innerCancel(boolean mayInterruptIfRunning) { for (;;) { int s = getState(); if (ranOrCancelled(s)) return false; // 此时state只能为0或RUNNING if (compareAndSetState(s, CANCELLED)) break; } // 如有必要,中断线程 if (mayInterruptIfRunning) { Thread r = runner; if (r != null) r.interrupt(); } releaseShared(0); done(); return true; } // 开始执行任务 void innerRun() { // 0表示初始状态,先尝试把state设置为RUNNING if (!compareAndSetState(0, RUNNING)) return; try { runner = Thread.currentThread(); // 会再次检查state的状态,检查的目的是防止无意义地调用callable.call()。 if (getState() == RUNNING) innerSet(callable.call()); else releaseShared(0); // cancel } catch (Throwable ex) { innerSetException(ex); } } // 执行任务,执行完毕后,标记任务为初始状态。只有当任务被调度为重复运行时,才会调用此方法。 // 由于是重复运行,所以不设置result值,Future的get方法也无定义。 boolean innerRunAndReset() { if (!compareAndSetState(0, RUNNING)) return false; try { runner = Thread.currentThread(); if (getState() == RUNNING) callable.call(); // don't set result runner = null; return compareAndSetState(RUNNING, 0); } catch (Throwable ex) { innerSetException(ex); return false; } } }
相关推荐
这份"java学习笔记的源代码"提供了与学习资料配套的示例代码,旨在帮助初学者和有经验的开发者深入理解Java语言的核心概念和实践技巧。 1. **面向对象编程基础** Java是完全基于面向对象编程(OOP)的,它支持类、...
动力节点的JAVA基础与进阶课程源代码和笔记集合提供了全面深入的学习材料,旨在帮助初学者及有经验的开发者巩固和提升Java编程技能。这个压缩包包含了一系列的代码示例和学习笔记,覆盖了从基础到高级的Java知识点。...
通过阅读和理解这些框架的源码,开发者可以深入理解框架的工作原理,提高代码设计和优化的能力。 线程池是Java并发编程的重要组成部分,它通过复用已创建的线程来减少线程创建和销毁的开销。Java的ExecutorService...
Java中可以通过ExecutorService和ThreadPoolExecutor来创建和管理线程池。 17. **UDP编程**: - DatagramSocket和DatagramPacket用于UDP(无连接)网络编程,适用于一次性传输少量数据的场景。 以上内容仅涵盖了...
day07_等待与唤醒案例、线程池、Lambda表达式 day08_File类、递归 day09_字节流、字符流 day10_缓冲流、转换流、序列化流、Files day11_网络编程 day12_函数式接口 day13_Stream流、方法引用 Java基础小节练习题答案
通过这些源代码和笔记,学习者可以深入理解Java编程的各个方面,并通过实践来提升编程技能。同时,对于初学者来说,这是一个很好的自我学习和提升的途径,对于有经验的开发者,也可以作为复习和扩展知识的参考资料。
Java-J2SE(Java Standard Edition)是Java平台的核心部分,主要面向桌面应用和服务器端开发。这份学习笔记将深入探讨Java编程语言的基础、核心...通过阅读和研究这份笔记,开发者将能够构建高效、可靠的Java应用程序。
【标题】:“韩顺平java从入门到精通上课所有笔记源码”是一个全面学习Java编程的资源集合,其中包含了韩顺平老师在教授Java技术时的详细笔记和配套源代码。这个标题暗示了该压缩包内容是针对初学者和进阶者设计的,...
通过阅读源代码,你可以学习如何设计和管理线程池,优化服务器性能。 五、异常处理与安全 网络通信中,错误处理和安全性是不可忽视的。Java的异常处理机制使得程序在面对错误时能优雅地处理。同时,Java也提供了SSL...
这个压缩包包含了该系统的源代码和相关的文档资料,对于学习和理解Java系统框架设计与开发具有很高的参考价值。 一、Java框架设计基础 Java框架设计是构建应用程序的基础,它提供了一套预定义的规则和结构,帮助...
通过阅读这份文档,我们可以更深入地理解源代码的实现细节,并学习如何运行和测试系统。 总的来说,"JAVA网络通信系统的研究与开发"项目涵盖了Java网络编程的多个重要方面,包括网络通信的基本原理、Java API的使用...
在本文中,我们将深入探讨Java多线程的相关知识点,并结合提供的源代码进行学习。 1. **线程的创建** - **实现Runnable接口**:创建一个类实现Runnable接口,然后将其实例传递给Thread类的构造函数,如`Thread t =...
【Java笔记】是一份详尽记录Core Java核心概念...这些笔记是学习和复习Java基础知识的宝贵资源,通过深入阅读和实践,可以提升对Java语言的理解和应用能力。对于初学者和有经验的开发者来说,这些内容都是不可或缺的。
5. **变量注解(Annotations)**:这是一种元数据,可以附加到源代码的元素上,用于提供编译器或运行时的信息,如编译时检查、运行时代码生成等。 6. **类型推断**:编译器可以根据上下文推断局部变量的类型,尤其...
源代码的提供让我们有机会深入研究其设计模式、数据结构和算法的应用。 1. **Java编程基础**:本项目基于Java编程语言,涉及到面向对象编程的基本概念,包括类、对象、继承、多态和封装。通过分析源码,我们可以...
【Java SE课程笔记详解】 Java SE(Standard Edition)是Java平台的核心版本,它为开发桌面应用、服务器端应用以及分布式网络应用提供了基础框架。张龙的Java SE课程笔记是针对初学者和进阶者的一份宝贵资料,涵盖...
压缩包中的"说明.pdf"可能包含详细的步骤指导和最佳实践,而"JAVA公共资源模块的设计与开发(源代码+l文)"则提供了实际的代码示例和相关的技术文档。通过学习这些资料,开发者不仅可以了解公共资源模块的基本概念,还...
- 分析开源项目或Java标准库的源代码,有助于深入理解Java的设计思想和实现方式,提升编程技能。 这份笔记详尽地介绍了Java编程语言的各个方面,无论是初学者还是有经验的开发者,都能从中获得宝贵的启示和知识。...
4.14 增强型for循环:Java提供了增强型for循环来简化遍历集合和数组的代码。 4.15 List高级-数据结构:Queue队列和Deque栈是Java集合框架中的接口,提供了先进先出和后进先出的数据结构。 4.16 Set集合的实现类...