`

Java多线程高并发高级篇(四)--线程池工作流程篇

 
阅读更多

前三篇说了线程池一些核心东西,这一篇,我想就线程池的使用从全流程的角度来说下,线程池是如何工作的。


 
 我们以一个调用示例,来逐步深入。

一、固定大小线程池调用示例

1、我们先创建一个corePoolSize大小为5的线程池,然后提交10个任务。

 

public class ThreadPoolDemo {
	
	public static class MyTask implements Runnable{

		@Override
		public void run() {
			// TODO Auto-generated method stub
			System.out.println(System.currentTimeMillis()+"--Thread Id:"+Thread.currentThread().getId());
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
	}
	
	public static void main(String[] args) {
		MyTask task = new MyTask();
		//定义corePoolSize和maxiumPoolSize大小为5的线程池
		ExecutorService es = Executors.newFixedThreadPool(5);
		for (int i = 0; i < 10; i++) {
			es.execute(task);
		}
	}
}

 首先,我们要知道的是,在示例中,我们初始化的线程池内容是什么样子的。我们使用的是大小固定的线程池Executors.newFixedThreadPool(5)。

 

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

 

初始化完成,各属性值见下图,清晰明白,其他不多说。

注意两个属性(要不然看源码都不知道找谁):threadFactory为Executors中的静态内部类DefaultThreadFactory;handler(拒绝策略处理器)使用的是默认的ThreadPoolExecutor中的静态内部类AbortPolicy。

 


 

 

2、我们从es.execute(task)开始分析整个的执行流程。execute方法是ThreadPoolExecutor中的方法,源码内容如下:

 

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

 

①初始时,poolSize值为0,条件poolSize >= corePoolSize不成立,走的都是方法addIfUnderCorePoolSize(command),其中command是提交的需要执行的任务。

该方法源码如下:

 

/**
     * Creates and starts a new thread running firstTask as its first
     * task, only if fewer than corePoolSize threads are running
     * and the pool is not shut down.
     * @param firstTask the task the new thread should run first (or
     * null if none)
     * @return true if successful
     */
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

 

核心代码中关键方法为:addThread(firstTask),源码如下:

 

/**
     * Creates and returns a new thread running firstTask as its first
     * task. Call only while holding mainLock.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none)
     * @return the new thread, or null if threadFactory fails to create thread
     */
    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;
            workers.add(w);
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

 它的作用是在线程池中的线程数量(poolsize)还没有达到指定的corePoolSize之前,将任务提交给线程池中新创建的线程处理(Worker w = new Worker(firstTask);Thread t = threadFactory.newThread(w);),这里做了一层封装,把任务包装成Worker,然后使用Work对象在threadFactory创建新的线程,也就是使用这个线程去处理work。由于这个过程都能创建新线程处理任务,所以if (t == null)不成立,返回的是true,所以条件 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))不成立,下面的逻辑不会执行(这也是设计巧妙的地方)。其他逻辑很简单(每创建一个新线程,poolsize都会自增1,统计目前线程池的大小)。

 

②当线程池大小达到corePoolSize时,也就是条件poolSize >= corePoolSize成立,不会再执行addIfUnderCorePoolSize方法,因为只要线程池不关闭,接收新任务和处理等待队列的任务过程就不会停,所以runState在这个过程始终是0,也就是RUNNING状态(RUNNING-0:  Accept new tasks and process queued tasks)。

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }

左半条件成立后,那么接着就要进行 workQueue.offer(command)了。等待任务队列(workQueue)这回就要派上用场了。因为固定大小的线程池使用的等待任务队列是LinkedBlockingQueue,所以我们看下它的offer方法,核心代码就一行-----insert(e),只要插入成功,就返回true(c变为0,c>=0肯定是成立的)。其实跟我们前面介绍队列术时候的put方法一样,就是放任务,前面帖子已经说过,这里不再赘述,不清楚的可以回过去看。

public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                insert(e);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

 

我们使用固定大小线程池,涉及到核心方法只有addIfUnderCorePoolSize,因为固定线程池使用的等待任务队列是LinkedBlockingQueue,接近于无限队列,因此在没有达到Integer.MAX_VALUE个任务时,是不会执行到addIfUnderMaximumPoolSize方法的。为了看下addIfUnderMaximumPoolSize做了什么,我们还需要举一例,使用newCachedThreadPool。

 

二、newCachedThreadPool使用举例

为什么要用它作为举例,因为它有个特殊之处,workQueue是使用SynchronousQueue,无大小,也就是人们说的直接提交队列(不明白的回头看我写的队列术解密),因此很容易走到方法addIfUnderMaximumPoolSize(if under maximun pool size,add command)。

我们看下它的源码:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

 

这段源码,核心代码也就一段:

 if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);

当等待任务队列已满时,那么就需要判断线程池中处理任务的线程个数是否达到了规定的最大线程池大小maxinumPoolSize,如果没有达到,则将任务使用work对象包裹任务,然后利用work对象创建新线程至线程池中,处理该任务;如果线程池大小达到了maxinumPoolSize,那么就需要执行reject(command)--执行拒绝策略了。

 

附示例及示例程序执行过程图:

public static void main(String[] args) {
		MyTask task = new MyTask();
		//定义corePoolSize和maxiumPoolSize大小为5的线程池
		ExecutorService es = Executors.newCachedThreadPool();
		for (int i = 0; i < 10; i++) {
			es.execute(task);
		}
	}

 

执行过程程序图:


 


 

 

  • 大小: 46.6 KB
  • 大小: 52.3 KB
  • 大小: 37.1 KB
  • 大小: 28.4 KB
分享到:
评论

相关推荐

    java多线程、并发及线程池介绍收藏的几篇文档

    Java多线程、并发以及线程池是Java编程中至关重要的概念,特别是在处理高并发、高性能的系统设计时。以下是对这些主题的详细说明: 1. **Java 程序中的多线程** - 多线程允许一个程序同时执行多个任务,提高程序...

    Java多线程与高并发入门到精通-视频教程网盘链接提取码下载.txt

    综上所述,《Java多线程与高并发入门到精通》是一门全面涵盖Java多线程编程基础及高级技术的课程。无论您是初学者还是有一定基础的开发者,都能够从中获得宝贵的知识和经验。通过学习这门课程,您将能够掌握多线程的...

    java 多线程高并发相关资料收集

    本文将围绕“Java多线程高并发相关资料收集”这一主题,详细探讨这两个领域的核心知识点。 首先,多线程是指在单个程序中同时执行多个线程。Java提供了一个强大的多线程支持,允许开发者创建、管理和控制多个执行...

    Java多线程实战精讲-带你一次搞明白Java多线程高并发

    Java多线程实战精讲是Java开发者必备的技能之一,特别是在处理高并发场景时,它的重要性不言而喻。本文将深入探讨Java多线程的相关知识点,帮助你全面理解并掌握这一核心概念。 1. **线程基础** - **线程定义**:...

    Java 模拟线程并发

    Java 模拟线程并发是编程领域中的一个重要概念,尤其在多核处理器和高并发应用中,理解并熟练掌握线程并发技术对于提升程序性能至关重要。在Java中,线程并发可以通过多种方式实现,包括继承Thread类、实现Runnable...

    CVI学习文件-多线程 线程池(修改增加学习版)

    在IT行业中,多线程和线程池是高级编程中不可或缺的部分,特别是在视觉计算领域,如CVI(Cooperative Visual Inspection)系统。多线程允许程序同时执行多个任务,提高系统的并发性能,而线程池则是一种管理和优化...

    Java多线程编程实战指南-核心篇

    《Java多线程编程实战指南-...通过《Java多线程编程实战指南-核心篇》,你可以深入了解Java并发编程的各个方面,提升在高并发场景下的编程能力。书中的案例和练习将帮助你更好地掌握理论知识,并将其应用于实际项目中。

    java并发库高级应用源码--张孝祥

    在《java并发库高级应用源码--张孝祥》中,我们将会深入探讨Java中的线程管理和并发控制策略,这对于我们理解和优化多线程程序至关重要。 首先,Java中的`Thread`类是实现并发的基础,它代表了一个独立的执行线程。...

    JAVA线程高级-线程按序交替执行

    在Java编程中,多线程是并发编程的重要组成部分,它允许程序同时执行多个任务,从而提高了系统的效率和响应性。然而,在某些场景下,我们可能需要控制线程的执行顺序,确保它们按照特定的顺序交替运行,这在并发编程...

    Socket网络编程学习笔记之---使用线程池提高性能

    线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池可以有效地控制运行的线程数量,当线程数量过多时,可以适当控制新的线程的创建,避免过多线程导致系统资源的...

    java多线程和并发.pdf

    Java多线程与并发编程是Java语言中用于处理多任务执行的关键技术,它能够帮助开发者设计出能够有效应对高并发请求的应用程序。在现代的线上(Online)和离线(Offline)应用中,合理利用多线程技术可以大幅提高系统...

    java多线程并发

    ### Java多线程并发知识点详解 #### 一、Java多线程并发简介 在现代软件开发中,特别是在Java这样的主流编程语言中,多线程并发技术是提高程序执行效率、优化资源利用的关键手段之一。本篇文章将深入探讨Java中的...

    多线程精品资源--高并发-高可靠-高性能three-high-import导入系统-高并发多线程进阶.zip

    "多线程精品资源--高并发-高可靠-高性能three-high-import导入系统-高并发多线程进阶.zip" 这个压缩包文件名暗示了其内容可能包含了一系列关于如何在复杂系统中有效地利用多线程来达到高并发、高可靠性和高性能的...

    Java多线程与并发库高级应用

    ### Java多线程与并发库高级应用 #### 一、Java多线程基础 在深入探讨Java多线程与并发库的高级应用之前,我们首先需要回顾一下Java多线程的基础概念和技术要点。 ##### 1.1 线程的概念 在计算机科学中,线程是...

    人工智能-项目实践-多线程-Java多线程高并发实例.zip

    通过分析和运行这些代码,你将能够更深入地理解Java多线程在高并发场景下的实际运用,从而在你的人工智能项目中实现更高效、更稳定的数据处理。 总之,这个项目实例旨在帮助开发者掌握Java多线程技术,提升处理高...

    curl线程池多线程调用

    在实际操作中,我们可以通过编写脚本或者使用编程语言(如Python、Java等)来调用`curl`,并利用这些语言提供的多线程库来构建线程池。例如,Python中的`concurrent.futures.ThreadPoolExecutor`可以用来创建线程池...

    java多线程视频教程(共七套)

    06、【深度进阶】【高级原理实战】java高级多线程高并发编程实战(三个阶段) 07、【高并发项目实战】多线程并发分布式并发项目实战 共74个G 失效的话及时回复,或发送邮件至yn_gizarm@qq.com说明

    张孝祥Java多线程与并发库高级应用视频教程练习代码

    本教程的焦点在于“张孝祥Java多线程与并发库高级应用视频教程”的实践代码,旨在帮助开发者深入理解并熟练掌握这些关键概念。 首先,我们要明确多线程的概念。在单处理器系统中,多线程允许程序同时执行多个任务,...

    黑马程序员_张孝祥_Java多线程与并发库 视频+代码+资料

    根据给定文件的信息,我们...通过以上知识点的学习,开发者可以深入理解Java多线程编程的核心概念和技术细节,为实际开发工作打下坚实的基础。同时,掌握这些高级并发技术对于构建高性能、高可靠性的应用系统至关重要。

    Java 线程池的原理与实现

    Java线程池是一种高级的多线程处理框架,它是Java并发编程中非常重要的一个组件。线程池的原理和实现涉及到操作系统调度、内存管理和并发控制等多个方面。理解线程池的工作原理有助于优化程序性能,避免过度创建和...

Global site tag (gtag.js) - Google Analytics