`

ExecutorService与Executors例子的简单剖析[转]

 
阅读更多

转:http://www.cnblogs.com/yezhenhan/archive/2012/01/07/2315645.html

 

对于多线程有了一点了解之后,那么来看看java.lang.concurrent包下面的一些东西。在此之前,我们运行一个线程都是显式调用了 Thread的start()方法。我们用concurrent下面的类来实现一下线程的运行,而且这将成为以后常用的方法或者实现思路。 

        看一个简单的例子: 

Java代码  收藏代码
  1. public class CacheThreadPool {  
  2.     public static void main(String[] args) {  
  3.         ExecutorService exec=Executors.newCachedThreadPool();  
  4.         for(int i=0;i<5;i++)  
  5.             exec.execute(new LiftOff());  
  6.         exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务  
  7.     }  
  8. }  


        这个例子其实很容易看懂,ExecutorService中有一个execute方法,这个方法的参数是Runnable类型。也就是说,将一个实现了Runnable类型的类的实例作为参数传入execute方法并执行,那么线程就相应的执行了。 

        一、ExecutorService 
        先看看ExecutorService,这是一个接口,简单的列一下这个接口: 

Java代码  收藏代码
  1. public interface ExecutorService extends Executor {  
  2.   
  3.     void shutdown();  
  4.   
  5.     List<Runnable> shutdownNow();  
  6.   
  7.     boolean isShutdown();  
  8.   
  9.     boolean isTerminated();  
  10.   
  11.     boolean awaitTermination(long timeout, TimeUnit unit)  
  12.   
  13.     <T> Future<T> submit(Callable<T> task);  
  14.   
  15.     <T> Future<T> submit(Runnable task, T result);  
  16.   
  17.     Future<?> submit(Runnable task);  
  18.   
  19.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  
  20.   
  21.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)  
  22.   
  23.     <T> T invokeAny(Collection<? extends Callable<T>> tasks)  
  24.   
  25.     <T> T invokeAny(Collection<? extends Callable<T>> tasks,  
  26.                     long timeout, TimeUnit unit)  
  27. }  


        ExecuteService继承了Executor,Executor也是一个接口,里面只有一个方法: 

Java代码  收藏代码
  1. void execute(Runnable command)  



        二、Executors 
        Executors是一个类,直接援引JDK文档的说明来说一下这个类的作用: 
       

              Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods: 

 

           
  • Methods that create and return an ExecutorService set up with commonly useful configuration settings.
  •        
  • Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.
  •        
  • Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.
  •        
  • Methods that create and return a ThreadFactory that sets newly created threads to a known state.
  •        
  • Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.
  •        


        在上面的例子中,我们用到了newCachedThreadPool()方法。看一下这个方法: 

Java代码  收藏代码
  1. public static ExecutorService newCachedThreadPool() {  
  2.         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
  3.                                       60L, TimeUnit.SECONDS,  
  4.                                       new SynchronousQueue<Runnable>());  
  5.     }  


        在源码中我们可以知道两点,1、这个方法返回类型是ExecutorService;2、此方法返回值实际是另一个类的实例。看一下这个类的信息: 

Java代码  收藏代码
  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     ..........  
  3.     private final BlockingQueue<Runnable> workQueue;//这个变量在下面会提到  
  4.     ..........  
  5. }  


        ThreadPoolExecutor继承了AbstractExecutorService,而AbstractExecutorService又实现 了ExecutorService接口。所以,根据多态,ThreadPoolExecutor可以看作是ExecutorService类型。 

        线程执行的最关键的一步是执行了executor方法,根据java的动态绑定,实际执行的是ThreadPoolExecutor所实现的executor方法。看看源码: 

Java代码  收藏代码
  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     ..........  
  3.     public void execute(Runnable command) {  
  4.         if (command == null)  
  5.             throw new NullPointerException();  
  6.         if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
  7.             if (runState == RUNNING && workQueue.offer(command)) {  
  8.                 if (runState != RUNNING || poolSize == 0)  
  9.                     ensureQueuedTaskHandled(command);  
  10.             }  
  11.             else if (!addIfUnderMaximumPoolSize(command))  
  12.                 reject(command); // is shutdown or saturated  
  13.         }  
  14.     }  
  15.     ..........  
  16. }  


        根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是: 
        1、workQueue.offer(command) 
        workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。 
        2、ensureQueuedTaskHandled(command) 
        这个是线程执行的关键语句。看看它的源码: 

Java代码  收藏代码
  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     ..........  
  3.     private void ensureQueuedTaskHandled(Runnable command) {  
  4.         final ReentrantLock mainLock = this.mainLock;  
  5.         mainLock.lock();  
  6.         boolean reject = false;  
  7.         Thread t = null;  
  8.         try {  
  9.             int state = runState;  
  10.             if (state != RUNNING && workQueue.remove(command))  
  11.                 reject = true;  
  12.             else if (state < STOP &&  
  13.                      poolSize < Math.max(corePoolSize, 1) &&  
  14.                      !workQueue.isEmpty())  
  15.                 t = addThread(null);  
  16.         } finally {  
  17.             mainLock.unlock();  
  18.         }  
  19.         if (reject)  
  20.             reject(command);  
  21.         else if (t != null)  
  22.             t.start();  
  23.     }  
  24.     ..........  
  25. }  


        在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码: 

Java代码  收藏代码
  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     ..........  
  3.     private Thread addThread(Runnable firstTask) {  
  4.         Worker w = new Worker(firstTask);  
  5.         Thread t = threadFactory.newThread(w);  
  6.         if (t != null) {  
  7.             w.thread = t;  
  8.             workers.add(w);  
  9.             int nt = ++poolSize;  
  10.             if (nt > largestPoolSize)  
  11.                 largestPoolSize = nt;  
  12.         }  
  13.         return t;  
  14.     }  
  15.     ..........  
  16. }  


        这里两个重点,很明显: 
        1、Worker w = new Worker(firstTask) 
        2、Thread t = threadFactory.newThread(w) 
        先看Worker是个什么结构: 

Java代码  收藏代码
  1. public class ThreadPoolExecutor extends AbstractExecutorService {  
  2.     ..........  
  3.     private final class Worker implements Runnable {  
  4.         ..........  
  5.         Worker(Runnable firstTask) {  
  6.             this.firstTask = firstTask;  
  7.         }  
  8.   
  9.         private Runnable firstTask;  
  10.         ..........  
  11.   
  12.         public void run() {  
  13.             try {  
  14.                 Runnable task = firstTask;  
  15.                 firstTask = null;  
  16.                 while (task != null || (task = getTask()) != null) {  
  17.                     runTask(task);  
  18.                     task = null;  
  19.                 }  
  20.             } finally {  
  21.                 workerDone(this);  
  22.             }  
  23.         }  
  24.     }  
  25.   
  26.     Runnable getTask() {  
  27.         for (;;) {  
  28.             try {  
  29.                 int state = runState;  
  30.                 if (state > SHUTDOWN)  
  31.                     return null;  
  32.                 Runnable r;  
  33.                 if (state == SHUTDOWN)  // Help drain queue  
  34.                     r = workQueue.poll();  
  35.                 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  
  36.                     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  
  37.                 else  
  38.                     r = workQueue.take();  
  39.                 if (r != null)  
  40.                     return r;  
  41.                 if (workerCanExit()) {  
  42.                     if (runState >= SHUTDOWN) // Wake up others  
  43.                         interruptIdleWorkers();  
  44.                     return null;  
  45.                 }  
  46.                 // Else retry  
  47.             } catch (InterruptedException ie) {  
  48.                 // On interruption, re-check runState  
  49.             }  
  50.         }  
  51.     }  
  52.     }  
  53.     ..........  
  54. }  


        Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。
        在看看newThread是一个什么方法: 

Java代码  收藏代码
  1. public class Executors {  
  2.     ..........  
  3.     static class DefaultThreadFactory implements ThreadFactory {  
  4.         ..........  
  5.         public Thread newThread(Runnable r) {  
  6.             Thread t = new Thread(group, r,  
  7.                                   namePrefix + threadNumber.getAndIncrement(),  
  8.                                   0);  
  9.             if (t.isDaemon())  
  10.                 t.setDaemon(false);  
  11.             if (t.getPriority() != Thread.NORM_PRIORITY)  
  12.                 t.setPriority(Thread.NORM_PRIORITY);  
  13.             return t;  
  14.         }  
  15.         ..........  
  16.     }  
  17.     ..........  
  18. }  


        通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。 

        之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知 道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。 
        从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask 是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实 例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把 Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。

分享到:
评论

相关推荐

    Java+创建线程的例子源代码学习

    `ExecutorService`通常与`ThreadPoolExecutor`或`Executors`类一起使用,如以下示例所示: ```java import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class MyTask ...

    多线程读取文件

    this.executorService = Executors.newFixedThreadPool(numThreads); } public String read() throws IOException, InterruptedException, ExecutionException { List&lt;Future&lt;String&gt;&gt; futures = new ArrayList...

    socket实例

    executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * POOL_SIZE); System.out.println(""); } ``` - **service 方法**:循环监听客户端连接请求,接受连接后将每个...

    Java 5_0 多线程编程实践

    以下是一个简单的示例,演示如何使用`ExecutorService`和`Callable`/`Future`来实现服务器端处理客户端请求的功能: ```java private static final int PORT = 19527; ServerSocket serverListenSocket = new ...

    android异步操作例子

    在Android开发中,异步操作是至关重要的,它能让应用程序在执行耗时任务时保持界面...这个"AysncTest"例子可能就是一个演示这些异步处理技术的简单应用,通过分析和学习,开发者可以更好地掌握Android异步编程的精髓。

    TaskthreadDemo2.rar_java programming

    ExecutorService executor = Executors.newFixedThreadPool(5); Future&lt;?&gt; future = executor.submit(new TaskthreadDemo2()); // future.get()用于获取任务结果,如果任务已完成 executor.shutdown(); // 关闭...

    Android 文件下载功能Java多线程下载功能的例子实例源码

    3. 使用ExecutorService:Java并发框架的一部分,它允许我们创建线程池,管理和控制线程的生命周期。例如,使用Executors.newFixedThreadPool(int nThreads)创建固定大小的线程池。 三、多线程下载原理 多线程下载...

    多线程实例片段

    ExecutorService executor = Executors.newFixedThreadPool(5); executor.execute(new MyTask()); // 提交任务到线程池 executor.shutdown(); // 关闭线程池 ``` 在实际开发中,工具库如Apache Commons Lang的`...

    基于socket java 语言网络通讯机制和程序设计

    ExecutorService executor = Executors.newFixedThreadPool(maxThreads); while (true) { // 接受客户端连接 Socket clientSocket = serverSocket.accept(); // 创建处理线程 Runnable worker = new ...

    Android源码设计模式解析与实战》

    ExecutorService mExecutorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); public ImageLoader() { initImageCache(); } private void initImageCache() { final ...

    Java Lambda 编程之道

    ExecutorService executor = Executors.newFixedThreadPool(5); executor.submit(() -&gt; { // 执行异步任务 System.out.println("Task executed in thread: " + Thread.currentThread().getName()); }); ``` 这里...

    多线程java买票系统

    在这个例子中,可能使用了`Executors`工具类来创建一个线程池,例如`Executors.newFixedThreadPool(int nThreads)`,创建一个固定大小的线程池,可以处理多个并发请求。 4. **`saleTicket.java`** 这个文件可能...

    Java组合式异步编程方法详解.pdf

    ExecutorService executor = Executors.newCachedThreadPool(); Future&lt;Double&gt; future = executor.submit(new Callable() { public Double call() { return doSomeLongComputation(); } }); doSomethingElse(); ...

    2dloop2darray_pl_multest.7z

    ExecutorService executor = Executors.newFixedThreadPool(threadCount); for (int i = 0; i ; i++) { executor.submit(() -&gt; { for (int j = 0; j ; j++) { // 对matrix[i][j]进行操作 } }); } executor....

    Java 多线程与并发编程总结.doc

    `ExecutorService`是线程池的接口,可以使用`Executors`工厂类创建不同类型的线程池,如固定大小线程池、可缓存线程池、单线程线程池等。 总之,Java多线程与并发编程是Java程序员必须掌握的核心技能,它涉及到操作...

    深入java线程池的使用详解

    使用`Executors`创建线程池的例子: ```java ExecutorService executor = Executors.newFixedThreadPool(5); // 创建一个包含5个线程的线程池 executor.execute(new Runnable() { @Override public void run() { ...

    JAVA中锁的解决方案.docx

    ExecutorService es = Executors.newFixedThreadPool(50); // 闭锁 CountDownLatch cdl = new CountDownLatch(5000); for (int i = 0; i ; i++) { es.execute(() -&gt; { test.increment(); cdl.countDown(); }...

    Java多线程饥饿与公平介绍及代码示例

    ExecutorService exec = Executors.newSingleThreadScheduledExecutor(); // ExecutorService exec = Executors.newCachedThreadPool(); //如果添加给线程池中添加足够多的线程,就可以让所有任务都执行,避免...

    Java开发:实现网站信息批量析取

    ExecutorService executor = Executors.newFixedThreadPool(10); // 创建固定大小的线程池 for (String url : urls) { executor.submit(() -&gt; { // 在这里执行每个网址的抓取任务 }); } executor.shutdown();...

    java callable(多线程)

    ExecutorService es = Executors.newFixedThreadPool(3); try { Future&lt;String&gt; future1 = es.submit(new MyCallableClass(0)); System.out.println("task1:" + future1.get()); Future&lt;String&gt; future2 = es....

Global site tag (gtag.js) - Google Analytics