- 浏览: 777503 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
Fanatic357:
同问,请问这个 曲线 是用什么工具 监测得到的?
RocketMQ性能压测分析 -
sunshine_love:
8核 16G, 单master TPS 4w+,2m-2s- ...
RocketMQ性能压测分析 -
assertmyself:
很好,,获益良多!
jstack和线程dump分析 -
zhaoxiaoxiao:
非常赞,帮助理解了问题。今天也是遇到了这样的问题
hessian序列化bug -
wjg_java:
打不开 宕机了
博客停止更新
一:ThreadPoolExecutor
从 Java 5 开始,Java 提供了自己的线程池。线程池就是一个线程的容器,每次只执行额定数量的线程java.util.concurrent.ThreadPoolExecutor 就是这样的线程池。它很灵活,但使用起来也比较复杂。 首先是构造函数。以最简单的构造函数为例:
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
看起来挺复杂的。这里介绍一下。
corePoolSize 指的是保留的线程池大小。
maximumPoolSize 指的是线程池的最大大小。
keepAliveTime 指的是空闲线程结束的超时时间。
unit 是一个枚举,表示 keepAliveTime 的单位。
workQueue 表示存放任务的队列。
可以从线程池的工作过程中了解这些参数的意义。线程池的工作过程如下:
1 、线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
2 、当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
3 、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4 、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
这样的过程说明,并不是先加入任务就一定会先执行。假设队列大小为 10,corePoolSize 为 3,maximumPoolSize 为 6,那么当加入 20 个任务时,执行的顺序就是这样的:首先执行任务 1、2、3,然后任务 4~13 被放入队列。这时候队列满了,任务 14、15、16 会被马上执行,而任务 17~20 则会抛出异常。最终顺序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。下面是一个线程池使用的例子:
public static void main(String[] args) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.DAYS, queue);
for (int i = 0; i < 20; i++) {
executor.execute(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(String.format("thread %d finished", this.hashCode()));
}
});
}
executor.shutdown();
}
对这个例子的说明如下:
1、BlockingQueue 只是一个接口,常用的实现类有 LinkedBlockingQueue 和 ArrayBlockingQueue。用 LinkedBlockingQueue 的好处在于没有大小限制。这样的话,因为队列不会满,所以 execute() 不会抛出异常,而线程池中运行的线程数也永远不会超过 corePoolSize 个,keepAliveTime 参数也就没有意义了。
2、shutdown() 方法不会阻塞。调用 shutdown() 方法之后,主线程就马上结束了,而线程池会继续运行直到所有任务执行完才会停止。如果不调用 shutdown() 方法,那么线程池会一直保持下去,以便随时添加新的任务。
到这里对于这个线程池还只是介绍了一小部分。ThreadPoolExecutor 具有很强的可扩展性,不过扩展它的前提是要熟悉它的工作方式。 java.util.concurrent.ThreadPoolExecutor 类提供了丰富的可扩展性。你可以通过创建它的子类来自定义它的行为。例如,我希望当每个任务结束之后打印一条消息,但我又无法修改任务对象,那么我可以这样写: 除了 afterExecute 方法之外,ThreadPoolExecutor 类还有 beforeExecute() 和 terminated() 方法可以重写,分别是在任务执行之前和整个线程池停止之后执行。
除了可以添加任务执行前后的动作之外, ThreadPoolExecutor 还允许你自定义当添加任务失败后的执行策略。你可以调用线程池的 setRejectedExecutionHandler() 方法,用自定义的 RejectedExecutionHandler 对象替换现有的策略。 ThreadPoolExecutor 提供 4 个现有的策略,分别是:
ThreadPoolExecutor.AbortPolicy:表示拒绝任务并抛出异常
ThreadPoolExecutor.DiscardPolicy:表示拒绝任务但不做任何动作
ThreadPoolExecutor.CallerRunsPolicy:表示拒绝任务,并在调用者的线程中直接执行该任务
ThreadPoolExecutor.DiscardOldestPolicy:表示先丢弃任务队列中的第一个任务,然后把这个任务加进队列。
这里是一个例子:
ThreadPoolExecutor executor = new ThreadPoolExecutor(size, maxSize, 1, TimeUnit.DAYS, queue);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
除此之外,你也可以通过实现 RejectedExecutionHandler 接口来编写自己的策略。下面是一个例子:
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 6, 1, TimeUnit.SECONDS, queue,
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(String.format("Task %d rejected.", r.hashCode()));
}
}
);
二:ThreadPoolExecutor 实现的接口 Executer
看jdk的javadoc,ThreadPoolExecutor实现了两个接口:Executer,ExecuterService,注意, ExecuterService继承了 Executer, 看看javadoc对 Executer的描述 :
public interface Executor
执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用
new Thread(new(RunnableTask())).start():
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}
更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这说明了一个复合执行程序。
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
此包中提供的 Executor 实现实现了 ExecutorService,这是一个使用更广泛的接口。ThreadPoolExecutor 类提供一个可扩展的线程池实现。Executors 类为这些 Executor 提供了便捷的工厂方法。
三:ThreadPoolExecutor 实现的接口 ExecutorService
public interface ExecutorService extends Executor
ExecutorService
提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。 可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。shutdown() 方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService 以允许回收其资源。
通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法 submit 扩展了基本方法 Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService 类来编写这些方法的自定义变体)。 Executors 类提供了用于此包中所提供的执行程序服务的工厂方法。
用法示例
下面给出了一个网络服务的简单结构,这里线程池中的线程作为传入的请求。它使用了预先配置的 Executors.newFixedThreadPool(int) 工厂方法:
class NetworkService implements Runnable {
private final ServerSocket serverSocket;
private final ExecutorService pool;
public NetworkService(int port, int poolSize)
throws IOException {
serverSocket = new ServerSocket(port);
pool = Executors.newFixedThreadPool(poolSize);
}
public void run() { // run the service
try {
for (;;) {
pool.execute(new Handler(serverSocket.accept()));
}
} catch (IOException ex) {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final Socket socket;
Handler(Socket socket) { this.socket = socket; }
public void run() {
// read and service request on socket
}
}
下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务:
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}
内存一致性效果:线程中向 ExecutorService 提交 Runnable 或 Callable 任务之前的操作 happen-before 由该任务所提取的所有操作,后者依次 happen-before 通过 Future.get() 获取的结果。
四:异步任务 Future接口
一般情况下,调用线程池的execute方法执行的任务都是不需要返回结果的,但是在某些情况下,我们可能需要执行异步的任务,就是提交一个任务,然后我做自己的事情,不管它,一会儿我再查看这个任务返回的结果,那么这个时候,就要用到异步任务,异步任务都实现了Future接口。
它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。
用法示例(注意,下列各类都是构造好的。)
interface ArchiveSearcher { String search(String target); }
class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(final String target)
throws InterruptedException {
Future<String> future
= executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) { cleanup(); return; }
}
}
FutureTask 类是 Future 的一个实现,并且FutureTask 实现 了Runnable接口,所以可通过 Executor 来执行。例如,可用下列内容替换上面带有 submit 的构造:
FutureTask<String> future =
new FutureTask<String>(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
executor.execute(future);
String result=future.get();
System.out.println("result is:"+result);
发表评论
-
dubbo问题总结
2012-03-14 10:00 2991任何诡异的现象必然能找到问题原因,程序是不会骗人的 ... -
memcached客户端源码分析
2011-09-08 17:28 19967memcached的java客户端有好 ... -
说说单例模式
2011-05-23 11:12 3362单例模式?多么简单!也许吧,可是要通过简单的现象, ... -
jstack和线程dump分析
2011-05-12 13:48 180202一:jstack jstack命令的语法格式: js ... -
说说new Integer和Integer.valueOf
2010-11-11 15:04 6608看看这两个语句 Integer a=new Integ ... -
线程安全总结(二)
2010-11-11 12:36 5622关于线程安全总结(-)请看 http://www.iteye ... -
java线程安全总结
2010-11-09 20:48 15649最近想将java基 ... -
hadoop架构
2010-09-07 19:41 2694该文章我转自IBM开发者社区 ... -
HashMap深入分析
2010-09-03 19:36 5840java.util.HashMap是很常见的 ... -
CountDownLatch
2010-09-02 20:03 2973java的并发包真 ... -
随便说说
2010-09-01 19:29 2105这两天给系统 ... -
一波三折的rmi调用
2010-08-18 18:02 9864很久以前写了基于rmi的分布式java程序,现 ... -
java内存查看与分析
2010-08-07 17:03 22507业界有很多强 ... -
java动态代理之cglib
2010-06-22 17:27 2811cglib是一个 ... -
java动态代理随笔二
2010-06-22 16:29 1885jdk的动态代 ... -
java动态代理随笔一
2010-06-22 14:49 2086先说一下java class的加载机制和与cla ... -
关于hashcode和equals
2010-04-19 14:58 3396前几天有个同事问我,String a=" ... -
建设银行对接(五)
2010-02-09 17:34 2568public static void testVerify ... -
建设银行对接(四)
2010-02-09 17:32 3103上接“建设银行对接(三)”,javaeye的文章字数限制也太少 ... -
建设银行对接(三)
2010-02-09 17:24 3487前面两章请见我的博客 对建行返回的数据进行数字签名 ...
相关推荐
对于线程池的核心类ThreadPoolExecutor来说,有哪些重要的属性和内部类为线程池的正确运行提供重要的保障呢? ThreadPoolExecutor类中的重要属性 在ThreadPoolExecutor类中,存在几个非常重要的属性和方法,接下来,...
线程池通过ThreadPoolExecutor类实现,这是一个高度可配置的工具,能够根据具体需求定制线程的创建、管理和销毁策略。 ThreadPoolExecutor的核心参数包括: 1. corePoolSize:核心线程数,这是线程池在非繁忙状态下...
分析这个测试文件可以帮助我们更好地理解和应用`BlockingQueue`。实际操作中,我们可以通过监控和调优这些代码来优化并发性能,比如调整队列大小,选择适合的实现类,或者优化生产者和消费者的处理速度。
本篇将详细探讨多线程相关的知识点,结合给定的标签"源码"和"工具",我们将讨论多线程的基本原理、在实际开发中的应用以及相关工具的使用。 一、多线程基本概念 1. 线程:线程是操作系统调度的基本单位,每个线程都...
通过分析如ThreadPoolExecutor等关键类的源码,读者可以理解线程池的创建和执行机制,以及线程的管理方式。 **线程与线程池**部分介绍了操作系统中的线程概念,指出线程是CPU调度的基本单位,而多线程则能充分利用...
Java中的ExecutorService和ThreadPoolExecutor可以创建线程池,通过合理调度任务,避免过多线程导致资源浪费和系统不稳定。使用线程池可以控制并发数量,优化内存管理,提高性能。 3. **日志(Log)**: 日志是...
synchronized 关键字、ReentrantLock 等)、并发容器(如 ConcurrentHashMap、ConcurrentLinkedQueue 等)、原子变量(AtomicInteger、AtomicReference 等)、线程池(ThreadPoolExecutor 和其相关类)以及线程间的...
首先,Worker类是ThreadPoolExecutor的一个内部类,它继承了AbstractQueuedSynchronizer(AQS),这是一个抽象的同步队列,用于实现锁和其他同步组件的基础框架。同时,Worker也实现了Runnable接口,这意味着它是一...
AQS相关应用(CountDownLatch、CyclicBarrier、Semaphore等),executor(ThreadPoolExecutor、ScheduledThreadPoolExecutor、FutureTask等),collection(ConcurrentHashMap、CopyOnWriteArrayList等), 对每个类的...
3. GUI界面编程:由于文件中出现了`JPanel`、`Timer`、`ActionListener`等GUI相关类的引用,这说明编程作业中包括了图形用户界面的设计与实现。用户界面可能是聊天室的可视部分,例如显示消息的窗口和聊天输入区域。...
在Java中,`java.util.concurrent.ExecutorService`和`java.util.concurrent.ThreadPoolExecutor`是实现线程池的主要类。开发者可以自定义线程池的大小、任务队列、拒绝策略等参数,以适应不同的性能需求。 地图...
本文将深入探讨JDK性能相关的知识,主要涉及源码分析和工具使用。 首先,了解JDK的源码对于性能优化至关重要。通过阅读和理解JDK源码,我们可以得知Java内部的运行机制,从而找到性能瓶颈并进行针对性优化。例如,...
- 这个包包含了许多并发编程相关的工具类,如`ExecutorService`, `Semaphore`, `CountDownLatch`, `CyclicBarrier`等,以及线程池`ThreadPoolExecutor`。 7. `java.util.Random`: - 提供随机数生成的功能,广泛...
对于I/O操作,文档讨论了java.io包中的基本I/O类,比如用于读写数据的InputStream和OutputStream类、Reader和Writer类,以及序列化和反序列化的相关类。 文档还介绍了java.math包下的BigInteger和BigDecimal类,这...
18. **java.applet**:Applet小程序相关类,不过现在由于浏览器安全策略,Applet已逐渐被淘汰。 19. **java.rmi**:远程方法调用(Remote Method Invocation),用于分布式计算。 20. **java.awt.image**:图像...
12. **ExecutorService 和 ThreadPoolExecutor**: Java并发编程的重要工具,用于管理线程池,提高系统资源利用率。 13. **Optional**: 自Java 8引入,用于表示可能为null的对象,有助于避免空指针异常。 14. **...
`ExecutorService`是线程池的主要接口,而`ThreadPoolExecutor`是其主要的实现类,提供了创建、管理和关闭线程池的方法。 ### 2. ThreadPoolExecutor构造参数 `ThreadPoolExecutor`的构造函数接收五个参数: - `...
3. **线程池相关类**:除了ThreadPoolExecutor,还有ScheduledThreadPoolExecutor,用于定时或周期性执行任务。WorkStealingPool是ForkJoinPool的一种形式,适合大规模并行计算。 4. **并发集合**:JUC提供了线程...
在Java中,线程池主要由`java.util.concurrent`包下的`ThreadPoolExecutor`类实现。 `ThreadPoolExecutor`的构造方法接受五个参数,用于定义线程池的行为: 1. `corePoolSize`: 初始化时创建的线程数。 2. `...
在深入理解Java反射的同时,了解并发处理和线程池的内部机制也是非常有益的,因为这两个主题都与程序的性能和可扩展性密切相关。 总的来说,Java反射提供了强大的能力,让我们能够在运行时动态地操控代码,但同时也...