`
coolxing
  • 浏览: 875137 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
9a45b66b-c585-3a35-8680-2e466b75e3f8
Java Concurre...
浏览量:97648
社区版块
存档分类
最新评论

task与execution--JCIPC08读书笔记

阅读更多

[本文是我对Java Concurrency In Practice C08的归纳和总结.  转载请注明作者和出处,  如有谬误, 欢迎在评论中指正. ]

task和线程池执行机制之间隐式的耦合

前面曾提到过, 线程池的应用解耦了task的提交和执行. 事实上, 这有所夸大, 因为不是所有的task都适用于所有的执行机制, 某些task要求在特定的线程池中执行:

1. 非独立task, 指的是依赖于其他task的任务. 

2. 要求在单线程中运行的task. 某些task不是线程安全的, 无法并发运行. Executors.newSingleThreadExecutor()方法返回的线程池只包含单个线程, 提交给该线程池的task将缓存在一个无界队列中, 线程池中所包含的单个线程将依次从队列中取出task运行.

3. 响应时间敏感的task. 某些task要求必须在极短的时间内开始执行, 比如GUI应用中处理用户点击操作的task. 假如提交给某一线程池的task既包含long-running task, 也包含响应时间敏感的task, 那么响应时间敏感的task可能无法在极短的时间内得到执行. 

4. 使用了ThreadLocal类的task. 线程池的标准实现可能会在空闲时销毁多余的线程, 繁忙时创建更多的线程, 更有可能重用线程. 所以使用了ThreadLocal的task不应该提交给线程池运行, 除非ThreadLocal的使用只限定在单个task内, 不用于多个task之间通信.

 

线程饥饿死锁

如果提交给线程池运行的task之间不是相互独立的, 就有可能出现线程饥饿死锁. 比如提交给SingleThreadExecutor执行的2个task, task A在执行过程中需要等待task B的执行结果才能继续, 而此时没有多余的线程用于执行task B, 如此就发生了线程饥饿死锁.

public class StarvationDeadLock {
	public static void main(String[] args) {
		final ExecutorService executor = Executors.newSingleThreadExecutor();
		final Runnable taskB = new Runnable() {
			@Override
			public void run() {
				//...
			}
		};
		Runnable taskA = new Runnable() {
			@Override
			public void run() {
				Future<?> future = executor.submit(taskB);
				try {
					System.out.println("waiting for taskB complete");
					// get方法将阻塞, 直到taskB执行完成
					// 但是由于线程池中只有一个线程, 而该线程已经被taskA占用, 所以taskB将没有机会执行. 
					// 此时就发生了线程饥饿死锁
					future.get();
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				} catch (ExecutionException e) {
					e.printStackTrace();
				}
				//...
			}
		};
		executor.submit(taskA);
	}
}

不仅SingleThreadExecutor执行相互依赖的task时会发生死锁, 其他线程池执行相互依赖的task时也可能发生死锁:

public class StarvationDeadLock {
	public static void main(String[] args) {
		final ExecutorService executor = Executors.newFixedThreadPool(3);
		// 设定await在Barrier对象上的线程数达到4个时, 其await方法才释放
		final CyclicBarrier barrier = new CyclicBarrier(4);
		
		// 重复提交4个task, 每个task都await在barrier对象上
		// barrier的await方法将一直阻塞, 直到4个线程都到达await点.
		// 但是线程池中只有3个线程, 不可能出现4个线程都达到await点的情形, 所以依然会发生死锁
		for (int i = 0; i < 4; i++) {
			executor.submit(new Runnable() {
				@Override
				public void run() {
					try {
						System.out.println("waiting for other tasks arriving at common point");
						barrier.await();
					} catch (InterruptedException e) {
						Thread.currentThread().interrupt();
					} catch (BrokenBarrierException e) {
						e.printStackTrace();
					}
				}
			});
		}
	}
} 

避免相互依赖的task提交给同一线程池执行时发生死锁的唯一方法是: 线程池中的线程足够多. 

 

确定线程池的size

如果线程池的size过大, 将造成内存等资源的浪费, 甚至使得资源耗尽. 如果线程池的size过小, 将造成CPU的利用率不高. 确定合适的size需要考虑:CPU数, 内存, 是计算密集型task还是I/O密集型task, 是否需要获取稀缺资源(比如数据库连接)等.

对于计算密集型task, 合适的size大约为CPU数量+1. 对于I/O占较大比例的task, 合适的size可以通过以下公式确定: size = CPU数量 * CPU利用率 * (1 + I/O时间比例). Runtime.getRuntime().availableProcessors()返回CPU的个数.

当然, 实际开发中size还受到内存, 文件句柄, socket, 数据库连接数等稀缺资源的约束. 将总的稀缺资源除以每一个task使用的资源数, 能得到线程数的上限. 

 

循环并行化

如果循环体所进行的操作是相互独立的, 这样的循环可以并发的运行:

// 循环操作
void processSequentially(List<Element> elements) {
	for (Element e : elements)
		process(e);
}

// 将相互独立的循环操作转变为并发操作
void processInParallel(Executor exec, List<Element> elements) {
	for (final Element e : elements) {
		exec.execute(new Runnable() {
			public void run() {
				process(e);
			}
		});
	}
	exec.shutdown(); 
	exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} 

如果希望同时提交一系列task, 并且等待它们执行完毕, 可以调用ExecutorService.invokeAll方法.

如果希望task执行完毕之后就获取其执行结果, 可以使用CompletionService.

 

1
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics