import java.util.concurrent.TimeUnit; public class ThreadPoolTask implements Runnable { private final Object threadPoolTaskData; private static long consumerTaskSleepTime = 2L; public ThreadPoolTask(Object tasks) { this.threadPoolTaskData = tasks; } @Override public void run() { System.out.println("start :" + threadPoolTaskData); try { TimeUnit.SECONDS.sleep(consumerTaskSleepTime); } catch (Exception e) { e.printStackTrace(); } System.out.println("finish " + threadPoolTaskData); } }
Abort(中止)策略:该策略会抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。代码如下:
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPool { private static int executePrograms = 0; private static int produceTaskMaxNumber = 10; /** * @param args */ public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.AbortPolicy()); for(int i = 0 ; i < produceTaskMaxNumber; i ++) { try { String task = "task@ " + i; System.out.println("put " + task); threadPoolExecutor.execute(new ThreadPoolTask(task)); TimeUnit.SECONDS.sleep(executePrograms); } catch (Exception e) { e.printStackTrace(); } } } }
程序的执行结果如下:
put task@ 0 put task@ 1 start :task@ 0 put task@ 2 put task@ 3 put task@ 4 put task@ 5 start :task@ 1 put task@ 6 start :task@ 5 put task@ 7 start :task@ 6 put task@ 8 java.util.concurrent.RejectedExecutionException put task@ 9 at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656) at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22) java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656) at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22) java.util.concurrent.RejectedExecutionException at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1774) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:768) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:656) at com.bohai.thread.pool.ThreadPool.main(ThreadPool.java:22) finish task@ 0 start :task@ 2 finish task@ 1 start :task@ 3 finish task@ 5 start :task@ 4 finish task@ 6 finish task@ 2 finish task@ 3 finish task@ 4
分析运行结果:
corepoolsize = 2 ,maxpoolsize = 4,queue size = 3。运行中的线程数+队列中等待的线程数 = 7,及提交的线程数大于7时,会抛出异常。
DiscardPolicy 策略会悄悄抛弃新提交的任务:代码如下:
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPool { private static int executePrograms = 0; private static int produceTaskMaxNumber = 10; /** * @param args */ public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardPolicy()); for(int i = 0 ; i < produceTaskMaxNumber; i ++) { try { String task = "task@ " + i; System.out.println("put " + task); threadPoolExecutor.execute(new ThreadPoolTask(task)); TimeUnit.SECONDS.sleep(executePrograms); } catch (Exception e) { e.printStackTrace(); } } } }
执行结果如下:
put task@ 0 put task@ 1 start :task@ 0 put task@ 2 put task@ 3 put task@ 4 put task@ 5 start :task@ 1 put task@ 6 put task@ 7 put task@ 8 put task@ 9 start :task@ 6 start :task@ 5 finish task@ 0 finish task@ 6 start :task@ 2 finish task@ 1 start :task@ 3 start :task@ 4 finish task@ 5 finish task@ 2 finish task@ 3 finish task@ 4
分析结果:
提交的任务数有10个,开始和结束的任务数7个。抛弃了新提交的任务7、8、9。
DiscardOldestPolicy 抛弃旧的线程。代码如下:
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPool { private static int executePrograms = 0; private static int produceTaskMaxNumber = 10; /** * @param args */ public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.DiscardOldestPolicy()); for(int i = 0 ; i < produceTaskMaxNumber; i ++) { try { String task = "task@ " + i; System.out.println("put " + task); threadPoolExecutor.execute(new ThreadPoolTask(task)); TimeUnit.SECONDS.sleep(executePrograms); } catch (Exception e) { e.printStackTrace(); } } } }
执行结果如下:
put task@ 0 put task@ 1 put task@ 2 put task@ 3 put task@ 4 put task@ 5 put task@ 6 put task@ 7 put task@ 8 put task@ 9 start :task@ 0 start :task@ 5 start :task@ 1 start :task@ 6 finish task@ 0 start :task@ 7 finish task@ 5 start :task@ 8 finish task@ 1 start :task@ 9 finish task@ 6 finish task@ 7 finish task@ 8 finish task@ 9
对结果进行分析:
线程池抛弃了2、3、4,线程0、1没有进入队列,进行开始执行。所以不会被抛出。
CallerRunsPolicy 策略实现一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。代码如下:
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ThreadPool { private static int executePrograms = 0; private static int produceTaskMaxNumber = 10; /** * @param args */ public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3),new ThreadPoolExecutor.CallerRunsPolicy()); for(int i = 0 ; i < produceTaskMaxNumber; i ++) { try { String task = "task@ " + i; System.out.println("put " + task); threadPoolExecutor.execute(new ThreadPoolTask(task)); TimeUnit.SECONDS.sleep(executePrograms); } catch (Exception e) { e.printStackTrace(); } } } }
运行结果如下:
put task@ 0 put task@ 1 put task@ 2 put task@ 3 put task@ 4 put task@ 5 start :task@ 1 put task@ 6 put task@ 7 start :task@ 7 start :task@ 6 start :task@ 0 start :task@ 5 finish task@ 7 put task@ 8 finish task@ 1 start :task@ 2 finish task@ 6 finish task@ 0 finish task@ 5 put task@ 9 start :task@ 3 start :task@ 8 start :task@ 4 finish task@ 2 start :task@ 9 finish task@ 3 finish task@ 8 finish task@ 4 finish task@ 9
对结果进行分析:
所有的线程都执行了。
相关推荐
JAVA线程池是一种高效管理线程的技术,它允许开发者预设线程数量,避免无限制地创建和销毁线程带来的开销。...通过合理配置线程池参数,结合业务场景选择合适的阻塞队列和饱和策略,可以实现高性能的并发处理。
ThreadPoolExecutor饱和策略有哪些 可达性分析 成员变量与局部变量的区别有那些? HashMap 的长度为什么是2的幂次方 谈谈JVM中,对类加载器的认识 你对线程优先级的理解是什么? 构造器(constructor)是否可...
- **饱和策略**:当所有线程都在工作且任务队列也已满时,线程池会根据配置的饱和策略来处理无法接收的任务。 - **线程销毁**:当任务减少时,超过核心线程数的空闲线程会在一定时间内被销毁,而核心线程则会一直...
本文将深入探讨名为“PollDemo”的示例,通过分析其源代码,帮助开发者理解和应用线程池中的Poll策略。 “PollDemo.rar”是一个压缩包,包含了一个用于演示线程池Poll操作的程序。由于文件过多,无法逐一验证每个...
通过线程池,可以避免频繁创建和销毁线程带来的开销,同时可以配置线程池的拒绝策略,应对任务提交过快导致的资源饱和问题。 五、Java并发工具 Java提供了一系列并发工具类,如`Semaphore`(信号量)、`...
5. **源码分析**:对于"工具"标签,可能意味着这篇博文深入解析了`ThreadPoolExecutor`的源码,包括线程的创建、调度策略、任务的提交和取消等方面,这对于理解和优化线程池的使用非常有帮助。 6. **优化实践**:...
饱和策略 AbortPolicy DiscardPolicy DiscardOldestPolicy CallerRunsPolicy 线程工厂 在调用构造函数后再定制ThreadPoolExecutor 扩展 ThreadPoolExecutor afterExecute(Runnable r,...
项目描述:对java.util.concurrent包下线程池相关源码进行重新实现,深入研究和学习线程池超时机制、饱和策略、生命周期等知识 ThreadPoolExecutor类下部分方法和内部类介绍: 1、Worker类: 描述:Worker类实现...
8.3.3 饱和策略144 8.3.4 线程工厂146 8.3.5 在调用构造函数后再定制ThreadPoolExecutor147 8.4 扩展 ThreadPoolExecutor148 8.5 递归算法的并行化149 第9章 图形用户界面应用程序156 9.1 为什么GUI是单线程...
- `handler`:饱和策略 - `RejectedExecutionHandler`:拒绝策略 #### 五、Java并发工具类 - **`ConcurrentHashMap`**:线程安全的哈希表实现,使用分段锁来减少锁竞争。 - **`ConcurrentLinkedQueue`**:线程...
- 对于图片加载,可以采用图像压缩技术减少内存占用,并使用LRU缓存策略来管理图片缓存。 - 及时释放不再使用的资源,如关闭数据库连接、释放Bitmap资源等。 - 使用工具如MAT (Memory Analyzer Tool) 来监控和...
- **第28章:有界队列的饱和策略** 探讨不同类型的拒绝策略,如`AbortPolicy`、`DiscardPolicy`、`DiscardOldestPolicy`和`CallerRunsPolicy`,以及如何使用`Semaphore`来控制队列的大小。 - **第29章:死锁的成因...