- 浏览: 49471 次
文章分类
最新评论
对于多线程有了一点了解之后,那么来看看java.lang.concurrent包下面的一些东西。在此之前,我们运行一个线程都是显式调用了Thread的start()方法。我们用concurrent下面的类来实现一下线程的运行,而且这将成为以后常用的方法或者实现思路。
看一个简单的例子:
收藏代码
1.public class CacheThreadPool {
2. public static void main(String[] args) {
3. ExecutorService exec=Executors.newCachedThreadPool();
4. for(int i=0;i<5;i++)
5. exec.execute(new LiftOff());
6. exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务
7. }
8.}
这个例子其实很容易看懂,ExecutorService中有一个execute方法,这个方法的参数是Runnable类型。也就是说,将一个实现了Runnable类型的类的实例作为参数传入execute方法并执行,那么线程就相应的执行了。
一、ExecutorService
先看看ExecutorService,这是一个接口,简单的列一下这个接口:
收藏代码
1.public interface ExecutorService extends Executor {
2.
3. void shutdown();
4.
5. List<Runnable> shutdownNow();
6.
7. boolean isShutdown();
8.
9. boolean isTerminated();
10.
11. boolean awaitTermination(long timeout, TimeUnit unit)
12.
13. <T> Future<T> submit(Callable<T> task);
14.
15. <T> Future<T> submit(Runnable task, T result);
16.
17. Future<?> submit(Runnable task);
18.
19. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
20.
21. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
22.
23. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
24.
25. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
26. long timeout, TimeUnit unit)
27.}
ExecuteService继承了Executor,Executor也是一个接口,里面只有一个方法:
收藏代码
1.void execute(Runnable command)
二、Executors
Executors是一个类,直接援引JDK文档的说明来说一下这个类的作用:
Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:
•Methods that create and return an ExecutorService set up with commonly useful configuration settings.
•Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
•Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
•Methods that create and return a ThreadFactory that sets newly created threads to a known state.
•Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.
在上面的例子中,我们用到了newCachedThreadPool()方法。看一下这个方法:
收藏代码
1.public static ExecutorService newCachedThreadPool() {
2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3. 60L, TimeUnit.SECONDS,
4. new SynchronousQueue<Runnable>());
5. }
在源码中我们可以知道两点,1、这个方法返回类型是ExecutorService;2、此方法返回值实际是另一个类的实例。看一下这个类的信息:
收藏代码
1.public class ThreadPoolExecutor extends AbstractExecutorService {
2. ..........
3. private final BlockingQueue<Runnable> workQueue;//这个变量在下面会提到
4. ..........
5.}
ThreadPoolExecutor继承了AbstractExecutorService,而AbstractExecutorService又实现了ExecutorService接口。所以,根据多态,ThreadPoolExecutor可以看作是ExecutorService类型。
线程执行的最关键的一步是执行了executor方法,根据java的动态绑定,实际执行的是ThreadPoolExecutor所实现的executor方法。看看源码:
收藏代码
1.public class ThreadPoolExecutor extends AbstractExecutorService {
2. ..........
3. public void execute(Runnable command) {
4. if (command == null)
5. throw new NullPointerException();
6. if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
7. if (runState == RUNNING && workQueue.offer(command)) {
8. if (runState != RUNNING || poolSize == 0)
9. ensureQueuedTaskHandled(command);
10. }
11. else if (!addIfUnderMaximumPoolSize(command))
12. reject(command); // is shutdown or saturated
13. }
14. }
15. ..........
16.}
根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是:
1、workQueue.offer(command)
workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。
2、ensureQueuedTaskHandled(command)
这个是线程执行的关键语句。看看它的源码:
收藏代码
1.public class ThreadPoolExecutor extends AbstractExecutorService {
2. ..........
3. private void ensureQueuedTaskHandled(Runnable command) {
4. final ReentrantLock mainLock = this.mainLock;
5. mainLock.lock();
6. boolean reject = false;
7. Thread t = null;
8. try {
9. int state = runState;
10. if (state != RUNNING && workQueue.remove(command))
11. reject = true;
12. else if (state < STOP &&
13. poolSize < Math.max(corePoolSize, 1) &&
14. !workQueue.isEmpty())
15. t = addThread(null);
16. } finally {
17. mainLock.unlock();
18. }
19. if (reject)
20. reject(command);
21. else if (t != null)
22. t.start();
23. }
24. ..........
25.}
在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码:
收藏代码
1.public class ThreadPoolExecutor extends AbstractExecutorService {
2. ..........
3. private Thread addThread(Runnable firstTask) {
4. Worker w = new Worker(firstTask);
5. Thread t = threadFactory.newThread(w);
6. if (t != null) {
7. w.thread = t;
8. workers.add(w);
9. int nt = ++poolSize;
10. if (nt > largestPoolSize)
11. largestPoolSize = nt;
12. }
13. return t;
14. }
15. ..........
16.}
这里两个重点,很明显:
1、Worker w = new Worker(firstTask)
2、Thread t = threadFactory.newThread(w)
先看Worker是个什么结构:
收藏代码
1.public class ThreadPoolExecutor extends AbstractExecutorService {
2. ..........
3. private final class Worker implements Runnable {
4. ..........
5. Worker(Runnable firstTask) {
6. this.firstTask = firstTask;
7. }
8.
9. private Runnable firstTask;
10. ..........
11.
12. public void run() {
13. try {
14. Runnable task = firstTask;
15. firstTask = null;
16. while (task != null || (task = getTask()) != null) {
17. runTask(task);
18. task = null;
19. }
20. } finally {
21. workerDone(this);
22. }
23. }
24. }
25.
26. Runnable getTask() {
27. for (;;) {
28. try {
29. int state = runState;
30. if (state > SHUTDOWN)
31. return null;
32. Runnable r;
33. if (state == SHUTDOWN) // Help drain queue
34. r = workQueue.poll();
35. else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
36. r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
37. else
38. r = workQueue.take();
39. if (r != null)
40. return r;
41. if (workerCanExit()) {
42. if (runState >= SHUTDOWN) // Wake up others
43. interruptIdleWorkers();
44. return null;
45. }
46. // Else retry
47. } catch (InterruptedException ie) {
48. // On interruption, re-check runState
49. }
50. }
51. }
52. }
53. ..........
54.}
Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。
在看看newThread是一个什么方法:
收藏代码
1.public class Executors {
2. ..........
3. static class DefaultThreadFactory implements ThreadFactory {
4. ..........
5. public Thread newThread(Runnable r) {
6. Thread t = new Thread(group, r,
7. namePrefix + threadNumber.getAndIncrement(),
8. 0);
9. if (t.isDaemon())
10. t.setDaemon(false);
11. if (t.getPriority() != Thread.NORM_PRIORITY)
12. t.setPriority(Thread.NORM_PRIORITY);
13. return t;
14. }
15. ..........
16. }
17. ..........
18.}
通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。
之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。
从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。
看一个简单的例子:
收藏代码
1.public class CacheThreadPool {
2. public static void main(String[] args) {
3. ExecutorService exec=Executors.newCachedThreadPool();
4. for(int i=0;i<5;i++)
5. exec.execute(new LiftOff());
6. exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务
7. }
8.}
这个例子其实很容易看懂,ExecutorService中有一个execute方法,这个方法的参数是Runnable类型。也就是说,将一个实现了Runnable类型的类的实例作为参数传入execute方法并执行,那么线程就相应的执行了。
一、ExecutorService
先看看ExecutorService,这是一个接口,简单的列一下这个接口:
收藏代码
1.public interface ExecutorService extends Executor {
2.
3. void shutdown();
4.
5. List<Runnable> shutdownNow();
6.
7. boolean isShutdown();
8.
9. boolean isTerminated();
10.
11. boolean awaitTermination(long timeout, TimeUnit unit)
12.
13. <T> Future<T> submit(Callable<T> task);
14.
15. <T> Future<T> submit(Runnable task, T result);
16.
17. Future<?> submit(Runnable task);
18.
19. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
20.
21. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
22.
23. <T> T invokeAny(Collection<? extends Callable<T>> tasks)
24.
25. <T> T invokeAny(Collection<? extends Callable<T>> tasks,
26. long timeout, TimeUnit unit)
27.}
ExecuteService继承了Executor,Executor也是一个接口,里面只有一个方法:
收藏代码
1.void execute(Runnable command)
二、Executors
Executors是一个类,直接援引JDK文档的说明来说一下这个类的作用:
Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods:
•Methods that create and return an ExecutorService set up with commonly useful configuration settings.
•Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
•Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
•Methods that create and return a ThreadFactory that sets newly created threads to a known state.
•Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.
在上面的例子中,我们用到了newCachedThreadPool()方法。看一下这个方法:
收藏代码
1.public static ExecutorService newCachedThreadPool() {
2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3. 60L, TimeUnit.SECONDS,
4. new SynchronousQueue<Runnable>());
5. }
在源码中我们可以知道两点,1、这个方法返回类型是ExecutorService;2、此方法返回值实际是另一个类的实例。看一下这个类的信息:
收藏代码
1.public class ThreadPoolExecutor extends AbstractExecutorService {
2. ..........
3. private final BlockingQueue<Runnable> workQueue;//这个变量在下面会提到
4. ..........
5.}
ThreadPoolExecutor继承了AbstractExecutorService,而AbstractExecutorService又实现了ExecutorService接口。所以,根据多态,ThreadPoolExecutor可以看作是ExecutorService类型。
线程执行的最关键的一步是执行了executor方法,根据java的动态绑定,实际执行的是ThreadPoolExecutor所实现的executor方法。看看源码:
收藏代码
1.public class ThreadPoolExecutor extends AbstractExecutorService {
2. ..........
3. public void execute(Runnable command) {
4. if (command == null)
5. throw new NullPointerException();
6. if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
7. if (runState == RUNNING && workQueue.offer(command)) {
8. if (runState != RUNNING || poolSize == 0)
9. ensureQueuedTaskHandled(command);
10. }
11. else if (!addIfUnderMaximumPoolSize(command))
12. reject(command); // is shutdown or saturated
13. }
14. }
15. ..........
16.}
根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是:
1、workQueue.offer(command)
workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。
2、ensureQueuedTaskHandled(command)
这个是线程执行的关键语句。看看它的源码:
收藏代码
1.public class ThreadPoolExecutor extends AbstractExecutorService {
2. ..........
3. private void ensureQueuedTaskHandled(Runnable command) {
4. final ReentrantLock mainLock = this.mainLock;
5. mainLock.lock();
6. boolean reject = false;
7. Thread t = null;
8. try {
9. int state = runState;
10. if (state != RUNNING && workQueue.remove(command))
11. reject = true;
12. else if (state < STOP &&
13. poolSize < Math.max(corePoolSize, 1) &&
14. !workQueue.isEmpty())
15. t = addThread(null);
16. } finally {
17. mainLock.unlock();
18. }
19. if (reject)
20. reject(command);
21. else if (t != null)
22. t.start();
23. }
24. ..........
25.}
在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码:
收藏代码
1.public class ThreadPoolExecutor extends AbstractExecutorService {
2. ..........
3. private Thread addThread(Runnable firstTask) {
4. Worker w = new Worker(firstTask);
5. Thread t = threadFactory.newThread(w);
6. if (t != null) {
7. w.thread = t;
8. workers.add(w);
9. int nt = ++poolSize;
10. if (nt > largestPoolSize)
11. largestPoolSize = nt;
12. }
13. return t;
14. }
15. ..........
16.}
这里两个重点,很明显:
1、Worker w = new Worker(firstTask)
2、Thread t = threadFactory.newThread(w)
先看Worker是个什么结构:
收藏代码
1.public class ThreadPoolExecutor extends AbstractExecutorService {
2. ..........
3. private final class Worker implements Runnable {
4. ..........
5. Worker(Runnable firstTask) {
6. this.firstTask = firstTask;
7. }
8.
9. private Runnable firstTask;
10. ..........
11.
12. public void run() {
13. try {
14. Runnable task = firstTask;
15. firstTask = null;
16. while (task != null || (task = getTask()) != null) {
17. runTask(task);
18. task = null;
19. }
20. } finally {
21. workerDone(this);
22. }
23. }
24. }
25.
26. Runnable getTask() {
27. for (;;) {
28. try {
29. int state = runState;
30. if (state > SHUTDOWN)
31. return null;
32. Runnable r;
33. if (state == SHUTDOWN) // Help drain queue
34. r = workQueue.poll();
35. else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
36. r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
37. else
38. r = workQueue.take();
39. if (r != null)
40. return r;
41. if (workerCanExit()) {
42. if (runState >= SHUTDOWN) // Wake up others
43. interruptIdleWorkers();
44. return null;
45. }
46. // Else retry
47. } catch (InterruptedException ie) {
48. // On interruption, re-check runState
49. }
50. }
51. }
52. }
53. ..........
54.}
Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。
在看看newThread是一个什么方法:
收藏代码
1.public class Executors {
2. ..........
3. static class DefaultThreadFactory implements ThreadFactory {
4. ..........
5. public Thread newThread(Runnable r) {
6. Thread t = new Thread(group, r,
7. namePrefix + threadNumber.getAndIncrement(),
8. 0);
9. if (t.isDaemon())
10. t.setDaemon(false);
11. if (t.getPriority() != Thread.NORM_PRIORITY)
12. t.setPriority(Thread.NORM_PRIORITY);
13. return t;
14. }
15. ..........
16. }
17. ..........
18.}
通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。
之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。
从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。
相关推荐
`ExecutorService`通常与`ThreadPoolExecutor`或`Executors`类一起使用,如以下示例所示: ```java import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class MyTask ...
this.executorService = Executors.newFixedThreadPool(numThreads); } public String read() throws IOException, InterruptedException, ExecutionException { List<Future<String>> futures = new ArrayList...
executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * POOL_SIZE); System.out.println(""); } ``` - **service 方法**:循环监听客户端连接请求,接受连接后将每个...
以下是一个简单的示例,演示如何使用`ExecutorService`和`Callable`/`Future`来实现服务器端处理客户端请求的功能: ```java private static final int PORT = 19527; ServerSocket serverListenSocket = new ...
在Android开发中,异步操作是至关重要的,它能让应用程序在执行耗时任务时保持界面...这个"AysncTest"例子可能就是一个演示这些异步处理技术的简单应用,通过分析和学习,开发者可以更好地掌握Android异步编程的精髓。
ExecutorService executor = Executors.newFixedThreadPool(5); Future<?> future = executor.submit(new TaskthreadDemo2()); // future.get()用于获取任务结果,如果任务已完成 executor.shutdown(); // 关闭...
ExecutorService executor = Executors.newFixedThreadPool(5); executor.execute(new MyTask()); // 提交任务到线程池 executor.shutdown(); // 关闭线程池 ``` 在实际开发中,工具库如Apache Commons Lang的`...
3. 使用ExecutorService:Java并发框架的一部分,它允许我们创建线程池,管理和控制线程的生命周期。例如,使用Executors.newFixedThreadPool(int nThreads)创建固定大小的线程池。 三、多线程下载原理 多线程下载...
ExecutorService executor = Executors.newFixedThreadPool(maxThreads); while (true) { // 接受客户端连接 Socket clientSocket = serverSocket.accept(); // 创建处理线程 Runnable worker = new ...
ExecutorService mExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public ImageLoader() { initImageCache(); } private void initImageCache() { final ...
ExecutorService executor = Executors.newCachedThreadPool(); Future<Double> future = executor.submit(new Callable() { public Double call() { return doSomeLongComputation(); } }); doSomethingElse(); ...
在这个例子中,可能使用了`Executors`工具类来创建一个线程池,例如`Executors.newFixedThreadPool(int nThreads)`,创建一个固定大小的线程池,可以处理多个并发请求。 4. **`saleTicket.java`** 这个文件可能...
ExecutorService executor = Executors.newFixedThreadPool(threadCount); for (int i = 0; i ; i++) { executor.submit(() -> { for (int j = 0; j ; j++) { // 对matrix[i][j]进行操作 } }); } executor....
`ExecutorService`是线程池的接口,可以使用`Executors`工厂类创建不同类型的线程池,如固定大小线程池、可缓存线程池、单线程线程池等。 总之,Java多线程与并发编程是Java程序员必须掌握的核心技能,它涉及到操作...
使用`Executors`创建线程池的例子: ```java ExecutorService executor = Executors.newFixedThreadPool(5); // 创建一个包含5个线程的线程池 executor.execute(new Runnable() { @Override public void run() { ...
ExecutorService es = Executors.newFixedThreadPool(50); // 闭锁 CountDownLatch cdl = new CountDownLatch(5000); for (int i = 0; i ; i++) { es.execute(() -> { test.increment(); cdl.countDown(); }...
ExecutorService exec = Executors.newSingleThreadScheduledExecutor(); // ExecutorService exec = Executors.newCachedThreadPool(); //如果添加给线程池中添加足够多的线程,就可以让所有任务都执行,避免...
ExecutorService executor = Executors.newFixedThreadPool(5); executor.submit(() -> { // 执行异步任务 System.out.println("Task executed in thread: " + Thread.currentThread().getName()); }); ``` 这里...
ExecutorService es = Executors.newFixedThreadPool(3); try { Future<String> future1 = es.submit(new MyCallableClass(0)); System.out.println("task1:" + future1.get()); Future<String> future2 = es....