最近一直在写爬虫,于是写了以下一个简单的线程池。在这,抛砖引玉,忘大家多多指点。
项目开始初,在查阅javaeye论坛中,曾看到一句这样的提示性设计:
线程是被动,由别人(监工)分配,激发
有一个主线程--“监工”:负责查看任务队列中是否有任务,如果有,取出一个任务,设置到一个“空闲”的线程中,并notify该线程。(http://www.iteye.com/topic/104432)
在这,我也运用这个思想,详见下面的具体实现。
先来看池程池类:
package jk.spider.core.task.threading;
import jk.spider.core.task.WorkerTask;
import jk.spider.core.task.dispatch.DispatcherTask;
import org.apache.log4j.Logger;
/**
* 线程池
* @author kqy
* @date 2008-12-30
* @version 2.0
*/
public class WorkerThreadPool extends ThreadGroup {
protected static final Logger log = Logger.getLogger(WorkerThreadPool.class);
protected DispatcherThread dispatcherThread;
protected WorkerThread[] pool;
protected int poolSize;
public WorkerThreadPool(String poolName, String threadName, int poolSize) {
super(poolName);
this.poolSize = poolSize;
//事件分发线程
dispatcherThread = new DispatcherThread(this, threadName + " dispathcer");
pool = new WorkerThread[poolSize];
for (int i = 0; i < poolSize; i++) {
pool[i] = new WorkerThread(this, threadName, i);
synchronized (this) {
try {
//启动线程,随即让其休眠
pool[i].start();
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* 分配工作任务至线程池,线程池将选择工作线程执行任务
* 首先检查池程池中是否有空闲的工作线程,
* 如果存在,唤醒该线程; 否,休眠,直到有空闲线程时唤醒
* @param task
*/
public synchronized void assign(WorkerTask task) {
while (true) {
for (int i = 0; i < poolSize; i++) {
if (pool[i].isAvailable()) { //判断该线程是否可以分配任务
pool[i].assign(task); //唤醒该线程
return;
}
}
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
/**
* 分配分发任务至线程池
*
* @param task
*/
public void assignGroupTask(DispatcherTask task) {
dispatcherThread.assign(task);
}
/**
* 关闭线程池
*
*/
public void stopAll() {
for (int i = 0; i < pool.length; i++) {
WorkerThread thread = pool[i];
thread.stopRunning();
}
}
public int getSize() {
return poolSize;
}
}
下面再看工作线程:
package jk.spider.core.task.threading;
import jk.spider.core.task.WorkerTask;
import org.apache.log4j.Logger;
/**
* 工作线程
* @author kqy
* @date 2008-12-30
* @version 2.0
*/
public class WorkerThread extends Thread {
protected static final Logger log = Logger.getLogger(WorkerThread.class);
//空闲的线程
public static final int WORKERTHREAD_IDLE = 0;
//阻塞的
public static final int WORKERTHREAD_BLOCKED = 1;
//繁忙的
public static final int WORKERHTREAD_BUSY = 2;
protected int state;
//是否有工作任务分配至该线程
protected boolean assigned;
//该线程是否被激活,正在运行
protected boolean running;
protected WorkerThreadPool pool;
protected WorkerTask task;
public WorkerThread(WorkerThreadPool pool, String name, int i) {
super(pool, name + " " + i);
this.pool = pool;
running = false;
assigned = false;
state = WORKERTHREAD_IDLE;
}
/**
* 判断是否还可分配任务至该线程
* @return
*/
public boolean isAvailable() {
return (!assigned) && running;
}
public boolean isOccupied() {
return assigned;
}
/**
* 分配一个新的任务,并告知这个线程不接受任何新的任务
* @param task
*/
public synchronized void assign(WorkerTask task) {
if(!running) {
throw new RuntimeException("THREAD NOT RUNNING, CANNOT ASSIGN TASK !!!");
}
if(assigned) {
throw new RuntimeException("THREAD ALREADY ASSIGNED !!!");
}
this.task = task;
assigned = true;
notify();
}
public int getStates() {
return state;
}
public synchronized void run() {
running = true;
log.info("Worker thread ( " + this.getName() + " ) born...");
synchronized(pool) {
pool.notify(); //唤醒下一个线程
}
while(running) {
if(assigned) {
state = WORKERTHREAD_BLOCKED;
task.prepare(); //前期准备
state = WORKERHTREAD_BUSY;
try {
task.execute();//执行任务
} catch (Exception e) {
log.fatal("PANIC! Task " + task + " threw an excpetion!", e);
}
synchronized(pool) {
assigned = false;
task = null;
state = WORKERTHREAD_IDLE;
pool.notify(); //唤醒池程池中待分配的任务
this.notify();
}
}
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
log.info("Worker thread (" + this.getName() + ") dying");
}
/**
* 关闭所有线程
*/
public synchronized void stopRunning() {
if( !running ) {
throw new RuntimeException ("THREAD NOT RUNNING - CANNOT STOP !");
}
if ( assigned ) {
try {
this.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
running = false;
notify();
}
}
现还缺一个监工,负责分配,激发线程,这个就很简单的,主要是调用WorkerThreadPool 中的assign方法。
package jk.spider.core.task.threading;
import jk.spider.core.task.dispatch.DispatcherTask;
public class DispatcherThread extends Thread {
protected DispatcherTask task;
public DispatcherThread(ThreadGroup group, String name) {
super(group, name);
}
public void assign(DispatcherTask task) {
this.task = task;
start();
}
public void run() {
synchronized(task) {
task.execute();
task.notify();
}
}
}
DispatcherTask 类,抽象类,因为在爬虫,我设计了任务分发器,都是从DispatcherTask中继承下来,在这,具体看负责分发抓取的任务分发类。
package jk.spider.core.task.dispatch;
import jk.spider.core.SpiderController;
import jk.spider.core.task.Task;
import jk.spider.core.task.WorkerTask;
import jk.spider.core.task.threading.WorkerThreadPool;
public abstract class DispatcherTask implements WorkerTask {
protected SpiderController controller;
protected WorkerThreadPool pool;
protected boolean running;
public DispatcherTask(SpiderController controller, WorkerThreadPool pool) {
this.controller = controller;
this.pool = pool;
this.running = true;
}
public SpiderController getSpiderController() {
return this.controller;
}
public void shutdown() {
this.running = false;
}
}
DispatchSpiderTasks.java 具体分发任务,唤醒线程,化被动通知线程池执行为主动激发线程池中线程处理
package jk.spider.core.task.dispatch;
import jk.spider.core.SpiderController;
import jk.spider.core.task.WorkerTask;
import jk.spider.core.task.threading.WorkerThreadPool;
import org.apache.log4j.Logger;
public class DispatchSpiderTasks extends DispatcherTask {
protected static final Logger log = Logger.getLogger(DispatchSpiderTasks.class);
protected WorkerThreadPool spiders;
public DispatchSpiderTasks(SpiderController controller, WorkerThreadPool spiders) {
super(controller, spiders);
}
public int getType() {
return WorkerTask.WORKERTASK_SPIDERTASK;
}
public void prepare() { }
public void execute() {
log.info("Spider task dispatcher running ...");
while(running) {
try {
//从Scheduler中得到一个任务,队列是使用JDK1.5中的LinkedBlockingQueue
spiders.assign(controller.getContent().getSpiderTask());
} catch (InterruptedException e) {
log.warn("DispatchSpiderTasks InterruptedException -> ", e);
running = false;
}
}
log.info("Spider task dispatcher dying ...");
}
}
Task.java 这个类,大家一看就会明白,我就不细说了。
package jk.spider.core.task;
/**
* Spider Task 提供统一接口,并将任务添加至Scheduler
* 由线程池从Scheduler中取任务
* @author kqy
*
*/
public interface Task {
/**
* 执行任务,线程池将会调用该方法执行任务
*/
public void execute();
}
写了一大篇幅,而现在JDK1.5以上都提供了线程池的实现,使用起来更加方法,但为了更扎实自己对线程的理解,在参考了一些资料自己现实该简单的线程池,收获不少,不再被其中的wait(),notify(),notifyAll()搞得晕头转向。
实践才是真理啊...
经过一段时间,爬虫也已完工。其中慢慢的不断改善,已具体爬虫必备的良好扩展及可配置。
现回顾,总结,又是一次学习,发现其中很多的设计是那么的傻。
水平未到家,继续需努力...
分享到:
相关推荐
在这个简单的线程池实现中,我们可以通过`pthread_pool.cpp`、`MainFunctionForTest.cpp`、`twork_work.cpp`、`pthread_pool.h`和`twork_work.h`这五个文件来理解其基本架构和工作原理。 首先,`pthread_pool.h`...
这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易...
简易的C++11线程池实现,线程池为固定方式,后面会新增非固定模式。
本文将深入探讨如何实现一个简单的线程池,并通过提供的文件名`_threadpool.cpp`, `_promain.cpp`, `_threadpool.h`来解释其核心概念和组件。 首先,`_threadpool.h`是头文件,通常包含类定义和相关函数声明。...
易语言简易线程池的实现。 ——V雪落有声V原创。转载请保留。前文:。为了能充分理解本篇文章的内容,需要了解的知识如下:。1.事件对象的使用:http://baike.baidu.com/view/751499.htm。2.信号量的使用:...
该线程池方案的设计目标是提供一个简单、快速、可靠的线程池实现。它能够满足项目的需求,提供一个高效的线程池解决方案。 四、线程池的功能定义 该线程池方案具有以下几个功能: 1. 支持线程池中的最小工作线程数...
总的来说,这个项目提供了一个基础的C语言线程池实现,展示了如何利用POSIX线程在Linux环境下进行多线程编程。理解和实现这样的线程池不仅可以提高程序的效率,还能帮助开发者深入理解操作系统级的并发编程。
线程池 这是一个简单的 C++11 线程池实现。
不过,实际生产环境中的线程池实现往往会选择更高效的数据结构,例如`BlockingQueue`,并配合`ExecutorService`的高级特性,如定时任务、工作队列大小控制等。 在实际项目中,使用标准库提供的`ExecutorService`和`...
在给定的资源中,"200行C代码实现简单线程池.doc"可能包含了详细的设计和实现步骤,以及如何在实际项目中应用线程池的文档。而"threadpool.c"则是实际的C语言源代码文件,实现了线程池的核心功能。下面我们将探讨...
本实例将深入探讨如何在Linux下实现一个简单的线程池,并介绍相关的关键知识点。 1. **线程与线程池的概念** - **线程**:是操作系统分配CPU时间片的基本单位,是程序执行的流,一个进程中可以包含多个线程,它们...
一、要实现高效的线程池,可以考虑以下几点 二、实现线程池可以按照以下步骤进行 三、简单的C++线程池代码示例 四、 基于boost编写的源码库 - 线程池 4.1 基于boost编写的源码库地址 4.2 boost线程池的先进先出、...
C语言实现的简单线程池
一个简单的线程池实现通常包括以下几个关键部分: 1. **线程池初始化**:创建线程池时,需要设定一些参数,如核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、线程存活时间(keepAliveTime)以及单位...
标题 "简单线程池与线程池检查的实现" 涉及到的是计算机编程中的多线程管理和优化,特别是线程池的概念及其检查机制。线程池是一种多线程处理形式,预先创建一定数量的线程,放入池中,当有任务需要执行时,直接从池...
线程池中的线程由`WorkThread`类实现,每个`WorkThread`对象代表一个可复用的工作线程,它们等待在任务队列中取出待处理的任务。 `Task`接口是任务的抽象,它定义了线程需要执行的操作。在实际应用中,我们可以实现...
【描述】:本章节将探讨Java并发编程中的线程池实现,通过自己动手创建一个简单的线程池,来理解线程池的工作原理及其优势。 【知识点详解】: 1. **线程池的必要性**: - 并发编程的核心在于任务的并行处理,但...
在Linux下用C写的一个简易线程池。系统是RedHat 9,gcc版本"gcc version 4.1.2 20071124 (Red Hat 4.1.2-42)"。文件夹里的源码是按工程组织好的,在文件夹下的test目录下面有一个小的测试程序和Makefile,编译后即可...
本项目"简单的线程池的.net实现"提供了一个自定义线程池的示例,帮助开发者更好地理解和运用线程池。 首先,线程池的主要优点在于其资源管理效率。当任务到达时,线程池会检查是否有空闲线程可供使用,如果有的话,...
本文将深入探讨如何在Visual Studio 2015(VS2015)环境下实现一个简单的线程池,并解析提供的`TestThreadPool.sln`解决方案和`TestThreadPool`源代码文件。 线程池的基本思想是维护一组预先创建的线程,这些线程...