`

JAVA线程池ThreadPoolExecutor与阻塞队列BlockingQueue

 
阅读更多

从Java5开始,Java提供了自己的线程池。每次只执行指定数量的线程,java.util.concurrent.ThreadPoolExecutor 就是这样的线程池。以下是我的学习过程。

首先是构造函数签名如下:

 

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); 

参数介绍:

 

corePoolSize 核心线程数,指保留的线程池大小(不超过maximumPoolSize值时,线程池中最多有corePoolSize 个线程工作)。 
maximumPoolSize 指的是线程池的最大大小(线程池中最大有corePoolSize 个线程可运行)。 
keepAliveTime 指的是空闲线程结束的超时时间(当一个线程不工作时,过keepAliveTime 长时间将停止该线程)。 
unit 是一个枚举,表示 keepAliveTime 的单位(有NANOSECONDS, MICROSECONDS, MILLISECONDS, SECONDS, MINUTES, HOURS, DAYS,7个可选值)。 
workQueue 表示存放任务的队列(存放需要被线程池执行的线程队列)。 
handler 拒绝策略(添加任务失败后如何处理该任务).

1、线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
2、当调用 execute() 方法添加一个任务时,线程池会做如下判断:
    a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
    c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
    d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
3、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
       这个过程说明,并不是先加入任务就一定会先执行。假设队列大小为 4,corePoolSize为2,maximumPoolSize为6,那么当加入15个任务时,执行的顺序类似这样:首先执行任务 1、2,然后任务3~6被放入队列。这时候队列满了,任务7、8、9、10 会被马上执行,而任务 11~15 则会抛出异常。最终顺序是:1、2、7、8、9、10、3、4、5、6。当然这个过程是针对指定大小的ArrayBlockingQueue<Runnable>来说,如果是LinkedBlockingQueue<Runnable>,因为该队列无大小限制,所以不存在上述问题。

示例一,LinkedBlockingQueue<Runnable>队列使用:

 

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created on 2011-12-28
 * <p>Description: [Java 线程池学习]</p>
 * @author         shixing_11@sina.com
 */
public class ThreadPoolTest implements Runnable { 
     public void run() { 
          synchronized(this) { 
            try{
                System.out.println(Thread.currentThread().getName());
                Thread.sleep(3000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
          } 
     } 
     
     public static void main(String[] args) { 
         BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 10; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("线程队列大小为-->"+threadSize);
         }   
         executor.shutdown();  
     }
} 

输出结果如下:

 

线程队列大小为-->0
线程名称:pool-1-thread-1
线程队列大小为-->0
线程队列大小为-->1
线程队列大小为-->2
线程队列大小为-->3
线程队列大小为-->4
线程队列大小为-->5
线程队列大小为-->6
线程队列大小为-->7
线程队列大小为-->8
线程名称:pool-1-thread-2
线程名称:pool-1-thread-1
线程名称:pool-1-thread-2
线程名称:pool-1-thread-1
线程名称:pool-1-thread-2
线程名称:pool-1-thread-1
线程名称:pool-1-thread-2
线程名称:pool-1-thread-1
线程名称:pool-1-thread-2

可见,线程队列最大为8,共执行了10个线线程。因为是从线程池里运行的线程,所以虽然将线程的名称设为"TestThread".concat(""+i),但输出后还是变成了pool-1-thread-x。

示例二,LinkedBlockingQueue<Runnable>队列使用:

 

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created on 2011-12-28
 * <p>Description: [Java 线程池学习]</p>
 * @author         shixing_11@sina.com
 */
public class ThreadPoolTest implements Runnable { 
     public void run() { 
          synchronized(this) { 
            try{
                System.out.println("线程名称:"+Thread.currentThread().getName());
                Thread.sleep(3000); //休眠是为了让该线程不至于执行完毕后从线程池里释放
            }catch (InterruptedException e){
                e.printStackTrace();
            }
          } 
     } 
     
     public static void main(String[] args) throws InterruptedException { 
         BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定为4的线程队列
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 10; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("线程队列大小为-->"+threadSize);
         }   
         executor.shutdown();  
     }
}

输出结果如下:

 

线程队列大小为-->0
线程队列大小为-->0
线程队列大小为-->1
线程队列大小为-->2
线程队列大小为-->3
线程队列大小为-->4
线程队列大小为-->4
线程队列大小为-->4
线程队列大小为-->4
线程名称:pool-1-thread-1
线程名称:pool-1-thread-3
线程名称:pool-1-thread-2
线程名称:pool-1-thread-4
线程队列大小为-->4
线程名称:pool-1-thread-5
线程名称:pool-1-thread-6
线程名称:pool-1-thread-5
线程名称:pool-1-thread-6
线程名称:pool-1-thread-4
线程名称:pool-1-thread-2

可见,总共10个线程,因为核心线程数为2,2个线程被立即运行,线程队列大小为4,所以4个线程被加入队列,最大线程数为6,还能运行6-2=4个,其10个线程的其余4个线程又立即运行了。

如果将我们要运行的线程数10改为11,则由于最大线程数6+线程队列大小4=10<11,则根据线程池工作原则,最后一个线程将被拒绝策略拒绝,将示例二的main方法改为如下:

 

     public static void main(String[] args) throws InterruptedException { 
         ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定为4的线程队列
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 11; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("线程队列大小为-->"+threadSize);
         }   
         executor.shutdown();  
     }

输出结果:

 

线程队列大小为-->0
线程名称:pool-1-thread-1
线程队列大小为-->0
线程队列大小为-->1
线程队列大小为-->2
线程队列大小为-->3
线程队列大小为-->4
线程名称:pool-1-thread-2
线程队列大小为-->4
线程名称:pool-1-thread-3
线程队列大小为-->4
线程名称:pool-1-thread-4
线程队列大小为-->4
线程名称:pool-1-thread-5
线程队列大小为-->4
线程名称:pool-1-thread-6
Exception in thread "main" java.util.concurrent.RejectedExecutionException
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at ths.ThreadPoolTest.main(ThreadPoolTest.java:30)

线程名称:pool-1-thread-1
线程名称:pool-1-thread-3
线程名称:pool-1-thread-2
线程名称:pool-1-thread-4

很明显,抛RejectedExecutionException异常了,被拒绝策略拒绝了,这就说明线程超出了线程池的总容量(线程队列大小+最大线程数)。

         对于 java.util.concurrent.BlockingQueue 类有有三种方法将线程添加到线程队列里面,然而如何区别三种方法的不同呢,其实在队列未满的情况下结果相同,都是将线程添加到线程队列里面,区分就在于当线程队列已经满的时候,此时

public boolean add(E e) 方法将抛出IllegalStateException异常,说明队列已满。

public boolean offer(E e) 方法则不会抛异常,只会返回boolean值,告诉你添加成功与否,队列已满,当然返回false。

public void put(E e) throws InterruptedException 方法则一直阻塞(即等待,直到线程池中有线程运行完毕,可以加入队列为止)。

为了证明对上面这三个方法的描述,我们将示例二改为如下、public boolean add(E e)方法测试程序:

 

     public static void main(String[] args) throws InterruptedException { 
         BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定为4的线程队列
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 10; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("线程队列大小为-->"+threadSize);
             if (threadSize==4){
                 queue.add(new Runnable() {  //队列已满,抛异常
                     @Override
                     public void run(){
                         System.out.println("我是新线程,看看能不能搭个车加进去!");
                         
                     }
                 });
             }
         }   
         executor.shutdown();  
     }

运行结果:

 

线程队列大小为-->0
线程名称:pool-1-thread-1
线程队列大小为-->0
线程队列大小为-->1
线程队列大小为-->2
线程队列大小为-->3
线程队列大小为-->4
线程名称:pool-1-thread-2
Exception in thread "main" java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(Unknown Source)
at java.util.concurrent.ArrayBlockingQueue.add(Unknown Source)
at ths.ThreadPoolTest.main(ThreadPoolTest.java:35)

线程名称:pool-1-thread-1
线程名称:pool-1-thread-2
线程名称:pool-1-thread-2
线程名称:pool-1-thread-1

很明显,当线程队列已满,即线程队列里的线程数为4时,抛了异常,add线程失败。再来看public boolean offer(E e) 方法测试程序:

 

public static void main(String[] args) throws InterruptedException { 
         BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定为4的线程队列
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 10; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("线程队列大小为-->"+threadSize);
             if (threadSize==4){
                 final boolean flag = queue.offer(new Runnable() {
                     @Override
                     public void run(){
                         System.out.println("我是新线程,看看能不能搭个车加进去!");
                         
                     }
                 });
                 System.out.println("添加新线程标志为-->"+flag);
             }
         }   
         executor.shutdown();  
     }

运行结果如下:

 

线程队列大小为-->0
线程名称:pool-1-thread-1
线程队列大小为-->0
线程队列大小为-->1
线程队列大小为-->2
线程队列大小为-->3
线程队列大小为-->4
添加新线程标志为-->false
线程队列大小为-->4
线程名称:pool-1-thread-3
添加新线程标志为-->false
线程名称:pool-1-thread-2
线程队列大小为-->4
添加新线程标志为-->false
线程名称:pool-1-thread-4
线程队列大小为-->4
添加新线程标志为-->false
线程名称:pool-1-thread-5
线程队列大小为-->4
添加新线程标志为-->false
线程名称:pool-1-thread-6
线程名称:pool-1-thread-1
线程名称:pool-1-thread-2
线程名称:pool-1-thread-3
线程名称:pool-1-thread-4


可以看到,当线程队列已满的时候,线程没有被添加到线程队列,程序也没有抛异常。继续看public void put(E e) throws InterruptedException;方法测试程序:

 

public static void main(String[] args) throws InterruptedException { 
         BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(4); //固定为4的线程队列
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);
         for (int i = 0; i < 10; i++) {   
             executor.execute(new Thread(new ThreadPoolTest(), "TestThread".concat(""+i)));   
             int threadSize = queue.size();
             System.out.println("线程队列大小为-->"+threadSize);
             if (threadSize==4){
                 queue.put(new Runnable() {
                     @Override
                     public void run(){
                         System.out.println("我是新线程,看看能不能搭个车加进去!");
                     }
                 });
             }
         }   
         executor.shutdown();  
     }

结果如下:

 

线程队列大小为-->0
线程队列大小为-->0
线程队列大小为-->1
线程队列大小为-->2
线程队列大小为-->3
线程队列大小为-->4
线程名称:pool-1-thread-1
线程名称:pool-1-thread-2
线程名称:pool-1-thread-1
线程名称:pool-1-thread-2
线程队列大小为-->3
线程队列大小为-->4
线程名称:pool-1-thread-3
线程名称:pool-1-thread-2
线程队列大小为-->4
线程名称:pool-1-thread-1
线程队列大小为-->4
我是新线程,看看能不能搭个车加进去!
线程名称:pool-1-thread-3
线程名称:pool-1-thread-4
我是新线程,看看能不能搭个车加进去!
线程名称:pool-1-thread-3
我是新线程,看看能不能搭个车加进去!
我是新线程,看看能不能搭个车加进去!

很明显,尝试了四次才加进去,前面三次尝试添加,但由于线程sleep(3000),所以没有执行完,线程队列一直处于满的状态,直到某个线程执行完,队列有空位,新线程才加进去,没空位之前一直阻塞(即等待),我能加进去为止。

 

那么线程池的排除策略是什么样呢,一般按如下规律执行:

A.  如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
B.  如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
C.  如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

 

总结:

1. 线程池可立即运行的最大线程数 即maximumPoolSize 参数。

2. 线程池能包含的最大线程数 = 可立即运行的最大线程数 + 线程队列大小 (一部分立即运行,一部分装队列里等待)

3. 核心线程数可理解为建议值,即建议使用的线程数,或者依据CPU核数

4. add,offer,put三种添加线程到队列的方法只在队列满的时候有区别,add为抛异常,offer返回boolean值,put直到添加成功为止。

5.同理remove,poll, take三种移除队列中线程的方法只在队列为空的时候有区别, remove为抛异常,poll为返回boolean值, take等待直到有线程可以被移除。

看看下面这张图就清楚了:


记在这里做为学习的过程,以后偶尔有空翻起来容易。

 

分享到:
评论

相关推荐

    java 线程池实现多并发队列后进先出

    - **监控与调优**:使用`ThreadPoolExecutor`提供的`getPoolSize`、`getActiveCount`、`getCompletedTaskCount`等方法监控线程池状态,根据实际情况进行调优。 在实际应用中,理解线程池的工作原理以及如何定制...

    并发-线程池和阻塞队列

    在Java编程中,"并发-线程池和...总之,理解和掌握线程池与阻塞队列的原理和使用方法,是提升Java并发编程能力的重要一步。它们为开发人员提供了一种强大而灵活的工具,能够在处理并发问题时保证系统的稳定性和性能。

    10、阻塞队列BlockingQueue实战及其原理分析

    阻塞队列BlockingQueue是Java并发编程中一个重要的数据结构,它是线程安全的队列,主要用于生产者消费者模型中的数据交换。在Java的`java.util.concurrent`包中,提供了多种实现阻塞队列的类,如`ArrayBlockingQueue...

    自定义实现Java线程池

    ### 自定义实现Java线程池 #### 一、概述 在深入探讨自定义Java线程池之前,我们先简要回顾一下线程池的基本概念及其重要性。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动...

    java多线程程序设计:Java NIO+多线程实现聊天室

    阻塞队列BlockingQueue,生产者消费者模式 Selector Channel ByteBuffer ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子变量 内置锁 CompletionService log4j+slf4j日志 实现的...

    java中线程队列BlockingQueue的用法

    在Java编程中,`BlockingQueue`(阻塞队列)是一种重要的并发工具,它结合了队列的数据结构和线程同步机制。`BlockingQueue`接口位于`java.util.concurrent`包中,提供了线程安全的数据结构,可以用于实现生产者-...

    java 线程池

    线程池在Java中由`java.util.concurrent`包下的`ExecutorService`接口及其实现类,尤其是`ThreadPoolExecutor`类提供。这篇博文可能详细介绍了线程池的工作原理、配置参数以及如何使用。 线程池的核心组件包括: 1....

    基于java开发的NIO+多线程实现聊天室+源码+项目解析(毕业设计&课程设计&项目开发)

    阻塞队列BlockingQueue,生产者消费者模式 Selector Channel ByteBuffer ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子变量 内置锁 CompletionService log4j+slf4j日志 实现的...

    java线程池概念.txt

    线程池的排队策略与BlockingQueue有关。 threadFactory:线程工厂,主要用来创建线程:默认值 DefaultThreadFactory; handler:表示当拒绝处理任务时的策略,就是上面提及的reject操作;有以下四种取值:  ...

    Java线程池编程

    这是因为Java的线程池采用了阻塞队列(BlockingQueue)来处理任务的入队和出队,因此主线程不会被阻塞,这是与某些自定义实现的线程池(如Tomcat的)的一个区别。 然而,固定大小的线程池可能会导致资源浪费,因为...

    线程池java

    在Java中,`ThreadPoolExecutor`是线程池的核心实现类之一,它提供了丰富的配置选项来满足不同的应用场景需求。该类继承自`AbstractExecutorService`,实现了线程池的主要功能。 **构造方法详解**: ```java ...

    java线程池的使用方式

    ### Java线程池的使用方式 #### 一、简介 线程在Java中扮演着至关重要的角色。在早期的JDK版本(如JDK 1.4及之前)中,线程池的功能相对简单,使用起来不够灵活。然而,自JDK 1.5开始,随着`java.util.concurrent`...

    Java NIO+多线程实现聊天室

    阻塞队列BlockingQueue,生产者消费者模式 选择器 渠道 字节缓冲区 ProtoStuff 高性能序列化 HttpClient连接池 Spring依赖注入 lombok简化POJO开发 原子指标 内置锁 竣工服务 log4j+slf4j日志 实现的功能 登录注销 ...

    java.util.concurrent 实现线程池队列

    在Java中,`java.util.concurrent.ExecutorService` 接口代表了一个线程池服务,而`ThreadPoolExecutor` 是它的具体实现,我们可以自定义线程池的核心参数,如核心线程数、最大线程数、线程存活时间、线程队列等。...

    Java开发基于多线程和NIO实现聊天室源码+项目说明(含服务端+客户端).zip

    - 阻塞队列BlockingQueue,生产者消费者模式 - Selector - Channel - ByteBuffer - ProtoStuff 高性能序列化 - HttpClient连接池 - Spring依赖注入 - lombok简化POJO开发 - 原子变量 - 内置锁 - ...

    JAVA线程池应用.pptx

    - **BlockingQueue**:阻塞队列,用于存放Runnable任务,主要有ArrayBlockingQueue、LinkedBlockingQueue和SynchronousQueue三种类型。 #### 六、拒绝执行策略 当线程池无法处理更多任务时,需要采取一定的策略来...

    java线程池详解文档

    `ThreadPoolExecutor`是Java中实现线程池功能的核心类,提供了丰富的构造方法和配置选项,可以满足各种复杂的业务场景需求。 ##### 构造方法参数解析 ```java public ThreadPoolExecutor(int corePoolSize, int ...

    并发编程demo测试包含线程池、锁、队列、信号、cas等等

    Java中的ArrayBlockingQueue和LinkedBlockingQueue是两种常见的阻塞队列实现。 信号(信号量/Semaphore)是另一种同步原语,用于限制同时访问特定资源的线程数量。它可以看作是“许可证”,当线程获取一个许可证后...

    java并发工具包 java.util.concurrent中文版用户指南pdf

    2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 SynchronousQueue 8. 阻塞...

    java并发工具包详解

    2. 阻塞队列 BlockingQueue 3. 数组阻塞队列 ArrayBlockingQueue 4. 延迟队列 DelayQueue 5. 链阻塞队列 LinkedBlockingQueue 6. 具有优先级的阻塞队列 PriorityBlockingQueue 7. 同步队列 Synchronou sQueue 8. ...

Global site tag (gtag.js) - Google Analytics