ExecutorService
abstraction has been around since Java 5. We are talking about 2004 here. Just a quick reminder: both Java 5 and 6 are no longer supported, Java 7won't be in half a year. The reason I'm bringing this up is that many Java programmers still don't fully understand how ExecutorService
works. There are many places to learn that, today I wanted to share few lesser known features and practices. However this article is still aimed toward intermediate programmers, nothing especially advanced.
1. Name pool threads
I can't emphasize this. When dumping threads of a running JVM or during debugging, default thread pool naming scheme is pool-N-thread-M
, where N
stands for pool sequence number (every time you create a new thread pool, global N
counter is incremented) and M
is a thread sequence number within a pool. For example pool-2-thread-3
means third thread in second pool created in the JVM lifecycle. See:Executors.defaultThreadFactory()
. Not very descriptive. JDK makes it slightly complex to properly name threads because naming strategy is hidden insideThreadFactory
. Luckily Guava has a helper class for that:
1
2
3
4
5
6
7
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat( "Orders-%d" )
.setDaemon( true )
.build();
final ExecutorService executorService = Executors.newFixedThreadPool( 10 , threadFactory);
|
By default thread pools create non-daemon threads, decide whether this suits you or not.
2. Switch names according to context
This is a trick I learnt from Supercharged jstack: How to Debug Your Servers at 100mph. Once we remember about thread names, we can actually change them at runtime whenever we want! It makes sense because thread dumps show classes and method names, not parameters and local variables. By adjusting thread name to keep some essential transaction identifier we can easily track which message/record/query/etc. is slow or caused deadlock. Example:
1
2
3
4
5
6
7
8
9
10
11
12
|
private void process(String messageId) {
executorService.submit(() -> {
final Thread currentThread = Thread.currentThread();
final String oldName = currentThread.getName();
currentThread.setName( "Processing-" + messageId);
try {
//real logic here...
} finally {
currentThread.setName(oldName);
}
});
} |
Inside try
-finally
block current thread is named Processing-WHATEVER-MESSAGE-ID-IS
. This might come in handy when tracking down message flow through the system.
3. Explicit and safe shutdown
Between client threads and thread pool there is a queue of tasks. When your application shuts down, you must take care of two things: what is happening with queued tasks and how already running tasks are behaving (more on that later). Surprisingly many developers are not shutting down thread pool properly or consciously. There are two techniques: either let all queued tasks to execute (shutdown()
) or drop them (shutdownNow()
) - it totally depends on your use case. For example if we submitted a bunch of tasks and want to return as soon as all of them are done, use shutdown()
:
1
2
3
4
5
6
7
8
|
private void sendAllEmails(List<String> emails) throws InterruptedException {
emails.forEach(email ->
executorService.submit(() ->
sendEmail(email)));
executorService.shutdown();
final boolean done = executorService.awaitTermination( 1 , TimeUnit.MINUTES);
log.debug( "All e-mails were sent so far? {}" , done);
} |
In this case we send a bunch of e-mails, each as a separate task in a thread pool. After submitting these tasks we shut down pool so that it no longer accepts any new tasks. Then we wait at most one minute until all these tasks are completed. However if some tasks are still pending, awaitTermination()
will simply return false
. Moreover, pending tasks will continue processing. I know hipsters would go for:
1
|
emails.parallelStream().forEach( this ::sendEmail);
|
Call me old fashioned, but I like to control the number of parallel threads. Never mind, an alternative to graceful shutdown()
is shutdownNow()
:
1
2
|
final List<Runnable> rejected = executorService.shutdownNow();
log.debug( "Rejected tasks: {}" , rejected.size());
|
This time all queued tasks are discarded and returned. Already running jobs are allowed to continue.
4. Handle interruption with care
Lesser known feature of Future
interface is cancelling. Rather than repeating myself, check out my older article: InterruptedException and interrupting threads explained
5. Monitor queue length and keep it bounded
Incorrectly sized thread pools may cause slowness, instability and memory leaks. If you configure too few threads, the queue will build up, consuming a lot of memory. Too many threads on the other hand will slow down the whole system due to excessive context switches - and lead to same symptoms. It's important to look at depth of queue and keep it bounded, so that overloaded thread pool simply rejects new tasks temporarily:
1
2
3
4
|
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>( 100 );
executorService = new ThreadPoolExecutor(n, n,
0L, TimeUnit.MILLISECONDS,
queue);
|
Code above is equivalent to Executors.newFixedThreadPool(n)
, however instead of default unlimited LinkedBlockingQueue
we use ArrayBlockingQueue
with fixed capacity of 100
. This means that if 100 tasks are already queued (and n
being executed), new task will be rejected with RejectedExecutionException
. Also sincequeue
is now available externally, we can periodically call size()
and put it in logs/JMX/whatever monitoring mechanism you use.
6. Remember about exception handling
What will be the result of the following snippet?
1
2
3
|
executorService.submit(() -> { System.out.println( 1 / 0 );
}); |
I got bitten by that too many times: it won't print anything. No sign ofjava.lang.ArithmeticException: / by zero
, nothing. Thread pool just swallows this exception, as if it never happened. If it was a good'ol java.lang.Thread
created from scratch, UncaughtExceptionHandler
could work. But with thread pools you must be more careful. If you are submitting Runnable
(without any result, like above), youmust surround whole body with try
-catch
and at least log it. If you are submittingCallable<Integer>
, ensure you always dereference it using blocking get()
to re-throw exception:
1
2
3
|
final Future<Integer> division = executorService.submit(() -> 1 / 0 );
//below will throw ExecutionException caused by ArithmeticException division.get(); |
Interestingly even Spring framework made this bug with @Async
, see: SPR-8995 andSPR-12090.
7. Monitor waiting time in a queue
Monitoring work queue depth is one side. However when troubleshooting single transaction/task it's worthwhile to see how much time passed between submitting task and actual execution. This duration should preferably be close to 0 (when there was some idle thread in a pool), however it will grow when task has to be queued. Moreover if pool doesn't have a fixed number of threads, running new task might require spawning thread, also consuming short amount of time. In order to cleanly monitor this metric, wrap original ExecutorService
with something similar to this:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class WaitTimeMonitoringExecutorService implements ExecutorService {
private final ExecutorService target;
public WaitTimeMonitoringExecutorService(ExecutorService target) {
this .target = target;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
final long startTime = System.currentTimeMillis();
return target.submit(() -> {
final long queueDuration = System.currentTimeMillis() - startTime;
log.debug( "Task {} spent {}ms in queue" , task, queueDuration);
return task.call();
}
);
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return submit(() -> {
task.run();
return result;
});
}
@Override
public Future<?> submit(Runnable task) {
return submit( new Callable<Void>() {
@Override
public Void call() throws Exception {
task.run();
return null ;
}
});
}
//...
} |
This is not a complete implementation, but you get the basic idea. The moment we submit a task to a thread pool, we immediately start measuring time. We stop as soon as task was picked up and begins execution. Don't be fooled by close proximity ofstartTime
and queueDuration
in source code. In fact these two lines are evaluated in different threads, probably milliseconds or even seconds apart, e.g.:
1
|
Task com.nurkiewicz.MyTask @7c7f3894 spent 9883ms in queue
|
8. Preserve client stack trace
Reactive programming seems to get a lot of attention these days. Reactive manifesto,reactive streams, RxJava (just released 1.0!), Clojure agents, scala.rx... They all work great, but stack trace are no longer your friend, they are at most useless. Take for example an exception happening in a task submitted to thread pool:
1
2
3
4
5
6
7
|
java.lang.NullPointerException: null at com.nurkiewicz.MyTask.call(Main.java:76) ~[classes/:na]
at com.nurkiewicz.MyTask.call(Main.java:72) ~[classes/:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0]
at java.lang.Thread.run(Thread.java:744) ~[na:1.8.0]
|
We can easily discover that MyTask
threw NPE at line 76. But we have no idea who submitted this task, because stack trace reveals only Thread
andThreadPoolExecutor
. We can technically navigate through the source code in hope to find just one place where MyTask
is created. But without threads (not to mention event-drivent, reactive, actor-ninja-programming) we would immediately see full picture. What if we could preserve stack trace of client code (the one which submitted task) and show it, e.g. in case of failure? The idea isn't new, for example Hazelcastpropagates exceptions from owner node to client code. This is how naïve support for keeping client stack trace in case of failure could look:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public class ExecutorServiceWithClientTrace implements ExecutorService {
protected final ExecutorService target;
public ExecutorServiceWithClientTrace(ExecutorService target) {
this .target = target;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return target.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
private <T> Callable<T> wrap( final Callable<T> task, final Exception clientStack, String clientThreadName) {
return () -> {
try {
return task.call();
} catch (Exception e) {
log.error( "Exception {} in task submitted from thrad {} here:" , e, clientThreadName, clientStack);
throw e;
}
};
}
private Exception clientTrace() {
return new Exception( "Client stack trace" );
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return tasks.stream().map( this ::submit).collect(toList());
}
//...
} |
This time in case of failure we will retrieve full stack trace and thread name of a place where task was submitted. Much more valuable compared to standard exception seen earlier:
1
2
3
4
5
6
7
8
9
10
|
Exception java.lang.NullPointerException in task submitted from thrad main here: java.lang.Exception: Client stack trace at com.nurkiewicz.ExecutorServiceWithClientTrace.clientTrace(ExecutorServiceWithClientTrace.java:43) ~[classes/:na]
at com.nurkiewicz.ExecutorServiceWithClientTrace.submit(ExecutorServiceWithClientTrace.java:28) ~[classes/:na]
at com.nurkiewicz.Main.main(Main.java:31) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0]
at java.lang.reflect.Method.invoke(Method.java:483) ~[na:1.8.0]
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) ~[idea_rt.jar:na]
|
9. Prefer CompletableFuture
In Java 8 more powerful CompletableFuture
was introduced. Please use it whenever possible. ExecutorService
wasn't extended to support this enhanced abstraction, so you have to take care of it yourself. Instead of:
1
2
|
final Future<BigDecimal> future =
executorService.submit( this ::calculate);
|
do:
1
2
|
final CompletableFuture<BigDecimal> future =
CompletableFuture.supplyAsync( this ::calculate, executorService);
|
CompletableFuture
extends Future
so everything works as it used to. But more advanced consumers of your API will truly appreciate extended functionality given byCompletableFuture
.
10. Synchronous queue
SynchronousQueue
is an interesting BlockingQueue
that's not really a queue. It's not even a data structure per se. It's best explained as a queue with capacity of 0. Quoting JavaDoc:
eachinsert
operation must wait for a correspondingremove
operation by another thread, and vice versa. A synchronous queue does not have any internal capacity, not even a capacity of one. You cannot peek at a synchronous queue because an element is only present when you try to remove it; you cannot insert an element (using any method) unless another thread is trying to remove it; you cannot iterate as there is nothing to iterate. [...]
Synchronous queues are similar to rendezvous channels used in CSP and Ada.
How is this related to thread pools? Try using SynchronousQueue
withThreadPoolExecutor
:
1
2
3
4
|
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor( 2 , 2 ,
0L, TimeUnit.MILLISECONDS,
queue);
|
We created a thread pool with two threads and a SynchronousQueue
in front of it. Because SynchronousQueue
is essentially a queue with 0 capacity, suchExecutorService
will only accept new tasks if there is an idle thread available. If all threads are busy, new task will be rejected immediately and will never wait. This behavior might be desirable when processing in background must start immediately or be discarded.
That's it, I hope you found at least one interesting feature!
相关推荐
### Java--330--Tips:实用开发指南 #### 前言 “Java--330--Tips”是一本汇集了330个实用Java技巧的手册,这些技巧覆盖了多个方面,包括网络编程、多线程处理、图形用户界面(GUI)、数据库操作等。该书的目标读者...
并发该库提供了接口和相关类的实现,这些类旨在支持长时间运行的阻塞任务(通常为I / O绑定)。 此功能增强了唯一的Java 8内置实现 ,该实现主要支持计算任务。 此外,该库还可以帮助解决许多异步编程难题,例如处理...
NULL 博文链接:https://x125858805.iteye.com/blog/2191873
ExecutorService方法案例文件.zip
Java并发编程中的ExecutorService、Callable和Future Java并发编程中,ExecutorService、Callable和Future是三大核心组件,它们之间紧密相连,共同实现了高效、安全的并发编程。下面我们将详细介绍这些组件的作用和...
标题“SSD8-Network-And-Distributed-computing-master2_java_”表明这是一个关于网络和分布式计算的项目,主要使用Java语言实现。这个项目可能是为西北工业大学(西工大)的学生设计的,目的是帮助他们理解和实践...
接口 java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行。壹個 ExecutorService 实例因此特别像壹個线程池。事实上,在 java.util.concurrent 包中的 ExecutorService 的实现...
在 Spring Boot 中使用 Java 线程池 ExecutorService 的讲解 Spring Boot 作为一个流行的 Java 框架,提供了许多便捷的功能来帮助开发者快速构建应用程序。其中之一就是使用 Java 线程池 ExecutorService 来管理...
ExecutorService线程池是Java并发编程中的核心组件,它位于`java.util.concurrent`包下,是`java.util.concurrent.Executor`接口的一个实现。ExecutorService提供了一种管理线程的方式,允许我们创建、管理和控制...
【Executor、Executors和ExecutorService详解】 在Java并发编程中,`Executor`、`Executors`和`ExecutorService`是核心组件,它们帮助开发者高效管理线程资源,提高程序的并发性能。理解这三个概念的区别和用途是...
ExecutorService10个要诀和技巧编程开发技术共9页.pdf.zip
ExecutorService 和 CompletionService 是Java并发处理中的两种工具,它们用于管理和执行多个任务。ExecutorService 是一个接口,它是java.util.concurrent.Executor 接口的扩展,提供了一组方法来管理和控制线程池...
在Java多线程编程中,`ExecutorService`是线程池的核心接口,它提供了一种管理线程的方式,包括创建线程、调度线程执行以及控制线程的生命周期。`ExecutorService`通过`execute()`和`submit()`这两个方法来提交任务...
ExecutorService executor = Executors.newFixedThreadPool(10); Runnable runnableTask = () -> { try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); } }...
在Java多线程编程中,`ExecutorService`是线程池的一个重要接口,它提供了管理和控制线程执行的能力。当我们创建一个`ExecutorService`实例并提交任务时,我们可以通过调用`shutdown()`方法来关闭线程池。然而,标题...
这个"java(1-10章)经典案例开发集锦"涵盖了Java学习的初步到进阶阶段,旨在帮助开发者掌握核心概念和实战技能。以下是根据标题、描述和标签提炼出的一些关键知识点,以及可能在压缩包中的文件"java案例开发集锦.pdf...
运用JAVA的concurrent.ExecutorService线程池实现socket的TCP和UDP连接
Java中的`ExecutorService`是Java并发编程的重要组成部分,它提供了线程池的管理,使得开发者可以更有效地控制并发任务的执行。在Java的`java.util.concurrent`包中,`ExecutorService`接口作为线程池的核心接口,...
《Java源码解析:Loan Collection and Recovery Services》 在Java编程领域,理解并掌握系统级的贷款收集和恢复服务的源码是至关重要的。这涉及到金融应用开发中的核心功能,如贷款管理、逾期处理和债务回收等。在...
本资源“Concurrent-Programming-with-Java-and-OpenMP”提供了Java和OpenMP两种并发编程技术的源码实例,帮助开发者深入理解并发编程的实践与原理。 在Java中,Java并发API(Java Concurrency API)是实现并发编程...