背景
前段时间一个项目中因为涉及大量的线程开发,把jdk cocurrent的代码重新再过了一遍。这篇文章中主要是记录一下学习ThreadPoolExecutor过程中容易被人忽略的点,Doug Lea的整个类设计还是非常nice的
正文
先看一副图,描述了ThreadPoolExecutor的工作机制:
整个ThreadPoolExecutor的任务处理有4步操作:
第一步,初始的poolSize < corePoolSize,提交的runnable任务,会直接做为new一个Thread的参数,立马执行
第二步,当提交的任务数超过了corePoolSize,就进入了第二步操作。会将当前的runable提交到一个block queue中
第三步,如果block queue是个有界队列,当队列满了之后就进入了第三步。如果poolSize < maximumPoolsize时,会尝试new 一个Thread的进行救急处理,立马执行对应的runnable任务
第四步,如果第三步救急方案也无法处理了,就会走到第四步执行reject操作。
几点说明:(相信这些网上一搜一大把,我这里简单介绍下,为后面做一下铺垫) block queue有以下几种实现:
1. ArrayBlockingQueue : 有界的数组队列
2. LinkedBlockingQueue : 可支持有界/无界的队列,使用链表实现
3. PriorityBlockingQueue : 优先队列,可以针对任务排序
4. SynchronousQueue : 队列长度为1的队列,和Array有点区别就是:client thread提交到block queue会是一个阻塞过程,直到有一个worker thread连接上来poll task。
RejectExecutionHandler是针对任务无法处理时的一些自保护处理:
1. Reject 直接抛出Reject exception
2. Discard 直接忽略该runnable,不可取
3. DiscardOldest 丢弃最早入队列的的任务
4. CallsRun 直接让原先的client thread做为worker线程,进行执行
容易被人忽略的点: 1. pool threads启动后,以后的任务获取都会通过block queue中,获取堆积的runnable task.
所以建议: block size >= corePoolSize ,不然线程池就没任何意义2. corePoolSize 和 maximumPoolSize的区别, 和大家正常理解的数据库连接池不太一样。 * 据dbcp pool为例,会有minIdle , maxActive配置。minIdle代表是常驻内存中的threads数量,maxActive代表是工作的最大线程数。 * 这里的corePoolSize就是连接池的maxActive的概念,它没有minIdle的概念(每个线程可以设置keepAliveTime,超过多少时间多有任务后销毁线程,但不会固定保持一定数量的threads)。 * 这里的maximumPoolSize,是一种救急措施的第一层。当threadPoolExecutor的工作threads存在满负荷,并且block queue队列也满了,这时代表接近崩溃边缘。这时允许临时起一批threads,用来处理runnable,处理完后立马退出。
所以建议: maximumPoolSize >= corePoolSize =期望的最大线程数。 (我曾经配置了corePoolSize=1, maximumPoolSize=20, blockqueue为无界队列,最后就成了单线程工作的pool。典型的配置错误)
3. 善用blockqueue和reject组合. 这里要重点推荐下CallsRun的Rejected Handler,从字面意思就是让调用者自己来运行。 我们经常会在线上使用一些线程池做异步处理,比如我前面做的 (业务层)异步并行加载技术分析和设计, 将原本串行的请求都变为了并行操作,但过多的并行会增加系统的负载(比如软中断,上下文切换)。所以肯定需要对线程池做一个size限制。但是为了引入异步操作后,避免因在block queue的等待时间过长,所以需要在队列满的时,执行一个callsRun的策略,并行的操作又转为一个串行处理,这样就可以保证尽量少的延迟影响。
所以建议: RejectExecutionHandler = CallsRun , blockqueue size = 2 * poolSize (为啥是2倍poolSize,主要一个考虑就是瞬间高峰处理,允许一个thread等待一个runnable任务)
Btrace容量规划
再提供一个btrace脚本,分析线上的thread pool容量规划是否合理,可以运行时输出poolSize等一些数据。
import static com.sun.btrace.BTraceUtils.addToAggregation;import static com.sun.btrace.BTraceUtils.field;import static com.sun.btrace.BTraceUtils.get;import static com.sun.btrace.BTraceUtils.newAggregation;import static com.sun.btrace.BTraceUtils.newAggregationKey;import static com.sun.btrace.BTraceUtils.printAggregation;import static com.sun.btrace.BTraceUtils.println;import static com.sun.btrace.BTraceUtils.str;import static com.sun.btrace.BTraceUtils.strcat;import java.lang.reflect.Field;import java.util.concurrent.atomic.AtomicInteger;import com.sun.btrace.BTraceUtils;import com.sun.btrace.aggregation.Aggregation;import com.sun.btrace.aggregation.AggregationFunction;import com.sun.btrace.aggregation.AggregationKey;import com.sun.btrace.annotations.BTrace;import com.sun.btrace.annotations.Kind;import com.sun.btrace.annotations.Location;import com.sun.btrace.annotations.OnEvent;import com.sun.btrace.annotations.OnMethod;import com.sun.btrace.annotations.OnTimer;import com.sun.btrace.annotations.Self;/** * 并行加载监控 * * @author jianghang 2011-4-7 下午10:59:53 */@BTracepublic class AsyncLoadTracer { private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0); private static Aggregation histogram = newAggregation(AggregationFunction.QUANTIZE); private static Aggregation average = newAggregation(AggregationFunction.AVERAGE); private static Aggregation max = newAggregation(AggregationFunction.MAXIMUM); private static Aggregation min = newAggregation(AggregationFunction.MINIMUM); private static Aggregation sum = newAggregation(AggregationFunction.SUM); private static Aggregation count = newAggregation(AggregationFunction.COUNT); @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY)) public static void executeMonitor(@Self Object self) { Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "poolSize"); Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "largestPoolSize"); Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor", "workQueue"); Field countField = field("java.util.concurrent.ArrayBlockingQueue", "count"); int poolSize = (Integer) get(poolSizeField, self); int largestPoolSize = (Integer) get(largestPoolSizeField, self); int queueSize = (Integer) get(countField, get(workQueueField, self)); println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "), str(largestPoolSize)), " queueSize : "), str(queueSize))); } @OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY)) public static void rejectMonitor(@Self Object self) { String name = str(self); if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) { BTraceUtils.incrementAndGet(rejecctCount); } } @OnTimer(1000) public static void rejectPrintln() { int reject = BTraceUtils.getAndSet(rejecctCount, 0); println(strcat("reject count in 1000 msec: ", str(reject))); AggregationKey key = newAggregationKey("rejectCount"); addToAggregation(histogram, key, reject); addToAggregation(average, key, reject); addToAggregation(max, key, reject); addToAggregation(min, key, reject); addToAggregation(sum, key, reject); addToAggregation(count, key, reject); } @OnEvent public static void onEvent() { BTraceUtils.truncateAggregation(histogram, 10); println("---------------------------------------------"); printAggregation("Count", count); printAggregation("Min", min); printAggregation("Max", max); printAggregation("Average", average); printAggregation("Sum", sum); printAggregation("Histogram", histogram); println("---------------------------------------------"); }}
运行结果:
1 poolSize : 1, largestPoolSize = 10, queueSize = 10
2 reject count in 1000msec: 0
说明:
1. poolSize 代表为当前的线程数
2. largestPoolSize 代表为历史最大的线程数
3. queueSize 代表blockqueue的当前堆积的size
4. reject count 代表在1000ms内的被reject的数量。
转自:http://www.iteye.com/topic/1118660
分享到:
相关推荐
在使用ThreadPoolExecutor时,还需要注意以下几点: * 核心线程数和最大线程数的设置:核心线程数和最大线程数需要根据实际情况进行设置,过小可能导致线程池无法处理任务,过大可能导致系统资源浪费。 * 任务队列...
在Java并发编程中,主要涉及以下几个关键知识点: 1. **线程与进程**:线程是程序执行的最小单元,一个进程中可以有多个线程同时执行。理解线程的概念和生命周期对于并发编程至关重要。 2. **线程安全**:当多个...
在Java中构建数据采集系统时,通常会涉及到以下几个关键知识点: 1. **网络编程**:数据采集往往涉及到从网络上的不同接口抓取数据,如HTTP、HTTPS等。Java的`HttpURLConnection`或第三方库如Apache HttpClient可以...
该文档可能涵盖了以下几个关键知识点: 1. **线程基础**:解释了如何创建和管理线程,包括使用Thread类和实现Runnable接口的方式。还可能讨论了线程的生命周期,如新建、就绪、运行、阻塞和死亡等状态。 2. **同步...
在分析源码时,我们需要关注以下几个关键点: 1. `execute()`方法的入口,它是提交任务的主要途径。 2. `addWorker()`方法,负责创建和添加新线程到线程池。 3. `getTask()`方法,线程从工作队列中获取任务。 4. `...
线程池可以显著提高多线程应用的性能和效率,主要体现在以下几点: 1. **资源重用**:避免了频繁创建和销毁线程所带来的开销。 2. **控制并发性**:限制系统中的最大并发线程数,防止过多的线程消耗过多的系统资源...
**Java核心知识点** Java作为一款跨平台的编程语言,其核心知识点是学习Java的基础。这包括但不限于以下几个方面: 1. **基本语法**:包括变量、数据类型、运算符、流程控制(if-else,switch,for,while)、方法...
在Java进阶学习中,你需要掌握以下几个关键的知识点: 1. 高级数据结构与算法:理解并能熟练运用ArrayList、LinkedList、HashMap等集合框架,深入研究Set、Queue、Stack等接口及其实现类。同时,学习并应用高级排序...
Java环境配置是安装和设置Java开发环境的过程,包括以下几个关键步骤: 1. **下载JDK**:首先,从Oracle官网下载适用于您操作系统的Java Development Kit(JDK)。 2. **安装JDK**:运行下载的安装程序,按照向导...
书中可能涵盖了以下几个关键知识点: 1. **线程的创建与启动**:通过创建Thread对象或实现Runnable接口,然后调用start()方法启动线程。 2. **同步机制**:包括synchronized关键字、wait()、notify()和notifyAll()...
这种情况下,我们通常会使用`ExecutorService`或`ThreadPoolExecutor`来管理和调度线程,而不是直接创建`Thread`对象。这样做可以更有效地控制线程池的大小,防止资源过度消耗,并且提供了更好的异常处理机制。 在...
下面我们将深入探讨Java中线程的应用实例,以及在实际开发中需要注意的关键点。 首先,让我们来看看`Thread.java`这个文件,它通常包含了定义和实现线程的方法。在Java中,创建线程有两种主要方式:继承`Thread`类...
使用线程池时,需要注意以下几点: 1. **合理设置线程池参数**:过大可能导致资源浪费,过小可能影响并发性能。应根据系统资源和任务特性进行调整。 2. **避免长时间持有任务**:线程池中的线程如果长时间处理任务,...
- **线程池**:ExecutorService、ThreadPoolExecutor和Future接口的使用。 6. **内存管理**: - **垃圾收集**:理解Java垃圾收集机制,包括GC的工作原理和几种常见的垃圾收集器。 - **内存区域**:堆、栈、方法...
本文将深入探讨Java中的线程池实现及其相关知识点。 ### 1. Java线程池的实现 Java通过`java.util.concurrent`包中的`ExecutorService`接口和`ThreadPoolExecutor`类来实现线程池。`ExecutorService`是线程池的主要...
在这个Java实现的代理服务器程序中,我们可以深入探讨几个关键的技术点。 首先,代理服务器的核心是网络通信。Java提供了丰富的网络编程API,如java.net包中的Socket和ServerSocket类。ServerSocket用于监听客户端...
本篇文章将深入探讨如何使用Java实现多线程文件传输,并涵盖以下几个关键知识点: 1. **线程基础**:在Java中,线程是程序执行的基本单元,可以通过实现`Runnable`接口或继承`Thread`类来创建。`Runnable`接口更...
Java是一种广泛使用的编程语言,Java面试题涵盖了Java语言的方方面面,本文将对Java面试题中的知识点进行系统的总结和解释。 一、Java并发编程 Java并发编程是指在Java程序中处理多个线程并行执行的技术。Java中的...
在Java JDK1.6的学习笔记中,可能会涉及以下几个关键知识点: 1. **基础语法**:包括变量、数据类型、运算符、流程控制语句(如if-else,switch,for,while等)、方法定义和调用、类与对象的概念、封装、继承和多...
- **线程池**:ExecutorService、ThreadPoolExecutor和ScheduledExecutorService的理解和使用。 8. **I/O流**: - **流的分类**:字节流(InputStream/OutputStream)和字符流(Reader/Writer),以及它们之间的...