`

线程池

 
阅读更多

 

个人学习笔记,如有错误欢迎指正。。

 

一 线程池的介绍

      如果需要线程来执行的任务,可以使用 new Thread 来创建线程,Thread.start()方法执行线程,线程执行完之后,不可再利用。还有其它任务需要线程来执行,需要重复上述步骤。

        这样就形成,一个任务创建一条线程来执行,对于大量任务来说,频繁创建和销毁线程都需要耗费系统资源,这种情况可以使用线程池来提升性能减少开销。

 

       线程池中的线程创建完成后,可以重复的执行不同的任务(Runnable或Callable),实现了线程的重复利用,减少了创线程的数量。

       线程池实现的简要示意:

public class ThreadPool {
	private final BlockingQueue<Runnable> queue ;
	public ThreadPool (int threadNum){
		this.queue = new ArrayBlockingQueue<Runnable>(10);
		for(int i=0;i<threadNum;i++){
			new WorkerThread().start();
		}
	}
	public void execute(Runnable runnable) throws Exception{
		this.queue.put(runnable);
	}
	private final class WorkerThread extends Thread{
		@Override
		public void run() {
			// TODO Auto-generated method stub
			try{
				queue.take().run();//不断的从任务队列中取出任务来执行
			}catch(Exception e){}
		}
		
	}
}

 

 

 线程池的构建器:

  

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
          用给定的初始参数和默认的线程工厂及处理程序创建新的 ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
          用给定的初始参数创建新的 ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
          用给定的初始参数创建新的 ThreadPoolExecutor

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
          用给定的初始参数创建新的 ThreadPoolExecutor

 

 

 

参数说明:

    corePoolSize 核心线程数 就是线程池中创建多少条线程来处理任务。

workQueue 任务队列,用于存放需要处理的任务。线程从这个任务队列中取出需要处理的任务。

maximumPoolSize 最大线程数,当任务积压时(任务队列已满,再也存不下更多的任务时),最多创建多少条线程来来处理任务。

keepAliveTime 线程空闲超时时间,当实际线程数大于  corePoolSize ,且线程空闲时间超过 keepAliveTime ,则销毁该线程,用于保持实际线程数==  corePoolSize

 threadFactory 线程工厂,用于创建线程 。

 handler 当线程池不再理处理更的线程时,对于当于任务调用handler.rejectedExecution(command, this)来处理。

线程的创建过程:

成员变量     private volatile int   poolSize; 表示当前实际线程数据。

    execute(Runnable command)方法被调用的时候:

如果 poolSize<corePoolSize ,则创建新的线程来运行 Runnable任务,同时poolSize++;

如果 poolSize>=corePoolSize,则将 Runnable任务存入工作队列workQueue,当前实际运行的线程数==corePoolSize,这些线程运行完当前任务后,会从 workQueue.take()出任务来执行。

如果 workQueue已满,任务无法再加入队列,如果maximumPoolSize> poolSize,则创那建一条线程来运行当前任务并poolSize++; 如果 poolSize== maximumPoolSize,则当前已经创建了最大的线程数,不能再创建线程了。 对于当前的任务则调用handler.rejectedExecution(command, this);线程池提供了四种 handler:

(未指定时默认使用ThreadPoolExecutor.AbortPolicy

 

 

static class ThreadPoolExecutor.AbortPolicy
          它将抛出 RejectedExecutionException.
static class ThreadPoolExecutor.CallerRunsPolicy
          它直接在 execute 方法的调用线程中运行任务,当前执行的execute方法会直接调用Runnable.run方法,直至任务执行完成,excecute方法才会返回;如果执行程序已关闭,则会丢弃该任务。
static class ThreadPoolExecutor.DiscardOldestPolicy
          它放弃最旧的未处理请求,即将工作队列头上的任务舍弃,然后重试 execute(当前任务);如果执行程序已关闭,则会丢弃该任务。
static class ThreadPoolExecutor.DiscardPolicy
          默认情况下它将放弃当前任务。

 

二  exectue方法源码分析

exectue方法源码分析:

 

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {//poolSize < corePoolSize,调用addIfUnderCorePoolSize创那建新的线程来运行任务
            if (runState == RUNNING && workQueue.offer(command)) {//poolSize = corePoolSize//任务加入工作队列
                if (runState != RUNNING || poolSize == 0)//
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command)) //poolSize < maximumPoolSize时增加线程来运行任务,
                reject(command); //当poolSize == maximumPoolSize时,调用handler.rejectedExecution(command, this);
        }
    }

 

 

 

 

 现在看一下线程池中的线程对象,是如何运行任务的:

    private final class Worker implements Runnable {

        private final ReentrantLock runLock = new ReentrantLock();

        private Runnable firstTask;

        Thread thread;

        Worker(Runnable firstTask) {//第一次运行的任务
            this.firstTask = firstTask;
        }

        /**
         * Runs a single task between before/after methods.
         */
        private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                
                boolean ran = false;
                beforeExecute(thread, task);//扩展点,运行任务之前调用
                try {
                    task.run();//运行任务
                    ran = true;
                    afterExecute(task, null);;//扩展点,运行完任务后调用
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

        /**
         * Main run loop
         */
        public void run() {//线程执行入口
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {//无线循环,获取任务
                    runTask(task);//运行任务
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }
    }
     Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;//线程STOP或TERMINATED状态,返回NULL来退出当前线程
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)//实际线程数>corePoolSize或 允许线程销毁(通过 public void allowCoreThreadTimeOut(boolean value)方法设定)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);//线程空闲超时时间内未得到任务,则返回null,用于当前线程退出,如果得到任务则运行
                else
                    r = workQueue.take();//队列有任务则取出,没有任务则阻塞
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

 

三 线程池的扩展点:

protected void beforeExecute(Thread t, Runnable r) { }

--任务运行之前调用

protected void afterExecute(Runnable r, Throwable t) { }

--任务运行之后调用。

 protected void terminated() { }

--线程池终止结束

 

四 线程池的四种状态:

RUNNING:接收新的任务并处理队列任务

SHUTDOWN:不接收新的任务,但处理队列任务

STOP:不接收新的任务,不处理队列任务,并中断所有进行中的任务。

TERMINATED:与STOP相同并所有线程结束

 

五 线程池的关闭:

1.shutdown()方法

调用shutdown()方法之后,更改线程池的状态为 shutdown并设置中断标志等操作后就返回了,这时线程池并没有立即停止运行:exectue对于新入的任务调用 reject(command)来拒绝任务,各个线程需要把工作队列中的任务都处理完成,才能退出。

 

  源码分析:

    

    public void shutdown() {
        

	SecurityManager security = System.getSecurityManager();
	if (security != null)
            security.checkPermission(shutdownPerm);

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (security != null) { // Check if caller can modify our threads
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }

            int state = runState;
            if (state < SHUTDOWN)
                runState = SHUTDOWN;//更改状态为SHUTDOWN

            try {
                for (Worker w : workers) {
                    w.interruptIfIdle();//如果线程没有运行任务,则设置线程中断标志
                }
            } catch (SecurityException se) { // Try to back out
                runState = state;
                // tryTerminate() here would be a no-op
                throw se;
            }

            tryTerminate(); // 如果线程数为0,并任务队列为空则结束
        } finally {
            mainLock.unlock();
        }
    }

  void interruptIfIdle() {
            final ReentrantLock runLock = this.runLock;
            if (runLock.tryLock()) {//线程未执行任务,则设定线程中断标志
                try {
                    thread.interrupt();
                } finally {
                    runLock.unlock();
                }
            }
        }

//线程在shutdown状态下的运行情况及对中断的响应
Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // 如果shutdown了,则直接取任务队列中的任务,队列为空则返回null,而不是调用TASK(没有任务时等待)
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();//线程被唤醒后,检测到中断标志,抛出异常InterruptedException,从而进入下一个循环
                if (r != null)//如果有工作任务,则继续执行任务
                    return r;
                if (workerCanExit()) {//如果没有工作任务,再次判断工人队列为空,则当前线程可以退出
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;//退出当前线程
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }


 

 

 2. List<Runnable> shutdownNow()

    shutdownNow方法运行后,更改线程池状态为STOP,并将所有线程设置中断,工作队列清空,返回未处理的任务列表。这时线程池未必已停止,因为可能有正在运行中的线程,需要线程都退出之后,线程池才真正停止。

源码分析:

public List<Runnable> shutdownNow() {
	SecurityManager security = System.getSecurityManager();
	if (security != null)
            security.checkPermission(shutdownPerm);

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (security != null) { // Check if caller can modify our threads
                for (Worker w : workers)
                    security.checkAccess(w.thread);
            }

            int state = runState;
            if (state < STOP)
                runState = STOP;//设置STOP状态

            try {
                for (Worker w : workers) {
                    w.interruptNow();//所有线程设置中断标志
                }
            } catch (SecurityException se) { // Try to back out
                runState = state;
                // tryTerminate() here would be a no-op
                throw se;
            }

            List<Runnable> tasks = drainQueue();//清空任务队列
            tryTerminate(); // 如果当前没有线程运行,则试着完全停止
            return tasks;//返回未处理的任务列表
        } finally {
            mainLock.unlock();
        }
    }

//Stop状态下线程的运行情况及对中断的响应
  Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)//如果是stop状态则直接返回null从而退出线程
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();//take方法会响应中断,抛出异常,并从循环开始再次执行,从而退出
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }
 
  

 

上述两个停止方法 shutDown() 和 shutDownNow()被调用后,线程池未必立即停止运行,那什么时候线程池已经停止运行了呢? awaitTermination方法返回真,则已停止。

源码分析:

 

   

 public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runState == TERMINATED)
                    return true;//线程池已停止立即返回真
                if (nanos <= 0)//等待时间已用完或设定的等待时间>=0,则立即返回假
                    return false;
                nanos = termination.awaitNanos(nanos);//阻塞等待线程池停止,如果已停止则被唤醒,超时也被唤醒
            }
        } finally {
            mainLock.unlock();
        }
    }

 

 六 可获取线程任务执行结果的方法 sumit(Callable task)

    方法 public <T> Future<T> submit(Callable<T> task) ,提交一个任务让线程池执行,并返回Future对象用于获取任务的执行结果。submit()方法执行后会立即返回,不会等待任务执行完成。 可以通过isDone()判断线程是否执行完成。get()方法获取结果(任务未执完则阻塞)

   future可用方法:

 

boolean cancel(boolean mayInterruptIfRunning)
          试图取消对此任务的执行。
 V

get()
          如果任务未完成,则阻塞,直至任务完成获取其结果。

    异常:

 

CancellationException - 如果计算被取消。ExecutionException - 如果计算抛出异常。InterruptedException - 如果当前的线程在等待时被中断。

 

 V

get(long timeout, TimeUnit unit)

         如果任务未完成,则阻塞指定时间,如指定时间内任务完成,则获取其结果。

      异常:

CancellationException - 如果计算被取消。
ExecutionException - 如果计算抛出异常。
InterruptedException - 如果当前的线程在等待时被中断。
TimeoutException - 如果等待超时。
<!-- ========= END OF CLASS DATA ========= -->
 boolean isCancelled()
          如果在任务正常完成前将其取消,则返回 true
 boolean isDone()
          如果任务已完成,则返回 true

 

 

 ThreadPoolExecutor 线程池 submit 方法 将Callable对象转为Runnable对象并调用 execute(Runnable)方法来执行任务。

源码分析:

 

 public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);//将Callable对象转为Runnable对象,调用execute(Runnable)方法执行任务
        return ftask;
    }


 

  RunnableFuture 类实现了Future,提供了get()等方法。

  RunnableFuture 类实现了Runnable,线程池通过execute方法调用它的run()方法来执行任务。

  主要使用了AbstractQueuedSynchronizer实现阻塞及唤醒。

  源码分析看看run方法做了什么:

  

   	    void innerRun() {//run方法直接调用这个方法
            if (!compareAndSetState(0, RUNNING))//如果state为0,则改为Running,state初始化状态为0,如果不为0,则说明该任务被(可能其它线程)调用过了,不能再调用立即返回。
                return;
            try {
                runner = Thread.currentThread();
                if (getState() == RUNNING) // 重新检查线程状态
                    innerSet(callable.call());//callable.call()调用任务,结束后调用innerSet设置结果
                else
                    releaseShared(0); // cancel
            } catch (Throwable ex) {
                innerSetException(ex);
            }
        }
    }    
    
  void innerSet(V v) {
	 for (;;) {
		int s = getState();
		if (s == RAN)//任务状态为已完成则直接返回
		    return;
                if (s == CANCELLED) {
                    releaseShared(0);//任务取消,释放线程共锁
                    return;
                }
		if (compareAndSetState(s, RAN)) {//将state状态变为已完成
                    result = v;//结果赋值
                    releaseShared(0);//调用tryReleaseShared方法,tryReleaseShared方法返回真,唤醒被阻塞的线程
                    done();
		    return;
                }
            }
    }
    
     protected boolean tryReleaseShared(int ignore) {
            runner = null;
            return true;//返回真,唤醒调用GET方法阻塞线程返回任务结果
        }
        
   }
    
    //get结果方法直接调用本方法
      V innerGet() throws InterruptedException, ExecutionException {
            acquireSharedInterruptibly(0);//方法内部调用tryAcquireShared,如果任务未完成,则阻塞,直至被唤醒,唤醒时任务已完成了,才能顺序执行,返回结果或抛出异常
            if (getState() == CANCELLED)
                throw new CancellationException();
            if (exception != null)
                throw new ExecutionException(exception);
            return result;
        }
        //方法返回-1,则表示任务未完成,线程阻塞,等待唤醒
        //方法返回1,表示任务已完成,程序顺序执行
        protected int tryAcquireShared(int ignore) {
            return innerIsDone()? 1 : -1;
        }
  其它submit方法:
 
Future<?>

submit(Runnable task)
          提交一个 Runnable 任务用于执行,并返回一个表示该任务的 Future。

         该 Future.get()方法阻塞直至任务完成,返回null. 

        (关心的是任务何时完成,并不关心任务的结果)

<T> Future<T>
submit(Runnable task, T result)
          提交一个 Runnable 任务用于执行,并返回一个 Future,该 Future.get()方法阻塞直至任务完成,返回方法参数中的result

 

 七 批量执行任务并返回结果的方法

 

 

<T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
          执行给定的任务,阻塞直至所有任务完成时,返回保持任务状态和结果的 Future 列表。Future.get()会立即返回结果,因为任务已执行完成了。 
<T> List<Future<T>>

invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
          执行给定的任务,阻塞直至所有任务在超时前执行完成,返回所有任务的Future 列表。这时所有的Future.isDone==true;

       如果有任务未在超时时间内完成,由于超时返回所有任务的Future 列表,

并取消未完成的任务。Future.isDone==true;为完成任务,Future.isDone==false为未完成的任务。

 

<T> T
invokeAny(Collection<Callable<T>> tasks)
          执行给定的任务,阻塞直至某一个任务最先完成(也就是未抛出异常),则返回该任务的执行结果,并取消其它任务
<T> T

invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
          执行给定的任务,阻塞直至某一个任务最先完成,返回该任务的结果,并取消其它任务。

     如果给定的超时期满前没有任何任务完成,抛出TimeoutException。

 

 

 八 线程池任务抛出异常时的处理

   如果任务抛出异常即 Runnable任务对象的run方法抛出异常,会导至执行该任务的线程退出。 减少了线程数。 有新任务进行时,发现pooSize<corePoolSize ,则创那建新线程。 

   如果很多任务抛出异常,又有新任务进入,就导至线程的频繁的创建与退出。 

  如果很多任务抛出异常,没有新任务进入,则不会创建新的线程,会导至可利用的线程越来越少,对于大的队列,影响了并发,使整体任务处理速度变慢。

  当前所有线程任务 抛出异常时,这些线程会都退出,但任务队列中还有需要处理的线程,则线程池只会创建一条新的线程执行任务。

  可能通过监控getCorePoolSize()方法的返回值确定实际线程数,需要增加线程可以调用prestartCoreThread 来启动一条核心线程 或prestartAllCoreThreads

 

 源码分析:

 

   public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);//抛出异常,则该方法被调用,该方法执行完成后,就退出了run方法,当前线程不可再利用了。
            }
        }
    void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);//线程列表中移走该线程
            if (--poolSize == 0)//实际线程数-1
                tryTerminate();//如果所有线程任务都报错,则poolSize会等于0,则进行tryTerminate方法
        } finally {
            mainLock.unlock();
        }
    } 
    
   private void tryTerminate() {
        if (poolSize == 0) {如果所有线程任务都报错,则poolSize会等于0,状态是Running
            int state = runState;
            //如果工作队列中没有任务,则不做任何处理,新入任务execute方法会启用新的线程
            if (state < STOP && !workQueue.isEmpty()) {//工作队列中有任务
                state = RUNNING; // disable termination check below
                Thread t = addThread(null);//创建一条新的线程用来处理队列中的任务
                if (t != null)
                    t.start();
            }
            if (state == STOP || state == SHUTDOWN) {
                runState = TERMINATED;
                termination.signalAll();
                terminated();
            }
        }
    }    

 九 定时执行或需要周期性重复执行的任务(ScheduledThreadPoolExecutor

 

方法说明:

<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
          创建并执行在给定延迟后启用的 ScheduledFuture。
 ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
          创建并执行在给定延迟后启用的一次性操作。
 ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
          创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。
 ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
          创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。

 

内部实现主要使用延迟队列DelayQueue,延迟队列DelayQueue是Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,则出现期满。此队列不允许使用 null 元素。

  DelayQueue使用装饰模式,在内部队列PriorityQueue的基础上提供的时间延迟的功能

源码分析:

public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                return null;//虽然first != null, first.getDelay(TimeUnit.NANOSECONDS) > 0 说明未到期则返回NULL
            else {
                E x = q.poll();
                assert x != null;
                if (q.size() != 0)//队列不为空,则唤醒其它线程,如TAKE方当阻塞的线程
                    available.signalAll();
                return x;
            }
        } finally {
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    available.await();//队列中没有对象则阻塞
                } else {
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {//队列中有队象,但未到期,也阻塞
                        long tl = available.awaitNanos(delay);
                    } else {
                        E x = q.poll();
                        assert x != null;
                        if (q.size() != 0)
                            available.signalAll(); // wake up other takers
                        return x;

                    }
                }
            }
        } finally {
            lock.unlock();
        }
    }
 

 

 DelayQueue 队列中的对象,只有到期才能TAKE出来,调度线程池依据此功能实现了 延迟指定时间再执行任务的功能,如:

 

<V> ScheduledFuture<V>
schedule(Callable<V> callable, long delay, TimeUnit unit)
          创建并执行在给定延迟后启用的 ScheduledFuture。
 ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
          创建并执行在给定延迟后启用的一次性操作。

 

 

 

那么重复周期性执行的任务又如何实现的呢? 

看源码:

 

        private void runPeriodic() {
            boolean ok = ScheduledFutureTask.super.runAndReset();//调用RUN方法,并将任务状态初始化,以便下次再调用
            boolean down = isShutdown();
            // Reschedule if not cancelled and not shutdown or policy allows
            if (ok && (!down ||
                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                        !isTerminating()))) {
                long p = period;
                if (p > 0)
                    time += p;//计算下一次调用时间
                else
                    time = now() - p;//计算下一次调用时间
                ScheduledThreadPoolExecutor.super.getQueue().add(this);
//重新加入队列,等待下一周期TAKE出来执行。
            }
            // This might have been the final executed delayed
            // task.  Wake up threads to check.
            else if (down)
                interruptIdleWorkers();
        }

        /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {
            if (isPeriodic())//如果需要周期性执行
                runPeriodic();
            else
                ScheduledFutureTask.super.run();//只需一次执行
        }
    }
 

 

 

 

 十 提供线程池实例化功能的工厂类 

 

static ExecutorService newCachedThreadPool()
          创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
          创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们,并在需要时使用提供的 ThreadFactory 创建新线程。
static ExecutorService newFixedThreadPool(int nThreads)
          创建一个可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程。
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
          创建一个可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程,在需要时使用提供的 ThreadFactory 创建新线程。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
          创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)
          创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
static ExecutorService newSingleThreadExecutor()
          创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
          创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程,并在需要时使用提供的 ThreadFactory 创建新线程。
static ScheduledExecutorService newSingleThreadScheduledExecutor()
          创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)
          创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。

 

 

 十一  可立即获得线程任务执行结果的服务类ExecutorCompletionService

ThreadPoolExecutor  提供了执行一组线程任务的方法:

List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)

该方法阻塞直至所有任务完成。

每个线程任务执行时间长短不一,即使最短线程执行完成,也不能返回结果,需要等待所有线程完成时才返回,这些线程中可能存在需要执行很长时间的线程。 

 ExecutorCompletionService 类提供了某条线程任务执行完成后,可立即获得其结果的功能,而不需要等待其它线程的完成。 

实现原理是:每个线程任务(FutureTask)执行完成后,将FutureTask存入BlockingQueue,并对外提供了BlockingQueue的poll及task方法。

API方法:

 

Future<V>

poll()
         调用  BlockingQueue.poll 获取已完成的任务(移出队列),如果队列为空则返回NULL.

         队列为空的原因:

1.此时没有任务完成

2.所有完成任务都已队列中移走 (需要根据任务数量判断是否再次poll)

 

 

 Future<V> poll(long timeout, TimeUnit unit)
      调用  BlockingQueue.poll 获取已完成的任务(移出队列),等待指定时间后队列仍为空则返回NULL.

 队列为空的原因:

1.此时没有任务完成

2.所有完成任务都已队列中移走 (需要根据任务数量判断是否再次poll)

 Future<V> submit(Callable<V> task)
          将Callable转换为 FutureTask ,提交给线程池执行,任务执行完成后,将任务 FutureTask 存入 BlockingQueue队列
 Future<V> submit(Runnable task, V result)
            将Runnable转换为 FutureTask ,提交给线程池执行,任务执行完成后,将任务 FutureTask 存入 BlockingQueue队列
 Future<V>

take()
          调用  BlockingQueue.take获取已完成的任务 (移出队列),如果队列为空则阻塞,直至有任务完成。 

   注意:需要根据任务数量判断是否再次take,如果没有任务了,再take会一至阻塞

 

 

 

实现原理:

   submit方法提交任务时将runnable或Callable类转换为 FutureTask ,  FutureTask 实现了Runnable。

线程池执行任务时是调用  FutureTask 的run方法,run方法执行结束时调用done方法, done方法是属于模版模式,供子类重写。

 

 void innerSet(V v) {
	    for (;;) {
		int s = getState();
		if (s == RAN)
		    return;
                if (s == CANCELLED) {
		    // aggressively release to set runner to null,
		    // in case we are racing with a cancel request
		    // that will try to interrupt runner
                    releaseShared(0);
                    return;
                }
		if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    done();//任务完成时调用
		    return;
                }
            }
        }

 

 protected void done() { }

 

 ExecutorCompletionService  中实现了done方法,方法中将任务存入队列:

 private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

 

 

分享到:
评论

相关推荐

    线程池  

    线程池是一种多线程处理形式,通过预先创建一定数量的线程并管理它们,以提高系统的效率和响应性。在计算机科学中,特别是在软件开发领域,线程池是操作系统或者编程语言中的一种资源管理技术。它允许程序预先启动一...

    java线程池概念.txt

    corePoolSize:核心池的大小,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; ...

    阻塞线程池 阻塞线程池 阻塞线程池

    阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池阻塞线程池...

    C++实现线程池详解(基于boost源码以及封装等线程池)

    一、要实现高效的线程池,可以考虑以下几点 二、实现线程池可以按照以下步骤进行 三、简单的C++线程池代码示例 四、 基于boost编写的源码库 - 线程池 4.1 基于boost编写的源码库地址 4.2 boost线程池的先进先出、...

    线程池原理及创建(C++实现)

    ### 线程池原理及创建(C++实现) #### 一、线程池的重要性 在现代计算环境中,网络服务器面临着处理大量并发请求的挑战,其中包括但不限于Web服务器、电子邮件服务器和数据库服务器。这类服务器通常需要在短时间...

    java线程池使用后到底要关闭吗

    java线程池使用后到底要关闭吗 java线程池是一种高效的并发编程技术,可以帮助开发者更好地管理线程资源,提高系统的性能和可靠性。然而,在使用java线程池时,一个常见的问题是:使用完线程池后到底要不要关闭?...

    Django异步任务线程池实现原理

    文章通过实例展示了如何创建一个全局线程池类,该类中封装了线程池对象,并提供了向线程池提交任务、检查任务是否在运行等方法。全局线程池的生命周期与Django主线程的生命周期一致,确保了线程资源的合理释放。 5....

    windows线程池,使用Windows自带的线程池api功能,比你写的线程池性能好得多

    线程池是多线程编程中的一个重要概念,它是一种线程使用模式,通过预先创建一组线程并维护一个线程集合来处理并发任务。在Windows操作系统中,内建的线程池API(Thread Pool API)提供了高效且灵活的线程管理机制,...

    一个简单线程池的实现

    线程池是一种在多线程编程中非常重要的概念,它能有效地管理和调度系统中的线程资源,从而提高系统的效率和响应速度。在这个简单的线程池实现中,我们可以通过`pthread_pool.cpp`、`MainFunctionForTest.cpp`、`...

    VC++ 线程池(ThreadPool)实现

    在编程领域,线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池在C++中是提高程序效率和资源管理的重要工具,尤其在处理大量并发操作时。本文将深入探讨VC++中...

    Java8并行流中自定义线程池操作示例

    Java8并行流中自定义线程池操作示例 Java8并行流中自定义线程池操作示例主要介绍了Java8并行流中自定义线程池操作,结合实例形式分析了并行流的相关概念、定义及自定义线程池的相关操作技巧。 1. 概览 Java8引入了...

    linux线程池创建c实现

    Linux 线程池创建 C 实现 线程池是一种常用的并发编程技术,它可以提高应用程序的性能和响应速度。在 Linux 系统中,使用 C 语言创建线程池可以实现高效的并发处理。 什么时候需要创建线程池呢?简单的说,如果一...

    DELPHI的ThreadPool的线程池DEMO

    DELPHI的线程池(ThreadPool)是一种高效管理并发任务的技术,它允许程序在需要时创建线程,而不是每次需要执行任务时都手动创建。线程池通过预先创建一组线程,然后根据需要分配任务,减少了线程创建和销毁的开销,...

    多线程写法(精易模块线程池和鱼刺模块线程池)

    本篇文章将重点探讨两种线程池实现:精易模块线程池和鱼刺模块线程池,并通过源码分析来展示它们的特点和用法。 首先,精易模块(SanYe Module)是由中国程序员SanYe开发的一系列开源模块,其中包含了线程池的实现...

    Linux线程池目录拷贝

    在Linux系统中,线程池是一种高效的进程管理方式,它允许多个任务并行执行,同时限制了系统中并发线程的数量,以优化资源分配和调度。本项目实现了利用线程池进行目录拷贝的功能,这涉及到多个重要的编程概念和技术...

    仿ACE线程池机制实现的线程池类

    线程池是一种优化资源管理的机制,通过预先创建并维护一组可重用的线程,避免频繁地创建和销毁线程带来的性能开销。在Java、C++等编程语言中,线程池广泛应用于并发处理,提高系统效率,降低系统的资源消耗。本项目...

    Python 使用threading+Queue实现线程池示例

    一、线程池 1、为什么需要使用线程池 1.1 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率。 记创建线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3,如果T1+T3&gt;T2,那...

    线程池管理多线程上传

    线程池管理和多线程上传是并发编程中的一个重要实践,特别是在大数据传输和网络服务中。在Java等编程语言中,线程池通过有效地管理和复用线程资源,避免了频繁创建和销毁线程带来的开销,提升了系统性能。下面将详细...

    c++ 多线程线程池 demo

    线程池是多线程编程中一个重要的概念,它能够优化系统资源的使用,提高系统的响应速度和效率。本篇文章将深入探讨C++中的线程池实现,并通过名为“OEasyPool-1.0”的示例来展示其工作原理。 线程池是预先创建并维护...

    C#线程池 所有线程运行完毕

    在C#编程中,线程池(ThreadPool)是一种高效的线程管理机制,它允许开发者创建并管理多个线程,而无需直接操作线程对象。线程池中的线程可以复用,减少了创建和销毁线程的开销。当我们需要执行大量短生命周期的任务...

Global site tag (gtag.js) - Google Analytics