- 浏览: 1336242 次
- 性别:
- 来自: 成都
文章分类
- 全部博客 (471)
- 原创文章 (4)
- Database (84)
- J2SE (63)
- Web (26)
- Javascript (30)
- Lucene (11)
- os (13)
- 算法 (8)
- Webservice (1)
- Open projects (18)
- Hibernate (18)
- Spring (15)
- Css (2)
- J2ee (2)
- 综合技术 (18)
- 安全管理 (13)
- PatternsInJava (27)
- NIO (5)
- Ibatis (2)
- 书籍收藏 (1)
- quartz (7)
- 并发编程 (15)
- oracle问题 (2)
- ios (60)
- coco2d-iphone (3)
- C++ (6)
- Zookeeper (2)
- golang (4)
- animation (2)
- android (1)
最新评论
-
dandingge123:
【引用】限制UITextField输入长度的方法 -
qja:
...
对List顺序,逆序,随机排列实例代码 -
安静听歌:
现在在搞这个,,,,,哎~头都大了,,,又freemarker ...
通用大型网站页面静态化解决方案(一) -
springdata-jpa:
java quartz定时任务demo教程源代码下载,地址:h ...
Quartz 配置参考 -
马清天:
[b][/b][list][*]引用[u][/u][/list ...
通用大型网站页面静态化解决方案(一)
使用线程池
* 什么样的任务需要特殊的执行策略呢?
1) 有依赖关系的任务.
想想如果提交给线程池的任务之间是有依赖关系的, 怎么办?
2) 在原本单线程访问外部资源的情况下, 如果突然改成了线程池呢?
3) 考虑即时响应性的程序.
如果在用户界面提交了一个长时间运行的任务而线程池是单线程的 , 或者提交N个长时间运行的任务, 而线程池只有N-1的线程数, 会怎样?
4) 不要在线程池的线程里面使用ThreadLocal对象
线程池可以自由使用/重用/替换某个工作线程, 因而可能造成工作线程所访问的ThreadLocal风格的对象被调换或重置或清除. 这一点很严重. 除非ThreadLocal的对象有特定的生命周期, 并且可以在线程池对线程的"借出"周期内完成, 才可以谨 慎考虑使用.
考虑下Hibernate? 很多情况下, 如果用EJB容器+Hibernate, 情况会怎样? hibernate是否会在session退出的时候清理ThreadLocal对象?
* 线程池最理想的工作情景是: 所处理的任务全部是同类的, 或者全部是不相关的
* 考虑线程死锁
当然这发生在有相互关系的线程身上. 如果提交的任务之间有结果依赖关系的时候, 要考虑进去
public class ThreadDeadlock {
ExecutorService exec = Executors.newSingleThreadExecutor();
public class RenderPageTask implements Callable<String> {
public String call() throws Exception {
Future<String> header, footer;
header = exec.submit(new LoadFileTask("header.html"));
footer = exec.submit(new LoadFileTask("footer.html"));
String page = renderBody();
// Will deadlock -- task waiting for result of subtask
// get()是取自一个阻塞队列的数据?
return header.get() + page + footer.get();
}
}
}
* 如果确实有需要长时间运行的任务, 并且客户端需要及时的响应, 可以考虑使用有时间限制版本的线程API, 很多API如Thread.join, BlockingQueue.put, CountDownLatch.await, and Selector.select都有不限时和定时的版本. 对于超时的情况可以及时反馈给客户端
* 考虑线程池的大小, 太大造成资源过度消耗, 太小造成吞吐量不足
1) 计算机的cpu数量, 内存
2) 任务分为哪些种类? (可以为不同类型的任务使用不同的线程池)
3) cpu利用率
@? EJB规范对线程的限制? 在session bean调用之后不能启动新的线程?
对于可能出现阻塞操作的线程, 线程池的容量应该要大一些, 以适应吞吐量的要求. 比如使用N[cpu]+1个线程的线程池, 就可以在"某一个"线程被阻塞或者出现问题的时候, 仍有空闲的线程来处理"额外"的请求.
要更准确的估计线程池容量的大小, 就要考虑到线程执行所需大概的时间, 这个时间不一定很精确, 但是可以通过性能测试来比较, 试着调整线程池的容量, 在高并发的情况下, 吞吐量和cpu,内存使用率之间需要一个折中的结果.
给出一个估算的"公式"
N[cpu] : Number of cpu, 表示cpu个数(从Runtime.availableProcessors获得)
U[cpu] : Utilization of cpu, 表示期望的cpu使用率, 1%~100%
W/C : Wait time/Computing time, 可能出现的等待时间除以实际计算时间
在这三个因素的影响下, 线程数的"公式"为
N[thread] = N[cpu] * U[cpu] * (1+W/C)
比如一个双核CPU的计算机且希望cpu利用率为30%, 且某一个任务需要等待数据库连接返回结果0.2s, 实际计算时间0.1s
N[thread] = 2 * 30% * (1+0.2/0.1) = 1.8, 即为2线程.
但是如果我们期待数据库的等待时间为0.05, 那么我们就要把这个代价分摊开, 需要提高响应的线程数(当然同时提高了CPU占用率).
还有些有趣的情况, 如果这些任务每一个都需要使用一个从连接池获取的连接, 那么其实线程池的容量已经被连接池的容量所限制了, 连接池的容量便成为了线程池容量的上限, 因为如果过多的线程也无法获取连接, 只能阻塞.
* 核心线程数(core pool size), 最大线程数
核心线程数其实就是目标线程数,即使没有任务可以执行, 线程池也尝试维护这个数量的线程. 如果工作队列的满了再创建更多的线程. 最大线程数是不言而喻的, 当任务超过这个数量的时候, 多余的任务讲被挂起. 如果一个线程空闲下来, 超过了空闲时间限制, 就有可能被回收.
Whena ThreadPoolExecutor is initially created, the core threads are not started immediately but instead as tasks are submitted, unless you call prestartAllCoreThreads
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }
Executors.newFixedThreadPool: 核心线程数=最大线程数=设定的线程数
newCachedThreadPool : 核心线程数=设定的线程数
最大线程数=Integer.MAX_VALUE
超时时间=1分钟, 且会自动收缩
* 使用请求队列的好处: 防止大量并发的请求对服务器端造成冲击, 并且在压力过大的情况下抛弃某些请求.
ThreadPoolExecutor allows you to supply a BlockingQueue to hold tasks awaiting execution. There are three basic approaches to task queueing: unbounded queue, bounded queue, and synchronous handoff
newFixedThreadPool和newSingleThreadExecutor 固定了线程池的线程数, 但是用的任务队列是"无限"的队列, 可以提交任意多的任务, 但这样带来的问题就是内存使用率的直线上升
使用有容量限制的ArrayBlockingQueue, LinkedBlockingQueue , Priority-BlockingQueue利于降低内存,cpu使用率
使用SynchronousQueue其实是忽略了在producer/consumer之间的队列,这是一种在线程间传递请求的机制. 要求是必须有可用的线程, 或者能按需创建新线程, 否则请求会被拒绝. 实际上, SynchronousQueue适用于线程池是无限大或者可以拒绝请求的情况.
* 再提一次:
LinkedBlockingQueue or ArrayBlockingQueue 按顺序处理
PriorityBlockingQueue: 要求任务实现Comparable或提供一个Comparator
* 饱和策略
说的是, 当任务队列满了的时候, 该怎么办? 通常是拒绝, 但是具体怎么拒绝, 也有一些讲究
ThreadPoolExecutor.setRejectedExecutionHandler 可用的有 AbortPolicy, CallerRunsPolicy, DiscardPolicy, and DiscardOldestPolicy
AbortPolicy: 抛出异常 RejectedExecutionException
CallerRunsPolicy: "借用"调用者的线程来执行, 即不使用线程池内的线程
DiscardPolicy: 忽略任务
DiscardOldestPolicy: 忽略即将执行的下一个任务
如果同PriorityBlockingQueue合用, 那么被忽略的将是优先级最高的.
ThreadPoolExecutor executor= new ThreadPoolExecutor(
N_THREADS, N_THREADS,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(CAPACITY));
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
也可以使用Semaphore来代替有界队列
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
* Thread Factories
线程池是使用Thread Factory来提供线程的, 当有需要的时候, 会调用ThreadFactory.newThread
public interface ThreadFactory {
Thread newThread(Runnable r);
}
显然是为了自己一些目的才实现自己的ThreadFactory
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
public Thread newThread(Runnable runnable) {
return new MyAppThread(runnable, poolName);
}
}
如果在线程池里面创建一个这样的线程, 那么这个线程池就有了log和异常处理的功能
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); }
public MyAppThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t,
Throwable e) {
log.log(Level.SEVERE,
"UNCAUGHT in thread " + t.getName(), e);
}
});
}
public void run() {
// Copy debug flag to ensure consistent value throughout.
boolean debug = debugLifecycle;
if (debug) log.log(Level.FINE, "Created "+getName());
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) log.log(Level.FINE, "Exiting "+getName());
}
}
public static int getThreadsCreated() { return created.get(); }
public static int getThreadsAlive() { return alive.get(); }
public static boolean getDebug() { return debugLifecycle; }
public static void setDebug(boolean b) { debugLifecycle = b; }
}
当然, ThreadExecutor创建后还是可以做一些额外的配置
ExecutorService exec = Executors.newCachedThreadPool();
if (exec instanceof ThreadPoolExecutor)
((ThreadPoolExecutor) exec).setCorePoolSize(10);
else
throw new AssertionError("Oops, bad assumption");
不过注意, Executors.newFixedThreadPool()和newSingleThreadPool()不能重新配置
* ThreadPoolExecutor的扩展
可覆盖的方法有beforeExecute, afterExecute, and terminate
很简单, 看代码就行
public class TimingThreadPool extends ThreadPoolExecutor {
private final ThreadLocal<Long> startTime
= new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end %s, time=%dns",
t, r, taskTime));
} finally {
super.afterExecute(r, t);
}
}
protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns",
totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}
* 并行算法, 有点意思
串行方式:
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
最简单的并行, 不过只适用于各个任务是相对独立的, 并且每个任务不需要返回值:
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements)
exec.execute(new Runnable() {
public void run() { process(e); }
});
}
* 递归算法的简单并行化
这是一种很简单的情况, 每个递归不需要返回值. 当然, 在需要返回值的情况下, 可以考虑把值存放在其他地方
原始算法:
public<T> void sequentialRecursive(List<Node<T>> nodes,Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
并行化:
public<T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(new Runnable() {
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}
我原来就做过类似这样的产品, 涉及到这类递归, 但是更复杂一些, 因为递归的对象不是一棵树, 而是一张图, 也就是说有着多个起点, 每一个节点可以有多个父节点... 想来兴许可以考虑把它并行化
看一个例子, 搜索迷宫出口的代码, 用并发的方式来实现
public class ConcurrentPuzzleSolver<P, M> {
private final Puzzle<P, M> puzzle;
private final ExecutorService exec;
private final ConcurrentMap<P, Boolean> seen;
final ValueLatch<Node<P, M>> solution
= new ValueLatch<Node<P, M>>();
...
public List<M> solve() throws InterruptedException {
try {
P p = puzzle.initialPosition();
exec.execute(newTask(p, null, null));
// block until solution found
/*留意这里是一个阻塞方法, 如果这个迷宫没有出路, 这里就永远阻塞了*/
Node<P, M> solnNode = solution.getValue();
return (solnNode == null) ? null : solnNode.asMoveList();
} finally {
exec.shutdown();
}
}
protected Runnable newTask(P p, M m, Node<P,M> n) {
return new SolverTask(p, m, n);
}
class SolverTask extends Node<P, M> implements Runnable {
...
public void run() {
if (solution.isSet()
|| seen.putIfAbsent(pos, true) != null)
return; // already solved or seen this position
if (puzzle.isGoal(pos))
solution.setValue(this);
else
for (M m : puzzle.legalMoves(pos))
exec.execute(
newTask(puzzle.move(pos, m), m, this));
}
}
}
当然, 在上面的代码里面, 为了解决可能出现的阻塞问题, 可以使用一个计数器,保存每个任务运行的状态.
private final AtomicInteger taskCount = new AtomicInteger(0);
每创建一个线程, 就增加1, 每执行完毕一次, 就减1, 当taskCount为0的时候,清空solution即可
if (taskCount.decrementAndGet() == 0)
solution.setValue(null);
转自:http://hi.baidu.com/iwishyou2/blog/item/479388649e96fbf8f6365404.html
发表评论
-
通过 Terracotta实现基于Tomcat的Web应用集群教程
2011-09-28 17:08 4167http://forums.terracotta.org/fo ... -
Future接口的应用
2011-04-07 14:16 1208import java.util.concurren ... -
ScheduledThreadPool
2011-04-07 14:16 2483使用延迟线程池可以指定任务在特定的时延之后执行。下面是一个例子 ... -
多线程摘录 009
2011-04-07 14:11 1290Atomic Variable & nonblocki ... -
多线程摘录 008
2011-04-07 14:10 1655* !!! synchronized使得在等待锁定的线程无法被 ... -
多线程摘录 007
2011-04-07 14:09 668* 测试多线程程序的安全性和生存型 - 不要做出& ... -
多线程摘录 006
2011-04-07 14:08 1237生存性的问题* 死锁最常见的情况, 就是一组线程处理任务的时候 ... -
多线程摘录 004
2011-04-07 14:06 1519* 使用哪种模式的并发?观察一下简单的服务器方式一: ... -
用ReentrantLock模拟宴会的热闹情景
2011-04-07 14:04 1999一个简单的ReentrantLock的例子, 情景是几个朋友吃 ... -
多线程摘录 003
2011-04-07 14:03 1229同步器A synchronizer is any objec ... -
多线程摘录 002
2011-04-07 14:02 1235设计线程安全的类需要 ... -
多线程摘录 001
2011-04-07 14:00 1337需要考虑什么?* 正确性. 程序必须能得到正确的结果* 生存性 ... -
java 5.0 多线程编程实践
2011-04-07 13:49 1149Java5增加了新的类库并发集java.util.concu ... -
Java ExecutorService线程
2011-04-07 13:48 5844ExecutorService 建立多线程的步骤: 1。 ...
相关推荐
在Delphi编程环境中,多线程技术是一种强大的工具,它允许程序同时执行多个任务,从而提高应用程序的响应性和效率。本教程将引导你深入理解Delphi中的多线程编程,并通过实例来帮助你掌握相关技能。 一、多线程概念...
《C语言多进程多线程编程》是一本深入探讨C语言在并发编程领域的专业书籍。在计算机科学中,进程和线程是操作系统中并行执行任务的基本单位,理解和掌握它们对于提升程序性能和优化资源利用至关重要。这本书籍针对...
从内容摘录中无法直接得知程序是如何处理线程同步问题的,但可以确定在多线程环境中运行排序算法时,这一点尤为重要。 图形化展示排序过程的核心是ShowData方法。这个方法使用了Windows Forms中的Graphics类来绘制...
摘要:VC/C++源码,系统相关,多线程 与VC++爱好者们分享一个很不错的多线程MultiThread应用例子,你可将之中的模块摘录出来供需要者使用。本多线程模块实例以新建文件写入数据为操作目的,创建多个线程分别创建文件,...
同时,他可能会讨论Java的特性,如多线程、反射、泛型等,以及这些特性和设计模式的结合使用。 通过对《Java与模式 阎宏 摘录》的学习,开发者不仅可以提升自己的编程技巧,还能深入了解如何在实际工作中选择和应用...
A:晕,这最终还是调用了老汉多线程……那和线程也没什么区别吧……你应该再试一试线程池…… B:不完全是,因为纤程要先ConvertThreadToFiber,才能CreateFiber,VB中就一个线程,你把它Convert成纤程,那纤程...
这部分可能涉及到在多线程环境中保护GUI资源的同步机制。在Windows环境下,可能使用标准的互斥对象来实现这一功能,而在其他环境(如单片机)中,可能需要自定义的解决方案。 总的来说,ucgui是一个提供图形用户...
包括了c++经典笔试题,多线程编程,操作系统,数据库,网络相关知识。以及一些经典面经
ACE Reactor框架是其核心部分之一,它是一个事件多路分离器,能够在一个进程或线程中处理多个客户端连接的事件。使用Reactor框架开发用户程序相对简单,主要包括三个步骤:首先,从`ACE_Event_Handler`派生出子类并...
根据提供的文件信息,我们可以推断出这部分内容主要讨论了多线程编程中的关键概念与技术。下面将基于这些有限的信息,展开对多线程编程中的一些核心知识点进行详细阐述。 ### 多线程编程概述 多线程编程是现代软件...
《精通VB.pdf》:这是一本高级教程,旨在帮助读者从熟悉VB到精通,内容可能涉及更复杂的编程概念和技术,如面向对象编程、高级控件应用、多线程处理、网络编程等,有助于提升编程技能。 《VB-函数-速查手册.pdf》:...
2. **控制共享资源的访问**:在多线程环境下,可以确保共享资源被正确地管理,避免资源竞争问题。 3. **简化配置过程**:单例模式下的对象通常作为配置或参数传递给其他对象使用,简化了配置过程。 #### 实现方式 ...
可以通过锁(lock)机制、读写锁、线程静态字段(ThreadStaticAttribute)等方式确保多线程环境下的安全性。 #### 4.3 什么是装箱与拆箱? 装箱是指将值类型转换为引用类型的过程;拆箱则是相反的过程。这在使用泛型...
相反,多线程技术可以在较少核心的情况下提供更好的性能提升,因为它可以在同一核心上同时执行多个线程,增加处理效率,并且在开发难度、功耗和面积上,多线程技术更有优势。作者提出未来CPU发展不应只是在多核上...
本资源是关于Python语言程序设计的教程,总共分为10章,涵盖了Python编程语言的基础知识、数据类型、控制结构、函数、模块、文件处理、异常处理、面向对象编程、多线程编程等方面的内容。 第一章至第四章主要介绍了...
线程.rar可能包含的是关于CvtCNKI的多线程下载技术文档,这通常意味着软件可能利用多线程技术加速从CNKI等网站下载文献,提高效率,尤其对于大量文献的下载来说,这是一个非常实用的功能。 CvtCNKI-v2.0.1是软件的...
并发编程是一个广泛的话题,涉及到线程的创建、管理、数据共享、同步、内存模型、设计无锁数据结构、设计基于锁的数据结构、并发代码设计、高级线程管理和多线程应用程序的测试与调试等多个方面。 在描述中,书籍的...
在IT行业中,电子阅读器是一种专门用于阅读电子书的软件或设备,它们通常具有文本...这个修复过程涉及到了C#中对剪贴板的操作、可能的多线程同步问题以及异常处理策略,这些都是开发高质量软件时必须考虑的关键因素。