`
longgangbai
  • 浏览: 7348571 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论
阅读更多

转载自 http://www.iteye.com/topic/366591

 

Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。他们的关系为:


 

并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后在提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用内部的线程池完成操作。

一、创建线程池

Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

public static ExecutorService newFixedThreadPool(int nThreads)

创建固定数目线程的线程池。

public static ExecutorService newCachedThreadPool()

创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

public static ExecutorService newSingleThreadExecutor()

创建一个单线程化的Executor。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

Java代码 复制代码 收藏代码
  1. Executor executor = Executors.newFixedThreadPool(10);   
  2. Runnable task = new Runnable() {   
  3.     @Override  
  4.     public void run() {   
  5.         System.out.println("task over");   
  6.     }   
  7. };   
  8. executor.execute(task);   
  9.   
  10. executor = Executors.newScheduledThreadPool(10);   
  11. ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;   
  12. scheduler.scheduleAtFixedRate(task, 1010, TimeUnit.SECONDS);  
Executor executor = Executors.newFixedThreadPool(10);
Runnable task = new Runnable() {
	@Override
	public void run() {
		System.out.println("task over");
	}
};
executor.execute(task);

executor = Executors.newScheduledThreadPool(10);
ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;
scheduler.scheduleAtFixedRate(task, 10, 10, TimeUnit.SECONDS);

 二、ExecutorService与生命周期

ExecutorService扩展了Executor并添加了一些生命周期管理的方法。一个Executor的生命周期有三种状态,运行关闭终止 。Executor创建时处于运行状态。当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不应该再想Executor中添加任务,所有已添加的任务执行完毕后,Executor处于终止状态,isTerminated()返回true。

如果Executor处于关闭状态,往Executor提交任务会抛出unchecked exception RejectedExecutionException。

Java代码 复制代码 收藏代码
  1. ExecutorService executorService = (ExecutorService) executor;   
  2. while (!executorService.isShutdown()) {   
  3.     try {   
  4.         executorService.execute(task);   
  5.     } catch (RejectedExecutionException ignored) {   
  6.            
  7.     }   
  8. }   
  9. executorService.shutdown();  
ExecutorService executorService = (ExecutorService) executor;
while (!executorService.isShutdown()) {
	try {
		executorService.execute(task);
	} catch (RejectedExecutionException ignored) {
		
	}
}
executorService.shutdown();

 三、使用Callable,Future返回结果

Future<V>代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask<V>实现了Future<V>和Runable<V>。Callable代表一个有返回值得操作。

Java代码 复制代码 收藏代码
  1. Callable<Integer> func = new Callable<Integer>(){   
  2.     public Integer call() throws Exception {   
  3.         System.out.println("inside callable");   
  4.         Thread.sleep(1000);   
  5.         return new Integer(8);   
  6.     }          
  7. };         
  8. FutureTask<Integer> futureTask  = new FutureTask<Integer>(func);   
  9. Thread newThread = new Thread(futureTask);   
  10. newThread.start();   
  11.   
  12. try {   
  13.     System.out.println("blocking here");   
  14.     Integer result = futureTask.get();   
  15.     System.out.println(result);   
  16. catch (InterruptedException ignored) {   
  17. catch (ExecutionException ignored) {   
  18. }  
		Callable<Integer> func = new Callable<Integer>(){
			public Integer call() throws Exception {
				System.out.println("inside callable");
				Thread.sleep(1000);
				return new Integer(8);
			}		
		};		
		FutureTask<Integer> futureTask  = new FutureTask<Integer>(func);
		Thread newThread = new Thread(futureTask);
		newThread.start();
		
		try {
			System.out.println("blocking here");
			Integer result = futureTask.get();
			System.out.println(result);
		} catch (InterruptedException ignored) {
		} catch (ExecutionException ignored) {
		}

 ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。

例子:并行计算数组的和。

Java代码 复制代码 收藏代码
  1. package executorservice;   
  2.   
  3. import java.util.ArrayList;   
  4. import java.util.List;   
  5. import java.util.concurrent.Callable;   
  6. import java.util.concurrent.ExecutionException;   
  7. import java.util.concurrent.ExecutorService;   
  8. import java.util.concurrent.Executors;   
  9. import java.util.concurrent.Future;   
  10. import java.util.concurrent.FutureTask;   
  11.   
  12. public class ConcurrentCalculator {   
  13.   
  14.     private ExecutorService exec;   
  15.     private int cpuCoreNumber;   
  16.     private List<Future<Long>> tasks = new ArrayList<Future<Long>>();   
  17.   
  18.     // 内部类   
  19.     class SumCalculator implements Callable<Long> {   
  20.         private int[] numbers;   
  21.         private int start;   
  22.         private int end;   
  23.   
  24.         public SumCalculator(final int[] numbers, int start, int end) {   
  25.             this.numbers = numbers;   
  26.             this.start = start;   
  27.             this.end = end;   
  28.         }   
  29.   
  30.         public Long call() throws Exception {   
  31.             Long sum = 0l;   
  32.             for (int i = start; i < end; i++) {   
  33.                 sum += numbers[i];   
  34.             }   
  35.             return sum;   
  36.         }   
  37.     }   
  38.   
  39.     public ConcurrentCalculator() {   
  40.         cpuCoreNumber = Runtime.getRuntime().availableProcessors();   
  41.         exec = Executors.newFixedThreadPool(cpuCoreNumber);   
  42.     }   
  43.   
  44.     public Long sum(final int[] numbers) {   
  45.         // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor   
  46.         for (int i = 0; i < cpuCoreNumber; i++) {   
  47.             int increment = numbers.length / cpuCoreNumber + 1;   
  48.             int start = increment * i;   
  49.             int end = increment * i + increment;   
  50.             if (end > numbers.length)   
  51.                 end = numbers.length;   
  52.             SumCalculator subCalc = new SumCalculator(numbers, start, end);   
  53.             FutureTask<Long> task = new FutureTask<Long>(subCalc);   
  54.             tasks.add(task);   
  55.             if (!exec.isShutdown()) {   
  56.                 exec.submit(task);   
  57.             }   
  58.         }   
  59.         return getResult();   
  60.     }   
  61.   
  62.     /**  
  63.      * 迭代每个只任务,获得部分和,相加返回  
  64.      *   
  65.      * @return  
  66.      */  
  67.     public Long getResult() {   
  68.         Long result = 0l;   
  69.         for (Future<Long> task : tasks) {   
  70.             try {   
  71.                 // 如果计算未完成则阻塞   
  72.                 Long subSum = task.get();   
  73.                 result += subSum;   
  74.             } catch (InterruptedException e) {   
  75.                 e.printStackTrace();   
  76.             } catch (ExecutionException e) {   
  77.                 e.printStackTrace();   
  78.             }   
  79.         }   
  80.         return result;   
  81.     }   
  82.   
  83.     public void close() {   
  84.         exec.shutdown();   
  85.     }   
  86. }  
package executorservice;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class ConcurrentCalculator {

	private ExecutorService exec;
	private int cpuCoreNumber;
	private List<Future<Long>> tasks = new ArrayList<Future<Long>>();

	// 内部类
	class SumCalculator implements Callable<Long> {
		private int[] numbers;
		private int start;
		private int end;

		public SumCalculator(final int[] numbers, int start, int end) {
			this.numbers = numbers;
			this.start = start;
			this.end = end;
		}

		public Long call() throws Exception {
			Long sum = 0l;
			for (int i = start; i < end; i++) {
				sum += numbers[i];
			}
			return sum;
		}
	}

	public ConcurrentCalculator() {
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
	}

	public Long sum(final int[] numbers) {
		// 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
		for (int i = 0; i < cpuCoreNumber; i++) {
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length)
				end = numbers.length;
			SumCalculator subCalc = new SumCalculator(numbers, start, end);
			FutureTask<Long> task = new FutureTask<Long>(subCalc);
			tasks.add(task);
			if (!exec.isShutdown()) {
				exec.submit(task);
			}
		}
		return getResult();
	}

	/**
	 * 迭代每个只任务,获得部分和,相加返回
	 * 
	 * @return
	 */
	public Long getResult() {
		Long result = 0l;
		for (Future<Long> task : tasks) {
			try {
				// 如果计算未完成则阻塞
				Long subSum = task.get();
				result += subSum;
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		return result;
	}

	public void close() {
		exec.shutdown();
	}
}

 Main

Java代码 复制代码 收藏代码
  1. int[] numbers = new int[] { 123456781011 };   
  2. ConcurrentCalculator calc = new ConcurrentCalculator();   
  3. Long sum = calc.sum(numbers);   
  4. System.out.println(sum);   
  5. calc.close();  
int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 };
ConcurrentCalculator calc = new ConcurrentCalculator();
Long sum = calc.sum(numbers);
System.out.println(sum);
calc.close();

 四、CompletionService

在刚在的例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意字任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使CompletionService。生产者submit()执行的任务。使用者take()已完成的任务,并按照完成这些任务的顺序处理它们的结果 。也就是调用CompletionService的take方法是,会返回按完成顺序放回任务的结果,CompletionService内部维护了一个阻塞队列BlockingQueue,如果没有任务完成,take()方法也会阻塞。修改刚才的例子使用CompletionService:

Java代码 复制代码 收藏代码
  1. public class ConcurrentCalculator2 {   
  2.   
  3.     private ExecutorService exec;   
  4.     private CompletionService<Long> completionService;   
  5.   
  6.   
  7.     private int cpuCoreNumber;   
  8.   
  9.     // 内部类   
  10.     class SumCalculator implements Callable<Long> {   
  11.         ......   
  12.     }   
  13.   
  14.     public ConcurrentCalculator2() {   
  15.         cpuCoreNumber = Runtime.getRuntime().availableProcessors();   
  16.         exec = Executors.newFixedThreadPool(cpuCoreNumber);   
  17.         completionService = new ExecutorCompletionService<Long>(exec);   
  18.   
  19.   
  20.     }   
  21.   
  22.     public Long sum(final int[] numbers) {   
  23.         // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor   
  24.         for (int i = 0; i < cpuCoreNumber; i++) {   
  25.             int increment = numbers.length / cpuCoreNumber + 1;   
  26.             int start = increment * i;   
  27.             int end = increment * i + increment;   
  28.             if (end > numbers.length)   
  29.                 end = numbers.length;   
  30.             SumCalculator subCalc = new SumCalculator(numbers, start, end);    
  31.             if (!exec.isShutdown()) {   
  32.                 completionService.submit(subCalc);   
  33.   
  34.   
  35.             }   
  36.                
  37.         }   
  38.         return getResult();   
  39.     }   
  40.   
  41.     /**  
  42.      * 迭代每个只任务,获得部分和,相加返回  
  43.      *   
  44.      * @return  
  45.      */  
  46.     public Long getResult() {   
  47.         Long result = 0l;   
  48.         for (int i = 0; i < cpuCoreNumber; i++) {               
  49.             try {   
  50.                 Long subSum = completionService.take().get();   
  51.                 result += subSum;              
  52.             } catch (InterruptedException e) {   
  53.                 e.printStackTrace();   
  54.             } catch (ExecutionException e) {   
  55.                 e.printStackTrace();   
  56.             }   
  57.         }   
  58.         return result;   
  59.     }   
  60.   
  61.     public void close() {   
  62.         exec.shutdown();   
  63.     }   
  64. }  
public class ConcurrentCalculator2 {

	private ExecutorService exec;
	private CompletionService<Long> completionService;


	private int cpuCoreNumber;

	// 内部类
	class SumCalculator implements Callable<Long> {
		......
	}

	public ConcurrentCalculator2() {
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
		completionService = new ExecutorCompletionService<Long>(exec);


	}

	public Long sum(final int[] numbers) {
		// 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
		for (int i = 0; i < cpuCoreNumber; i++) {
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length)
				end = numbers.length;
			SumCalculator subCalc = new SumCalculator(numbers, start, end);	
			if (!exec.isShutdown()) {
				completionService.submit(subCalc);


			}
			
		}
		return getResult();
	}

	/**
	 * 迭代每个只任务,获得部分和,相加返回
	 * 
	 * @return
	 */
	public Long getResult() {
		Long result = 0l;
		for (int i = 0; i < cpuCoreNumber; i++) {			
			try {
				Long subSum = completionService.take().get();
				result += subSum;			
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		return result;
	}

	public void close() {
		exec.shutdown();
	}
}

 

 

 

 

 

1. ExecutorService

Java从1.5开始正式提供了并发包,而这个并发包里面除了原子变量,synchronizer,并发容器,另外一个非常重要的特性就是线程池.对于线程池的意义,我们这边不再多说.

上图是线程池的主体类图,ThreadPoolExecutor是应用最为广泛的一个线程池实现(我也将在接下来的文字中详细描述我对这个类的理解和执行机制),ScheduledThreadPoolExecutor则在ThreadPoolExecutor上提供了定时执行的等附加功能,这个可以从ScheduledExecutorService接口的定义中看出来.Executors则类似工厂方法,提供了几个非常常用的线程池初始化方法.

ThreadPoolExecutor

这个类继承了AbstractExecutorService抽象类, AbstractExecutorService主要的职责有2部分,一部分定义和实现提交任务的方法(3个submit方法的实现) ,实例化FutureTask并且交给子类执行,另外一部分实现invokeAny,invokeAll方法.留给子类的方法为execute方法,也就是Executor接口定义的方法.

//实例化一个FutureTask,交给子类的execute方法执行.这种设计能够保证callable和runnable的执行接口方法的一致性(FutureTask包装了这个差别)
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

关于FutureTask这个类的实现,我在前面的JAVA LOCK代码浅析有讲过其实现原理,主要的思想就是关注任务完成与未完成的状态,任务提交线程get()结果时被park住,等待任务执行完成被唤醒,任务执行线程在任务执行完毕后设置结果,并且unpark对应线程并且让其得到执行结果.

回到ThreadPoolExecutor类.ThreadPoolExecutor需要实现除了我们刚才说的execute(Runnable command)方法外,还得实现ExecutorService接口定义的部分方法.但ThreadPoolExecutor所提供的不光是这些,以下根据我的理解来列一下它所具有的特性
1. execute流程
2. 池
3. 工作队列
4. 饱和拒绝策略
5. 线程工厂
6. beforeExecute和afterExecute扩展

execute方法的实现有个机制非常重要,当当前线程池线程数量小于corePoolSize,那么生成一个新的worker并把提交的任务置为这个工作线程的头一个执行任务,如果大于corePoolSize,那么会试着将提交的任务塞到workQueue里面供线程池里面的worker稍后执行,并不是直接再起一个worker,但是当workQueue也满,并且当前线程池小于maxPoolSize,那么起一个新的worker并将该任务设为该worker执行的第一个任务执行,大于maxPoolSize,workQueue也满负荷,那么调用饱和策略里面的行为.

worker线程在执行完一个任务之后并不会立刻关闭,而是尝试着去workQueue里面取任务,如果取不到,根据策略关闭或者保持空闲状态.所以submit任务的时候,提交的顺序为核心线程池——工作队列——扩展线程池.

池包括核心池,扩展池(2者的线程在同一个hashset中,这里只是为了方便才这么称呼,并不是分离的),核心池在池内worker没有用完的情况下,只要有任务提交都会创建新的线程,其代表线程池正常处理任务的能力.扩展池,是在核心线程池用完,并且工作队列也已排满任务的情况下才会开始初始化线程,其代表的是线程池超出正常负载时的解决方案,一旦任务完成,并且试图从workQueue取不到任务,那么会比较当前线程池与核心线程池的大小,大于核心线程池数的worker将被销毁.

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            //>SHUTDOWN就是STOP或者TERMINATED
            //直接返回
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            //如果是SHUTDOWN状态,那么取任务,如果有
              //将剩余任务执行完毕,否则就结束了
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            //如果不是以上状态的(也就是RUNNING状态的),那么如果当前池大于核心池数量,
            //或者允许核心线程池取任务超时就可以关闭,那么从任务队列取任务,
            //如果超出keepAliveTime,那么就返回null了,也就意味着这个worker结束了
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            //如果当前池小于核心池,并且不允许核心线程池取任务超时就关闭,那么take(),直到拿到任务或者被interrupt
            else
                r = workQueue.take();
            //如果经过以上判定,任务不为空,那么返回任务
            if (r != null)
                return r;
            //如果取到任务为空,那么判定是否可以退出
            if (workerCanExit()) {
                //如果整个线程池状态变为SHUTDOWN或者TERMINATED,那么将所有worker interrupt (如果正在执行,那继续让其执行)
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
}
    }

//worker从workQueue中取不到数据的时候调用此方法,以决定自己是否跳出取任务的无限循环,从而结束此worker的运行
private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    try {
        /**//*
        *线程池状态为stop或者terminated,
        *或者任务队列里面任务已经为空,
        *或者允许线程池线程空闲超时(实现方式是从工作队列拿最多keepAliveTime的任务,超过这个时间就返回null了)并且
         *当前线程池大于corePoolSize(>1)
        *那么允许线程结束
        *static final int RUNNING    = 0;
        *static final int SHUTDOWN   = 1;
        *static final int STOP       = 2;
        *static final int TERMINATED = 3;
        */
        canExit = runState >= STOP ||
        workQueue.isEmpty() ||
       (allowCoreThreadTimeOut &&
        poolSize > Math.max(1,corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

当提交任务是,线程池都已满,并且工作队列也无空闲位置的情况下,ThreadPoolExecutor会执行reject操作,JDK提供了四种reject策略,包括AbortPolicy(直接抛RejectedException Exception),CallerRunsPolicy(提交任务线程自己执行,当然这时剩余任务也将无法提交),DiscardOldestPolicy(将线程池的workQueue任务队列里面最老的任务剔除,将新任务丢入),DiscardPolicy(无视,忽略此任务,并且立即返回).实例化ThreadPoolExecutor时,如果不指定任何饱和策略,默认将使用AbortPolicy.

个人认为这些饱和策略并不十分理想,特别是在应用既要保证快速,又要高可用的情况下,我的想法是能够加入超时等待策略,也就是提交线程时线程池满,能够park住提交任务的线程,一旦有空闲,能在第一时间通知到等待线程. 这个实际上和主线程执行相似,但是主线程执行期间即使线程池有大量空闲也不会立即可以提交任务,效率上后者可能会比较低,特别是执行慢速任务.

实例化Worker的时候会调用ThreadFactory的addThread(Runnable r)方法返回一个Thread,这个线程工厂是可以在ThreadPoolExecutor实例化的时候指定的,如果不指定,那么将会使用DefaultThreadFactory, 这个也就是提供给使用者命名线程,线程归组,是否是demon等线程相关属性设置的机会.

beforeExecute和afterExecute是提供给使用者扩展的,这两个方法会在worker runTask之前和run完毕之后分别调用.JDK注释里 Doug Lea(concurrent包作者)展示了beforeExecute一个很有趣的示例.代码如下.

class PausableThreadPoolExecutor extends ThreadPoolExecutor {
    private boolean isPaused;
    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

public PausableThreadPoolExecutor() { super(); }

protected void beforeExecute(Thread t, Runnable r) {
    super.beforeExecute(t, r);
    pauseLock.lock();
    try {
        while (isPaused) unpaused.await();
    } catch (InterruptedException ie) {
        t.interrupt();
    } finally {
        pauseLock.unlock();
    }
}

public void pause() {
    pauseLock.lock();
    try {
        isPaused = true;
    } finally {
        pauseLock.unlock();
    }
}

public void resume() {
    pauseLock.lock();
    try {
        isPaused = false;
        unpaused.signalAll();
    } finally {
        pauseLock.unlock();
    }
}
  }

使用这个线程池,用户可以随时调用pause中止剩余任务执行,当然也可以使用resume重新开始执行剩余任务.

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor是一个很实用的类,它的实现核心是基于DelayedWorkQueue.从ScheduledThreadPoolExecutor的继承结构上来看,各位应该能够看出些端倪来,就是ScheduledThreadPoolExecutor将ThreadPoolExecutor中的任务队列设置成了DelayedWorkQueue,这也就是说,线程池Worker从任务队列中取的一个任务,需要等待这个队列中最短超时任务的超时,也就是实现定时的效果.所以ScheduledThreadPoolExecutor所做的工作其实是比较少的.主要就是实现任务的实例化并加入工作队列,以及支持scheduleAtFixedRate和scheduleAtFixedDelay这种周期性任务执行.

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
           super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);
}

对于scheduleAfFixedRate和scheduleAtFiexedDelay这种周期性任务支持,是由ScheduledThreadPoolExecutor内部封装任务的ScheduledFutureTask来实现的.这个类在执行任务后,对于周期性任务,它会处理周期时间,并将自己再次丢入线程池的工作队列,从而达到周期执行的目的.

private void runPeriodic() {
          boolean ok = ScheduledFutureTask.super.runAndReset();
          boolean down = isShutdown();
          // Reschedule if not cancelled and not shutdown or policy allows
      if (ok && (!down ||(getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) {
                long p = period;
                if (p > 0)
                      time += p;
                else
                      time = triggerTime(-p);

                ScheduledThreadPoolExecutor.super.getQueue().add(this);
         }
         // This might have been the final executed delayed
        // task.  Wake up threads to check.
        else if (down)
              interruptIdleWorkers();
}

2. CompletionService

ExecutorCompletionService

CompletionService定义了线程池执行任务集,可以依次拿到任务执行完毕的Future,ExecutorCompletionService是其实现类,先举个例子,如下代码,这个例子中,需要注意ThreadPoolExecutor核心池一定保证能够让任务提交并且马上执行,而不是放到等待队列中去,那样次序将会无法控制,CompletionService也将失去效果(其实核心池中的任务完成顺序还是准确的).

public static void main(String[] args) throws InterruptedException, ExecutionException{
    ThreadPoolExecutor es=new ThreadPoolExecutor(10, 15, 2000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.AbortPolicy());
    CompletionService<String> cs=new ExecutorCompletionService<String>(es);
    cs.submit(new Callable<String>() {
     @Override
     public String call() throws Exception {
         Thread.currentThread().sleep(1000);
         return "i am sleeped 1000 milliseconds";
     }
    });

    cs.submit(new Callable<String>() {
     @Override
     public String call() throws Exception {
         Thread.currentThread().sleep(5000);
         return "i am sleeped 5000 milliseconds";
     }
    });

    cs.submit(new Callable<String>() {
     @Override
     public String call() throws Exception {
         Thread.currentThread().sleep(4000);
         return "i am sleeped 4000 milliseconds";
     }
    });

    cs.submit(new Callable<String>() {
     @Override
         public String call() throws Exception {
          Thread.currentThread().sleep(2000);
              return "i am sleeped 2000 milliseconds";
      }
});

    for(int i=0;i<4;i++){
        Future<String> fu=cs.take();
        System.out.println(fu.get());
    }
}

执行结果:
i am sleeped 1000 milliseconds
i am sleeped 2000 milliseconds
i am sleeped 4000 milliseconds
i am sleeped 5000 milliseconds
从执行结果看来,我们发现先完成的任务先被拿出来了,直到所有任务被执行完毕,也就是CompletionService的效果达到了.

ExecutorCompletionService并不复杂,关键的一个点就是它的内部类QueueingFuture继承了FutureTask类,并且实现了done()方法,done()方法是在线程池任务执行完毕,最后调用FutureTask的方法(这在 JAVA LOCK代码浅析(http://www.blogjava.net/BucketLi/archive/2010/09/30/333471.html)一文中对于FutureTask代码解析有提到)

QueueingFuture的done()方法实现是将执行完的任务(FutureTask)丢入全局的完成队列中(completionQueue),那么take是从这个blockingqueue中取元素.也就是任务完成就会有元素,即生产者消费者.

这种实现的思想是将原本在单个FutureTask上的等待转化为在BlockingQueue上的等待,即对全部FutureTask的等待,从而达到哪个先完成,哪个就可取执行结果的效果.

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        his.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

总结:
JUC提供的线程池体系核心是在ThreadPoolExecutor, 而ScheduledThreadPoolExecutor和ExecutorCompletionService只是对其扩展,这里没有去细讲Executors这个便捷类,这个类提供很多便捷的线程池构建方法.各位使用的时候不妨去看下.

 

分享到:
评论

相关推荐

    java并发编程-从入门到精通

    Java并发编程是Java开发者必须掌握的关键技能之一,尤其是在开发高性能、多线程的应用时。本教程“java并发编程-从入门到精通”旨在帮助你深入理解这个领域,并逐步提升你的编程能力。 首先,我们要理解Java并发的...

    java并发编程实战源码,java并发编程实战pdf,Java

    《Java并发编程实战》是Java并发编程领域的一本经典著作,它深入浅出地介绍了如何在Java平台上进行高效的多线程编程。这本书的源码提供了丰富的示例,可以帮助读者更好地理解书中的理论知识并将其应用到实际项目中。...

    java并发编程-构建块

    "java并发编程-构建块"这个主题涵盖了使程序能够同时处理多个任务的关键概念和技术。在这个主题下,我们将深入探讨Java中用于构建高效并发应用的核心工具和概念。 1. **线程**:Java中的线程是并发编程的基础,每个...

    Java 并发编程实战.pdf

    《Java并发编程实战》这本书是关于Java语言中并发编程技术的经典著作。它详细介绍了如何在Java环境中有效地实现多线程程序和并发控制机制。在Java平台上,由于其本身提供了强大的并发编程支持,因此,掌握并发编程...

    Java并发编程利器:Executor框架深度解析与应用实践

    在现代Java应用开发中,多线程并发编程已成为提升程序性能的关键技术之一。Java通过引入Executor框架,为并发任务的执行提供了一种高效、灵活的管理机制。本文将深入探讨Executor框架的设计哲学、核心组件,并结合...

    JAVA并发编程实践-中文-高清-带书签-完整版

    《JAVA并发编程实践》是Java开发人员深入理解并发编程的一本经典著作,由Doug Lea撰写,本书中文版高清完整,包含丰富的书签,便于读者查阅和学习。这本书旨在帮助开发者掌握在Java平台上进行高效、安全并发编程的...

    Java-Executor并发框架.docx

    Java并发框架中的Executor服务是Java 1.5引入的核心组件,位于`java.util.concurrent`包下,极大地简化了多线程编程。Executor接口虽然历史悠久,但其重要性不言而喻,很多开发者对其背后的原理并不十分了解。本文将...

    java 并发编程的艺术pdf清晰完整版 源码

    《Java并发编程的艺术》这本书是Java开发者深入理解并发编程的重要参考书籍。这本书全面地介绍了Java平台上的并发和多线程编程技术,旨在帮助开发者解决在实际工作中遇到的并发问题,提高程序的性能和可伸缩性。 ...

    JAVA并发编程艺术pdf版

    《JAVA并发编程艺术》是Java开发者深入理解和掌握并发编程的一本重要著作,它涵盖了Java并发领域的核心概念和技术。这本书详细阐述了如何在多线程环境下有效地编写高效、可靠的代码,对于提升Java程序员的技能水平...

    13-Java并发编程学习宝典.zip

    Java并发编程是软件开发中的重要领域,特别是在大型系统和高并发场景中不可或缺。"13-Java并发编程学习宝典.zip" 包含了一系列关于Java并发编程的学习资源,旨在帮助开发者掌握多线程编程的核心技术和最佳实践。以下...

    java并发编程内部分享PPT

    Java并发编程是Java开发中的重要领域,特别是在多核处理器和分布式系统中,高效地利用并发可以极大地提升程序的性能和响应速度。这份“java并发编程内部分享PPT”显然是一个深入探讨这一主题的资料,旨在帮助开发者...

    Java 并发编程实战-随书源码

    《Java并发编程实战》这本书是Java开发者深入理解并发编程的重要参考书籍。本书旨在帮助程序员解决在多线程环境中遇到的实际问题,提升系统性能并保证其稳定性。随书源码提供了丰富的示例,让读者能够动手实践,加深...

    Java并发编程-并发编程知识点总结.docx

    ### Java并发编程知识点总结 #### 1. 什么是线程? 线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。程序员可以通过线程进行多处理器编程,利用多线程对运算密集型任务...

    Java并发编程从入门到精通(pdf)(附源码)

    《Java并发编程从入门到精通》是一本专为Java开发者设计的深度学习并发编程的书籍。作者韩剑锋,凭借其12年的IT行业经验,曾担任多家IT公司的研发总监和技术总监,以其丰富的实战经验和深厚的理论知识,为读者提供了...

    java并发编程书籍

    Java并发编程是软件开发中的一个关键领域,尤其是在大型企业级应用和分布式系统中。通过学习相关的书籍,开发者可以深入理解如何有效地设计和实现高效的多线程应用程序,避免并发问题,如竞态条件、死锁、活锁等。...

    《Java并发编程的艺术》

    《Java并发编程的艺术》内容涵盖Java并发编程机制的底层实现原理、Java内存模型、Java并发编程基础、Java中的锁、并发容器和框架、原子类、并发工具类、线程池、Executor框架等主题,每个主题都做了深入的讲解,同时...

    Java并发编程实践.pdf

    ### Java并发编程实践 #### 一、并发编程基础 ##### 1.1 并发与并行的区别 在Java并发编程中,首先需要理解“并发”(Concurrency)和“并行”(Parallelism)的区别。“并发”指的是多个任务在同一时间段内交替...

    Java并发编程实践-电子书1-9章pdf

    《Java并发编程实践》是Java开发者深入理解并发编程的重要参考资料,尤其对于想要提升多线程应用设计和性能优化技能的程序员来说,这本书提供了丰富的实践经验和深入的理论知识。以下是根据提供的章节内容概述的一些...

    JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf

    根据提供的文件信息,“JAVA并发编程实践 中文 高清 带书签 完整版 Doug Lea .pdf”,我们可以推断出这份文档主要聚焦于Java并发编程的技术实践与理论探讨。下面将从多个角度来解析这个文档可能涵盖的关键知识点。 ...

Global site tag (gtag.js) - Google Analytics