- 浏览: 11600 次
- 性别:
文章分类
最新评论
JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用简介收藏
在多线程大师Doug Lea的贡献下,在JDK1.5中加入了许多对并发特性的支持,例如:线程池。
一、简介
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四个选择:
ThreadPoolExecutor.AbortPolicy()
抛出java.util.concurrent.RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy()
重试添加当前的任务,他会自动重复调用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy()
抛弃旧的任务
ThreadPoolExecutor.DiscardPolicy()
抛弃当前的任务
二、一般用法举例
说明:
1、在这段程序中,一个任务就是一个Runnable类型的对象,也就是一个ThreadPoolTask类型的对象。
2、一般来说任务除了处理方式外,还需要处理的数据,处理的数据通过构造方法传给任务。
3、在这段程序中,main()方法相当于一个残忍的领导,他派发出许多任务,丢给一个叫 threadPool的任劳任怨的小组来做。
这个小组里面队员至少有两个,如果他们两个忙不过来,任务就被放到任务列表里面。
如果积压的任务过多,多到任务列表都装不下(超过3个)的时候,就雇佣新的队员来帮忙。但是基于成本的考虑,不能雇佣太多的队员,至多只能雇佣 4个。
如果四个队员都在忙时,再有新的任务,这个小组就处理不了了,任务就会被通过一种策略来处理,我们的处理方式是不停的派发,直到接受这个任务为止(更残忍!呵呵)。
因为队员工作是需要成本的,如果工作很闲,闲到 3SECONDS都没有新的任务了,那么有的队员就会被解雇了,但是,为了小组的正常运转,即使工作再闲,小组的队员也不能少于两个。
4、通过调整 produceTaskSleepTime和 consumeTaskSleepTime的大小来实现对派发任务和处理任务的速度的控制,改变这两个值就可以观察不同速率下程序的工作情况。
5、通过调整4中所指的数据,再加上调整任务丢弃策略,换上其他三种策略,就可以看出不同策略下的不同处理方式。
6、对于其他的使用方法,参看jdk的帮助,很容易理解和使用。
JDK1.5新特性 (三) - 线程池(2)
2008-08-14 14:38
3.2 JDK1.5中的线程池
3.2.1 简单介绍
在J2SE(TM)5.0 中,Doug Lea 编写了一个优秀的并发实用程序开放源码库 util.concurrent,它包括互斥、信号量、诸如在并发访问下执行得很好的队列和散列表之类集合类以及几个工作队列实现。该包中的 PooledExecutor 类是一种有效的、广泛使用的以工作队列为基础的线程池的正确实现。Util.concurrent 定义一个 Executor 接口,以异步地执行 Runnable,另外还定义了 Executor 的几个实现,它们具有不同的调度特征。将一个任务排入 executor 的队列非常简单:
Executor executor = new QueuedExecutor();
…
Runnable runnable = … ;
executor.execute(runnable);
PooledExecutor 是一个复杂的线程池实现,它不但提供工作线程(worker thread)池中任务的调度,而且还可灵活地调整池的大小,同时还提供了线程生命周期管理,这个实现可以限制工作队列中任务的数目,以防止队列中的任务耗尽所有可用内存,另外还提供了多种可用的关闭和饱和度策略(阻塞、废弃、抛出、废弃最老的、在调用者中运行等)。所有的 Executor 实现为您管理线程的创建和销毁,包括当关闭 executor 时,关闭所有线程,
3.2.2 线程池的使用
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
· corePoolSize
线程池维护线程的最少数量
· maximumPoolSiz
线程池维护线程的最大数量
· keepAliveTime
线程池维护线程所允许的空闲时间
· unit
线程池维护线程所允许的空闲时间的单位
· workQueue
线程池所使用的缓冲队列
· handler
线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四个选择:
· ThreadPoolExecutor.AbortPolicy()
抛出java.util.concurrent.RejectedExecutionException异常
· ThreadPoolExecutor.CallerRunsPolicy()
重试添加当前的任务,他会自动重复调用execute()方法
· ThreadPoolExecutor.DiscardOldestPolicy()
抛弃旧的任务
· ThreadPoolExecutor.DiscardPolicy()
抛弃当前的任务
用法举例
对这两段程序的说明:
1. 在这段程序中,一个任务就是一个Runnable类型的对象,也就是一个ThreadPoolTask类型的对象。
2. 一般来说任务除了处理方式外,还需要处理的数据,处理的数据通过构造方法传给任务。
3. 在这段程序中,main()方法相当于一个残忍的领导,他派发出许多任务,丢给一个叫 threadPool的任劳任怨的小组来做。
o 这个小组里面队员至少有两个,如果他们两个忙不过来, 任务就被放到任务列表里面。
o 如果积压的任务过多,多到任务列表都装不下(超过3个)的时候,就雇佣新的队员来帮忙。但是基于成本的考虑,不能雇佣太多的队员, 至多只能雇佣 4个。
o 如果四个队员都在忙时,再有新的任务, 这个小组就处理不了了,任务就会被通过一种策略来处理,我们的处理方式是不停的派发, 直到接受这个任务为止(更残忍!呵呵)。
o 因为队员工作是需要成本的,如果工作很闲,闲到 3SECONDS都没有新的任务了,那么有的队员就会被解雇了,但是,为了小组的正常运转,即使工作再闲,小组的队员也不能少于两个。
4. 通过调整 produceTaskSleepTime和 consumeTaskSleepTime的大小来实现对派发任务和处理任务的速度的控制, 改变这两个值就可以观察不同速率下程序的工作情况。
5. 通过调整4中所指的数据,再加上调整任务丢弃策略, 换上其他三种策略,就可以看出不同策略下的不同处理方式。
6. 对于其他的使用方法,参看jdk的帮助,很容易理解和使用。
JDK1.5新特性 (3) - 线程池
来源:互联网 作者:不详 时间:2008-06-04 Tag:JDK 1.5 线程池 点击: 7
3. 线程池
3.1简单的线程池实现
我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。以下代码实现了具有线程池的工作队列。
虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。
用线程池执行任务
如果你开发项目的时候用到很多的short-lived任务,这里推荐使用"线程池"这项技术。你可以创建一个线程池来执行池中的的任务,来取代每次执行任务是都要为新的任务来new和discard。如果一个线程在池中是可用状态,那么任务将立即执行。执行完成之后线程返回池中,否则,任务将一直等待直到有线程处在可用状态。
J2SE 5.0为大家提供了一个新的java.util.concurrent package,并且在这个报中提供了一个pre-built 的线程池架构。在java.util.concurrent中提供了一个Executor 接口,里面有一个execute的方法,参数是Runnable 类型
public interface Executor {
public void execute(Runnable command);
}
使用线程池架构,你就必须创建一个Executor实例,然后你给他分配一些runnable任务,例如:
然后你创建或者找到Executor的实现类,实现类可以立即(或者连续)执行分配的任务,例如:
concurrency utilities也包括了一个ThreadPoolExecutor类,它提供了很多对线程的一般性操作,提供了四个构造函数,每个都可以指定如:线程池大小,持续时间,一个线程factory,和拒绝线程的handler。
但是你不必声明构造函数,Executors类会为你创建一个线程池。在一种最简单的情况下,你在Executors类中声明了newFixedThreadPool方法,并且在池中分配了许多线程。你可以使用ExecutorService(继承Executor的一个接口),去execute和submit 那些Runnable任务,使用ExecutorService中的submit方法可以得到一个返回结果,当然submit也可以返回一个Future对象用来检查任务是否执行。
让我们来先做一个Runnable类,名字为NamePrinter,它通知你运行、暂停、和耗费的时间。
然后下面是我们测试的项目UsePool,它创建一个有三个线程的线程池,分配了10个任务给它(运行10次NamePrinter),UsePool在被shutdown 和 awaitTermination之前将等待并执行分配的任务。一个ExecutorService必须要在terminated之前执行shutdown,shutdownNow方法是立即尝试shutdown操作。shutdownNow 方法将返回没有被执行的任务。
注意前三个NamePrinter对象启动的非查的快,之后的NamePrinter对象每次启动都要等待前面的执行完成。
在J2SE 5.0有非常多的pooling framework可以用,例如,你可以创建一个scheduled线程池……
更多信息还是看官方的concurrency utilities,地址:http://java.sun.com/j2se/1.5.0/docs/guide/concurrency/
jdk5.0多线程学习笔记(一)
关键字: jdk5 线程 并发
先来复习一下什么是线程:
线程有时称为 轻量级进程。与进程一样,它们拥有通过程序运行的独立的并发路径,并且每个线程都有自己的程序计数器,称为堆栈和本地变量。然而,线程存在于进程中,它们与同一进程内的其他线程共享内存、文件句柄以及每进程状态。
一个进程中的线程是在同一个地址空间中执行的,所以多个线程可以同时访问相同对象,并且它们从同一堆栈中分配对象。
在 JDK 5.0 之前,确保线程安全的主要机制是 synchronized 原语。访问共享变量(那些可以由多个线程访问的变量)的线程必须使用同步来协调对共享变量的读写访问。
创建线程的方法:
可以用两种方法创建线程,通过扩展 Thread 和覆盖 run() 方法,或者通过实现 Runnable 接口和使用 Thread(Runnable) 构造函数:
创建线程会使用相当一部分内存,其中包括有两个堆栈(Java 和 C),以及每线程数据结构。如果创建过多线程,其中每个线程都将占用一些 CPU 时间,结果将使用许多内存来支持大量线程,每个线程都运行得很慢。这样就无法很好地使用计算资源。
下面的代码就是一段不好的利用线程的代码:
当服务器被请求吞没时,UnreliableWebServer 类不能很好地处理这种情况。每次有请求时,就会创建新的类。根据操作系统和可用内存,可以创建的线程数是有限的。不幸的是,您通常不知道限制是多少 —— 只有当应用程序因为 OutOfMemoryError 而崩溃时才发现。如果足够快地向这台服务器上抛出请求的话,最终其中一个线程创建将失败,生成的 Error 会关闭整个应用程序。
为任务创建新的线程并不一定不好,但是如果创建任务的频率高,而平均任务持续时间低,我们可以看到每项任务创建一个新的线程将产生性能(如果负载不可预知,还有稳定性)问题.
使用线程池解决问题
管理一大组小任务的标准机制是组合工作队列和线程池。工作队列就是要处理的任务的队列,线程池是线程的集合,每个线程都提取公用工作队列。当一个工作线程完成任务处理后,它会返回队列,查看是否有其他任务需要处理。如果有,它会转移到下一个任务,并开始处理。作为一种额外好处,因为请求到达时,线程已经存在,从而可以消除由创建线程引起的延迟。因此,可以立即处理请求,使应用程序更易响应。而且,通过正确调整线程池中的线程数,可以强制超出特定限制的任何请求等待,直到有线程可以处理它,它们等待时所消耗的资源要少于使用额外线程所消耗的资源,这样可以防止资源崩溃。
说了半天,上段代码
java.util.concurrent 包中包含灵活的线程池实现,Executor就是这个包中的。(以后会对其进行详细的介绍,但不在本文内)
创建 Executor 时,人们普遍会问的一个问题是"线程池应该有多大?"
用 WT 表示每项任务的平均等待时间,ST 表示每项任务的平均服务时间(计算时间)。则 WT/ST 是每项任务等待所用时间的百分比。对于 N 处理器系统,池中可以近似有 N*(1+WT/ST) 个线程。
本文只是列举部分jdk5中有关线程的东西,大部分是概念上的引导。
jdk5.0多线程学习笔记(二)
关键字: jdk5 线程 并发
在学习jdk5的新特性之前,先看一个多线程的模式:Future Pattern
去蛋糕店买蛋糕,不需要等蛋糕做出来(假设现做要很长时间),只需要领个提货单就可以了(去干别的事情),等到蛋糕做好了,再拿提货单取蛋糕就可以了。future模式与这个场景类似。
假设有一个需要执行一段时间的方法,我们可以不必等待结果出来,而是获取一个替代的"提货单"。因为获取"提货单"不需要花时间,这时这个"提货单"就是future参与者。
获取future参与者的线程会在事后再去获取执行结果,就好像拿提货单去取蛋糕一样。如果有执行结果了,就可以马上拿到数据。如果没有结果,就等到有结果。
下面看一段代码:
这里的main类就相当于"顾客",host就相当于"蛋糕店",顾客向"蛋糕店"定蛋糕就相当于"发请求request",返回的数据data就相当于"提货单"而不是真正的"蛋糕"。在过一段时间后(sleep一段时间后),再去凭"提货单"取蛋糕"data1.getContent()"。
下面来看一下,顾客定蛋糕后,蛋糕店做了什么:
host("蛋糕店")在接到请求后,先生成了"提货单"FutureData的实例future,然后命令"蛋糕师傅"RealData去做蛋糕,realdata相当于起个线程去做蛋糕了。然后host返回给顾客的仅仅是"提货单"future,而不是蛋糕。当蛋糕做好后,蛋糕师傅才能给对应的"提货单"蛋糕,也就是future.setRealData(realdata)。
下面来看看蛋糕师傅是怎么做蛋糕的:
现在来看看"提货单"future是怎么与蛋糕"content"对应的:
本文只是简单介绍一下future pattern,本人也是初学,如果要深入了解,还需要研究研究,本文代码并不优,只是做个说明性的例子。在以后将继续学习多线程。
jdk5.0 多线程学习笔记(三)
关键字: jdk5.0 多线程 producer consumer
在进一步学习jdk5.0的多线程编程以前,先介绍一下生产者--消费者模式(producer-consumer)
生产者是指:生产数据的线程
消费者是指:使用数据的线程
生产者和消费者是不同的线程,他们处理数据的速度是不一样的,一般在二者之间还要加个"桥梁参与者",用于缓冲二者之间处理数据的速度差。
下面用代码来说明:
看来在这个模式里table是个很重要的角色啊,让我们来看看他吧(这里只给出个简单的):
之所以在这里要介绍这个模式,是为了更好的理解jdk5的线程编程。
jdk5.0 多线程学习笔记(四)
关键字: jdk5.0 多线程 线程池
学了这么久,终于进入jdk5.0的线程编程了。
先来看一段代码:
看看工作线程:
从执行的结果可以看出:有两个线程在执行操作,因为我们的线程池中就只有两个线程。
这里要注意一下:
tpes.execute(workers[i]);
这里不是启动一个新线程,而是在仅仅是调用了run方法,并没有新建线程。这一点可以参看如下代码(节选自jdk5):
请注意task.run(); 这句, 这儿并没有启动线程 而是简单的调用了一个普通对象的一个方法
从多线程设计的角度来讲,jdk5中的线程池应该是基于worker模式的。下一节将对worker模式进行介绍,以加深对jdk5中多线程编程的理解。
jdk5.0 多线程学习笔记(五)
关键字: jdk5.0 多线程 模式
今天,我们来学学worker模式,大家也好对jdk5.0的线程池有一个更好的理解。
先来看看代码:
再来看看发送请求的client代码:
clientthread建立请求,并把请求传给了channel,下面来看看channel类(相当于线程池类)
channel类把传给他的请求放入队列中,等待worker去取请求,下面看看worker(即工作线程,线程池中已经初始话好的线程)
在工作线程中会从线程池的队列里取出请求,并对请求进行处理。这里的workerthread相当于背景线程,他一直都在运行,当有请求的时候,他就会进行处理,这里处理请求的线程是已经存在在channel(线程池里的线程),他不会因为请求的增加而增加(这是本例中的情况),不会来一个请求就新建立一个线程,节省了资源。
再看看请求的代码:
参考(多线程四)中所写的 ExecutorService,其就相当于channel,即线程池。至于其实现当然要比channel复杂多了,channel只是举个例子。而WorkerThread可不是工作线程,他相当于发送到channel的请求,也就是request,当执行代码:tpes.execute(workers[i]);时,相当于向线程池加入一个请求,而WorkerThread中的run则相当于request中的execute,这也是当执行tpes.execute(workers[i]);时,并不会产生新的线程的原因。(多线程四)中的写法是让人有些迷糊的。ExecutorService中产生的背景线程(相当于本篇的WorkerThread )我们是看不到的。
jdk5中的多线程还有很多需要进一步学习,其实现也反应了多线程的设计模式。本篇的worker模式只是其中的一种。
jdk5.0 多线程学习笔记(六)
关键字: jdk1.5 多线程 线程池
从前面的文章可以看出,jdk1.5为我们提供了很多线程池
这里做一下简要的说明:
类Executors,提供了一些创建线程池的方法
newFixedThreadPool(int nThreads)
创建一个可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程。
如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。
newCachedThreadPool()
创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。
调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。
终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。
注意,可以使用 ThreadPoolExecutor类 构造方法创建具有类似属性但细节不同,(例如超时参数)的线程池。
其实executors的线程池也是基于ThreadPoolExecutor扩展的。
newSingleThreadExecutor()
创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,
一个新线程将代替它执行后续的任务)。
可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
(与其他等效的 newFixedThreadPool(1) 不同,
可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。)
类ThreadPoolExecutor
Java代码
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
核心和最大池大小
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小。
当新任务在方法 execute(java.lang.Runnable)中提交时,
如果运行的线程少于corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
如果运行的线程多于corePoolSize 而少于maximumPoolSize,则仅当队列满时才创建新线程。
如果设置的corePoolSize 和maximumPoolSize 相同,则创建了固定大小的线程池。
如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),
则允许池适应任意数量的并发任务。
在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int)和
setMaximumPoolSize(int)进行动态更改。
默认情况下,即使核心线程也只是在新任务需要时才创建和启动的。
threadFactory - 执行程序创建新线程时使用的工厂
使用 ThreadFactory创建新线程。
如果没有另外说明,则在同一个ThreadGroup中一律使用Executors.defaultThreadFactory()创建线程,
并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。
通过提供不同的ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。
如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
(保持活动时间)
unit - keepAliveTime 参数的时间单位。
如果池中当前有多于corePoolSize的线程,则这些多出的线程在空闲时间超过keepAliveTime时将会终止。
这提供了当池处于非活动状态时减少资源消耗的方法。
如果池后来变得更为活动,则可以创建新的线程。
也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit)动态地更改此参数。
使用 Long.MAX_VALUE TimeUnit.NANOSECONDS的值在关闭前有效地从以前的终止状态禁用空闲线程。
workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
所有BlockingQueue都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
1.如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
2.如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
3.如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,
在这种情况下,任务将被拒绝。
排队有三种通用策略:
1.直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。
在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。
此策略可以避免在处理可能具有内部依赖性的请求集合时出现锁定。
直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。
当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
2.无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)
将导致在所有 corePoolSize 线程都忙的情况下将新任务加入队列。
这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)
当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。
这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
3.有界队列。当使用有限的 maximumPoolSizes 时,
有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。
队列大小和最大池大小可能需要相互折衷:
使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,
但是可能导致人工降低吞吐量。
如果任务频繁阻塞(例如,如果它们是 I/O 边界),
则系统可能为超过您许可的更多线程安排时间。
使用小型队列通常要求较大的池大小,CPU 使用率较高,
但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序 (被拒绝的任务 )
当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,
在方法 execute(java.lang.Runnable)中提交的新任务将被拒绝。
在以上两种情况下,execute 方法都将调用其 RejectedExecutionHandler的
Java代码
RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,
java.util.concurrent.ThreadPoolExecutor)
RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,
java.util.concurrent.ThreadPoolExecutor)
方法。
下面提供了四种预定义的处理程序策略:
1.在默认的 ThreadPoolExecutor.AbortPolicy中,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
2.在 ThreadPoolExecutor.CallerRunsPolicy中,线程调用运行该任务的 execute 本身。
此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
3. 在 ThreadPoolExecutor.DiscardPolicy中,不能执行的任务将被删除。
4.在 ThreadPoolExecutor.DiscardOldestPolicy中,如果执行程序尚未关闭,
则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
定义和使用其他种类的 RejectedExecutionHandler类也是可能的,但这样做需要非常小心,
尤其是当策略仅用于特定容量或排队策略时。
这些只是jdk中的介绍,要想更好的使用线程池,就需要对这些参数有所了解。
在多线程大师Doug Lea的贡献下,在JDK1.5中加入了许多对并发特性的支持,例如:线程池。
一、简介
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四个选择:
ThreadPoolExecutor.AbortPolicy()
抛出java.util.concurrent.RejectedExecutionException异常
ThreadPoolExecutor.CallerRunsPolicy()
重试添加当前的任务,他会自动重复调用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy()
抛弃旧的任务
ThreadPoolExecutor.DiscardPolicy()
抛弃当前的任务
二、一般用法举例
//TestThreadPool.java //package cn.simplelife.exercise; import java.io.Serializable; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestThreadPool { private static int produceTaskSleepTime = 2; private static int consumeTaskSleepTime = 2000; private static int produceTaskMaxNumber = 10; public static void main(String[] args) { //构造一个线程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3), new ThreadPoolExecutor.DiscardOldestPolicy()); for(int i=1;i<=produceTaskMaxNumber;i++){ try { //产生一个任务,并将其加入到线程池 String task = "task@ " + i; System.out.println("put " + task); threadPool.execute(new ThreadPoolTask(task)); //便于观察,等待一段时间 Thread.sleep(produceTaskSleepTime); } catch (Exception e) { e.printStackTrace(); } } } /** * 线程池执行的任务 * @author hdpan */ public static class ThreadPoolTask implements Runnable,Serializable{ private static final long serialVersionUID = 0; //保存任务所需要的数据 private Object threadPoolTaskData; ThreadPoolTask(Object tasks){ this.threadPoolTaskData = tasks; } public void run(){ //处理一个任务,这里的处理方式太简单了,仅仅是一个打印语句 System.out.println("start .."+threadPoolTaskData); try { ////便于观察,等待一段时间 Thread.sleep(consumeTaskSleepTime); } catch (Exception e) { e.printStackTrace(); } threadPoolTaskData = null; } public Object getTask(){ return this.threadPoolTaskData; } } }
说明:
1、在这段程序中,一个任务就是一个Runnable类型的对象,也就是一个ThreadPoolTask类型的对象。
2、一般来说任务除了处理方式外,还需要处理的数据,处理的数据通过构造方法传给任务。
3、在这段程序中,main()方法相当于一个残忍的领导,他派发出许多任务,丢给一个叫 threadPool的任劳任怨的小组来做。
这个小组里面队员至少有两个,如果他们两个忙不过来,任务就被放到任务列表里面。
如果积压的任务过多,多到任务列表都装不下(超过3个)的时候,就雇佣新的队员来帮忙。但是基于成本的考虑,不能雇佣太多的队员,至多只能雇佣 4个。
如果四个队员都在忙时,再有新的任务,这个小组就处理不了了,任务就会被通过一种策略来处理,我们的处理方式是不停的派发,直到接受这个任务为止(更残忍!呵呵)。
因为队员工作是需要成本的,如果工作很闲,闲到 3SECONDS都没有新的任务了,那么有的队员就会被解雇了,但是,为了小组的正常运转,即使工作再闲,小组的队员也不能少于两个。
4、通过调整 produceTaskSleepTime和 consumeTaskSleepTime的大小来实现对派发任务和处理任务的速度的控制,改变这两个值就可以观察不同速率下程序的工作情况。
5、通过调整4中所指的数据,再加上调整任务丢弃策略,换上其他三种策略,就可以看出不同策略下的不同处理方式。
6、对于其他的使用方法,参看jdk的帮助,很容易理解和使用。
JDK1.5新特性 (三) - 线程池(2)
2008-08-14 14:38
3.2 JDK1.5中的线程池
3.2.1 简单介绍
在J2SE(TM)5.0 中,Doug Lea 编写了一个优秀的并发实用程序开放源码库 util.concurrent,它包括互斥、信号量、诸如在并发访问下执行得很好的队列和散列表之类集合类以及几个工作队列实现。该包中的 PooledExecutor 类是一种有效的、广泛使用的以工作队列为基础的线程池的正确实现。Util.concurrent 定义一个 Executor 接口,以异步地执行 Runnable,另外还定义了 Executor 的几个实现,它们具有不同的调度特征。将一个任务排入 executor 的队列非常简单:
Executor executor = new QueuedExecutor();
…
Runnable runnable = … ;
executor.execute(runnable);
PooledExecutor 是一个复杂的线程池实现,它不但提供工作线程(worker thread)池中任务的调度,而且还可灵活地调整池的大小,同时还提供了线程生命周期管理,这个实现可以限制工作队列中任务的数目,以防止队列中的任务耗尽所有可用内存,另外还提供了多种可用的关闭和饱和度策略(阻塞、废弃、抛出、废弃最老的、在调用者中运行等)。所有的 Executor 实现为您管理线程的创建和销毁,包括当关闭 executor 时,关闭所有线程,
3.2.2 线程池的使用
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
· corePoolSize
线程池维护线程的最少数量
· maximumPoolSiz
线程池维护线程的最大数量
· keepAliveTime
线程池维护线程所允许的空闲时间
· unit
线程池维护线程所允许的空闲时间的单位
· workQueue
线程池所使用的缓冲队列
· handler
线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是 Runnable类型对象的run()方法。
当一个任务通过execute(Runnable)方法欲添加到线程池时:
如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。
如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。
如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。
也就是:处理任务的优先级为:
核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。
unit可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四个选择:
· ThreadPoolExecutor.AbortPolicy()
抛出java.util.concurrent.RejectedExecutionException异常
· ThreadPoolExecutor.CallerRunsPolicy()
重试添加当前的任务,他会自动重复调用execute()方法
· ThreadPoolExecutor.DiscardOldestPolicy()
抛弃旧的任务
· ThreadPoolExecutor.DiscardPolicy()
抛弃当前的任务
用法举例
package cn.simplelife.exercise; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestThreadPool { private static int produceTaskSleepTime = 2; public static void main(String[] args) { //构造一个线程池 ThreadPoolExecutor producerPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue(3), new ThreadPoolExecutor.DiscardOldestPolicy()); //每隔produceTaskSleepTime的时间向线程池派送一个任务。 int i=1; while(true){ try { Thread.sleep(produceTaskSleepTime); String task = "task@ " + i; System.out.println("put " + task); producerPool.execute(new ThreadPoolTask(task)); i++; } catch (Exception e) { e.printStackTrace(); } } } } package cn.simplelife.exercise; import java.io.Serializable; /** * 线程池执行的任务 * @author hdpan */ public class ThreadPoolTask implements Runnable,Serializable{ //JDK1.5中,每个实现Serializable接口的类都推荐声明这样的一个ID private static final long serialVersionUID = 0; private static int consumeTaskSleepTime = 2000; private Object threadPoolTaskData; ThreadPoolTask(Object tasks){ this.threadPoolTaskData = tasks; } //每个任务的执行过程,现在是什么都没做,除了print和sleep,:) public void run(){ System.out.println("start .."+threadPoolTaskData); try { //便于观察现象,等待一段时间 Thread.sleep(consumeTaskSleepTime); } catch (Exception e) { e.printStackTrace(); } threadPoolTaskData = null; } }
对这两段程序的说明:
1. 在这段程序中,一个任务就是一个Runnable类型的对象,也就是一个ThreadPoolTask类型的对象。
2. 一般来说任务除了处理方式外,还需要处理的数据,处理的数据通过构造方法传给任务。
3. 在这段程序中,main()方法相当于一个残忍的领导,他派发出许多任务,丢给一个叫 threadPool的任劳任怨的小组来做。
o 这个小组里面队员至少有两个,如果他们两个忙不过来, 任务就被放到任务列表里面。
o 如果积压的任务过多,多到任务列表都装不下(超过3个)的时候,就雇佣新的队员来帮忙。但是基于成本的考虑,不能雇佣太多的队员, 至多只能雇佣 4个。
o 如果四个队员都在忙时,再有新的任务, 这个小组就处理不了了,任务就会被通过一种策略来处理,我们的处理方式是不停的派发, 直到接受这个任务为止(更残忍!呵呵)。
o 因为队员工作是需要成本的,如果工作很闲,闲到 3SECONDS都没有新的任务了,那么有的队员就会被解雇了,但是,为了小组的正常运转,即使工作再闲,小组的队员也不能少于两个。
4. 通过调整 produceTaskSleepTime和 consumeTaskSleepTime的大小来实现对派发任务和处理任务的速度的控制, 改变这两个值就可以观察不同速率下程序的工作情况。
5. 通过调整4中所指的数据,再加上调整任务丢弃策略, 换上其他三种策略,就可以看出不同策略下的不同处理方式。
6. 对于其他的使用方法,参看jdk的帮助,很容易理解和使用。
JDK1.5新特性 (3) - 线程池
来源:互联网 作者:不详 时间:2008-06-04 Tag:JDK 1.5 线程池 点击: 7
3. 线程池
3.1简单的线程池实现
我们通常想要的是同一组固定的工作线程相结合的工作队列,它使用 wait() 和 notify() 来通知等待线程新的工作已经到达了。该工作队列通常被实现成具有相关监视器对象的某种链表。以下代码实现了具有线程池的工作队列。
public class WorkQueue { private final int nThreads; private final PoolWorker[] threads; private final LinkedList queue; public WorkQueue(int nThreads) { this.nThreads = nThreads; queue = new LinkedList(); threads = new PoolWorker[nThreads]; for (int i=0; i<nThreads; i++) { threads[i] = new PoolWorker(); threads[i].start(); } } public void execute(Runnable r) { synchronized(queue) { queue.addLast(r); queue.notify(); } } private class PoolWorker extends Thread { public void run() { Runnable r; while (true) { synchronized(queue) { while (queue.isEmpty()) { try { queue.wait(); } catch (InterruptedException ignored) { } } r = (Runnable) queue.removeFirst(); } // If we don't catch RuntimeException, // the pool could leak threads try { r.run(); } catch (RuntimeException e) { // You might want to log something here } } } } }
虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。用线程池构建的应用程序容易遭受任何其它多线程应用程序容易遭受的所有并发风险,诸如同步错误和死锁,它还容易遭受特定于线程池的少数其它风险,诸如与池有关的死锁、资源不足和线程泄漏。
用线程池执行任务
如果你开发项目的时候用到很多的short-lived任务,这里推荐使用"线程池"这项技术。你可以创建一个线程池来执行池中的的任务,来取代每次执行任务是都要为新的任务来new和discard。如果一个线程在池中是可用状态,那么任务将立即执行。执行完成之后线程返回池中,否则,任务将一直等待直到有线程处在可用状态。
J2SE 5.0为大家提供了一个新的java.util.concurrent package,并且在这个报中提供了一个pre-built 的线程池架构。在java.util.concurrent中提供了一个Executor 接口,里面有一个execute的方法,参数是Runnable 类型
public interface Executor {
public void execute(Runnable command);
}
使用线程池架构,你就必须创建一个Executor实例,然后你给他分配一些runnable任务,例如:
Executor executor = ...; executor.execute(aRunnable1); executor.execute(aRunnable2); Executor executor = ...; executor.execute(aRunnable1); executor.execute(aRunnable2);
然后你创建或者找到Executor的实现类,实现类可以立即(或者连续)执行分配的任务,例如:
class MyExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } }
concurrency utilities也包括了一个ThreadPoolExecutor类,它提供了很多对线程的一般性操作,提供了四个构造函数,每个都可以指定如:线程池大小,持续时间,一个线程factory,和拒绝线程的handler。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
但是你不必声明构造函数,Executors类会为你创建一个线程池。在一种最简单的情况下,你在Executors类中声明了newFixedThreadPool方法,并且在池中分配了许多线程。你可以使用ExecutorService(继承Executor的一个接口),去execute和submit 那些Runnable任务,使用ExecutorService中的submit方法可以得到一个返回结果,当然submit也可以返回一个Future对象用来检查任务是否执行。
让我们来先做一个Runnable类,名字为NamePrinter,它通知你运行、暂停、和耗费的时间。
public class NamePrinter implements Runnable { private final String name; private final int delay; public NamePrinter(String name, int delay) { this.name = name; this.delay = delay; } public void run() { System.out.println("Starting: " + name); try { Thread.sleep(delay); } catch (InterruptedException ignored) { } System.out.println("Done with: " + name); } }
然后下面是我们测试的项目UsePool,它创建一个有三个线程的线程池,分配了10个任务给它(运行10次NamePrinter),UsePool在被shutdown 和 awaitTermination之前将等待并执行分配的任务。一个ExecutorService必须要在terminated之前执行shutdown,shutdownNow方法是立即尝试shutdown操作。shutdownNow 方法将返回没有被执行的任务。
import java.util.concurrent.*; import java.util.Random; public class UsePool { public static void main(String args[]) { Random random = new Random(); ExecutorService executor = Executors.newFixedThreadPool(3); // Sum up wait times to know when to shutdown int waitTime = 500; for (int i=0; i<10; i++) { String name = "NamePrinter " + i; int time = random.nextInt(1000); waitTime += time; Runnable runner = new NamePrinter(name, time); System.out.println("Adding: " + name + " / " + time); executor.execute(runner); } try { Thread.sleep(waitTime); executor.shutdown(); executor.awaitTermination (waitTime, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { } System.exit(0); } } import java.util.concurrent.*; import java.util.Random; public class UsePool { public static void main(String args[]) { Random random = new Random(); ExecutorService executor = Executors.newFixedThreadPool(3); // Sum up wait times to know when to shutdown int waitTime = 500; for (int i=0; i<10; i++) { String name = "NamePrinter " + i; int time = random.nextInt(1000); waitTime += time; Runnable runner = new NamePrinter(name, time); System.out.println("Adding: " + name + " / " + time); executor.execute(runner); } try { Thread.sleep(waitTime); executor.shutdown(); executor.awaitTermination (waitTime, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { } System.exit(0); } } 输出的结果是: Adding: NamePrinter 0 / 30 Adding: NamePrinter 1 / 727 Adding: NamePrinter 2 / 980 Starting: NamePrinter 0 Starting: NamePrinter 1 Starting: NamePrinter 2 Adding: NamePrinter 3 / 409 Adding: NamePrinter 4 / 49 Adding: NamePrinter 5 / 802 Adding: NamePrinter 6 / 211 Adding: NamePrinter 7 / 459 Adding: NamePrinter 8 / 994 Adding: NamePrinter 9 / 459 Done with: NamePrinter 0 Starting: NamePrinter 3 Done with: NamePrinter 3 Starting: NamePrinter 4 Done with: NamePrinter 4 Starting: NamePrinter 5 Done with: NamePrinter 1 Starting: NamePrinter 6 Done with: NamePrinter 6 Starting: NamePrinter 7 Done with: NamePrinter 2 Starting: NamePrinter 8 Done with: NamePrinter 5 Starting: NamePrinter 9 Done with: NamePrinter 7 Done with: NamePrinter 9 Done with: NamePrinter 8
注意前三个NamePrinter对象启动的非查的快,之后的NamePrinter对象每次启动都要等待前面的执行完成。
在J2SE 5.0有非常多的pooling framework可以用,例如,你可以创建一个scheduled线程池……
更多信息还是看官方的concurrency utilities,地址:http://java.sun.com/j2se/1.5.0/docs/guide/concurrency/
public class PoolAsynService extends BaseService implements Runnable { private Thread thread = new Thread(this); private List waitToList = (List) Collections.synchronizedList(new LinkedList()); // ////////////线程池参数///////////////// private int corePoolSize = 5;// : 线程池维护线程的最少数量 private int maximumPoolSize = 10;// :线程池维护线程的最大数量 private long keepAliveTime = 60;// : 线程池维护线程所允许的空闲时间 private TimeUnit unit = TimeUnit.SECONDS;// : 线程池维护线程所允许的空闲时间的单位 private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(10);// : // 线程池所使用的缓冲队列 private RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();// : // 线程池对拒绝任务的处理策略 // //////////线程池参数///////////////// private ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); public void run() { while (!thread.isInterrupted()) { if (!waitToList.isEmpty()) { try { threadPool.execute(new Executor()); } catch (Exception e) { logger.error("pool execute error!!!", e); } } Tools.block(25); } } public void doAsync(Object executor, Object... objects) { Throwable t = new Throwable(); StackTraceElement[] elements = t.getStackTrace(); StackTraceElement element = elements[1]; String method = element.getMethodName(); AsyncContext ctx = new AsyncContext(); ctx.args = objects; ctx.executor = executor; ctx.method = method; if (method.endsWith("PA")) { waitToList.add(ctx); } else { logger.warn("async method name is not good!"); } } private class AsyncContext { String method; Object executor; Object[] args; } private class Executor implements Runnable { public void run() { if (!waitToList.isEmpty()) { try { Object task = waitToList.remove(0); AsyncContext ctx = (AsyncContext) task; doTaskByCtx(ctx); } catch (Exception e) { logger.error("async error!!!", e); } } } private void doTaskByCtx(AsyncContext ctx) { String targetMethodName = ctx.method.substring(0, ctx.method .length() - 2); Method targetMethod = null; Class clazz = null; try { clazz = ctx.executor.getClass(); Method[] methods = clazz.getDeclaredMethods(); if (methods != null) { for (int i = 0; i < methods.length; i++) { String name = methods[i].getName(); if (name.equals(targetMethodName)) { targetMethod = methods[i]; break; } } if (targetMethod != null) { targetMethod.invoke(ctx.executor, ctx.args); } } } catch (Exception e) { logger.error( "do async fail! " + clazz + ":" + targetMethodName, e); } } } @Override public void destroy() { thread.interrupt(); threadPool.shutdown(); logger.info("thread pool asynService shut down"); } @Override public void init() { thread.start(); logger.info("thread pool asynService start"); } }
jdk5.0多线程学习笔记(一)
关键字: jdk5 线程 并发
先来复习一下什么是线程:
线程有时称为 轻量级进程。与进程一样,它们拥有通过程序运行的独立的并发路径,并且每个线程都有自己的程序计数器,称为堆栈和本地变量。然而,线程存在于进程中,它们与同一进程内的其他线程共享内存、文件句柄以及每进程状态。
一个进程中的线程是在同一个地址空间中执行的,所以多个线程可以同时访问相同对象,并且它们从同一堆栈中分配对象。
在 JDK 5.0 之前,确保线程安全的主要机制是 synchronized 原语。访问共享变量(那些可以由多个线程访问的变量)的线程必须使用同步来协调对共享变量的读写访问。
创建线程的方法:
可以用两种方法创建线程,通过扩展 Thread 和覆盖 run() 方法,或者通过实现 Runnable 接口和使用 Thread(Runnable) 构造函数:
class WorkerThread extends Thread { public void run() { /* do work */ } } Thread t = new WorkerThread(); t.start(); class WorkerThread extends Thread { public void run() { /* do work */ } } Thread t = new WorkerThread(); t.start(); 或是 Java代码 Thread t = new Thread(new Runnable() { public void run() { /* do work */ } } t.start(); Thread t = new Thread(new Runnable() { public void run() { /* do work */ } } t.start();
创建线程会使用相当一部分内存,其中包括有两个堆栈(Java 和 C),以及每线程数据结构。如果创建过多线程,其中每个线程都将占用一些 CPU 时间,结果将使用许多内存来支持大量线程,每个线程都运行得很慢。这样就无法很好地使用计算资源。
下面的代码就是一段不好的利用线程的代码:
class UnreliableWebServer { public static void main(String[] args) { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable r = new Runnable() { public void run() { handleRequest(connection); } }; // Don't do this! new Thread(r).start(); } } } class UnreliableWebServer { public static void main(String[] args) { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable r = new Runnable() { public void run() { handleRequest(connection); } }; // Don't do this! new Thread(r).start(); } } }
当服务器被请求吞没时,UnreliableWebServer 类不能很好地处理这种情况。每次有请求时,就会创建新的类。根据操作系统和可用内存,可以创建的线程数是有限的。不幸的是,您通常不知道限制是多少 —— 只有当应用程序因为 OutOfMemoryError 而崩溃时才发现。如果足够快地向这台服务器上抛出请求的话,最终其中一个线程创建将失败,生成的 Error 会关闭整个应用程序。
为任务创建新的线程并不一定不好,但是如果创建任务的频率高,而平均任务持续时间低,我们可以看到每项任务创建一个新的线程将产生性能(如果负载不可预知,还有稳定性)问题.
使用线程池解决问题
管理一大组小任务的标准机制是组合工作队列和线程池。工作队列就是要处理的任务的队列,线程池是线程的集合,每个线程都提取公用工作队列。当一个工作线程完成任务处理后,它会返回队列,查看是否有其他任务需要处理。如果有,它会转移到下一个任务,并开始处理。作为一种额外好处,因为请求到达时,线程已经存在,从而可以消除由创建线程引起的延迟。因此,可以立即处理请求,使应用程序更易响应。而且,通过正确调整线程池中的线程数,可以强制超出特定限制的任何请求等待,直到有线程可以处理它,它们等待时所消耗的资源要少于使用额外线程所消耗的资源,这样可以防止资源崩溃。
说了半天,上段代码
class ReliableWebServer { Executor pool = Executors.newFixedThreadPool(7); public static void main(String[] args) { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable r = new Runnable() { public void run() { handleRequest(connection); } }; pool.execute(r); } } } class ReliableWebServer { Executor pool = Executors.newFixedThreadPool(7); public static void main(String[] args) { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable r = new Runnable() { public void run() { handleRequest(connection); } }; pool.execute(r); } } }
java.util.concurrent 包中包含灵活的线程池实现,Executor就是这个包中的。(以后会对其进行详细的介绍,但不在本文内)
创建 Executor 时,人们普遍会问的一个问题是"线程池应该有多大?"
用 WT 表示每项任务的平均等待时间,ST 表示每项任务的平均服务时间(计算时间)。则 WT/ST 是每项任务等待所用时间的百分比。对于 N 处理器系统,池中可以近似有 N*(1+WT/ST) 个线程。
本文只是列举部分jdk5中有关线程的东西,大部分是概念上的引导。
jdk5.0多线程学习笔记(二)
关键字: jdk5 线程 并发
在学习jdk5的新特性之前,先看一个多线程的模式:Future Pattern
去蛋糕店买蛋糕,不需要等蛋糕做出来(假设现做要很长时间),只需要领个提货单就可以了(去干别的事情),等到蛋糕做好了,再拿提货单取蛋糕就可以了。future模式与这个场景类似。
假设有一个需要执行一段时间的方法,我们可以不必等待结果出来,而是获取一个替代的"提货单"。因为获取"提货单"不需要花时间,这时这个"提货单"就是future参与者。
获取future参与者的线程会在事后再去获取执行结果,就好像拿提货单去取蛋糕一样。如果有执行结果了,就可以马上拿到数据。如果没有结果,就等到有结果。
下面看一段代码:
public class Main { public static void main(String[] args) { System.out.println("main BEGIN"); Host host = new Host(); Data data1 = host.request(10, 'A'); Data data2 = host.request(20, 'B'); Data data3 = host.request(30, 'C'); System.out.println("main otherJob BEGIN"); try { Thread.sleep(200); } catch (InterruptedException e) { } System.out.println("main otherJob END"); System.out.println("data1 = " + data1.getContent()); System.out.println("data2 = " + data2.getContent()); System.out.println("data3 = " + data3.getContent()); System.out.println("main END"); } } public class Main { public static void main(String[] args) { System.out.println("main BEGIN"); Host host = new Host(); Data data1 = host.request(10, 'A'); Data data2 = host.request(20, 'B'); Data data3 = host.request(30, 'C'); System.out.println("main otherJob BEGIN"); try { Thread.sleep(200); } catch (InterruptedException e) { } System.out.println("main otherJob END"); System.out.println("data1 = " + data1.getContent()); System.out.println("data2 = " + data2.getContent()); System.out.println("data3 = " + data3.getContent()); System.out.println("main END"); } }
这里的main类就相当于"顾客",host就相当于"蛋糕店",顾客向"蛋糕店"定蛋糕就相当于"发请求request",返回的数据data就相当于"提货单"而不是真正的"蛋糕"。在过一段时间后(sleep一段时间后),再去凭"提货单"取蛋糕"data1.getContent()"。
下面来看一下,顾客定蛋糕后,蛋糕店做了什么:
public class Host { public Data request(final int count, final char c) { System.out.println(" request(" + count + ", " + c + ") BEGIN"); // (1) 建立FutureData的实体 final FutureData future = new FutureData(); // (2) 为了建立RealData的实体,启动新的线程 new Thread() { public void run() { RealData realdata = new RealData(count, c); future.setRealData(realdata); } }.start(); System.out.println(" request(" + count + ", " + c + ") END"); // (3) 取回FutureData实体,作为传回值 return future; } } public class Host { public Data request(final int count, final char c) { System.out.println(" request(" + count + ", " + c + ") BEGIN"); // (1) 建立FutureData的实体 final FutureData future = new FutureData(); // (2) 为了建立RealData的实体,启动新的线程 new Thread() { public void run() { RealData realdata = new RealData(count, c); future.setRealData(realdata); } }.start(); System.out.println(" request(" + count + ", " + c + ") END"); // (3) 取回FutureData实体,作为传回值 return future; } }
host("蛋糕店")在接到请求后,先生成了"提货单"FutureData的实例future,然后命令"蛋糕师傅"RealData去做蛋糕,realdata相当于起个线程去做蛋糕了。然后host返回给顾客的仅仅是"提货单"future,而不是蛋糕。当蛋糕做好后,蛋糕师傅才能给对应的"提货单"蛋糕,也就是future.setRealData(realdata)。
下面来看看蛋糕师傅是怎么做蛋糕的:
public class RealData implements Data { private final String content; public RealData(int count, char c) { System.out.println(" making RealData(" + count + ", " + c + ") BEGIN"); char[] buffer = new char[count]; for (int i = 0; i < count; i++) { buffer[i] = c; try { Thread.sleep(1000); } catch (InterruptedException e) { } } System.out.println(" making RealData(" + count + ", " + c + ") END"); this.content = new String(buffer); } public String getContent() { return content; } } public class RealData implements Data { private final String content; public RealData(int count, char c) { System.out.println(" making RealData(" + count + ", " + c + ") BEGIN"); char[] buffer = new char[count]; for (int i = 0; i < count; i++) { buffer[i] = c; try { Thread.sleep(1000); } catch (InterruptedException e) { } } System.out.println(" making RealData(" + count + ", " + c + ") END"); this.content = new String(buffer); } public String getContent() { return content; } }
现在来看看"提货单"future是怎么与蛋糕"content"对应的:
public class FutureData implements Data { private RealData realdata = null; private boolean ready = false; //将提货单与蛋糕师傅对应也就是与蛋糕对应,一个蛋糕师傅做一个订单 public synchronized void setRealData(RealData realdata) { if (ready) { return; // balk } this.realdata = realdata; this.ready = true; notifyAll(); } public synchronized String getContent() { while (!ready) { try { wait(); } catch (InterruptedException e) { } } return realdata.getContent(); } } public class FutureData implements Data { private RealData realdata = null; private boolean ready = false; //将提货单与蛋糕师傅对应也就是与蛋糕对应,一个蛋糕师傅做一个订单 public synchronized void setRealData(RealData realdata) { if (ready) { return; // balk } this.realdata = realdata; this.ready = true; notifyAll(); } public synchronized String getContent() { while (!ready) { try { wait(); } catch (InterruptedException e) { } } return realdata.getContent(); } } 顾客做完自己的事情后,会拿着自己的"提货单"来取蛋糕: Java代码 System.out.println("data1 = " + data1.getContent()); System.out.println("data1 = " + data1.getContent()); 这时候如果蛋糕没做好,就只好等了: Java代码 while (!ready) { try { wait(); } catch (InterruptedException e) { } //等做好后才能取到 return realdata.getContent(); while (!ready) { try { wait(); } catch (InterruptedException e) { } //等做好后才能取到 return realdata.getContent();
本文只是简单介绍一下future pattern,本人也是初学,如果要深入了解,还需要研究研究,本文代码并不优,只是做个说明性的例子。在以后将继续学习多线程。
jdk5.0 多线程学习笔记(三)
关键字: jdk5.0 多线程 producer consumer
在进一步学习jdk5.0的多线程编程以前,先介绍一下生产者--消费者模式(producer-consumer)
生产者是指:生产数据的线程
消费者是指:使用数据的线程
生产者和消费者是不同的线程,他们处理数据的速度是不一样的,一般在二者之间还要加个"桥梁参与者",用于缓冲二者之间处理数据的速度差。
下面用代码来说明:
//生产者 public class MakerThread extends Thread { private final Random random; private final Table table; private static int id = 0; public MakerThread(String name, Table table, long seed) { super(name); this.table = table;//table就是桥梁参与者 this.random = new Random(seed); } public void run() { try { while (true) { Thread.sleep(random.nextInt(1000));//生产数据要耗费时间 String cake = "[ Cake No." + nextId() + " by " + getName() + " ]";//生产数据 table.put(cake);//将数据存入桥梁参与者 } } catch (InterruptedException e) { } } private static synchronized int nextId() { return id++; } } //生产者 public class MakerThread extends Thread { private final Random random; private final Table table; private static int id = 0; public MakerThread(String name, Table table, long seed) { super(name); this.table = table;//table就是桥梁参与者 this.random = new Random(seed); } public void run() { try { while (true) { Thread.sleep(random.nextInt(1000));//生产数据要耗费时间 String cake = "[ Cake No." + nextId() + " by " + getName() + " ]";//生产数据 table.put(cake);//将数据存入桥梁参与者 } } catch (InterruptedException e) { } } private static synchronized int nextId() { return id++; } } 再来看看消费者: Java代码 //消费者线程 public class EaterThread extends Thread { private final Random random; private final Table table; public EaterThread(String name, Table table, long seed) { super(name); this.table = table; this.random = new Random(seed); } public void run() { try { while (true) { String cake = table.take();//从桥梁参与者中取数据 Thread.sleep(random.nextInt(1000));//消费者消费数据要花时间 } } catch (InterruptedException e) { } } } //消费者线程 public class EaterThread extends Thread { private final Random random; private final Table table; public EaterThread(String name, Table table, long seed) { super(name); this.table = table; this.random = new Random(seed); } public void run() { try { while (true) { String cake = table.take();//从桥梁参与者中取数据 Thread.sleep(random.nextInt(1000));//消费者消费数据要花时间 } } catch (InterruptedException e) { } } }
看来在这个模式里table是个很重要的角色啊,让我们来看看他吧(这里只给出个简单的):
public class Table { private final String[] buffer; private int tail; /下一个放put(数据)的地方 private int head; //下一个那曲take(数据)的地方 private int count; // buffer内的数据数量 public Table(int count) { this.buffer = new String[count];//总量是确定的 this.head = 0; this.tail = 0; this.count = 0; } // 放置数据 public synchronized void put(String cake) throws InterruptedException { System.out.println(Thread.currentThread().getName() + " puts " + cake); while (count >= buffer.length) {//数据放满了就只能等待 wait(); } buffer[tail] = cake; tail = (tail + 1) % buffer.length; count++; notifyAll();//有数据了,唤醒线程去取数据 } // 取得数据 public synchronized String take() throws InterruptedException { while (count <= 0) {//没有数据就只能等待 wait(); } String cake = buffer[head]; head = (head + 1) % buffer.length; count--; notifyAll();//有位置可以放数据了,唤醒线程,不等了 System.out.println(Thread.currentThread().getName() + " takes " + cake); return cake; } }i public class Table { private final String[] buffer; private int tail; /下一个放put(数据)的地方 private int head; //下一个那曲take(数据)的地方 private int count; // buffer内的数据数量 public Table(int count) { this.buffer = new String[count];//总量是确定的 this.head = 0; this.tail = 0; this.count = 0; } // 放置数据 public synchronized void put(String cake) throws InterruptedException { System.out.println(Thread.currentThread().getName() + " puts " + cake); while (count >= buffer.length) {//数据放满了就只能等待 wait(); } buffer[tail] = cake; tail = (tail + 1) % buffer.length; count++; notifyAll();//有数据了,唤醒线程去取数据 } // 取得数据 public synchronized String take() throws InterruptedException { while (count <= 0) {//没有数据就只能等待 wait(); } String cake = buffer[head]; head = (head + 1) % buffer.length; count--; notifyAll();//有位置可以放数据了,唤醒线程,不等了 System.out.println(Thread.currentThread().getName() + " takes " + cake); return cake; } }i 好了我们来实验吧: Java代码 public class Main { public static void main(String[] args) { Table table = new Table(3); // 建立可以放置数据的桥梁参与者,3是他所能放置的最大数量的数据。 new MakerThread("MakerThread-1", table, 31415).start();//生产数据 new MakerThread("MakerThread-2", table, 92653).start(); new MakerThread("MakerThread-3", table, 58979).start(); new EaterThread("EaterThread-1", table, 32384).start();//消费数据 new EaterThread("EaterThread-2", table, 62643).start(); new EaterThread("EaterThread-3", table, 38327).start(); } } public class Main { public static void main(String[] args) { Table table = new Table(3); // 建立可以放置数据的桥梁参与者,3是他所能放置的最大数量的数据。 new MakerThread("MakerThread-1", table, 31415).start();//生产数据 new MakerThread("MakerThread-2", table, 92653).start(); new MakerThread("MakerThread-3", table, 58979).start(); new EaterThread("EaterThread-1", table, 32384).start();//消费数据 new EaterThread("EaterThread-2", table, 62643).start(); new EaterThread("EaterThread-3", table, 38327).start(); } }
之所以在这里要介绍这个模式,是为了更好的理解jdk5的线程编程。
jdk5.0 多线程学习笔记(四)
关键字: jdk5.0 多线程 线程池
学了这么久,终于进入jdk5.0的线程编程了。
先来看一段代码:
public class ThreadPoolTest { public static void main(String[] args) { int numWorkers = 10;//工作线程数 int threadPoolSize = 2;//线程池大小 ExecutorService tpes = Executors.newFixedThreadPool(threadPoolSize);//初始化线程池 WorkerThread[] workers = new WorkerThread[numWorkers]; for (int i = 0; i < numWorkers; i++) { workers[i] = new WorkerThread(i);//初始一个任务 tpes.execute(workers[i]);//执行任务 } tpes.shutdown();//所有线程执行完毕后才关闭。 // tpes.shutdownNow();//立即关闭 } } public class ThreadPoolTest { public static void main(String[] args) { int numWorkers = 10;//工作线程数 int threadPoolSize = 2;//线程池大小 ExecutorService tpes = Executors.newFixedThreadPool(threadPoolSize);//初始化线程池 WorkerThread[] workers = new WorkerThread[numWorkers]; for (int i = 0; i < numWorkers; i++) { workers[i] = new WorkerThread(i);//初始一个任务 tpes.execute(workers[i]);//执行任务 } tpes.shutdown();//所有线程执行完毕后才关闭。 // tpes.shutdownNow();//立即关闭 } }
看看工作线程:
public class WorkerThread implements Runnable { private int workerNumber; WorkerThread(int number) { workerNumber = number; } public void run() { for (int i=0;i<=100;i+=20) { //Perform some work... System.out.format("Worker number: %d, percent complete: %d%n", workerNumber, i); try { Thread.sleep((int)(Math.random() * 1000)); } catch (InterruptedException e) { } } } } public class WorkerThread implements Runnable { private int workerNumber; WorkerThread(int number) { workerNumber = number; } public void run() { for (int i=0;i<=100;i+=20) { //Perform some work... System.out.format("Worker number: %d, percent complete: %d%n", workerNumber, i); try { Thread.sleep((int)(Math.random() * 1000)); } catch (InterruptedException e) { } } } }
从执行的结果可以看出:有两个线程在执行操作,因为我们的线程池中就只有两个线程。
这里要注意一下:
tpes.execute(workers[i]);
这里不是启动一个新线程,而是在仅仅是调用了run方法,并没有新建线程。这一点可以参看如下代码(节选自jdk5):
** * Run a single task between before/after methods. */ private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { // Abort now if immediate cancel. Otherwise, we have // committed to run this task. if (runState == STOP) return; Thread.interrupted(); // clear interrupt status on entry boolean ran = false; beforeExecute(thread, task); try { task.run(); //调用的是run()方法 而不是start() ran = true; afterExecute(task, null); ++completedTasks; } catch(RuntimeException ex) { if (!ran) afterExecute(task, ex); // Else the exception occurred within // afterExecute itself in which case we don't // want to call it again. throw ex; } } finally { runLock.unlock(); } } ** * Run a single task between before/after methods. */ private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { // Abort now if immediate cancel. Otherwise, we have // committed to run this task. if (runState == STOP) return; Thread.interrupted(); // clear interrupt status on entry boolean ran = false; beforeExecute(thread, task); try { task.run(); //调用的是run()方法 而不是start() ran = true; afterExecute(task, null); ++completedTasks; } catch(RuntimeException ex) { if (!ran) afterExecute(task, ex); // Else the exception occurred within // afterExecute itself in which case we don't // want to call it again. throw ex; } } finally { runLock.unlock(); } }
请注意task.run(); 这句, 这儿并没有启动线程 而是简单的调用了一个普通对象的一个方法
从多线程设计的角度来讲,jdk5中的线程池应该是基于worker模式的。下一节将对worker模式进行介绍,以加深对jdk5中多线程编程的理解。
jdk5.0 多线程学习笔记(五)
关键字: jdk5.0 多线程 模式
今天,我们来学学worker模式,大家也好对jdk5.0的线程池有一个更好的理解。
先来看看代码:
public class Main { public static void main(String[] args) { Channel channel = new Channel(5); // 工人线程的數量,即线程池内的线程数目 channel.startWorkers();//启动线程池内的线程 new ClientThread("Alice", channel).start();//发送请求的线程,相当于向队列加入请求 new ClientThread("Bobby", channel).start(); new ClientThread("Chris", channel).start(); } } public class Main { public static void main(String[] args) { Channel channel = new Channel(5); // 工人线程的數量,即线程池内的线程数目 channel.startWorkers();//启动线程池内的线程 new ClientThread("Alice", channel).start();//发送请求的线程,相当于向队列加入请求 new ClientThread("Bobby", channel).start(); new ClientThread("Chris", channel).start(); } }
再来看看发送请求的client代码:
public class ClientThread extends Thread { private final Channel channel;//相当于线程池 private static final Random random = new Random(); public ClientThread(String name, Channel channel) { super(name); this.channel = channel; } public void run() { try { int i = 0; Request request = new Request(getName(), i);//生成请求 channel.putRequest(request);//向队列中放入请求,也即把请求传给线程池 Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { } } } public class ClientThread extends Thread { private final Channel channel;//相当于线程池 private static final Random random = new Random(); public ClientThread(String name, Channel channel) { super(name); this.channel = channel; } public void run() { try { int i = 0; Request request = new Request(getName(), i);//生成请求 channel.putRequest(request);//向队列中放入请求,也即把请求传给线程池 Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { } } }
clientthread建立请求,并把请求传给了channel,下面来看看channel类(相当于线程池类)
public class Channel { private static final int MAX_REQUEST = 100; private final Request[] requestQueue;//存放请求的队列 private int tail; // 下一个putRequest的地方 private int head; // 下一个takeRequest的地方 private int count; // Request的数量 private final WorkerThread[] threadPool; public Channel(int threads) { this.requestQueue = new Request[MAX_REQUEST]; this.head = 0; this.tail = 0; this.count = 0; threadPool = new WorkerThread[threads]; for (int i = 0; i < threadPool.length; i++) { threadPool[i] = new WorkerThread("Worker-" + i, this);//生成线程池中的线程 } } public void startWorkers() { for (int i = 0; i < threadPool.length; i++) { threadPool[i].start();//启动线程池中的线程 } } public synchronized void putRequest(Request request) {//向队列中存入请求 while (count >= requestQueue.length) { try { wait(); } catch (InterruptedException e) { } } requestQueue[tail] = request; tail = (tail + 1) % requestQueue.length; count++; notifyAll(); } public synchronized Request takeRequest() {//从队列取出请求 while (count <= 0) { try { wait(); } catch (InterruptedException e) { } } Request request = requestQueue[head]; head = (head + 1) % requestQueue.length; count--; notifyAll(); return request; } } public class Channel { private static final int MAX_REQUEST = 100; private final Request[] requestQueue;//存放请求的队列 private int tail; // 下一个putRequest的地方 private int head; // 下一个takeRequest的地方 private int count; // Request的数量 private final WorkerThread[] threadPool; public Channel(int threads) { this.requestQueue = new Request[MAX_REQUEST]; this.head = 0; this.tail = 0; this.count = 0; threadPool = new WorkerThread[threads]; for (int i = 0; i < threadPool.length; i++) { threadPool[i] = new WorkerThread("Worker-" + i, this);//生成线程池中的线程 } } public void startWorkers() { for (int i = 0; i < threadPool.length; i++) { threadPool[i].start();//启动线程池中的线程 } } public synchronized void putRequest(Request request) {//向队列中存入请求 while (count >= requestQueue.length) { try { wait(); } catch (InterruptedException e) { } } requestQueue[tail] = request; tail = (tail + 1) % requestQueue.length; count++; notifyAll(); } public synchronized Request takeRequest() {//从队列取出请求 while (count <= 0) { try { wait(); } catch (InterruptedException e) { } } Request request = requestQueue[head]; head = (head + 1) % requestQueue.length; count--; notifyAll(); return request; } }
channel类把传给他的请求放入队列中,等待worker去取请求,下面看看worker(即工作线程,线程池中已经初始话好的线程)
public class WorkerThread extends Thread { private final Channel channel; public WorkerThread(String name, Channel channel) { super(name); this.channel = channel; } public void run() { while (true) { Request request = channel.takeRequest();//取出请求 request.execute();//处理请求 } } } public class WorkerThread extends Thread { private final Channel channel; public WorkerThread(String name, Channel channel) { super(name); this.channel = channel; } public void run() { while (true) { Request request = channel.takeRequest();//取出请求 request.execute();//处理请求 } } }
在工作线程中会从线程池的队列里取出请求,并对请求进行处理。这里的workerthread相当于背景线程,他一直都在运行,当有请求的时候,他就会进行处理,这里处理请求的线程是已经存在在channel(线程池里的线程),他不会因为请求的增加而增加(这是本例中的情况),不会来一个请求就新建立一个线程,节省了资源。
再看看请求的代码:
public class Request { private final String name; // 委托者 private final int number; // 请求编号 private static final Random random = new Random(); public Request(String name, int number) { this.name = name; this.number = number; } public void execute() {//执行请求 System.out.println(Thread.currentThread().getName() + " executes " + this); try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { } } public String toString() { return "[ Request from " + name + " No." + number + " ]"; } } public class Request { private final String name; // 委托者 private final int number; // 请求编号 private static final Random random = new Random(); public Request(String name, int number) { this.name = name; this.number = number; } public void execute() {//执行请求 System.out.println(Thread.currentThread().getName() + " executes " + this); try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { } } public String toString() { return "[ Request from " + name + " No." + number + " ]"; } }
参考(多线程四)中所写的 ExecutorService,其就相当于channel,即线程池。至于其实现当然要比channel复杂多了,channel只是举个例子。而WorkerThread可不是工作线程,他相当于发送到channel的请求,也就是request,当执行代码:tpes.execute(workers[i]);时,相当于向线程池加入一个请求,而WorkerThread中的run则相当于request中的execute,这也是当执行tpes.execute(workers[i]);时,并不会产生新的线程的原因。(多线程四)中的写法是让人有些迷糊的。ExecutorService中产生的背景线程(相当于本篇的WorkerThread )我们是看不到的。
jdk5中的多线程还有很多需要进一步学习,其实现也反应了多线程的设计模式。本篇的worker模式只是其中的一种。
jdk5.0 多线程学习笔记(六)
关键字: jdk1.5 多线程 线程池
从前面的文章可以看出,jdk1.5为我们提供了很多线程池
这里做一下简要的说明:
类Executors,提供了一些创建线程池的方法
newFixedThreadPool(int nThreads)
创建一个可重用固定线程集合的线程池,以共享的无界队列方式来运行这些线程。
如果在关闭前的执行期间由于失败而导致任何线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。
newCachedThreadPool()
创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。
调用 execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。
终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。
注意,可以使用 ThreadPoolExecutor类 构造方法创建具有类似属性但细节不同,(例如超时参数)的线程池。
其实executors的线程池也是基于ThreadPoolExecutor扩展的。
newSingleThreadExecutor()
创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
(注意,如果因为在关闭前的执行期间出现失败而终止了此单个线程,那么如果需要,
一个新线程将代替它执行后续的任务)。
可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的。
(与其他等效的 newFixedThreadPool(1) 不同,
可保证无需重新配置此方法所返回的执行程序即可使用其他的线程。)
类ThreadPoolExecutor
Java代码
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
核心和最大池大小
corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize - 池中允许的最大线程数。
ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小。
当新任务在方法 execute(java.lang.Runnable)中提交时,
如果运行的线程少于corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的。
如果运行的线程多于corePoolSize 而少于maximumPoolSize,则仅当队列满时才创建新线程。
如果设置的corePoolSize 和maximumPoolSize 相同,则创建了固定大小的线程池。
如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),
则允许池适应任意数量的并发任务。
在大多数情况下,核心和最大池大小仅基于构造来设置,不过也可以使用 setCorePoolSize(int)和
setMaximumPoolSize(int)进行动态更改。
默认情况下,即使核心线程也只是在新任务需要时才创建和启动的。
threadFactory - 执行程序创建新线程时使用的工厂
使用 ThreadFactory创建新线程。
如果没有另外说明,则在同一个ThreadGroup中一律使用Executors.defaultThreadFactory()创建线程,
并且这些线程具有相同的 NORM_PRIORITY 优先级和非守护进程状态。
通过提供不同的ThreadFactory,可以改变线程的名称、线程组、优先级、守护进程状态,等等。
如果从 newThread 返回 null 时 ThreadFactory 未能创建线程,则执行程序将继续运行,但不能执行任何任务。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
(保持活动时间)
unit - keepAliveTime 参数的时间单位。
如果池中当前有多于corePoolSize的线程,则这些多出的线程在空闲时间超过keepAliveTime时将会终止。
这提供了当池处于非活动状态时减少资源消耗的方法。
如果池后来变得更为活动,则可以创建新的线程。
也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit)动态地更改此参数。
使用 Long.MAX_VALUE TimeUnit.NANOSECONDS的值在关闭前有效地从以前的终止状态禁用空闲线程。
workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。
所有BlockingQueue都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:
1.如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。
2.如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。
3.如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,
在这种情况下,任务将被拒绝。
排队有三种通用策略:
1.直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。
在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。
此策略可以避免在处理可能具有内部依赖性的请求集合时出现锁定。
直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。
当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
2.无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)
将导致在所有 corePoolSize 线程都忙的情况下将新任务加入队列。
这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)
当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。
这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
3.有界队列。当使用有限的 maximumPoolSizes 时,
有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。
队列大小和最大池大小可能需要相互折衷:
使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,
但是可能导致人工降低吞吐量。
如果任务频繁阻塞(例如,如果它们是 I/O 边界),
则系统可能为超过您许可的更多线程安排时间。
使用小型队列通常要求较大的池大小,CPU 使用率较高,
但是可能遇到不可接受的调度开销,这样也会降低吞吐量。
handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序 (被拒绝的任务 )
当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,
在方法 execute(java.lang.Runnable)中提交的新任务将被拒绝。
在以上两种情况下,execute 方法都将调用其 RejectedExecutionHandler的
Java代码
RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,
java.util.concurrent.ThreadPoolExecutor)
RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,
java.util.concurrent.ThreadPoolExecutor)
方法。
下面提供了四种预定义的处理程序策略:
1.在默认的 ThreadPoolExecutor.AbortPolicy中,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。
2.在 ThreadPoolExecutor.CallerRunsPolicy中,线程调用运行该任务的 execute 本身。
此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
3. 在 ThreadPoolExecutor.DiscardPolicy中,不能执行的任务将被删除。
4.在 ThreadPoolExecutor.DiscardOldestPolicy中,如果执行程序尚未关闭,
则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
定义和使用其他种类的 RejectedExecutionHandler类也是可能的,但这样做需要非常小心,
尤其是当策略仅用于特定容量或排队策略时。
这些只是jdk中的介绍,要想更好的使用线程池,就需要对这些参数有所了解。
相关推荐
使用jdk1.5 实现的线程池. 可以定制人物和其它特性. 下载后可以自己进行相关功能完善. 欢迎加QQ:934547801一起讨论
JDK1.5的线程池讲解,示例代码,很精辟~
JDK 1.5引入了java.util.concurrent包,其中包含了线程池的实现,使得并发编程更加便捷和高效。线程池的核心在于它的设计策略,包括核心线程数、最大线程数、线程存活时间、工作队列以及拒绝策略。 线程池的主要类...
JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用简介
在并发处理上,JDK1.5引入了并发工具类(java.util.concurrent),包括线程池(ExecutorService)、并发容器(如ConcurrentHashMap)以及Future接口等,这些工具极大地提高了多线程环境下的程序设计效率和性能。...
对于并发编程,JDK1.5引入了java.util.concurrent包,其中包括了线程池、Future、Callable接口以及CyclicBarrier和Semaphore等同步工具类,极大地丰富了并发处理能力,提升了多线程环境下的性能和可维护性。...
10. **NIO.2(New IO 2.0)**:虽然不是JDK1.5的一部分,但在后续的Java版本中,NIO(非阻塞I/O)进行了大量增强,增加了文件通道、文件属性和异步I/O等功能。 11. **异常链(Exception Chaining)**:当一个异常在...
"JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用" JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用是Java多线程编程中的一种重要概念。随着多线程编程的普及,线程池的使用变得...
安装程序"jdk1.5 for windows(32位)安装程序.exe"将引导用户完成配置步骤,包括选择安装目录、设置环境变量(如JAVA_HOME、PATH和CLASSPATH)以及选择是否创建桌面快捷方式。 请注意,由于JDK 1.5已经过时,可能...
**JDK1.5免安装详解** Java Development Kit(JDK)是Oracle公司提供的用于开发和运行Java应用程序的工具集合。JDK1.5,也称为Java 5.0,是一个重要的版本,它引入了许多创新特性,提升了Java编程的效率和性能。...
标题"jdk1.5 and tomcat5.5"指的是Java Development Kit (JDK) 1.5版本与Apache Tomcat 5.5版本的组合。这两个组件是开发和运行Java应用程序的关键工具,特别是对于Web应用程序。 描述中提到的"jdk-1_5_0_22-...
JDK 1.5的内存模型和并发工具也得到了增强,添加了如`java.util.concurrent`包,其中包含了线程池、同步容器、并发队列等高效并发编程工具,极大地提升了多线程环境下应用程序的性能和可维护性。 至于文件列表中的`...
在Java编程语言中,线程并发和线程池是多任务执行的核心概念,尤其是在JDK 1.5及以后的版本中得到了显著增强。线程并发允许程序同时执行多个任务,提高了程序的效率和响应性。线程池是管理线程资源的有效方式,通过...
以上就是JDK 1.5中的一些关键特性,这些改变对Java语言的发展产生了深远影响,使得Java成为更强大、更易用的开发平台。如果你正准备安装JDK 1.5,记得检查系统兼容性,遵循官方的安装指南进行操作。
**Java Development Kit (JDK)...总之,JDK 1.5.0.22 64位是一个重要的历史版本,它在Java发展历程中扮演了关键角色,引入了许多至今仍广泛使用的特性。然而,为了保持软件的最新特性和安全性,建议使用更新的JDK版本。
这些并发新特性使得在 JDK 1.5 中编写多线程程序变得更加简单和高效,减少了锁竞争,提高了并发性能,并降低了死锁和数据不一致的风险。通过熟练掌握这些工具,开发者可以更好地应对复杂的并发场景,构建健壮的并发...
《Java JDK1.5实例宝典》是一本深入解析JDK1.5版本中关键特性和类库的实践指南,涵盖了Java Mail、Servlet、JSP以及多线程等多个核心领域。这本书通过丰富的实例来帮助读者理解和掌握Java编程的精髓。 在JDK1.5中,...
在并发处理方面,JDK1.5引入了并发工具包(java.util.concurrent),包括线程池、并发容器和同步原语等,提供了高效且易于使用的多线程编程工具,大大改善了Java的并发性能。 最后,JDK1.5对垃圾回收机制进行了优化...
资源很不错