ThreadPoolExecutor类学习
Java中的线程池技术主要用的是ThreadPoolExecutor 这个类。先来看这个类的构造函数,
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize 线程池维护线程的最少数量
maximumPoolSize 线程池维护线程的最大数量
keepAliveTime 线程池维护线程所允许的空闲时间
workQueue 任务队列,用来存放我们所定义的任务处理线程
threadFactory 线程创建工厂
handler 线程池对拒绝任务的处理策略
ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize 设置的边界自动调整池大小。当新任务在方法execute(Runnable) 中提交时, 如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程。 如果设置的corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。
ThreadPoolExecutor是Executors类的实现,Executors类里面提供了一些静态工厂,生成一些常用的线程池,主要有以下几个:
newSingleThreadExecutor:创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
newFixedThreadPool:创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
newCachedThreadPool:创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。
在实际的项目中,我们会使用得到比较多的是newFixedThreadPool,创建固定大小的线程池,但是这个方法在真实的线上环境中还是会有很多问题,这个将会在下面一节中详细讲到。
当任务源源不断的过来,而我们的系统又处理不过来的时候,我们要采取的策略是拒绝服务。RejectedExecutionHandler接口提供了拒绝任务处理的自定义方法的机会。在ThreadPoolExecutor中已经包含四种处理策略。
1)CallerRunsPolicy:线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
这个策略显然不想放弃执行任务。但是由于池中已经没有任何资源了,那么就直接使用调用该execute的线程本身来执行。
2)AbortPolicy:处理程序遭到拒绝将抛出运行时 RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
这种策略直接抛出异常,丢弃任务。
3)DiscardPolicy:不能执行的任务将被删除
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
这种策略和AbortPolicy几乎一样,也是丢弃任务,只不过他不抛出异常。
4)DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,
则重复此过程)
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
该策略就稍微复杂一些,在pool没有关闭的前提下首先丢掉缓存在队列中的最早的任务,然后重新尝试运行该任务。这个策略需要适当小心。
3. ThreadPoolExecutor无界队列使用
public class ThreadPool { private final static String poolName = "mypool"; static private ThreadPool threadFixedPool = new ThreadPool(2); private ExecutorService executor; static public ThreadPool getFixedInstance() { return threadFixedPool; } private ThreadPool(int num) { executor = Executors.newFixedThreadPool(num, new DaemonThreadFactory(poolName)); } public void execute(Runnable r) { executor.execute(r); } public static void main(String[] params) { class MyRunnable implements Runnable { public void run() { System.out.println("OK!"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } for (int i = 0; i < 10; i++) { ThreadPool.getFixedInstance().execute(new MyRunnable()); } try { Thread.sleep(2000); System.out.println("Process end."); } catch (InterruptedException e) { e.printStackTrace(); } } }
在这段代码中,我们发现我们用到了Executors.newFixedThreadPool()函数,这个函数的实现是这样子的:
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
它实际上是创建了一个无界队列的固定大小的线程池。执行这段代码,我们发现所有的任务都正常处理了。但是在真实的线上环境中会存在这样的一个问题,前端的用户请求源源不断的过来,后端的处理线程如果处理时间变长,无法快速的将用户请求处理完返回结果给前端,那么任务队列中将堵塞大量的请求。这些请求在前端都是有超时时间设置的,假设请求是通过套接字过来,当我们的后端处理进程处理完一个请求后,从队列中拿下一个任务,发现这个任务的套接字已经无效了,这是因为在用户端已经超时,将套接字建立的连接关闭了。这样一来我们这边的处理程序再去读取套接字时,就会发生I/0 Exception. 恶性循环,导致我们所有的处理服务线程读的都是超时的套接字,所有的请求过来都抛I/O异常,这样等于我们整个系统都挂掉了,已经无法对外提供正常的服务了。
对于海量数据的处理,现在业界都是采用集群系统来进行处理,当请求的数量不断加大的时候,我们可以通过增加处理节点,反正现在硬件设备相对便宜。但是要保证系统的可靠性和稳定性,在程序方面我们还是可以进一步的优化的,我们下一节要讲述的就是针对线上出现的这类问题的一种处理策略。
4. ThreadPoolExecutor有界队列使用
public class ThreadPool { private final static String poolName = "mypool"; static private ThreadPool threadFixedPool = null; public ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2); private ExecutorService executor; static public ThreadPool getFixedInstance() { return threadFixedPool; } private ThreadPool(int num) { executor = new ThreadPoolExecutor(2, 4,60,TimeUnit.SECONDS, queue,new DaemonThreadFactory(poolName), new ThreadPoolExecutor.AbortPolicy()); } public void execute(Runnable r) { executor.execute(r); } public static void main(String[] params) { class MyRunnable implements Runnable { public void run() { System.out.println("OK!"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } } int count = 0; for (int i = 0; i < 10; i++) { try { ThreadPool.getFixedInstance().execute(new MyRunnable()); } catch (RejectedExecutionException e) { e.printStackTrace(); count++; } } try { log.info("queue size:" + ThreadPool.getFixedInstance().queue.size()); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Reject task: " + count); } }
首先我们来看下这段代码几个重要的参数,corePoolSize 为2,maximumPoolSize为4,任务队列大小为2,每个任务平均处理时间为10ms,一共有10个并发任务。
执行这段代码,我们会发现,有4个任务失败了。这里就验证了我们在上面提到有界队列时候线程池的执行顺序。当新任务在方法 execute(Runnable) 中提交时, 如果运行的线程少于 corePoolSize,则创建新线程来处理请求。 如果运行的线程多于corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程,如果此时线程数量达到maximumPoolSize,并且队列已经满,就会拒绝继续进来的请求。
现在我们调整一下代码中的几个参数,将并发任务数改为200,执行结果Reject task: 182,说明有18个任务成功了,线程处理完一个请求后会接着去处理下一个过来的请求。在真实的线上环境中,会源源不断的有新的请求过来,当前的被拒绝了,但只要线程池线程把当下的任务处理完之后还是可以处理下一个发送过来的请求。
通过有界队列可以实现系统的过载保护,在高压的情况下,我们的系统处理能力不会变为0,还能正常对外进行服务,虽然有些服务可能会被拒绝,至于如何减少被拒绝的数量以及对拒绝的请求采取何种处理策略我将会在下一篇文章《系统的过载保护》中继续阐述。
相关推荐
Executors 部分提供了一些线程池类,例如 ThreadPoolExecutor、ScheduledThreadPoolExecutor 等,这些类可以帮助开发者管理线程池。 JUC 框架的类结构可以分为五个部分: * Lock 框架和 Tools 类 * Collections * ...
提供工厂方法来创建不同类型的线程池,这篇文章主要介绍了Java ThreadPoolExecutor 线程池的使用介绍,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来...
文档可能会详细解释如何使用Java的ExecutorService和ThreadPoolExecutor类来创建线程池,以及如何配置线程池参数以优化性能。通过实际编写100行代码,读者可以亲身体验线程池的实现过程,理解其工作原理。 最后,...
- 线程池:ExecutorService接口和ThreadPoolExecutor类用于管理和控制线程池。 5. **网络编程**: - Socket和ServerSocket:提供客户端和服务器端的通信机制。 - URL和URLConnection:用于访问网络资源。 6. **...
你可能找到关于Java并发API的详细讲解,包括`java.util.concurrent`包下的Thread、Runnable、ExecutorService、Semaphore、CountDownLatch、CyclicBarrier、ThreadPoolExecutor等类的使用方法。此外,还可能探讨了...
这些只是Java学习笔记可能涵盖的部分主题,实际笔记可能还会包括其他高级特性,如注解、Lambda表达式、并发工具类、JDBC数据库操作、Spring框架、Maven项目管理等。通过深入学习这些内容,你可以逐步提升Java编程...
在Java中,`java.util.concurrent.ExecutorService`接口提供了创建和管理线程池的能力,而`ThreadPoolExecutor`类则是其最常用的实现之一。`ThreadPoolExecutor`允许我们自定义线程池的关键参数,如核心线程数、最大...
学习如何使用ThreadPoolExecutor等实现类来管理和控制线程执行。 - **Callable与Future:** Callable接口用于返回结果的任务,Future接口用于获取Callable任务的结果。学习如何使用它们实现异步编程。 - **...
10. 并发工具类:如CountDownLatch、CyclicBarrier、Semaphore、ThreadPoolExecutor等,用于多线程协作和管理。 11. Java内存模型:理解JVM内存结构,包括堆、栈、方法区等,以及垃圾回收机制。 12. 性能优化:...
- 线程池:了解ExecutorService接口和ThreadPoolExecutor类,优化线程管理。 7. **反射** - 反射机制:学习如何获取类的信息,创建并调用动态对象。 8. **字符串处理** - String类:理解String的不可变性,以及...
- **自定义异常**:学习如何创建自己的异常类。 7. **IO流**: - **文件操作**:读写文件,了解File类和BufferedReader/Writer。 - **网络通信**:使用Socket进行客户端-服务器通信。 8. **多线程**: - **...
- 文件操作:学习文件的创建、读写和复制,掌握File类的使用。 - Filter流:学习缓冲流、转换流等过滤流,提高I/O效率。 - NIO(非阻塞I/O):介绍Channel、Buffer、Selector的概念,理解其在高并发场景的优势。 第...
Java专题学习笔记主要...7. **多线程**:Java提供了丰富的多线程支持,包括Thread类、Runnable接口、同步机制(synchronized关键字、wait()、notify()、notifyAll())、线程池(ExecutorService、ThreadPoolExecutor、...
7. **线程与锁优化**:理解线程池的配置(如ThreadPoolExecutor的corePoolSize, maximumPoolSize, keepAliveTime等参数),死锁的检测和避免,以及锁的升级过程(从偏向锁到轻量级锁再到重量级锁)。 8. **编译优化...
Java 中的线程池类是 `java.util.concurrent.ThreadPoolExecutor`,它提供了一个线程池的实现。该类有一个构造方法,参数包括: * `corePoolSize`:线程池维护线程的最少数量 * `maximumPoolSize`:线程池维护线程...
3. **Executor框架** - "20 其实不用造轮子—Executor框架详解-慕课专栏.html":讲解了Java的`ExecutorService`和`ThreadPoolExecutor`,这是管理和控制线程执行的重要工具,可以有效地管理线程池,提高系统性能。...
- **反射机制**:学习如何在运行时获取类的信息,动态创建对象,调用方法。 - **注解**:理解注解的使用,包括元注解、自定义注解及其处理器。 9. **Java高级特性**: - **Lambda表达式**:掌握Java 8引入的函数...
- Executor框架:使用ExecutorService、ThreadPoolExecutor等工具类来管理线程池。 7. **IO流与NIO** - Java NIO(New IO):了解非阻塞I/O模型,使用选择器(Selector)进行多路复用。 8. **网络编程** - ...
3. 类加载器:学习不同的类加载器,如Bootstrap ClassLoader,Extension ClassLoader和App ClassLoader。 4. 垃圾回收:深入理解垃圾回收机制,如分代收集,可达性分析,以及不同GC算法,如Serial、Parallel、CMS和...
- **线程池**:ExecutorService接口和ThreadPoolExecutor类用于管理和调度线程,提高系统效率。 7. **网络编程** - **Socket编程**:创建服务器端Socket和客户端Socket,实现数据的发送和接收。 - **URL和...