`

java.util.concurrent.ExecutorService与Executors例子的简单剖析

    博客分类:
  • java
阅读更多

java.util.concurrent.ExecutorService与Executors例子的简单剖析

 

对于多线程有了一点了解之后,那么来看看java.lang.concurrent包下面的一些东西。在此之前,我们运行一个线程都是显式调用了Thread的start()方法。我们用concurrent下面的类来实现一下线程的运行,而且这将成为以后常用的方法或者实现思路。 

 

        看一个简单的例子: 

 

Java代码  

public class CacheThreadPool {  

    public static void main(String[] args) {  

        ExecutorService exec=Executors.newCachedThreadPool();  

        for(int i=0;i<5;i++)  

            exec.execute(new LiftOff());  

        exec.shutdown();//并不是终止线程的运行,而是禁止在这个Executor中添加新的任务  

    }  

}  

 

        这个例子其实很容易看懂,ExecutorService中有一个execute方法,这个方法的参数是Runnable类型。也就是说,将一个实现了Runnable类型的类的实例作为参数传入execute方法并执行,那么线程就相应的执行了。 

 

        一、ExecutorService 

        先看看ExecutorService,这是一个接口,简单的列一下这个接口: 

 

Java代码  

public interface ExecutorService extends Executor {  

  

    void shutdown();  

  

    List<Runnable> shutdownNow();  

  

    boolean isShutdown();  

  

    boolean isTerminated();  

  

    boolean awaitTermination(long timeout, TimeUnit unit)  

  

    <T> Future<T> submit(Callable<T> task);  

  

    <T> Future<T> submit(Runnable task, T result);  

  

    Future<?> submit(Runnable task);  

  

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  

  

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)  

  

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)  

  

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,  

                    long timeout, TimeUnit unit)  

}  

 

        ExecuteService继承了Executor,Executor也是一个接口,里面只有一个方法: 

 

Java代码  

void execute(Runnable command)  

 

 

        二、Executors 

        Executors是一个类,直接援引JDK文档的说明来说一下这个类的作用: 

       

 

        Factory and utility methods for Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, and Callable classes defined in this package. This class supports the following kinds of methods: 

 

 

       

Methods that create and return an ExecutorService set up with commonly useful configuration settings.

       

Methods that create and return a ScheduledExecutorService set up with commonly useful configuration settings.

       

Methods that create and return a "wrapped" ExecutorService, that disables reconfiguration by making implementation-specific methods inaccessible.

       

Methods that create and return a ThreadFactory that sets newly created threads to a known state.

       

Methods that create and return a Callable out of other closure-like forms, so they can be used in execution methods requiring Callable.

       

 

        在上面的例子中,我们用到了newCachedThreadPool()方法。看一下这个方法: 

 

Java代码  

public static ExecutorService newCachedThreadPool() {  

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  

                                      60L, TimeUnit.SECONDS,  

                                      new SynchronousQueue<Runnable>());  

    }  

 

        在源码中我们可以知道两点,1、这个方法返回类型是ExecutorService;2、此方法返回值实际是另一个类的实例。看一下这个类的信息: 

 

Java代码  

public class ThreadPoolExecutor extends AbstractExecutorService {  

    ..........  

    private final BlockingQueue<Runnable> workQueue;//这个变量在下面会提到  

    ..........  

}  

 

        ThreadPoolExecutor继承了AbstractExecutorService,而AbstractExecutorService又实现了ExecutorService接口。所以,根据多态,ThreadPoolExecutor可以看作是ExecutorService类型。 

 

        线程执行的最关键的一步是执行了executor方法,根据java的动态绑定,实际执行的是ThreadPoolExecutor所实现的executor方法。看看源码: 

 

Java代码  

public class ThreadPoolExecutor extends AbstractExecutorService {  

    ..........  

    public void execute(Runnable command) {  

        if (command == null)  

            throw new NullPointerException();  

        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  

            if (runState == RUNNING && workQueue.offer(command)) {  

                if (runState != RUNNING || poolSize == 0)  

                    ensureQueuedTaskHandled(command);  

            }  

            else if (!addIfUnderMaximumPoolSize(command))  

                reject(command); // is shutdown or saturated  

        }  

    }  

    ..........  

}  

 

        根据程序正常执行的路线来看,这个方法中比较重要的两个地方分别是: 

        1、workQueue.offer(command) 

        workQueue在上面提到过,是BlockingQueue<Runnable>类型的变量,这条语句就是将Runnable类型的实例加入到队列中。 

        2、ensureQueuedTaskHandled(command) 

        这个是线程执行的关键语句。看看它的源码: 

 

Java代码  

public class ThreadPoolExecutor extends AbstractExecutorService {  

    ..........  

    private void ensureQueuedTaskHandled(Runnable command) {  

        final ReentrantLock mainLock = this.mainLock;  

        mainLock.lock();  

        boolean reject = false;  

        Thread t = null;  

        try {  

            int state = runState;  

            if (state != RUNNING && workQueue.remove(command))  

                reject = true;  

            else if (state < STOP &&  

                     poolSize < Math.max(corePoolSize, 1) &&  

                     !workQueue.isEmpty())  

                t = addThread(null);  

        } finally {  

            mainLock.unlock();  

        }  

        if (reject)  

            reject(command);  

        else if (t != null)  

            t.start();  

    }  

    ..........  

}  

 

        在这里我们就可以看到最终执行了t.start()方法来运行线程。在这之前的重点是t=addThread(null)方法,看看addThread方法的源码: 

 

Java代码  

public class ThreadPoolExecutor extends AbstractExecutorService {  

    ..........  

    private Thread addThread(Runnable firstTask) {  

        Worker w = new Worker(firstTask);  

        Thread t = threadFactory.newThread(w);  

        if (t != null) {  

            w.thread = t;  

            workers.add(w);  

            int nt = ++poolSize;  

            if (nt > largestPoolSize)  

                largestPoolSize = nt;  

        }  

        return t;  

    }  

    ..........  

}  

 

        这里两个重点,很明显: 

        1、Worker w = new Worker(firstTask) 

        2、Thread t = threadFactory.newThread(w) 

        先看Worker是个什么结构: 

 

Java代码  

public class ThreadPoolExecutor extends AbstractExecutorService {  

    ..........  

    private final class Worker implements Runnable {  

        ..........  

        Worker(Runnable firstTask) {  

            this.firstTask = firstTask;  

        }  

  

        private Runnable firstTask;  

        ..........  

  

        public void run() {  

            try {  

                Runnable task = firstTask;  

                firstTask = null;  

                while (task != null || (task = getTask()) != null) {  

                    runTask(task);  

                    task = null;  

                }  

            } finally {  

                workerDone(this);  

            }  

        }  

    }  

  

    Runnable getTask() {  

        for (;;) {  

            try {  

                int state = runState;  

                if (state > SHUTDOWN)  

                    return null;  

                Runnable r;  

                if (state == SHUTDOWN)  // Help drain queue  

                    r = workQueue.poll();  

                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  

                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  

                else  

                    r = workQueue.take();  

                if (r != null)  

                    return r;  

                if (workerCanExit()) {  

                    if (runState >= SHUTDOWN) // Wake up others  

                        interruptIdleWorkers();  

                    return null;  

                }  

                // Else retry  

            } catch (InterruptedException ie) {  

                // On interruption, re-check runState  

            }  

        }  

    }  

    }  

    ..........  

}  

 

        Worker是一个内部类。根据之前可以知道,传入addThread的参数是null,也就是说Work中firstTask为null。

        在看看newThread是一个什么方法: 

 

Java代码  

public class Executors {  

    ..........  

    static class DefaultThreadFactory implements ThreadFactory {  

        ..........  

        public Thread newThread(Runnable r) {  

            Thread t = new Thread(group, r,  

                                  namePrefix + threadNumber.getAndIncrement(),  

                                  0);  

            if (t.isDaemon())  

                t.setDaemon(false);  

            if (t.getPriority() != Thread.NORM_PRIORITY)  

                t.setPriority(Thread.NORM_PRIORITY);  

            return t;  

        }  

        ..........  

    }  

    ..........  

}  

 

        通过源码可以得知threadFactory的实际类型是DefaultThreadFactory,而DefaultThreadFactory是Executors的一个嵌套内部类。 

 

        之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。 

        从这里可以看出,t.start()实际上执行的是Worker内部的run方法。run()内部会在if条件里面使用“短路”:判断firstTask是否为null,若不是null则直接执行firstTask的run方法;如果是null,则调用getTask()方法来获取Runnable类型实例。从哪里获取呢?workQueue!在execute方法中,执行ensureQueuedTaskHandled(command)之前就已经把Runnable类型实例放入到workQueue中了,所以这里可以从workQueue中获取到。

分享到:
评论

相关推荐

    java.util.concurrent 学习ppt

    Java.util.concurrent是Java 5.0引入的一个重要包,它为多线程编程提供了一组高级并发工具。这个包的设计者是Doug Lea,它的出现是JSR-166的一部分,也被称作Tiger更新。Java.util.concurrent的引入是为了解决传统...

    java.util.concurrent介绍(重要).pdf

    总之,`java.util.concurrent` 提供的工具使得并发编程变得更加容易和高效,是 Java 并发编程的基石,无论是对于初学者还是经验丰富的开发者,理解和掌握这个包都是非常重要的。通过熟练运用这些工具,开发者可以...

    SimpleDateFormat线程不安全的5种解决方案.docx

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ...

    JAVA课程学习笔记.doc

    - `java.util.concurrent.Executors`:提供了一些静态工厂方法,用于创建不同类型的线程池实例,如 `newFixedThreadPool()`、`newSingleThreadExecutor()`。 - `java.util.concurrent.CompletionService`:允许获取...

    Server100 代码

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.io.*; class Server{ static ServerSocket ss=null; static Socket s=null; static List l=new ArrayList();...

    java 实现调度器

    这通常是通过Java中的`java.util.Timer`类或者`java.util.concurrent.ScheduledExecutorService`来实现的。这两个工具提供了不同的功能和使用场景,让我们一一进行深入探讨。 首先,我们来看`java.util.Timer`类。...

    java获取压缩文件的名称并解压

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.zip.ZipEntry; import java.util.zip.ZipFile; public class ...

    java 资源搜索并下载(线程等待和通报)

    每个线程负责下载不同的资源,`java.util.concurrent`包提供了丰富的工具,如`ExecutorService`和`Future`,可以方便地管理线程池和控制并发执行。线程池可以避免频繁创建和销毁线程带来的开销。 四、线程等待和...

    Executor,Executors,ExecutorService比较.docx

    `Executors`是`java.util.concurrent`包下的工具类,它提供了多个静态工厂方法,用于创建不同类型的线程池。常见的线程池类型有: - **newFixedThreadPool(int nThreads)**:创建一个固定大小的线程池。线程池的...

    java的concurrent用法详解

    #### 二、`java.util.concurrent`包的关键概念与组件 ##### 2.1 Executor框架 `Executor`框架是`java.util.concurrent`的核心组件之一,它为任务的执行提供了一个统一的接口。其中最重要的接口是`ExecutorService`...

    lambda-change-java.rar_java lambda_lambda

    在并发编程中,`java.util.concurrent`包中的`ExecutorService`可以接受`Runnable`的Lambda表达式,轻松地创建线程执行任务: ```java ExecutorService executor = Executors.newFixedThreadPool(2); executor....

    java定时器\多线程(池)\java队列Demo

    线程池(java.util.concurrent.ExecutorService) 为了解决频繁创建和销毁线程的开销,Java提供了线程池的概念。线程池是一组预先创建的线程,用于处理到来的任务。`ExecutorService` 接口和它的实现类(如 `...

    java线程-Atomic的含义及示例_.docx

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class Test { public...

    Java异步编程最佳实践_.docx

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public interface DataManager { // 同步方法 public String getDataSynchronously(); // 异步方法 public Future...

    java 银行业务队列简单模拟

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Main { public static void main(String[] args) { BankService bankService = new BankService(); ...

    java并发编程分享

    `java.util.concurrent.ExecutorService`和`java.util.concurrent.Executors`类提供了线程池的创建和管理。线程池可以有效地重用已创建的线程,减少线程创建和销毁的开销,同时提供了定时任务和固定线程数等功能。 ...

    米哈游笔试题目-Java方向.docx

    import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class AsyncTaskFramework&lt;T&gt; { private final ExecutorService executor; ...

    java 线程池管理类:Executors_.docx

    `java.util.concurrent.Executors` 继承自 `java.lang.Object`,作为一个工具类,它提供了一系列用于创建和管理线程池的方法,包括`ExecutorService`、`ScheduledExecutorService`、`ThreadFactory`和`Callable`等...

    JAVA多线程框架.pdf

    线程池是Java多线程框架的核心组件,通过`java.util.concurrent.ExecutorService`接口和其子类实现。在示例代码中,`Executors.newFixedThreadPool(2)`创建了一个固定大小为2的线程池,这意味着线程池最多同时运行两...

    Java Concurrent处理并发需求

    #### 一、Java并发基础与Concurrent API介绍 在现代软件开发中,尤其是在服务器端应用中,对并发处理的需求日益增长。为了满足这种需求,Java平台提供了一系列强大的工具和API来支持多线程编程。其中,`java.util....

Global site tag (gtag.js) - Google Analytics