`

Java 线程池的原理与实现

 
阅读更多

最近在学习线程池、内存控制等关于提高程序运行性能方面的编程技术,在网上看到有一哥们写得不错,故和大家一起分享。

[分享]Java 线程池的原理与实现


这几天主要是狂看源程序,在弥补了一些以前知识空白的同时,也学会了不少新的知识(比如 NIO),或者称为新技术吧。
线程池就是其中之一,一提到线程,我们会想到以前《操作系统》的生产者与消费者,信号量,同步控制等等。
一提到池,我们会想到数据库连接池,但是线程池又如何呢?


建议 :在阅读本文前,先理一理同步的知识,特别是syncronized同步关键字的用法。
关于我对同步的认识,要缘于大三年的一本书,书名好像是 Java 实战,这本书写得实在太妙了,真正的从理论到实践,从截图分析到.class字节码分析。哇,我想市场上很难买到这么精致的书了。作为一个Java爱好者,我觉得绝对值得一读。
我对此书印象最深之一的就是:equal()方法,由浅入深,经典!
还有就是同步了,其中提到了我的几个编程误区,以前如何使用同步提高性能等等,通过学习,使我对同步的认识进一步加深。


简单介绍

创建线程有两种方式:继承Thread或实现Runnable。Thread实现了Runnable接口,提供了一个空的run()方法,所以不论是继承Thread还是实现Runnable,都要有自己的run()方法。
一个线程创建后就存在,调用start()方法就开始运行(执行run()方法),调用wait进入等待或调用sleep进入休眠期,顺利运行完毕或休眠被中断或运行过程中出现异常而退出。

wait和sleep比较:
sleep方法有:sleep(long millis),sleep(long millis, long nanos),调用sleep方法后,当前线程进入休眠期,暂停执行,但该线程继续拥有监视资源的所有权。到达休眠时间后线程将继续执行,直到完成。若在休眠期另一线程中断该线程,则该线程退出。
wait方法有:wait(),wait(long timeout),wait(long timeout, long nanos),调用wait方法后,该线程放弃监视资源的所有权进入等待状态;
wait():等待有其它的线程调用notify()或notifyAll()进入调度状态,与其它线程共同争夺监视。wait()相当于wait(0),wait(0, 0)。
wait(long timeout):当其它线程调用notify()或notifyAll(),或时间到达timeout亳秒,或有其它某线程中断该线程,则该线程进入调度状态。
wait(long timeout, long nanos):相当于wait(1000000*timeout + nanos),只不过时间单位为纳秒。



线程池:
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。

如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
一个线程池包括以下四个基本组成部分:
1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

 

001 /** 线程池类,工作线程作为其内部类 **/
002
003 package org.ymcn.util;
004
005 import java.util.Collections;
006 import java.util.Date;
007 import java.util.LinkedList;
008 import java.util.List;
009
010 import org.apache.log4j.Logger;
011
012 /**
013 * 线程池
014 * 创建线程池,销毁线程池,添加新任务
015 *
016 * <SPAN class=referer>@author</SPAN> obullxl
017 */
018 public final class ThreadPool {
019 private static Logger logger = Logger.getLogger(ThreadPool.class);
020 private static Logger taskLogger = Logger.getLogger("TaskLogger");
021
022 private static boolean debug = taskLogger.isDebugEnabled();
023 // private static boolean debug = taskLogger.isInfoEnabled();
024 /* 单例 */
025 private static ThreadPool instance = ThreadPool.getInstance();
026
027 public static final int SYSTEM_BUSY_TASK_COUNT = 150;
028 /* 默认池中线程数 */
029 public static int worker_num = 5;
030 /* 已经处理的任务数 */
031 private static int taskCounter = 0;
032
033 public static boolean systemIsBusy = false;
034
035 private static List<Task> taskQueue = Collections
036 .synchronizedList(new LinkedList<Task>());
037 /* 池中的所有线程 */
038 public PoolWorker[] workers;
039
040 private ThreadPool() {
041 workers = new PoolWorker[5];
042 for (int i = 0; i < workers.length; i++) {
043 workers[i] = new PoolWorker(i);
044 }
045 }
046
047 private ThreadPool(int pool_worker_num) {
048 worker_num = pool_worker_num;
049 workers = new PoolWorker[worker_num];
050 for (int i = 0; i < workers.length; i++) {
051 workers[i] = new PoolWorker(i);
052 }
053 }
054
055 public static synchronized ThreadPool getInstance() {
056 if (instance == null)
057 return new ThreadPool();
058 return instance;
059 }
060 /**
061 * 增加新的任务
062 * 每增加一个新任务,都要唤醒任务队列
063 * @param newTask
064 */
065 public void addTask(Task newTask) {
066 synchronized (taskQueue) {
067 newTask.setTaskId(++taskCounter);
068 newTask.setSubmitTime(new Date());
069 taskQueue.add(newTask);
070 /* 唤醒队列, 开始执行 */
071 taskQueue.notifyAll();
072 }
073 logger.info("Submit Task<" + newTask.getTaskId() + ">: "
074 + newTask.info());
075 }
076 /**
077 * 批量增加新任务
078 * @param taskes
079 */
080 public void batchAddTask(Task[] taskes) {
081 if (taskes == null || taskes.length == 0) {
082 return;
083 }
084 synchronized (taskQueue) {
085 for (int i = 0; i < taskes.length; i++) {
086 if (taskes[i] == null) {
087 continue;
088 }
089 taskes[i].setTaskId(++taskCounter);
090 taskes[i].setSubmitTime(new Date());
091 taskQueue.add(taskes[i]);
092 }
093 /* 唤醒队列, 开始执行 */
094 taskQueue.notifyAll();
095 }
096 for (int i = 0; i < taskes.length; i++) {
097 if (taskes[i] == null) {
098 continue;
099 }
100 logger.info("Submit Task<" + taskes[i].getTaskId() + ">: "
101 + taskes[i].info());
102 }
103 }
104 /**
105 * 线程池信息
106 * <SPAN class=referer>@return</SPAN>
107 */
108 public String getInfo() {
109 StringBuffer sb = new StringBuffer();
110 sb.append("\nTask Queue Size:" + taskQueue.size());
111 for (int i = 0; i < workers.length; i++) {
112 sb.append("\nWorker " + i + " is "
113 + ((workers[i].isWaiting()) ? "Waiting." : "Running."));
114 }
115 return sb.toString();
116 }
117 /**
118 * 销毁线程池
119 */
120 public synchronized void destroy() {
121 for (int i = 0; i < worker_num; i++) {
122 workers[i].stopWorker();
123 workers[i] = null;
124 }
125 taskQueue.clear();
126 }
127
128 /**
129 * 池中工作线程
130 *
131 * <SPAN class=referer>@author</SPAN> obullxl
132 */
133 private class PoolWorker extends Thread {
134 private int index = -1;
135 /* 该工作线程是否有效 */
136 private boolean isRunning = true;
137 /* 该工作线程是否可以执行新任务 */
138 private boolean isWaiting = true;
139
140 public PoolWorker(int index) {
141 this.index = index;
142 start();
143 }
144
145 public void stopWorker() {
146 this.isRunning = false;
147 }
148
149 public boolean isWaiting() {
150 return this.isWaiting;
151 }
152 /**
153 * 循环执行任务
154 * 这也许是线程池的关键所在
155 */
156 public void run() {
157 while (isRunning) {
158 Task r = null;
159 synchronized (taskQueue) {
160 while (taskQueue.isEmpty()) {
161 try {
162 /* 任务队列为空,则等待有新任务加入从而被唤醒 */
163 taskQueue.wait(20);
164 } catch (InterruptedException ie) {
165 logger.error(ie);
166 }
167 }
168 /* 取出任务执行 */
169 r = (Task) taskQueue.remove(0);
170 }
171 if (r != null) {
172 isWaiting = false;
173 try {
174 if (debug) {
175 r.setBeginExceuteTime(new Date());
176 taskLogger.debug("Worker<" + index
177 + "> start execute Task<" + r.getTaskId() + ">");
178 if (r.getBeginExceuteTime().getTime()
179 - r.getSubmitTime().getTime() > 1000)
180 taskLogger.debug("longer waiting time. "
181 + r.info() + ",<" + index + ">,time:"
182 + (r.getFinishTime().getTime() - r
183 .getBeginExceuteTime().getTime()));
184 }
185 /* 该任务是否需要立即执行 */
186 if (r.needExecuteImmediate()) {
187 new Thread(r).start();
188 } else {
189 r.run();
190 }
191 if (debug) {
192 r.setFinishTime(new Date());
193 taskLogger.debug("Worker<" + index
194 + "> finish task<" + r.getTaskId() + ">");
195 if (r.getFinishTime().getTime()
196 - r.getBeginExceuteTime().getTime() > 1000)
197 taskLogger.debug("longer execution time. "
198 + r.info() + ",<" + index + ">,time:"
199 + (r.getFinishTime().getTime() - r
200 .getBeginExceuteTime().getTime()));
201 }
202 } catch (Exception e) {
203 e.printStackTrace();
204 logger.error(e);
205 }
206 isWaiting = true;
207 r = null;
208 }
209 }
210 }
211 }
212 }
213
214 /** 任务接口类 **/
215
216 package org.ymcn.util;
217
218 import java.util.Date;
219
220 /**
221 * 所有任务接口
222 * 其他任务必须继承访类
223 *
224 * <SPAN class=referer>@author</SPAN> obullxl
225 */
226 public abstract class Task implements Runnable {
227 // private static Logger logger = Logger.getLogger(Task.class);
228 /* 产生时间 */
229 private Date generateTime = null;
230 /* 提交执行时间 */
231 private Date submitTime = null;
232 /* 开始执行时间 */
233 private Date beginExceuteTime = null;
234 /* 执行完成时间 */
235 private Date finishTime = null;
236
237 private long taskId;
238
239 public Task() {
240 this.generateTime = new Date();
241 }
242
243 /**
244 * 任务执行入口
245 */
246 public void run() {
247 /**
248 * 相关执行代码
249 *
250 * beginTransaction();
251 *
252 * 执行过程中可能产生新的任务 subtask = taskCore();
253 *
254 * commitTransaction();
255 *
256 * 增加新产生的任务 ThreadPool.getInstance().batchAddTask(taskCore());
257 */
258 }
259
260 /**
261 * 所有任务的核心 所以特别的业务逻辑执行之处
262 *
263 * @throws Exception
264 */
265 public abstract Task[] taskCore() throws Exception;
266
267 /**
268 * 是否用到数据库
269 *
270 * <SPAN class=referer>@return</SPAN>
271 */
272 protected abstract boolean useDb();
273
274 /**
275 * 是否需要立即执行
276 *
277 * <SPAN class=referer>@return</SPAN>
278 */
279 protected abstract boolean needExecuteImmediate();
280
281 /**
282 * 任务信息
283 *
284 * <SPAN class=referer>@return</SPAN> String
285 */
286 public abstract String info();
287
288 public Date getGenerateTime() {
289 return generateTime;
290 }
291
292 public Date getBeginExceuteTime() {
293 return beginExceuteTime;
294 }
295
296 public void setBeginExceuteTime(Date beginExceuteTime) {
297 this.beginExceuteTime = beginExceuteTime;
298 }
299
300 public Date getFinishTime() {
301 return finishTime;
302 }
303
304 public void setFinishTime(Date finishTime) {
305 this.finishTime = finishTime;
306 }
307
308 public Date getSubmitTime() {
309 return submitTime;
310 }
311
312 public void setSubmitTime(Date submitTime) {
313 this.submitTime = submitTime;
314 }
315
316 public long getTaskId() {
317 return taskId;
318 }
319
320 public void setTaskId(long taskId) {
321 this.taskId = taskId;
322 }
323
324 }
分享到:
评论

相关推荐

    JAVA线程池的原理与实现.pdf

    Java线程池是一种高效利用系统资源、管理并发执行任务的机制。...总的来说,理解Java线程池的工作原理和实现对于优化并发应用程序至关重要,它可以帮助我们更好地控制系统的并发度,提高系统的响应速度和资源利用率。

    JAVA线程池原理以及几种线程池类型介绍

    ### JAVA线程池原理及几种线程池类型的详细介绍 #### 一、线程池的引入背景及重要性 在现代软件开发中,特别是在基于Java的应用程序设计中,线程池技术已经成为提高系统性能和资源利用率的关键手段之一。线程池...

    JAVA线程池原理以及几种线程池类型介绍.doc

    Java线程池是一种高效管理线程的工具,它允许开发者预先创建一组线程,并复用它们来处理任务,从而降低了创建和销毁线程的开销。线程池的使用尤其适用于处理大量短小任务的场景,例如Web服务器、数据库服务器等。在...

    java线程池封装j

    本文将深入探讨Java线程池的原理、封装以及相关知识点。 ### 1. Java线程池概念 Java线程池由`java.util.concurrent`包中的`ExecutorService`接口和其子类实现。其中,最常用的是`ThreadPoolExecutor`类,它提供了...

    自定义实现Java线程池

    ### 自定义实现Java线程池 #### 一、概述 在深入探讨自定义Java线程池之前,我们先简要回顾一下线程池的基本概念及其重要性。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动...

    java 线程池实现多并发队列后进先出

    Java线程池是一种高效管理并发任务的机制,它允许开发者预先配置一定数量的线程,以便在处理多个并发任务时能有效地复用这些线程,从而避免了频繁创建和销毁线程带来的开销。在Java中,`java.util.concurrent`包下的...

    基于Java线程池技术的数据爬虫设计与实现.pdf

    本文所提及的基于Java线程池技术的数据爬虫设计与实现,不仅涉及到了数据爬虫的原理和架构,还包括了多线程编程的知识点,以及线程池技术在数据爬虫中的具体应用。 首先,数据爬虫的基本原理是模拟用户的点击行为,...

    Java线程池文档

    Java线程池是一种高效管理线程的机制,它允许开发者预先设定线程的数量,并通过池化的方式重用已创建的线程,以提高系统性能,减少线程的创建和销毁开销。线程池在Java中是通过`java.util.concurrent`包下的`...

    Java版线程池实现

    Java线程池是一种高效管理并发任务执行的机制,它通过预先创建并维护一定数量的线程,从而避免了频繁地创建和销毁线程所带来的性能开销。在Java中,线程池的实现主要依赖于`java.util.concurrent`包中的`...

    自定义实现Java线程池1-模拟jdk线程池执行流程1

    【自定义Java线程池实现】 在Java编程中,线程池是一种高效管理线程资源的方式,可以提高系统的性能和响应速度。本篇将探讨如何模拟Java的JDK线程池执行流程,以理解其设计原理。核心知识点包括线程池的执行策略、...

    资料:Java线程池实现原理及其在美团业务中的实践

    资料:Java线程池实现原理及其在美团业务中的实践

    java线程池实例

    Java线程池是一种高效管理线程资源的工具,它通过维护一组可重用的线程来减少创建和销毁线程的开销。在Java中,`java.util.concurrent`包提供了`ExecutorService`接口和它的实现类,如`ThreadPoolExecutor`,来支持...

    Java实现的线程池、消息队列功能

    标题中的“Java实现的...总的来说,这个主题涵盖了Java并发编程中的核心概念,线程池和消息队列的实现原理及应用场景,以及可能用到的相关开发工具。理解并熟练运用这些知识对于提升Java应用的性能和稳定性至关重要。

    JAVA线程池例子

    Java线程池是一种高效管理线程资源的技术,它允许开发者创建一组可重用的工作线程,从而避免频繁地创建和销毁线程带来的性能开销。线程池在Java中主要通过`java.util.concurrent`包中的`ExecutorService`接口及其...

    JAVA线程池原理实例详解

    JAVA线程池原理实例详解 JAVA线程池是java中的一种高效的线程管理机制,它可以控制线程的创建、销毁和复用,从而提高系统的性能和可靠性。下面我们将详细介绍JAVA线程池的原理、创建、使用方法及相关注意事项。 一...

    Java线程池及观察者模式解决多线程意外死亡重启问题

    Java线程池是Java并发编程中的重要组成部分,它允许开发者高效地管理多个并发执行的线程,有效地控制系统的资源消耗,提高系统性能和稳定性。在Java中,`java.util.concurrent`包提供了`ExecutorService`接口及其...

    Java线程池.pdf

    ### Java线程池详解 #### 引言 在现代计算机科学中,线程作为轻量级的进程,已经成为操作系统和应用程序提高并发性、优化资源...理解和掌握Java线程池的原理和使用方法,对于开发高性能、高可靠的Java应用至关重要。

    线程池的实现(JAVA)

    在Java中,`java.util.concurrent` 包提供了对线程池的支持,主要由`ExecutorService`、`ThreadPoolExecutor` 和 `Executors` 类来实现。 1. **ExecutorService**: 这是线程池的核心接口,定义了线程池的主要操作,...

Global site tag (gtag.js) - Google Analytics