`

ThreadPool的实现

阅读更多

最近看hadoop的时候,无意中看到了concurrent包中的类,于是打算好好研究一下线程安全方面的东西。先自己实现一个线程池。然后和sun自带的threadpool比较一下,看自己的实现有什么问题。

 1. ThreadPool 类,用于存储工作线程,在构造函数中创建线程,并启动线程

package com.fnk.threadpool;

import java.util.Vector;

public class ThreadPool {
	private Vector<Thread> threads;
	private static final int DEFAULT_THREAD_SIZE = 5;
	
	ThreadPool(ThreadWorkQueue workQueue) throws InvalidThreadParamException{
		this(workQueue,DEFAULT_THREAD_SIZE);
	}
	
	ThreadPool(ThreadWorkQueue workQueue,int threadSize) throws InvalidThreadParamException{
		if(workQueue == null || threadSize <= 0){
			throw new InvalidThreadParamException();
		}
		this.threads = new Vector<Thread>();
		for(int i = 0 ; i < threadSize; i++){
			threads.add(new WorkThread(workQueue));
			threads.get(i).start();
		}
	}
}

 

 2.任务队列类

package com.fnk.threadpool;

import java.util.Vector;
/*
 * 任务队列类,用于存储任务。先到的任务,先执行
 */
public class ThreadWorkQueue {
	private Vector<WorkIntf> works;
	public  Object mutex = new Object();   
	
	ThreadWorkQueue(){
		works = new Vector<WorkIntf>();
	}
	/*
	 * 任务进队列,如果原来的队列中的任务数为0,唤醒任务线程
	 */
	public void enQueue(WorkIntf work){
		if(works.size() == 0){
			works.add(work);
			synchronized (mutex) {
				mutex.notifyAll();
			}
		}else{
			works.add(work);
		}
		
	}
	/*
	 * 任务出队列,如果队列中的任务数为0,让线程等待
	 */
	public WorkIntf deQueue(){
		if(works.size() == 0){
			synchronized (mutex) {
				try {
					mutex.wait();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			return null;
		}else{
			return works.remove(0);
		}
	}
}

 

 3.任务接口类

package com.fnk.threadpool;

public interface WorkIntf {
	public boolean doWork();
}

 4. 工作线程类

package com.fnk.threadpool;
//任务线程类 
public class WorkThread extends Thread {
	ThreadWorkQueue workQueue;

	WorkThread(ThreadWorkQueue workQueue) {
		this.workQueue = workQueue;
	}
	
	public void run() {
		while (true) {
			WorkIntf work = null;
			//获取任务,如果任务队列中,
			work = workQueue.deQueue();
			//如果 有任务 ,那么就工作 
			if (work != null) {
				work.doWork();
			}
		}
	}
}

   5. 异常类

package com.fnk.threadpool;

public class InvalidThreadParamException extends Exception {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	public InvalidThreadParamException() {
		super();
	}

	public InvalidThreadParamException(String message) {
		super(message);
	}

	public InvalidThreadParamException(String message, Throwable cause) {
		super(message, cause);
	}

	public InvalidThreadParamException(Throwable cause) {
		super(cause);
	}
}

  6. 测试

package com.fnk.threadpool;

public class TestThreadPool {
	public static void main(String[] args) {
		
		try {
			ThreadWorkQueue workQueue = new ThreadWorkQueue();
			ThreadPool tp = new ThreadPool(workQueue);
			for(int i = 0; i < 10 ; i++){
				workQueue.enQueue(new WorkImpl(i));
			}
			
		} catch (InvalidThreadParamException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

 

 

 总结:缺点是不能动态的控制线程的个数,在线程池开启的时候就必须创建所以线程。

 

 

 

下面这段代码可能会导致dequeue线程永远被阻塞,这种情况出现在dequeue变成wait状态,而works队列一直有数据,建议换成blockqueue,参照JDK的ThreadPool

	/*
	 * 任务进队列,如果原来的队列中的任务数为0,唤醒任务线程
	 */
	public void enQueue(WorkIntf work){
		if(works.size() == 0){
			works.add(work);
			synchronized (mutex) {
				mutex.notifyAll();
			}
		}else{
			works.add(work);
		}
		
	}
	/*
	 * 任务出队列,如果队列中的任务数为0,让线程等待
	 */
	public WorkIntf deQueue(){
		if(works.size() == 0){
			synchronized (mutex) {
				try {
					mutex.wait();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			return null;
		}else{
			return works.remove(0);
		}
	}
分享到:
评论

相关推荐

    基于C++使用 epoll + threadpool 实现的 webServer,支持GET、POST

    【作品名称】:基于C++使用 epoll + threadpool 实现的 webServer,支持GET、POST 【适用人群】:适用于希望学习不同技术领域的小白或进阶学习者。可作为毕设项目、课程设计、大作业、工程实训或初期项目立项。 ...

    基于C使用 epoll threadpool 实现的 we.zip

    基于C使用 epoll threadpool 实现的 we

    基于C++使用 epoll + threadpool 实现的 webServer,支持GET、POST.zip

    基于C++使用 epoll + threadpool 实现的 webServer,支持GET、POST C++是一种广泛使用的编程语言,它是由Bjarne Stroustrup于1979年在新泽西州美利山贝尔实验室开始设计开发的。C++是C语言的扩展,旨在提供更...

    python线程池threadpool实现篇

    线程池Threadpool实现篇中会涉及到以下术语和概念: 1. 工作线程(worker):线程池启动时创建指定数量的工作线程,这些线程主要负责从任务队列中获取任务并执行。 2. 任务(requests):任务是工作线程所处理的...

    简单threadpool实现

    由于最近需要用多线程处理一些问题,一开始我用了.net默认的...于是我自己实现了一个简单的ThreadPool。 写的比较简单,有兴趣的朋友一起看看,共同改进。 代码主要由ThreadPoolEx,WorkItem,WorkQueue组成。

    threadPool的实现代码

    threadPool的实现代码

    CustomThreadPool:用于服务器上的计算工作的自定义ThreadPool实现(示例计算是第N个斐波那契数)

    用于服务器上的计算工作的自定义ThreadPool实现(示例计算是第N个斐波纳契数)。 请参阅页面底部的基准测试结果。 设计注意事项: 服务器应用程序可以选择池设置,例如最小和最大线程,线程空闲时间等。 随着工作...

    DICOM:DICOM开源库多线程分析之“LF_ThreadPool in DCM4CHEE”源码

    本文将重点解析一个开源库DCM4CHEE中的LF_ThreadPool实现,这是一个线程池设计,它为高效、有序的多线程处理提供了基础。 LF_ThreadPool是DCM4CHEE项目中的一个组件,它采用了Linux内核启发的线程池设计理念。...

    ThreadPool:一个简单的C ++线程池实现

    这是C ++中的简单ThreadPool实现。 此实现提供以下功能: 每个池实例的可配置线程数。 动态线程数修改SetThreadCount(size_t) 两种ThreadPool停止模式:同步和异步 main.cpp提供了使用此ThreadPool的示例代码。 ...

    threadpool

    在给定的文件中,我们看到`threadpool.c`、`threadpool_test.c`、`threadpool.h`和`Makefile`,这表明是一个C语言实现的线程池项目。`threadpool.c`很可能包含了线程池的主体实现,包括线程池的初始化、任务的添加、...

    ThreadPool C++实现

    在给定的"ThreadPool"文件中,可能包含了实现这样一个线程池的代码,包括上述各个组件的详细实现。通过对这些代码的分析和学习,可以深入了解线程池的工作原理,以及如何在C++中有效地利用多线程技术。

    ThreadPool

    标题中的"ThreadPool"指的是线程池,这是一个编程概念,特别是在多线程编程中非常关键。线程池是一种线程使用模式,它维护着一个工作线程的集合,用于执行一系列的任务。通过线程池,可以有效地管理和控制并发执行的...

    c++ head only thread pool

    在progschj的ThreadPool实现中,线程池类定义了一个std::queue来存储待处理的任务,并通过条件变量来同步线程与任务队列的状态。 首先,我们来看ThreadPool的构造函数,它会根据指定的线程数目创建相应数量的工作...

    C++ 实现线程池ThreadPool

    线程池(ThreadPool)是多线程编程中的一个重要概念,它是一种线程使用模式,用于高效地管理和调度线程资源。在C++中实现线程池可以帮助开发者优化并发执行的任务,减少线程创建和销毁的开销,提高系统效率。下面...

    zl_threadpool, Linux平台下C (C 98、C 03、C 11)实现的线程池.zip

    《Linux平台下C语言实现线程池:zl_threadpool》 在现代计算机系统中,多线程编程已经成为提升效率和优化资源使用的关键技术。线程池作为一种管理线程的机制,能够有效地解决频繁创建和销毁线程所带来的开销,提高...

Global site tag (gtag.js) - Google Analytics