`
kqy929
  • 浏览: 19483 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

一个简单的线程池实现

阅读更多

     最近一直在写爬虫,于是写了以下一个简单的线程池。在这,抛砖引玉,忘大家多多指点。

     项目开始初,在查阅javaeye论坛中,曾看到一句这样的提示性设计:

             线程是被动,由别人(监工)分配,激发

             有一个主线程--“监工”:负责查看任务队列中是否有任务,如果有,取出一个任务,设置到一个“空闲”的线程中,并notify该线程。(http://www.iteye.com/topic/104432)

     在这,我也运用这个思想,详见下面的具体实现。

     先来看池程池类:

    

package jk.spider.core.task.threading;

import jk.spider.core.task.WorkerTask;
import jk.spider.core.task.dispatch.DispatcherTask;

import org.apache.log4j.Logger;

/**
 * 线程池
 * @author kqy
 * @date 2008-12-30
 * @version 2.0
 */

public class WorkerThreadPool extends ThreadGroup {
	protected static final Logger log = Logger.getLogger(WorkerThreadPool.class);
	protected DispatcherThread dispatcherThread;

	protected WorkerThread[] pool;

	protected int poolSize;

	public WorkerThreadPool(String poolName, String threadName, int poolSize) {
		super(poolName);
		this.poolSize = poolSize;
		//事件分发线程
		dispatcherThread = new DispatcherThread(this, threadName + " dispathcer");
		pool = new WorkerThread[poolSize];
		for (int i = 0; i < poolSize; i++) {
			pool[i] = new WorkerThread(this, threadName, i);
			synchronized (this) {
				try {
					//启动线程,随即让其休眠
					pool[i].start(); 
					wait();
				} catch (InterruptedException e) {
					Thread.currentThread().interrupt();
				}
			}
		}
	}

	/**
	 * 分配工作任务至线程池,线程池将选择工作线程执行任务
	 * 首先检查池程池中是否有空闲的工作线程,
	 * 如果存在,唤醒该线程; 否,休眠,直到有空闲线程时唤醒
	 * @param task
	 */
	public synchronized void assign(WorkerTask task) {
		while (true) {
			for (int i = 0; i < poolSize; i++) {
				if (pool[i].isAvailable()) {  //判断该线程是否可以分配任务
					pool[i].assign(task); //唤醒该线程
					return;
				}
			}
			try {
				wait();
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			}
		}
	}

	/**
	 * 分配分发任务至线程池
	 * 
	 * @param task
	 */
	public void assignGroupTask(DispatcherTask task) {
		dispatcherThread.assign(task);
	}

	/**
	 * 关闭线程池
	 * 
	 */
	public void stopAll() {
		for (int i = 0; i < pool.length; i++) {
			WorkerThread thread = pool[i];
			thread.stopRunning();
		}
	}

	public int getSize() {
		return poolSize;
	}
}

 

 

   下面再看工作线程:

   

package jk.spider.core.task.threading;

import jk.spider.core.task.WorkerTask;

import org.apache.log4j.Logger;

/**
 * 工作线程
 * @author kqy
 * @date 2008-12-30
 * @version 2.0
 */
public class WorkerThread extends Thread {
	protected static final Logger log = Logger.getLogger(WorkerThread.class);
	//空闲的线程
	public static final int WORKERTHREAD_IDLE = 0;
	//阻塞的
	public static final int WORKERTHREAD_BLOCKED = 1;
	//繁忙的
	public static final int WORKERHTREAD_BUSY = 2;
	
	protected int state;
	//是否有工作任务分配至该线程
	protected boolean assigned;
	//该线程是否被激活,正在运行
	protected boolean running;
	
	protected WorkerThreadPool pool;
	
	protected WorkerTask task;
	
	public WorkerThread(WorkerThreadPool pool, String name, int i) {
		super(pool, name + " " + i);
		this.pool = pool;
		running = false;
		assigned = false;
		state = WORKERTHREAD_IDLE;
	}
	
	/**
	 * 判断是否还可分配任务至该线程
	 * @return
	 */
	public boolean isAvailable() {
		return (!assigned) && running;
	}
	
	public boolean isOccupied() {
		return assigned;
	}
	
	/**
	 * 分配一个新的任务,并告知这个线程不接受任何新的任务
	 * @param task
	 */
	public synchronized void assign(WorkerTask task) {
		if(!running) {
			throw new RuntimeException("THREAD NOT RUNNING, CANNOT ASSIGN TASK !!!");
		}
		if(assigned) {
			throw new RuntimeException("THREAD ALREADY ASSIGNED !!!");
		}
		
		this.task = task;
		assigned = true;
		notify();
	}
	
	public int getStates() {
		return state;
	}
	
	public synchronized void run() {
		running = true;
		log.info("Worker thread ( " + this.getName() + " ) born...");
		
		synchronized(pool) {
			pool.notify(); //唤醒下一个线程
		}
		
		while(running) {
			if(assigned) {
				state = WORKERTHREAD_BLOCKED;
				task.prepare(); //前期准备
				state = WORKERHTREAD_BUSY;
				try {
					task.execute();//执行任务
				} catch (Exception e) {
					log.fatal("PANIC! Task " + task + " threw an excpetion!", e);
				}
			
				synchronized(pool) {
					assigned = false;
					task = null;
					state = WORKERTHREAD_IDLE;
					pool.notify(); //唤醒池程池中待分配的任务
					this.notify();
				}
			}
			try {
				wait();
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			}
		}
		log.info("Worker thread (" + this.getName() + ") dying");
	}
	
	/**
	 * 关闭所有线程
	 */
	public synchronized void stopRunning() {
		if( !running ) {
			throw new RuntimeException ("THREAD NOT RUNNING - CANNOT STOP !");
		}
		if ( assigned ) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        running = false;
        notify();
	}

	
}

 

   现还缺一个监工,负责分配,激发线程,这个就很简单的,主要是调用WorkerThreadPool 中的assign方法。

  

package jk.spider.core.task.threading;

import jk.spider.core.task.dispatch.DispatcherTask;

public class DispatcherThread extends Thread {
	protected DispatcherTask task;
	public DispatcherThread(ThreadGroup group, String name) {
		super(group, name);
	}
	
	public void assign(DispatcherTask task) {
		this.task = task;
		start();
	}
	
	public void run() {
		synchronized(task) {
			task.execute();
			task.notify();
		}
	}
}

 

   DispatcherTask 类,抽象类,因为在爬虫,我设计了任务分发器,都是从DispatcherTask中继承下来,在这,具体看负责分发抓取的任务分发类。

  

package jk.spider.core.task.dispatch;

import jk.spider.core.SpiderController;
import jk.spider.core.task.Task;
import jk.spider.core.task.WorkerTask;
import jk.spider.core.task.threading.WorkerThreadPool;

public abstract class DispatcherTask implements WorkerTask {
	protected SpiderController controller;
	protected WorkerThreadPool pool;
	protected boolean running;
	
	public DispatcherTask(SpiderController controller, WorkerThreadPool pool) {
		this.controller = controller;
		this.pool = pool;
		this.running = true;
	}
	
	public SpiderController getSpiderController() {
		return this.controller;
	}
	
	public void shutdown() {
		this.running = false;
	}
}

 

  DispatchSpiderTasks.java 具体分发任务,唤醒线程,化被动通知线程池执行为主动激发线程池中线程处理

 

package jk.spider.core.task.dispatch;

import jk.spider.core.SpiderController;
import jk.spider.core.task.WorkerTask;
import jk.spider.core.task.threading.WorkerThreadPool;

import org.apache.log4j.Logger;

public class DispatchSpiderTasks extends DispatcherTask {
	protected static final Logger log = Logger.getLogger(DispatchSpiderTasks.class);
	protected WorkerThreadPool spiders;

	public DispatchSpiderTasks(SpiderController controller, WorkerThreadPool spiders) {
		super(controller, spiders);
	}

	public int getType() {
		return WorkerTask.WORKERTASK_SPIDERTASK;
	}

	public void prepare() { }

	public void execute() {
		log.info("Spider task dispatcher running ...");
		while(running) {
			try {
				//从Scheduler中得到一个任务,队列是使用JDK1.5中的LinkedBlockingQueue
				spiders.assign(controller.getContent().getSpiderTask());
			} catch (InterruptedException e) {
				log.warn("DispatchSpiderTasks InterruptedException -> ", e);
				running = false;
			}
		}
		log.info("Spider task dispatcher dying ...");
	}
}

 

  Task.java  这个类,大家一看就会明白,我就不细说了。

 

package jk.spider.core.task;

/**
 * Spider Task 提供统一接口,并将任务添加至Scheduler
 * 由线程池从Scheduler中取任务
 * @author kqy
 *
 */


public interface Task {
	

	/**
	 * 执行任务,线程池将会调用该方法执行任务
	 */
	public void execute();
}

 

       写了一大篇幅,而现在JDK1.5以上都提供了线程池的实现,使用起来更加方法,但为了更扎实自己对线程的理解,在参考了一些资料自己现实该简单的线程池,收获不少,不再被其中的wait(),notify(),notifyAll()搞得晕头转向。

       实践才是真理啊...

       经过一段时间,爬虫也已完工。其中慢慢的不断改善,已具体爬虫必备的良好扩展及可配置。

       现回顾,总结,又是一次学习,发现其中很多的设计是那么的傻。

       水平未到家,继续需努力...

 

2
1
分享到:
评论
2 楼 kqy929 2009-02-26  
fjlyxx 写道

&nbsp;&nbsp;&nbsp; 有一个主线程--“监工”:负责查看任务队列中是否有任务,如果有,取出一个任务,设置到一个“空闲”的线程中,并notify该线程。非标准LF线程模型,监工何必要固定一个人做呢&nbsp; 轮流着做不好吗??

YES,GOOD
记录下了,将在下一个版本中修改这个问题。
1 楼 fjlyxx 2009-02-25  
    有一个主线程--“监工”:负责查看任务队列中是否有任务,如果有,取出一个任务,设置到一个“空闲”的线程中,并notify该线程。

非标准LF线程模型,监工何必要固定一个人做呢  轮流着做不好吗??

相关推荐

    一个简单线程池的实现

    在这个简单的线程池实现中,我们可以通过`pthread_pool.cpp`、`MainFunctionForTest.cpp`、`twork_work.cpp`、`pthread_pool.h`和`twork_work.h`这五个文件来理解其基本架构和工作原理。 首先,`pthread_pool.h`...

    这是一个使用C++实现的简易线程池.zip

    这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易线程池.zip这是一个使用C++实现的简易...

    C++11 简易线程池实现

    简易的C++11线程池实现,线程池为固定方式,后面会新增非固定模式。

    简单C++线程池实现

    本文将深入探讨如何实现一个简单的线程池,并通过提供的文件名`_threadpool.cpp`, `_promain.cpp`, `_threadpool.h`来解释其核心概念和组件。 首先,`_threadpool.h`是头文件,通常包含类定义和相关函数声明。...

    易语言真正的线程池简易实现

    易语言简易线程池的实现。 ——V雪落有声V原创。转载请保留。前文:。为了能充分理解本篇文章的内容,需要了解的知识如下:。1.事件对象的使用:http://baike.baidu.com/view/751499.htm。2.信号量的使用:...

    基于UNIX C语言的一种线程池实现.pdf

    该线程池方案的设计目标是提供一个简单、快速、可靠的线程池实现。它能够满足项目的需求,提供一个高效的线程池解决方案。 四、线程池的功能定义 该线程池方案具有以下几个功能: 1. 支持线程池中的最小工作线程数...

    C语言实现简单线程池.zip

    总的来说,这个项目提供了一个基础的C语言线程池实现,展示了如何利用POSIX线程在Linux环境下进行多线程编程。理解和实现这样的线程池不仅可以提高程序的效率,还能帮助开发者深入理解操作系统级的并发编程。

    线程池 这是一个简单的 C++11 线程池实现

    线程池 这是一个简单的 C++11 线程池实现。

    使用Vector实现简单线程池

    不过,实际生产环境中的线程池实现往往会选择更高效的数据结构,例如`BlockingQueue`,并配合`ExecutorService`的高级特性,如定时任务、工作队列大小控制等。 在实际项目中,使用标准库提供的`ExecutorService`和`...

    线程池实现,通过C语言实现

    在给定的资源中,"200行C代码实现简单线程池.doc"可能包含了详细的设计和实现步骤,以及如何在实际项目中应用线程池的文档。而"threadpool.c"则是实际的C语言源代码文件,实现了线程池的核心功能。下面我们将探讨...

    linux 实现一个简单的线程池及工作

    本实例将深入探讨如何在Linux下实现一个简单的线程池,并介绍相关的关键知识点。 1. **线程与线程池的概念** - **线程**:是操作系统分配CPU时间片的基本单位,是程序执行的流,一个进程中可以包含多个线程,它们...

    C++实现线程池详解(基于boost源码以及封装等线程池)

    一、要实现高效的线程池,可以考虑以下几点 二、实现线程池可以按照以下步骤进行 三、简单的C++线程池代码示例 四、 基于boost编写的源码库 - 线程池 4.1 基于boost编写的源码库地址 4.2 boost线程池的先进先出、...

    C语言实现的简单线程池

    C语言实现的简单线程池

    简单线程池实现,凑字数

    一个简单的线程池实现通常包括以下几个关键部分: 1. **线程池初始化**:创建线程池时,需要设定一些参数,如核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、线程存活时间(keepAliveTime)以及单位...

    简单线程池与线程池检查的实现

    标题 "简单线程池与线程池检查的实现" 涉及到的是计算机编程中的多线程管理和优化,特别是线程池的概念及其检查机制。线程池是一种多线程处理形式,预先创建一定数量的线程,放入池中,当有任务需要执行时,直接从池...

    线程池的简单实现

    线程池中的线程由`WorkThread`类实现,每个`WorkThread`对象代表一个可复用的工作线程,它们等待在任务队列中取出待处理的任务。 `Task`接口是任务的抽象,它定义了线程需要执行的操作。在实际应用中,我们可以实现...

    19 自己动手丰衣足食—简单线程池实现.pdf

    【描述】:本章节将探讨Java并发编程中的线程池实现,通过自己动手创建一个简单的线程池,来理解线程池的工作原理及其优势。 【知识点详解】: 1. **线程池的必要性**: - 并发编程的核心在于任务的并行处理,但...

    Linux下C线程池实现

    在Linux下用C写的一个简易线程池。系统是RedHat 9,gcc版本"gcc version 4.1.2 20071124 (Red Hat 4.1.2-42)"。文件夹里的源码是按工程组织好的,在文件夹下的test目录下面有一个小的测试程序和Makefile,编译后即可...

    简单的线程池的.net实现

    本项目"简单的线程池的.net实现"提供了一个自定义线程池的示例,帮助开发者更好地理解和运用线程池。 首先,线程池的主要优点在于其资源管理效率。当任务到达时,线程池会检查是否有空闲线程可供使用,如果有的话,...

    c++简单线程池的实现

    本文将深入探讨如何在Visual Studio 2015(VS2015)环境下实现一个简单的线程池,并解析提供的`TestThreadPool.sln`解决方案和`TestThreadPool`源代码文件。 线程池的基本思想是维护一组预先创建的线程,这些线程...

Global site tag (gtag.js) - Google Analytics