`
thihy
  • 浏览: 69184 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java源代码阅读笔记之线程池

    博客分类:
  • java
 
阅读更多

线程池

Java中,有两个主流的线程池实现,分别为ThreadPoolExecutor和ScheduledThreadPoolExecutor。它们的继承关系如下:

ExecutorService <----- AbstractExecutorService <------ ThreadPoolExecutor <----- ScheduledThreadPoolExecutor

 

ScheduledExecutorService  <----- ScheduledThreadPoolExecutor

 

ThreadPoolExecutor支持execute(...)方法,ScheduledThreadPoolExecutor额外支持schedule(...)方法。

 

ThreadPoolExecutor

先来看线程池内部的一些变量。

 

  • BlockingQueue<Runnable> workQueue : 暂时保存不能立刻执行的任务
  • HashSet<Worker> workers = new HashSet<Worker>()
    • Worker相当于对Thead的状态,占据一个线程,连续处理多个任务
  • int poolSize: 当前存在的线程的数目
  • int corePoolSize : 核心的线程数目
    • 线程池会尽量保持corePoolSize个线程(可能少于)。当poolSize大于corePoolSize时,除非逼不得已,否则不会创建线程,而是将线程存入等待队列。
  • int maximumPoolSize: 最多的线程数目,也即不管咋样,不能超过这么多线程。
  • int runState: 当前的状态(状态变化见图),尤其注意状态都是不可逆的。
    • RUNNING: 可以接受新的任务,并执行现有的任务。
    • SHUTDOWN: 不可以接受新的任务,但是可以执行现有的任务。
    • STOP: 不可以接受新的任务,不再执行队列中的任务,中断了所有正在执行的任务。
    • TERMINATED: 所有线程都终结了。
  • ReentrantLock mainLock = new ReentrantLock()
    • 用来保护:poolSize, corePoolSize, maximumPoolSize, runState, and workers

 

添加新任务

当添加新任务时,线程池会尝试立刻执行此任务,或者将任务放入队列中等待以后执行,否则会调用RejectedExecutionHandler.rejectedExecution拒绝任务。

 

 

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize    // 超过核心线程数目,则肯定不能立刻执行
            || !addIfUnderCorePoolSize(command) // [线程安全]尝试立刻执行任务
           ) {
            if (runState == RUNNING && workQueue.offer(command)) { // 如果当前状态时RUNNING(可以接受新任务),则尝试把命令放入队列
                // 再次验证此任务,有可能在offer的同时,线程池中的线程全部执行完毕并退出了,从而导致此任务不被处理,所以要再次验证一下。
                if (runState != RUNNING  // 线程池被关闭了,则应该拒绝此任务(其实也可以不拒绝?)
                   || poolSize == 0) // 线程池空了,则应该启用一个线程,以执行此任务(这是主要的)
                    ensureQueuedTaskHandled(command); // [线程安全]拒绝任务或者保证有一个线程执行此任务
            }
            else if (!addIfUnderMaximumPoolSize(command))// [线程安全]尝试创建新的线程来允许此任务,此操作会导致poolSize大于corePoolSize。
                reject(command); 
        }
    }

 

从中可以看到

 

  1. 如果workQueue是无限的,则poolSize是不会大于corePoolSize的。
  2. 每一个路径下,最后总是一个线程安全的。这些线程安全的方法保证了每个新任务要么被立刻执行,要么被放入队列,要么被拒绝。

任务的执行

 

ThreadPoolExecutor.Worker对Thread进行了包装,是任务执行的实际类。

        public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) { // 获取任务
                    runTask(task); // 执行任务
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

Future

每个任务在提交后,可以返回Future对象,用来代表并行计算的结果。可以通过Future对象来取消任务,等待任务执行完毕,获取任务的返回值等。

 

在AbstractExecutorService中

 

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask); // 即上面提到的execute方法
        return ftask;
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

 FutureTask的代码是:

 

public class FutureTask<V> implements RunnableFuture<V> {
  // 同步控制
  private final Sync sync;
  // ....
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

 

 

 从上面的代码可以了解到,Future的实现是通过对传入的Runnable任务封装成RunnableFuture任务,从而可以有额外的接口来控制或监控任务(比如取消,等待任务完成等)。

FutureTask中会把实际的逻辑委托给Sync子类完成。Sync继承自AbstractQueuedSynchronizer,是具备线程安全保证的。

private final class Sync extends AbstractQueuedSynchronizer {
        // private volatile int state;// 在父类中被声明
  
        // 表示正在运行(0表示等待运行,未被作为常量声明)
        private static final int RUNNING   = 1;
        // 表示已经运行了
        private static final int RAN       = 2;
        // 表示被取消运行
        private static final int CANCELLED = 4;
        // 用户提交的任务
        private final Callable<V> callable;
        // 任务的返回值
        private V result;
        // 任务执行期间抛出的异常
        private Throwable exception;
  
        // runner表示运行任务的线程。
        // 当set或者cancel后,runner会被赋值为null,标志着可以使用result对象或exception异常(也即任务正式结束)。
        // 之所以使用runner,而不是state,是因为state是通过compareAndSet方式设置的,
        // 这种方式要求:必须在state成功设置成RAN后,才能给result赋值,从而不能通过state来判断result对象是否可用。
        private volatile Thread runner;
  
        Sync(Callable<V> callable) {
            this.callable = callable;
        }
 
        private boolean ranOrCancelled(int state) {
            // 使用位操作来快速判断,等价于state == RAN || state == CANCELLED。
            // 之所以不采用此形式,是因为有两个等于判断,不是线程安全的。
            return (state & (RAN | CANCELLED)) != 0;
        }
        boolean innerIsCancelled() {
            return getState() == CANCELLED;
        }
        
        boolean innerIsDone() {
            // runner为null才标识着任务的正式结束
            return ranOrCancelled(getState()) && runner == null;
        }
          
        // 尝试获取锁:任务完成返回1,否则返回-1。
        @Override
        protected int tryAcquireShared(int ignore) {
            return innerIsDone()? 1 : -1;
        }
 
        // 释放锁,设置runner为null,标记任务完成
        protected boolean tryReleaseShared(int ignore) {
            runner = null;
            return true;
        }
 
 
        V innerGet() throws InterruptedException, ExecutionException {
            // 获取锁,内部会调用tryAcquireShared方法,当其返回值>=0时,才表示成功获取锁
            acquireSharedInterruptibly(0);
            // 此时,任务应该已经成功完成,state应该不会再变化
            // 如果state为CANCELLED,表示任务被取消,抛出CancellationException异常
            if (getState() == CANCELLED)
                throw new CancellationException();
            // 否则state为RAN(见ranOrCancelled方法),表示任务成功执行完毕,
            // 先判断是否有执行异常,否则直接返回任务的返回值
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }
        
        // 与innerGet()类似
        V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
            if (!tryAcquireSharedNanos(0, nanosTimeout))
                throw new TimeoutException();
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }
 
        // 设置任务的返回值
        void innerSet(V v) {
            // compareAndSet方式对state赋值
        for (;;) {
        int s = getState();
        if (s == RAN)
            return;
                if (s == CANCELLED) {
            // aggressively release to set runner to null,
            // in case we are racing with a cancel request
            // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
                // 此时s==RUNNING或者0
        if (compareAndSetState(s, RAN)) {
                    // state成功设置为RAN后,才能对result复制
                    result = v;
                    // releaseShared会调用tryReleaseShared,从而设置runner为null
                    releaseShared(0);
                    done();
            return;
                }
            }
        }
        // 与innerSet(V v)类似
        void innerSetException(Throwable t) {
        for (;;) {
        int s = getState();
        if (s == RAN)
            return;
                if (s == CANCELLED) {
            // aggressively release to set runner to null,
            // in case we are racing with a cancel request
            // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
        if (compareAndSetState(s, RAN)) {
                    exception = t;
                    result = null;
                    releaseShared(0);
                    done();
            return;
                }
        }
        }
 
        // 取消任务执行,mayInterruptIfRunning表示中断runner
        boolean innerCancel(boolean mayInterruptIfRunning) {
        for (;;) {
        int s = getState();
        if (ranOrCancelled(s))
            return false;
                // 此时state只能为0或RUNNING
        if (compareAndSetState(s, CANCELLED))
            break;
        }
            // 如有必要,中断线程
            if (mayInterruptIfRunning) {
                Thread r = runner;
                if (r != null)
                    r.interrupt();
            }
            releaseShared(0);
            done();
            return true;
        }
        // 开始执行任务
        void innerRun() {
            // 0表示初始状态,先尝试把state设置为RUNNING
            if (!compareAndSetState(0, RUNNING))
                return;
            try {
                runner = Thread.currentThread();
                // 会再次检查state的状态,检查的目的是防止无意义地调用callable.call()。
                if (getState() == RUNNING)
                    innerSet(callable.call());
                else
                    releaseShared(0); // cancel
            } catch (Throwable ex) {
                innerSetException(ex);
            }
        }
        // 执行任务,执行完毕后,标记任务为初始状态。只有当任务被调度为重复运行时,才会调用此方法。
        // 由于是重复运行,所以不设置result值,Future的get方法也无定义。
        boolean innerRunAndReset() {
            if (!compareAndSetState(0, RUNNING))
                return false;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING)
                    callable.call(); // don't set result
                runner = null;
                return compareAndSetState(RUNNING, 0);
            } catch (Throwable ex) {
                innerSetException(ex);
                return false;
            }
        }
}

 

  • 大小: 34.1 KB
分享到:
评论

相关推荐

    java学习笔记的源代码

    这份"java学习笔记的源代码"提供了与学习资料配套的示例代码,旨在帮助初学者和有经验的开发者深入理解Java语言的核心概念和实践技巧。 1. **面向对象编程基础** Java是完全基于面向对象编程(OOP)的,它支持类、...

    动力节点JAVA基础+进阶源代码和笔记.rar

    动力节点的JAVA基础与进阶课程源代码和笔记集合提供了全面深入的学习材料,旨在帮助初学者及有经验的开发者巩固和提升Java编程技能。这个压缩包包含了一系列的代码示例和学习笔记,覆盖了从基础到高级的Java知识点。...

    Java demo 算法笔记

    通过阅读和理解这些框架的源码,开发者可以深入理解框架的工作原理,提高代码设计和优化的能力。 线程池是Java并发编程的重要组成部分,它通过复用已创建的线程来减少线程创建和销毁的开销。Java的ExecutorService...

    java笔记,上课笔记

    Java中可以通过ExecutorService和ThreadPoolExecutor来创建和管理线程池。 17. **UDP编程**: - DatagramSocket和DatagramPacket用于UDP(无连接)网络编程,适用于一次性传输少量数据的场景。 以上内容仅涵盖了...

    黑马程序员-Java语言进阶-源码、教程笔记.zip

    day07_等待与唤醒案例、线程池、Lambda表达式 day08_File类、递归 day09_字节流、字符流 day10_缓冲流、转换流、序列化流、Files day11_网络编程 day12_函数式接口 day13_Stream流、方法引用 Java基础小节练习题答案

    [北京圣思园Java培训教学视频]Java.SE.LessionCode(上课所有源代码)

    通过这些源代码和笔记,学习者可以深入理解Java编程的各个方面,并通过实践来提升编程技能。同时,对于初学者来说,这是一个很好的自我学习和提升的途径,对于有经验的开发者,也可以作为复习和扩展知识的参考资料。

    Java-J2SE学习笔记

    Java-J2SE(Java Standard Edition)是Java平台的核心部分,主要面向桌面应用和服务器端开发。这份学习笔记将深入探讨Java编程语言的基础、核心...通过阅读和研究这份笔记,开发者将能够构建高效、可靠的Java应用程序。

    韩顺平java从入门到精通上课所有笔记源码

    【标题】:“韩顺平java从入门到精通上课所有笔记源码”是一个全面学习Java编程的资源集合,其中包含了韩顺平老师在教授Java技术时的详细笔记和配套源代码。这个标题暗示了该压缩包内容是针对初学者和进阶者设计的,...

    JAVA网络通信系统的研究与开发(源代码+论文+开题报告).zip

    通过阅读源代码,你可以学习如何设计和管理线程池,优化服务器性能。 五、异常处理与安全 网络通信中,错误处理和安全性是不可忽视的。Java的异常处理机制使得程序在面对错误时能优雅地处理。同时,Java也提供了SSL...

    JAVA SMART系统-系统框架设计与开发(源代码+文).zip

    这个压缩包包含了该系统的源代码和相关的文档资料,对于学习和理解Java系统框架设计与开发具有很高的参考价值。 一、Java框架设计基础 Java框架设计是构建应用程序的基础,它提供了一套预定义的规则和结构,帮助...

    JAVA网络通信系统的研究与开发(文+源代码+开题报告).zip

    通过阅读这份文档,我们可以更深入地理解源代码的实现细节,并学习如何运行和测试系统。 总的来说,"JAVA网络通信系统的研究与开发"项目涵盖了Java网络编程的多个重要方面,包括网络通信的基本原理、Java API的使用...

    java多线程学习笔记

    在本文中,我们将深入探讨Java多线程的相关知识点,并结合提供的源代码进行学习。 1. **线程的创建** - **实现Runnable接口**:创建一个类实现Runnable接口,然后将其实例传递给Thread类的构造函数,如`Thread t =...

    java笔记

    【Java笔记】是一份详尽记录Core Java核心概念...这些笔记是学习和复习Java基础知识的宝贵资源,通过深入阅读和实践,可以提升对Java语言的理解和应用能力。对于初学者和有经验的开发者来说,这些内容都是不可或缺的。

    java学习笔记

    5. **变量注解(Annotations)**:这是一种元数据,可以附加到源代码的元素上,用于提供编译器或运行时的信息,如编译时检查、运行时代码生成等。 6. **类型推断**:编译器可以根据上下文推断局部变量的类型,尤其...

    JAVA图书馆书库管理系统设计(文+源代码).zip

    源代码的提供让我们有机会深入研究其设计模式、数据结构和算法的应用。 1. **Java编程基础**:本项目基于Java编程语言,涉及到面向对象编程的基本概念,包括类、对象、继承、多态和封装。通过分析源码,我们可以...

    张龙 java se课程笔记

    【Java SE课程笔记详解】 Java SE(Standard Edition)是Java平台的核心版本,它为开发桌面应用、服务器端应用以及分布式网络应用提供了基础框架。张龙的Java SE课程笔记是针对初学者和进阶者的一份宝贵资料,涵盖...

    JAVA公共资源模块的设计与开发(源代码+l文).zip

    压缩包中的"说明.pdf"可能包含详细的步骤指导和最佳实践,而"JAVA公共资源模块的设计与开发(源代码+l文)"则提供了实际的代码示例和相关的技术文档。通过学习这些资料,开发者不仅可以了解公共资源模块的基本概念,还...

    java详细笔记

    - 分析开源项目或Java标准库的源代码,有助于深入理解Java的设计思想和实现方式,提升编程技能。 这份笔记详尽地介绍了Java编程语言的各个方面,无论是初学者还是有经验的开发者,都能从中获得宝贵的启示和知识。...

    java 讲师笔记

    4.14 增强型for循环:Java提供了增强型for循环来简化遍历集合和数组的代码。 4.15 List高级-数据结构:Queue队列和Deque栈是Java集合框架中的接口,提供了先进先出和后进先出的数据结构。 4.16 Set集合的实现类...

Global site tag (gtag.js) - Google Analytics