前段时间一个项目中因为涉及大量的线程开发,把jdk cocurrent的代码重新再过了一遍。这篇文章中主要是记录一下学习ThreadPoolExecutor过程中容易被人忽略的点,Doug Lea的整个类设计还是非常nice的 先看一副图,描述了ThreadPoolExecutor的工作机制: 整个ThreadPoolExecutor的任务处理有4步操作: 再提供一个btrace脚本,分析线上的thread pool容量规划是否合理,可以运行时输出poolSize等一些数据。 运行结果: 说明: 1. poolSize 代表为当前的线程数 2. largestPoolSize 代表为历史最大的线程数 3. queueSize 代表blockqueue的当前堆积的size 4. reject count 代表在1000ms内的被reject的数量 这是我对ThreadPoolExecutor使用过程中的一些经验总结,希望能对大家有所帮助,如有描述不对的地方欢迎拍砖。背景
正文
1. ArrayBlockingQueue : 有界的数组队列
2. LinkedBlockingQueue : 可支持有界/无界的队列,使用链表实现
3. PriorityBlockingQueue : 优先队列,可以针对任务排序
4. SynchronousQueue : 队列长度为1的队列,和Array有点区别就是:client thread提交到block queue会是一个阻塞过程,直到有一个worker thread连接上来poll task。
1. Reject 直接抛出Reject exception
2. Discard 直接忽略该runnable,不可取
3. DiscardOldest 丢弃最早入队列的的任务
4. CallsRun 直接让原先的client thread做为worker线程,进行执行Btrace容量规划
import static com.sun.btrace.BTraceUtils.addToAggregation;
import static com.sun.btrace.BTraceUtils.field;
import static com.sun.btrace.BTraceUtils.get;
import static com.sun.btrace.BTraceUtils.newAggregation;
import static com.sun.btrace.BTraceUtils.newAggregationKey;
import static com.sun.btrace.BTraceUtils.printAggregation;
import static com.sun.btrace.BTraceUtils.println;
import static com.sun.btrace.BTraceUtils.str;
import static com.sun.btrace.BTraceUtils.strcat;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicInteger;
import com.sun.btrace.BTraceUtils;
import com.sun.btrace.aggregation.Aggregation;
import com.sun.btrace.aggregation.AggregationFunction;
import com.sun.btrace.aggregation.AggregationKey;
import com.sun.btrace.annotations.BTrace;
import com.sun.btrace.annotations.Kind;
import com.sun.btrace.annotations.Location;
import com.sun.btrace.annotations.OnEvent;
import com.sun.btrace.annotations.OnMethod;
import com.sun.btrace.annotations.OnTimer;
import com.sun.btrace.annotations.Self;
/**
* 并行加载监控
*
* @author jianghang 2011-4-7 下午10:59:53
*/
@BTrace
public class AsyncLoadTracer {
private static AtomicInteger rejecctCount = BTraceUtils.newAtomicInteger(0);
private static Aggregation histogram = newAggregation(AggregationFunction.QUANTIZE);
private static Aggregation average = newAggregation(AggregationFunction.AVERAGE);
private static Aggregation max = newAggregation(AggregationFunction.MAXIMUM);
private static Aggregation min = newAggregation(AggregationFunction.MINIMUM);
private static Aggregation sum = newAggregation(AggregationFunction.SUM);
private static Aggregation count = newAggregation(AggregationFunction.COUNT);
@OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "execute", location = @Location(value = Kind.ENTRY))
public static void executeMonitor(@Self Object self) {
Field poolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "poolSize");
Field largestPoolSizeField = field("java.util.concurrent.ThreadPoolExecutor", "largestPoolSize");
Field workQueueField = field("java.util.concurrent.ThreadPoolExecutor", "workQueue");
Field countField = field("java.util.concurrent.ArrayBlockingQueue", "count");
int poolSize = (Integer) get(poolSizeField, self);
int largestPoolSize = (Integer) get(largestPoolSizeField, self);
int queueSize = (Integer) get(countField, get(workQueueField, self));
println(strcat(strcat(strcat(strcat(strcat("poolSize : ", str(poolSize)), " largestPoolSize : "),
str(largestPoolSize)), " queueSize : "), str(queueSize)));
}
@OnMethod(clazz = "java.util.concurrent.ThreadPoolExecutor", method = "reject", location = @Location(value = Kind.ENTRY))
public static void rejectMonitor(@Self Object self) {
String name = str(self);
if (BTraceUtils.startsWith(name, "com.alibaba.pivot.common.asyncload.impl.pool.AsyncLoadThreadPool")) {
BTraceUtils.incrementAndGet(rejecctCount);
}
}
@OnTimer(1000)
public static void rejectPrintln() {
int reject = BTraceUtils.getAndSet(rejecctCount, 0);
println(strcat("reject count in 1000 msec: ", str(reject)));
AggregationKey key = newAggregationKey("rejectCount");
addToAggregation(histogram, key, reject);
addToAggregation(average, key, reject);
addToAggregation(max, key, reject);
addToAggregation(min, key, reject);
addToAggregation(sum, key, reject);
addToAggregation(count, key, reject);
}
@OnEvent
public static void onEvent() {
BTraceUtils.truncateAggregation(histogram, 10);
println("---------------------------------------------");
printAggregation("Count", count);
printAggregation("Min", min);
printAggregation("Max", max);
printAggregation("Average", average);
printAggregation("Sum", sum);
printAggregation("Histogram", histogram);
println("---------------------------------------------");
}
}
poolSize : 1 , largestPoolSize = 10 , queueSize = 10
reject count in 1000 msec: 0
最后
相关推荐
ThreadPoolExecutor是Java并发编程中非常重要的一个组件,它位于`java.util.concurrent`包下,用于管理线程资源,实现线程池服务。...理解和熟练使用ThreadPoolExecutor对于编写高性能的多线程Java程序至关重要。
理解其源码有助于我们更好地控制并发环境下的任务执行,提高系统的效率和稳定性。 1. **线程池状态** 线程池有五种状态: - **RUNNING**:表示线程池处于运行状态,可以接受新任务,同时处理队列中的任务。 - **...
Java ThreadPoolExecutor参数深入理解 Java ThreadPoolExecutor是Java并发编程中一个非常重要的组件,它提供了一种灵活的方式来管理线程池。ThreadPoolExecutor的参数深入理解是Java开发人员需要掌握的重要知识点,...
通过对构造函数参数的理解,我们可以精确地控制线程池的行为,从而更好地适应不同的应用需求。此外,通过了解 `newFixedThreadPool` 和 `newCachedThreadPool` 等预定义线程池的特点,我们可以更高效地利用系统资源...
内容概要:本篇文章深入探讨了Java中多线程管理的一个关键...阅读建议:文中不仅有理论讲解还有实操案例,因此推荐读者一边阅读相关章节,一边亲手实验每一步的操作步骤,这样才能深刻理解线程池的设计思路和使用技巧。
所以它的重要性不言而喻,但是它的复杂性也大,理解上可能会有问题,不过作为安卓工程师,了解这个也是必然的。 ThreadPoolExecutor有几个构造函数,最多参数的构造函数最常用,下面会详细介绍各个参数的含义及其几...
本文将深入解析ThreadPoolExecutor的execute()方法执行流程,以帮助我们理解线程池的工作原理。 当一个任务被提交到线程池,线程池的执行策略主要分为四步: 1. 首先,线程池会检查当前的核心线程数是否已达到设定...
本文将围绕 `ThreadPoolExecutor` 的核心方法 `execute()` 进行深入解析,帮助读者更好地理解其内部机制。 #### 二、构造方法 `ThreadPoolExecutor` 提供了一个构造函数,用于初始化线程池: ```java public ...
1. RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务 2. SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保
在IT行业中,线程池是多线程编程中一个重要的概念,它可以帮助我们高效地管理和控制并发执行的任务。...通过深入理解线程池的工作机制和源码,我们可以更好地设计和优化我们的并发程序,实现高效的数据抓取。
Java线程池是Java并发编程中...总结来说,理解并正确使用Java线程池和ThreadPoolExecutor对于优化Java应用程序的并发性能至关重要。通过调整线程池的参数,可以平衡资源利用率和系统响应时间,从而提高整体的系统效率。
通过分析如ThreadPoolExecutor等关键类的源码,读者可以理解线程池的创建和执行机制,以及线程的管理方式。 **线程与线程池**部分介绍了操作系统中的线程概念,指出线程是CPU调度的基本单位,而多线程则能充分利用...
"java 中ThreadPoolExecutor 原理分析" ThreadPoolExecutor 是 Java 并发编程中的一种高级线程池实现,它提供了一个灵活的线程池管理机制,允许开发者根据需要配置线程池的参数以满足不同的需求。在这篇文章中,...
5. **线程池原理**:ThreadPoolExecutor的构造参数,如corePoolSize、maximumPoolSize、keepAliveTime以及workQueue的作用。理解线程池的拒绝策略,如AbortPolicy、CallerRunsPolicy、DiscardPolicy和...
Java线程池是一种高效管理线程资源的工具,它的核心组件是`ThreadPoolExecutor`类,它在Java的`java.util.concurrent`包中。...理解其构造参数和工作原理,能够帮助开发者根据实际需求定制适合的线程池,优化系统性能。
这就是"java线程理解小程序"的初衷,帮助开发者学习如何在Android环境中正确地管理和使用线程。 首先,我们需要理解Java中的线程创建方式。最基础的是通过实现`Runnable`接口或继承`Thread`类来创建线程。实现`...
在实际开发中,还有其他工具和库可以帮助我们更高效地管理线程,如AsyncTask、IntentService、Runnable、ThreadPoolExecutor等。每个都有其特定的适用场景和优缺点,开发者需要根据项目需求选择合适的方法。 总之,...
- **线程池**:ExecutorService、ThreadPoolExecutor和ScheduledThreadPoolExecutor的使用及配置。 - **锁机制**:synchronized、Lock(ReentrantLock、ReadWriteLock),以及乐观锁和悲观锁的概念。 5. **设计...
ThreadPoolExecutor类是Java中最重要的线程池实现类,了解该类是理解Java线程池的关键。 ThreadPoolExecutor类的构造方法中有四个参数:corePoolSize、maximumPoolSize、keepAliveTime和unit。其中,corePoolSize...