- 浏览: 2105459 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
无心流泪wan:
private static final Log log = ...
log4j Category.callAppenders Block -
yjxa901:
博主好: http://www.java.net/down ...
jdk debug -
aptech406328627:
大神,请接收我的膜拜吧,纠结了两天的问题,就这么让你给解决了 ...
java.lang.reflect.MalformedParameterizedTypeException -
xukunddp:
谢谢1楼,我也遇到,搞定了
java.lang.reflect.MalformedParameterizedTypeException -
di1984HIT:
学习了!!!!
jvmstat hsperfdata java.io.tmpdir
用了好几次线程池,但是每次发现任务量太大,本该承受不起的任务,结果还是硬塞,当然也可以设置queue大小使用RejectedExecutionHandler来处理,不过不想走到这一步,故希望有个block ThreadPool,这样submit就会block住,go之发现已经有人有这个需求,故转来分享。
转载自: http://today.java.net/pub/a/today/2008/10/23/creating-a-notifying-blocking-thread-pool-executor.html
Creating a NotifyingBlockingThreadPoolExecutor
A Thread Pool
is a useful tool for
performing a collection of tasks in parallel. This becomes more and more
relevant as CPUs introduce multi-core architectures that can benefit from
parallelizing our programs. Java 5 introduced this framework as part of the new
concurrency support, with the ThreadPoolExecutor
class and other
assisting classes. The ThreadPoolExecutor
framework is powerful yet
flexible enough, allowing user-specific configurations and providing relevant
hooks and saturation strategies to deal with a full queue. To best follow this
article, you may find it useful to open the ThreadPoolExecutor
Java API
in a parallel tab.
The Need for a Blocking Thread Pool
Recently, my colleague Yaneeve Shekel had the need for a thread pool that
would work on several tasks in parallel but would wait to add new tasks until a
free thread was there to handle them. This is really not something bizarre: in
fact, this need is quite common. Yaneeve needed it to analyze a huge directory
with a very long list of files, where there was no point in piling on more and
more FileAnalyzeTask
instances without a free thread to handle
them. The analyze operation takes some time, while the speed in which we can
pile files for analysis is much higher. Thus, not controlling for thread
availability for the task would create a huge queue with a possible memory
problem, and for no benefit.
Other cases in which you'd need a thread pool that can wait to add new tasks:
-
Doing some in-memory task on a long list of database records. You would not want to run and turn each record to a task in the
ThreadPoolExecutor
queue while the threads are busy with some long operation on previous records, as doing this would exhaust your memory. The right way to do it is to query the database, run over the result set and create enough tasks for a fixed sized queue, and then wait until there is room in the queue. You can use a cursor to represent the result set, but even if you get back a dynamic result set, the database will not reply with the entire bulk of records; it will send you a limited amount of records and update your result set object while you run over it, forwarding to the next records of your result set, thus only forwarding through the result set. When the queue is ready for more tasks, it reads the next records from the database. -
Analyzing a long file with "independent lines": each line can be analyzed separately by a different thread. Again, there is no sense in reading the entire file into
LineTask
objects if there is no available thread to handle them. This scenario is in fact a true need raised in a forum asking for a recommended solution.
The problem is that ThreadPoolExecutor
doesn't give you the required behavior -- blocking when the queue is full -- out
of the box. A feature request was even submitted to the Java Bug database (Bug
Id 6648211
, "Need for blocking ThreadPoolExecutor
"), but it was
put on "very low priority," as the user is supposedly able to quite easily
implement this behavior.
At a first glance it looks odd; you think that a
ThreadPoolExecutor
with a bounded BlockingQueue
will
give you exactly this behavior. But apparently it does not. In fact, by default
it throws RejectedExecutionException
if a task is submitted and the
queue is full. This happens because
ThreadPoolExecutor.execute(Runnable)
does not call the blocking
method BlockingQueue.put(...)
when queuing a task, but rather the
unblocking Queue.offer(...)
, with a timeout of 0, which means "try
but do not wait.". And if the result is false (offer failed), it calls the
saturation policy -- the assigned RejectExecutionHandler
for this
thread pool -- with the default handler throwing an exception. Though it seems
that there is no real logic in this, it is in fact a design decision, allowing
the user to react to the fact that a task is rejected rather than just deciding
in the framework to wait or block.
Suggested Solutions
There are several ways to allow blocking on a full queue:
-
We may implement our own
BlockingThreadPoolExecutor
and override theexecute(...)
method, so it will call theBlockingQueue.put(...)
instead ofBlockingQueue.offer(...)
. But this may not be so elegant as we interfere quite brutally in howexecute()
works (and we cannot callsuper.execute(...)
since we do the queuing). -
There is the option to create a
ThreadPoolExecutor
with theCallerRunsPolicy
reject strategy. This strategy, in the case of a full queue, sends the exceeding task to be executed by the thread that calledexecute()
(the producer), thus killing two birds with one stone: the task is handled and the producer is busy in handling the task and not in overloading the queue with additional tasks. There are, however, two flaws in this strategy. First, the task is not handled in the order it was produced; this is usually not so problematic anyhow, as there is no real guarantee on the order of context switch between the worker threads that influences task progress and order. Second, when the producer is working on its task, no one fills the queue. So if one of the worker threads, or more, finish their tasks while the producer is still working, they will become idle. It requires fine configuration tuning of the queue size in order to minimize it, but you can never guarantee to avoid this situation. It would have been nice if there was a way to set theThreadPoolExecutor
in a true Leader-Followers manner (a design pattern in which the producer gets to run the task while a thread from the pool becomes the new producer), but theCallerRunsPolicy
strategy does not work like that. (The C++ ACE framework for example, implemented the Leader-Followers pattern. For more details on the Leader-Followers pattern, you can follow this presentation .) -
One can implement a simple "counting"
ThreadPoolExecutor
that uses aSemaphore
initialized to the bound that we want to set, decremented, by callingacquire()
atexecute(...)
, and increased back, by callingrelease()
at theafterExecute()
hook method, as well as in acatch
at the end ofexecute(...)
for the reject scenario. The semaphore is acting in this way as a block on the call toexecute(...)
and you can in fact use an unboundedBlockingQueue
in this case. This solution was suggested by Brian Goetz in a forum reply , and discussed also in his book Java Concurrency in Practice , by Goetz et al., in listing 8.4. Here is how it will look:public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private Semaphore semaphore; public BlockingThreadPoolExecutor(..., int bound, ...) { super(...); this.semaphore = new Semaphore(bound); } @Override public void execute(Runnable task) { boolean acquired = false; do { try { semaphore.acquire(); acquired = true; } catch (InterruptedException e) { // wait forever! } } while(!acquired); try { super.execute(task); } catch(RuntimeException e) { // specifically, handle RejectedExecutionException semaphore.release(); throw e; } catch(Error e) { semaphore.release(); throw e; } } @Override protected void afterExecute(Runnable r, Throwable t) { semaphore.release(); } }
This is a nice solution. A nice adaptation may be to use
tryAcquire(timeout)
as it is always a better practice to allow a timeout on blocking operations. But anyway, I personally don't like self-managing the blocking operation when theThreadPoolExecutor
may have its own bounded queue. It doesn't make sense for me. I prefer the following solution that uses the bounded queue blocking and the saturation policy. -
The fourth solution is to create a
ThreadPoolExecutor
with a bounded queue and our ownRejectExecutionHandler
that will block on the queue waiting for it to be ready to take new tasks. We prefer to wait on the queue with a timeout and to notify the user if the timeout occurs, so that we will not wait forever in case of some problem in pulling the tasks from the queue. However, for most reasonable scenarios, the caller will not have to take any action when the queue is full, as the producer thread will just wait on the queue. I prefer this approach is it seems the most simple using the original design ofThreadPoolExecutor
.
Which brings us to this code (see the Resources section to download it):
public class BlockingThreadPoolExecutor
extends ThreadPoolExecutor {
public BlockingThreadPoolExecutor(
int poolSize,
int queueSize,
long keepAliveTime,
TimeUnit keepAliveTimeUnit,
long maxBlockingTime,
TimeUnit maxBlockingTimeUnit,
Callable<Boolean> blockingTimeCallback) {
super(
poolSize, // Core size
poolSize, // Max size
keepAliveTime,
keepAliveTimeUnit,
new ArrayBlockingQueue<Runnable>(
// to avoid redundant threads
Math.max(poolSize, queueSize)
),
// our own RejectExecutionHandler – see below
new BlockThenRunPolicy(
maxBlockingTime,
maxBlockingTimeUnit,
blockingTimeCallback
)
);
super.allowCoreThreadTimeOut(true);
}
@Override
public void setRejectedExecutionHandler
(RejectedExecutionHandler h) {
throw new unsupportedOperationException(
"setRejectedExecutionHandler
is not allowed on this class.");
}
// ...
}
This is our new blocking thread pool. But as you may see, the real thing is
still missing and that is our own new RejectExecutionHandler
. In
the constructor we pass parameters to our super,
ThreadPoolExecutor
. We use the full version constructor since the
most important parameter that we wish to pass to our base class is the
RejectExecutionHandler
, which is the last parameter. We create a
new object of the type BlockThenRunPolicy
, our own class (presented
in a moment). The name of this saturation policy means exactly what it does: if
a task is rejected due to saturation, block on the task submission in the
producer thread context, and when there is enough capacity to take the task,
accept it. We implement the BlockThenRunPolicy
class as a private
inner class inside our BlockingThreadPoolExecutor
, as no one else
should know it.
// --------------------------------------------------
// Inner private class of BlockingThreadPoolExecutor
// A reject policy that waits on the queue
// --------------------------------------------------
private static class BlockThenRunPolicy
implements RejectedExecutionHandler {
private long blockTimeout;
private TimeUnit blocTimeoutUnit;
private Callable<Boolean> blockTimeoutCallback;
// Straight-forward constructor
public BlockThenRunPolicy(...){...}
// --------------------------------------------------
@Override
public void rejectedExecution(
Runnable task,
ThreadPoolExecutor executor) {
BlockingQueue<Runnable> queue = executor.getQueue();
boolean taskSent = false;
while (!taskSent) {
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"ThreadPoolExecutor has shutdown
while attempting to offer a new task.");
}
try {
// offer the task to the queue, for a blocking-timeout
if (queue.offer(task, blockTimeout, blocTimeoutUnit)) {
taskSent = true;
}
else {
// task was not accepted - call the user's Callback
Boolean result = null;
try {
result = blockTimeoutCallback.call();
}
catch(Exception e) {
// wrap the Callback exception and re-throw
throw new RejectedExecutionException(e);
}
// check the Callback result
if(result == false) {
throw new RejectedExecutionException(
"User decided to stop waiting
for task insertion");
}
else {
// user decided to keep waiting (may log it)
continue;
}
}
}
catch (InterruptedException e) {
// we need to go back to the offer call...
}
} // end of while for InterruptedException
} // end of method rejectExecution
// --------------------------------------------------
} // end of inner private class BlockThenRunPolicy
Note that we may get a timeout when waiting on the queue, on the call to
queue.offer(...)
. It is always the right practice to use a
timeout-enabled version of a blocking call, rather than any "wait-forever"
version. This way it is easier to be aware of and troubleshoot cases of thread
starvation and deadlocks. In this case, we do not log the event of getting the
timeout, as we do not have a logger at hand. But still, this is a major event,
especially if we set a long timeout that we do not expect to happen. This is why
we ask the user to provide a callback so we can report the event and let the
user decide whether to just log and keep waiting or stop the wait.
Our solution preserves the default behavior of
ThreadPoolExecutor
, except for the saturation policy. Since we use
inheritance, any setter or getter of the original
ThreadPoolExecutor
can be used, excluding the
setRejectedExecutionHandler
, which we forbid, throwing an exception
if called. Prometheus
,
another open source approach to the blocking thread pool problem, used a wrapper
solution as a straightforward approach (with the following API
).
However, the wrapper solution requires implementing all
ExecutorService
interface methods -- in order to be a common
ExecutorService
-- resulting with a quite cumbersome solution
compared to our more organic extension.
Almost Done
We have a BlockingThreadPoolExecutor
. But bear with me for a few
more moments, as we are about to ask for more.
Remember our problem. We have a huge directory filled with files and we wanted to block on the queue if it is full. But we need something more. When all files are sent to the queue, the producer thread knows it is done sending all the files, but it still needs to wait for the worker threads to finish. And we do not want to shut down the thread pool and wait for it to finish that way, as we are going to use it in a few moments again. What we need is a way to wait for the final tasks sent to the thread pool to complete.
To do that we add a "synchronizer" object for the producer to wait on. The
producer will wait on a new method we create, which we called
await()
, but there is an underlying condition inside that waits for
a signal, and this is our Synchronizer. The thread pool signals the
Synchronizer
when it is idle; that is, all worker threads are idle.
To have this info we simply count the number of currently working threads. We do
not rely on the getActiveCount()
method, as its contract and
definition are not clear enough; we prefer to simply do it ourselves using an
AtomicInteger
to make sure that increment and decrement operations
are done atomically, without a need to synchronize around ++
or
--
.
Here we use the beforeExecute()
and afterExecute()
hook methods, but must take care of tasks that failed at the execute point,
before assuming position in the queue, in which case decreasing the counter must
be done. Our Synchronizer
class manages the blocking wait on the
await()
method, by waiting on a Condition
that is
signaled only when there are no tasks in the queue.
The resulting code is this:
public class NotifyingThreadPoolExecutor
extends ThreadPoolExecutor {
private AtomicInteger tasksInProcess = new AtomicInteger();
// using our own private inner class, see below
private Synchronizer synchronizer = new Synchronizer();
@Override
public void execute(Runnable task) {
// count a new task in process
tasksInProcess.incrementAndGet();
try {
super.execute(task);
} catch(RuntimeException e) {
// specifically, handle RejectedExecutionException
tasksInProcess.decrementAndGet();
throw e;
} catch(Error e) {
tasksInProcess.decrementAndGet();
throw e;
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// synchronizing on the pool (and all its threads)
// we need the synchronization to avoid more than one signal
// if two or more threads decrement almost together and come
// to the if with 0 tasks together
synchronized(this) {
tasksInProcess.decrementAndGet();
if (tasksInProcess.intValue() == 0) {
synchronizer.signalAll();
}
}
}
public void await() throws InterruptedException {
synchronizer.await();
}
// (there is also an await with timeout, see the full source code)
}
We need now to provide the Synchronizer
class that does the
actual locking and synchronization work. We prefer to implement the
Synchronizer
class as a private inner class inside our
NotifyingThreadPoolExecutor
, as no one else should know it.
//--------------------------------------------------------------
// Inner private class of NotifyingThreadPoolExecutor
// for signaling when queue is idle
//--------------------------------------------------------------
private class Synchronizer {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private boolean isDone = false;
// called from the containing class NotifyingThreadPoolExecutor
private void signalAll() {
lock.lock(); // MUST lock!
try {
isDone = true;
done.signalAll();
}
finally {
lock.unlock(); // unlock even in case of an exception
}
}
public void await() throws InterruptedException {
lock.lock(); // MUST lock!
try {
while (!isDone) { // avoid signaling on 'spuriously' wake-up
done.await();
}
}
finally {
isDone = false; // for next call to await
lock.unlock(); // unlock even in case of an exception
}
}
// (there is also an await with timeout, see the full source code)
} // end of private inner class Synchronizer
//--------------------------------------------------------------
As we needed both the notifying and the blocking features together, we
combined them both to a NotifyingBlockingThreadPoolExecutor
, whose
code and an example of use can be found in the example source code.
Conclusions
Occasionally there is a need to accomplish something that is not supported out of the box in a framework at hand. The first thought is usually: "it ought to be there, I must have missed something!" Then, after a while, after we search and investigate, we realize that the thing is indeed missing. At this point we are close to convincing ourselves that there is a reason for not having this ability, and we probably don't really need it. ("There must be a reason for not having it there. Who are we to argue?!") But the bravest of us would not compromise. Good frameworks are built to be extended, and so be it.
In this article we presented the need we faced for a blocking thread pool.
This need is not a whim; other people, as can be seen in the resources list
, already raised this need. It is a bit
surprising that the Java API does not provide this ability, but as seen, there
are couple of good extensions that can be implemented to support this need.
While implementing this feature, we went through the difference between
offer()
and put()
on a BlockingQueue
, the
Rejection Policy of the thread pool framework, the beforeExecute()
and afterExecute()
hook methods, and some other related
java.concurrent
players, such as Lock
,
Signals
, and AtomicInteger
.
The implementation presented here, together with the sample code provided,
may serve both as a good solution for the blocking thread pool need, as well as
a reference for other related ThreadPoolExecutor
extensions.
Acknowledgments
I would like to thank Yaneeve Shekel for bringing this problem to me and working with me on parts of the code presented here.
Resources
- Sample code for this article
-
ThreadPoolExecutor
: Java 6 API - Java Bug ID 6648211 on the lack of this required feature
- IBM DeveloperWorks forum entry about this topic
- Java Concurrency in Practice by Brian Goetz et al.
- Prometheus : An open source library with a straightforward approach for solving our need. The guide was empty when I last checked it, but you can download the code or view the API. I personally prefer our more organic extension.
- On the Leader-Followers design pattern : see slides 16-18.
- Another forum entry on the subject
- Yet another post requesting for this feature
- And, yet another forum discussion on the exact same subject
发表评论
-
groovy shell 安全
2017-01-18 11:29 1202groovy 可以动态执行代码,但是我们也想他在一定的沙箱中 ... -
eclipse 插件
2016-11-17 12:00 608eclipse remote editor https: ... -
java method signature
2013-08-12 21:07 2717case 'B': _type = T_BYT ... -
eclipse显示GC的按钮
2013-06-18 19:32 4333同事说idea的一个比较亮的功能是可以手动去GC,然后机器 ... -
好用的maven插件收集
2013-02-22 10:40 13471:Maven Shade Plugin(把所有jar打到一 ... -
查看JVM Flags
2013-01-09 14:22 1333-XX:+PrintFlagsFinal Jav ... -
开源的好用JVM问题排查工具
2013-01-08 09:45 1859TProfiler https://github.com/ ... -
java ocr
2013-01-04 13:06 3018java OCR相关的资料记录 Clara OC ... -
eclipse ast
2012-12-23 22:36 1011Eclipse JDT - Abstract Syntax ... -
正则生成器
2012-12-23 22:24 976能够依据普通文本给出可能的正则组合 http://ww ... -
Kilim
2012-12-14 23:40 1109Java 开发 2.0: Kilim 简介 h ... -
IO Design Patterns Reactor VS Proactor
2012-11-13 01:34 15081:两种高性能I/O设计模式(Reactor/Proactor ... -
antlr
2012-11-13 00:36 12181:使用 Antlr 开发领域语言 http://www.i ... -
java singalException
2012-11-12 21:39 981之前看到毕大师一封关于异常多造成的cpu us很高的分析邮件, ... -
log4j Category.callAppenders Block
2012-11-06 17:01 10105经常在高并发下就遇到log4j用错引起的线程block住的问题 ... -
Troubleshooting JDK
2012-10-26 14:13 1522收集整理下JDK自带的关于 Troubleshooting 的 ... -
JavaOne 2011 Content Catalog
2012-10-14 17:12 1168上一篇讲javaone 2012,这次找了下2011的资料。 ... -
JavaOne 2012 Content Catalog
2012-10-13 16:07 1308转载自:http://marxsoftware.blogspo ... -
Memory usage of Java
2012-10-01 17:30 1217用JDK自带的api计算size,每次都会有个多余的12,看了 ... -
GC roots
2012-10-01 17:07 18491:GC roots http://www.yourkit. ...
相关推荐
在处理Web服务相关的Java应用程序时,可能会遇到一个名为“prefix cannot be 'null' when creating a QName”的异常。这个错误通常出现在尝试创建`QName`对象但提供的前缀为`null`的情况下。 ### 错误详情与解决...
Creating a Website The Missing Manual(4th) 英文epub 第4版 本资源转载自网络,如有侵权,请联系上传者或csdn删除 查看此书详细信息请在美国亚马逊官网搜索此书
Learning SQL and creating a draftable set of cards for a variety of Magic the Gathering experiences
程序员的事业指导手册,教你如何打造属于你自己的事业人生
Creating a Class Factory with C# and .NET.doc
Creating a Web Forms User Interface
标题"A Class for Creating a Trace Log"指出,这是一个关于创建跟踪日志的类的教程或代码示例。在软件开发中,跟踪日志是用于记录程序运行时信息的重要工具,它帮助开发者诊断错误、调试代码以及优化性能。这个类...
- **书名**:《创建网站:缺失的手册》(Creating a Website: The Missing Manual) - **作者**:马修·麦克唐纳(Matthew MacDonald) - **出版时间**:2011年4月 - **出版社**:O'Reilly Media, Inc. - **版次**:...
Creating a GUI In Matlab 5.chmCreating a GUI In Matlab 5.chm
Creating a Business Objects Universe from an Excel File
Oracle Solaris 11.2 Creating a Custom Oracle Solaris 11.2 Installation Image-32
The purpose of this document is to explain how to create a Virtual Machine on a Windows PC such that a Linux environment can be created in order to build a Linux kernel and applications.
Creating a Dropbox in Moodle with Option of Feedback Document
在本项目中,“Creating a Rotating Shape Using Javascript.zip”是一个基于JavaScript实现的有趣游戏或交互式应用程序,旨在展示如何利用JavaScript来创建一个旋转图形。这个压缩包包含了一个名为“html5_rotate”...
"Creating a schema" 是数据库设计的重要步骤,它涉及到构建数据结构的蓝图,为存储和组织数据提供框架。本主题主要关注如何在SQL Server 2008中创建和管理模式。 模式(Schema)在SQL Server中是一个逻辑概念,它...
"Creating a Culture of Execution"这一主题,通过引用美国总统约翰·F·肯尼迪(JFK)的演讲,揭示了领导力与执行力之间的紧密联系。肯尼迪在对国会的讲话中提出登月计划,这不仅是一个大胆的愿景,更是执行力的...