`

自己动手实现自定义线程池

阅读更多

老赵在前几次的POST里分析了.NET的自带线程池,由于.NET自带的线程池在底层通过win32api调用的windows的进程附带的线程池,所以对于进程,这个线程池是唯一的,而且很不幸的是很多.NET自身的操作也需要通过这个线程池来完成,比如timmer。所以我们来尝试自己写一个线程池,这个线程池不是静态的,一个进程里可以出现多个线程池的实例,我们可以随时放入要执行的操作,由于没有系统线程池的创建线程的频率的限制,对于大量突发线程的频繁操作来说自定义的线程池会比较好用。

首先我们来分析一下实现的原理。线程池,顾名思义就是在一个“池”中保存了一组可以重复利用的线程对象,从而可以节省创建线程的开销。那么首要需要解决的问题就是复用线程了。在.NET中我们创建一个线程的方式可以是:

Thread T = new Thread(Method);
<!-- Code inserted with Steve Dunn's Windows Live Writer Code Formatter Plugin. http://dunnhq.com -->

我们可以把线程看作是方法的一个包装,那么我们要复用线程就需要能够动态的改变线程体的方法。为了达到这个目的,我们需要把具体要执行的方法包装一下,这里我们就通过delegate来包装,然后用另外一个方法来代替被包装的方法称为线程体。如下:

        static Thread T = new Thread(TbodyWrapper);
        static object context;
        static WaitCallback TbodyInstance = new WaitCallback(Tbody);
        static void TbodyWrapper()
        {
            while (true)
            {
                //TO DO 阻塞线程使其挂起
                TbodyInstance(context);
            }
        }
        static void Tbody(object arg)
        {
            //TO DO 真正的线程体
        }
<!-- Code inserted with Steve Dunn's Windows Live Writer Code Formatter Plugin. http://dunnhq.com -->

如此这般,当线程被挂起的时候,我们就能够修改TbodyInstance,那么就等于修改了线程体要执行的代码,而线程被挂起之后就相当于是闲置起来了。

接下来要解决的问题就是如何阻塞线程,首先我们把上面的代码放到一个类中,我们称这个类为线程包装器类,在这个类中包装了一个Thread对象,而线程体的wapper方法就是这个类中的一个方法,我们在类中添加一个AutoResetEvent的对象,通过它我们就可以在线程体的包装方法一进入和每次执行完一次循环就阻塞,在另一个方法中set就可以唤醒线程,如下:

    class Task : IDisposable
    {

        private AutoResetEvent locks;
        private Thread T;
        public WaitCallback taskWorkItem;//真实要执行的线程体
        private bool working;
        public object contextdata
        {
            get;
            set;
        }
        public event Action<Task> WorkComplete; //当执行完成后通知线程池执行后续操作的事件

        public Task()
        {
            locks = new AutoResetEvent(false);
            T = new Thread(Work);
            T.IsBackground = true;
            working = true;
            contextdata = new object();
            T.Start();
        }

        public void Active()
        {
            locks.Set();
        }

        public void SetWorkItem(WaitCallback action, object context)
        {
            taskWorkItem = action;
            contextdata = context;
        }

        private void Work()
        {
            while (working)
            {
                locks.WaitOne();
                taskWorkItem(contextdata);
                WorkComplete(this);
            }
        }

        public void Close()
        {
            working = false;
        }

        public void Dispose()
        {
            //throw new NotImplementedException();
            try
            {
                T.Abort();
            }
            catch { }
        }
    }
<!-- Code inserted with Steve Dunn's Windows Live Writer Code Formatter Plugin. http://dunnhq.com -->

如此这般,当线程包装器初始化的时候,线程就启动并被阻塞挂起,这个时候我们可以设置线程执行的方法体委派,并且指定传递给线程的参数.当执行Active方法后线程被唤醒,并开始执行线程体,当线程体执行完之后会开始新循环并被继续挂起,如此这般周而复始。由此我们完成了对线程对象的服用。

----------------------我是分割线------------------------

接下来我们需要一个容器来保存和管理这些可复用的线程包装器,这个容器也就是所谓的线程池了。为了方便描述,以下我们简称线程包装器类为线程。

我们首先需要一个对象来保存所有已经创建出来的线程。为了同时方便定位特定的线程,我们给每个线程增加一个ID的属性,在创建的时候用GUID来赋值,这样我们就能用Dictionary<K,V>来存储所有已经创建出来的线程引用。

为了减少遍历,我们将正在工作的线程也放在一个dictionary中,最后把所有空闲的线程放在一个Queue里头。

由于不能无限量的增加线程,所以设置了最大线程数的限制,所以如果当需要执行的线程超过的时候为了不抛出异常,我们需要用一个结构来吧要执行的操作和数据加入队列,在有空闲线程的时候好取出来继续执行。

所以我们需要重点实现的就是两个过程,一个是加入线程,一个是当线程执行完毕后所执行的操作。

最后我们来看看完整实现的代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace TaskPoolTest
{
    public class TaskPool : IDisposable
    {

        private int max = 25; //最大线程数
        private int min = 4;  //最小线程数
        private int increment = 2; //当活动线程不足的时候新增线程的增量

        private Dictionary<string, Task> publicpool; //所有的线程
        private Queue<Task> freequeue;  //空闲线程队列
        private Dictionary<string, Task> working;   //正在工作的线程
        private Queue<Waititem> waitlist;  //等待执行工作队列

        //设置最大线程数
        public void Setmaxthread(int Value)
        {
            lock (this)
            {
                max = Value;
            }
        }
        //设置最小线程数
        public void Setminthread(int Value)
        {
            lock (this)
            {
                min = Value;
            }
        }
        //设置增量
        public void Setincrement(int Value)
        {
            lock (this)
            {
                increment = Value;
            }
        }
        //初始化线程池
        public TaskPool()
        {
            publicpool = new Dictionary<string, Task>();
            working = new Dictionary<string, Task>();
            freequeue = new Queue<Task>();
            waitlist = new Queue<Waititem>();
            Task t = null;
			//先创建最小线程数的线程
            for (int i = 0; i < min; i++)
            {
                t = new Task();
				//注册线程完成时触发的事件
                t.WorkComplete += new Action<Task>(t_WorkComplete);
				//加入到所有线程的字典中
                publicpool.Add(t.Key, t);
				//因为还没加入具体的工作委托就先放入空闲队列
                freequeue.Enqueue(t);
            }

        }
        //线程执行完毕后的触发事件
        void t_WorkComplete(Task obj)
        {
            lock (this)
            {
			    //首先因为工作执行完了,所以从正在工作字典里删除
                working.Remove(obj.Key);
				//检查是否有等待执行的操作,如果有等待的优先执行等待的任务
                if (waitlist.Count > 0)
                {
					//先要注销当前的线程,将其从线程字典删除
                    publicpool.Remove(obj.Key);
                    obj.Close();
					//从等待任务队列提取一个任务
                    Waititem item = waitlist.Dequeue();
                    Task nt = null;
					//如果有空闲的线程,就是用空闲的线程来处理
                    if (freequeue.Count > 0)
                    {
                        nt = freequeue.Dequeue();
                    }
                    else
                    {
						//如果没有空闲的线程就再新创建一个线程来执行
                        nt = new Task();
                        publicpool.Add(nt.Key, nt);
                        nt.WorkComplete += new Action<Task>(t_WorkComplete);
                    }
					//设置线程的执行委托对象和上下文对象
                    nt.taskWorkItem = item.Works;
                    nt.contextdata = item.Context;
					//添加到工作字典中
                    working.Add(nt.Key, nt);
					//唤醒线程开始执行
                    nt.Active();
                }
                else
                {
					//如果没有等待执行的操作就回收多余的工作线程
                    if (freequeue.Count > min)
                    {
						//当空闲线程超过最小线程数就回收多余的这一个
                        publicpool.Remove(obj.Key);
                        obj.Close();
                    }
                    else
                    {
						//如果没超过就把线程从工作字典放入空闲队列
                        obj.contextdata = null;
                        freequeue.Enqueue(obj);
                    }
                }
            }
        }
        //添加工作委托的方法
        public void AddTaskItem(WaitCallback TaskItem, object Context)
        {
            lock (this)
            {
                Task t = null;
                int len = publicpool.Values.Count;
				//如果线程没有到达最大值
                if (len < max)
                {
					//如果空闲列表非空
                    if (freequeue.Count > 0)
                    {
						//从空闲队列pop一个线程
                        t = freequeue.Dequeue();
						//加入工作字典
                        working.Add(t.Key, t);
						//设置执行委托
                        t.taskWorkItem = TaskItem;
						//设置状态对象
                        t.contextdata = Context;
						//唤醒线程开始执行
                        t.Active();
                        return;
                    }
                    else
                    {
						//如果没有空闲队列了,就根据增量创建线程
                        for (int i = 0; i < increment; i++)
                        {
						    //判断线程的总量不能超过max
                            if ((len + i) <= max)
                            {
                                t = new Task();
								//设置完成响应事件
                                t.WorkComplete += new Action<Task>(t_WorkComplete);
								//加入线程字典
                                publicpool.Add(t.Key, t);
								//加入空闲队列
                                freequeue.Enqueue(t);
                            }
                            else
                            {
                                break;
                            }
                        }
						//从空闲队列提出出来设置后开始执行
                        t = freequeue.Dequeue();
                        working.Add(t.Key, t);
                        t.taskWorkItem = TaskItem;
                        t.contextdata = Context;
                        t.Active();
                        return;
                    }
                }
                else
                {
					//如果线程达到max就把任务加入等待队列
                    waitlist.Enqueue(new Waititem() { Context = Context, Works = TaskItem });
                }
            }
        }

        //回收资源
        public void Dispose()
        {
            //throw new NotImplementedException();
            foreach (Task t in publicpool.Values)
            {
				//关闭所有的线程
                using (t) { t.Close(); }
            }
            publicpool.Clear();
            working.Clear();
            waitlist.Clear();
            freequeue.Clear();
        }
		//存储等待队列的类
        class Waititem
        {
            public WaitCallback Works { get; set; }
            public object Context { get; set; }
        }
    }
	//线程包装器类
    class Task : IDisposable
    {
        private AutoResetEvent locks; //线程锁
        private Thread T;  //线程对象
        public WaitCallback taskWorkItem; //线程体委托
        private bool working;  //线程是否工作
        public object contextdata
        {
            get;
            set;
        }
        public event Action<Task> WorkComplete;  //线程完成一次操作的事件
		//用于字典的Key
        public string Key
        {
            get;
            set;
        }
        //初始化包装器
        public Task()
        {
			//设置线程一进入就阻塞
            locks = new AutoResetEvent(false);
            Key = Guid.NewGuid().ToString();
            //初始化线程对象
            T = new Thread(Work);
            T.IsBackground = true;
            working = true;
            contextdata = new object();
			//开启线程
            T.Start();
        }
        //唤醒线程
        public void Active()
        {
            locks.Set();
        }
        //设置执行委托和状态对象
        public void SetWorkItem(WaitCallback action, object context)
        {
            taskWorkItem = action;
            contextdata = context;
        }
        //线程体包装方法
        private void Work()
        {
            while (working)
            {
				//阻塞线程
                locks.WaitOne();
                taskWorkItem(contextdata);
				//完成一次执行,触发事件
                WorkComplete(this);
            }
        }
        //关闭线程
        public void Close()
        {
            working = false;
        }
		//回收资源
        public void Dispose()
        {
            //throw new NotImplementedException();
            try
            {
                T.Abort();
            }
            catch { }
        }
    }
}
<!-- Code inserted with Steve Dunn's Windows Live Writer Code Formatter Plugin. http://dunnhq.com -->

最后看看如何使用这个线程池类:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace TaskPoolTest
{
    class Program
    {
        static void Main(string[] args)
        {
            TaskPool pool = new TaskPool();
            for (int i = 0; i < 50; i++)
            {
                pool.AddTaskItem(
                    x =>
                    {
                        for (int j = 0; j < 5; j++)
                        {
                            Thread.Sleep(10);
                            Console.WriteLine("Thread " + (int)x + " print " + j);
                        }
                    }, i);
            }
            Console.ReadKey();
        }
    }
}
<!-- Code inserted with Steve Dunn's Windows Live Writer Code Formatter Plugin. http://dunnhq.com -->

执行的结果

image

分享到:
评论

相关推荐

    19 自己动手丰衣足食—简单线程池实现.pdf

    通过自定义线程池,我们可以深入理解线程池的工作机制,这对于在实际项目中使用Java内置的`ExecutorService`或其他高级线程池实现(如`ThreadPoolExecutor`)非常有帮助。同时,这也为理解并发编程中的线程管理、...

    简单的线程池的.net实现

    本项目"简单的线程池的.net实现"提供了一个自定义线程池的示例,帮助开发者更好地理解和运用线程池。 首先,线程池的主要优点在于其资源管理效率。当任务到达时,线程池会检查是否有空闲线程可供使用,如果有的话,...

    CustomThreadPool.rar

    然而,为了更好地理解线程池的工作原理,我们可以自己动手实现一个简化版的线程池。 1. **线程池基础** - **工作原理**:线程池维护一组可重用的线程,当有任务提交时,线程池会从空闲线程中选择一个执行任务,而...

    自己动手写Tomcat

    当我们谈论“自己动手写Tomcat”时,我们实际上是在探讨如何理解和实现一个小型的Web服务器,就像Apache Tomcat那样。Tomcat是一款开源的Java Servlet容器,它实现了Java EE(现在称为Jakarta EE)中的Servlet、JSP...

    EventBusDemo

    EventBus 支持多种线程模型,如默认的 `ThreadMode.POSTING`(发布线程)、`ThreadMode.MAIN`(主线程)、`ThreadMode.BACKGROUND`(后台线程)以及自定义线程池。通过设置线程模式,可以控制事件的处理位置,确保 ...

    多线程.rar

    7. **线程池应用**:创建自定义线程池,处理大量异步任务,展示线程池如何管理和复用线程,提高系统性能。 通过这些理论讲解和实例代码,你可以全面了解多线程的概念、原理以及在实际编程中的应用。在学习过程中,...

    Android应用开发揭秘(附源码)

    可能包括Handler、Looper、AsyncTask的使用,以及如何创建和管理自定义线程池。随着Android版本的更新,学习使用协程(Coroutines)进行异步编程也是现代Android开发的重要内容。 除此之外,Android的权限管理、...

    自己动手让springboot异步处理浏览器发送的请求(只需要使用ConcurrentLinkedQueue即可)

    // 这里可以自定义一个线程池配置,例如设置核心线程数、最大线程数等 @Bean(name = "asyncExecutor") public TaskExecutor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()...

    Java经典编程50道

    《Java经典编程50道》是一份集合了Java编程中...对于每个类,阅读源码、理解其实现逻辑并尝试自己动手实现,是提高编程能力的有效途径。在学习过程中,结合Javadoc文档和官方教程,可以更深入地理解Java的相关特性。

    vc++ 应用源码包_1

    该实例可进行局域网的聊天、一对多、多对一、和多对多的传送和续传,理论上这是我本人的实现目的,而且目前经测试已基本实现了上述功能,而且网速一般有几M/S。另外有只打开一个应用程序、CRichEdit的使用、最小到...

    面向对象程序设计袁绍新-实验代码

    面向对象程序设计是一种重要的编程范式,它基于“对象”的概念,强调数据和...在学习过程中,读者应动手实践,逐步理解每个代码示例背后的原理,并尝试自己编写和改进代码,这样才能更好地掌握面向对象程序设计的精髓。

    vc++ 应用源码包_2

    该实例可进行局域网的聊天、一对多、多对一、和多对多的传送和续传,理论上这是我本人的实现目的,而且目前经测试已基本实现了上述功能,而且网速一般有几M/S。另外有只打开一个应用程序、CRichEdit的使用、最小到...

    vc++ 应用源码包_6

    该实例可进行局域网的聊天、一对多、多对一、和多对多的传送和续传,理论上这是我本人的实现目的,而且目前经测试已基本实现了上述功能,而且网速一般有几M/S。另外有只打开一个应用程序、CRichEdit的使用、最小到...

    vc++ 应用源码包_5

    该实例可进行局域网的聊天、一对多、多对一、和多对多的传送和续传,理论上这是我本人的实现目的,而且目前经测试已基本实现了上述功能,而且网速一般有几M/S。另外有只打开一个应用程序、CRichEdit的使用、最小到...

    vc++ 应用源码包_3

    该实例可进行局域网的聊天、一对多、多对一、和多对多的传送和续传,理论上这是我本人的实现目的,而且目前经测试已基本实现了上述功能,而且网速一般有几M/S。另外有只打开一个应用程序、CRichEdit的使用、最小到...

    安卓仿windows气泡动态壁纸源码

    【安卓仿Windows气泡动态壁纸源码详解】 在Android开发领域,动态壁纸是提升用户界面交互体验的一种方式,它能够使...同时,这也是一个动手实践的好项目,你可以根据自己的需求修改源码,创造出更具个性化的动态壁纸。

    WebServer_Java.rar_java webserver_javawebserver开发

    深入研究这个源代码,可以更好地理解上述知识点的实际应用,并且可以动手修改和扩展,提升自己的编程技能。 总之,这个Java WebServer项目是一个很好的学习平台,可以帮助我们理解网络编程的核心概念,提升Java IO...

    java工程基础学习资料

    - 自定义队列的数据结构实现方法。 #### 第十节:Java集合框架 - **主要内容**: - 集合框架的总体结构及分类。 - Set接口和List接口的特点及区别。 - Set接口实现类的具体用法。 - List接口的具体用法及其...

    netty实战-nettyPlayground.zip

    4. **编码与解码**:实现自定义的编解码器,例如将 Java 对象序列化为字节流,或者解析特定格式的数据包。 5. **心跳机制**:设置心跳包来检测连接是否活跃,防止因长时间无数据交换导致的连接断开。 6. **线程...

    vc++ 开发实例源码包

    注释非常详细,相信会帮助大家设计好自己的个性窗体。 C++_Primer_第4版_中文+英文 原书源码+课后习题答案。 CameraController(云界面) 实现了自绘控件,云端控制主要在CnComm类多线程串口通讯库, camerads-...

Global site tag (gtag.js) - Google Analytics