`
mlzboy
  • 浏览: 724855 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

自已实现线程池

阅读更多

.net内置的threadpool对于加入执行队列的任务,或是正在执行的任务无法取消,这对于我的项目来说有问题,因此要自定义一个线程池。

我的项目中具体的应用情节如下,某一个操作会非常耗时(将网址插入bdb中),如果将其加入线程池中,很可能将线程池中的资源耗尽,因此我希望我可以定义一个maxThreadNum用来控制执行此在操作最大允许同时执行的线程数,同时设定线程等级为最低ThreadPriority.Lowest,我只要这个操作慢慢执行就行了。

 

代码还需要重构,高手请指点下,对锁的机制还是不太清楚,

 

Code
<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Rhino.Commons;

namespace ThreadPool
{
    
public static class MyThreadPool
    {
        
static object obj = new object();
        
static AutoResetEvent wait = new AutoResetEvent(false);
        
static MyThreadPool()
        {
            System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, 
new WaitOrTimerCallback(aa), state, 2000,true);
            SetMaxThreadNum(
1);
        }

        
private static void aa(object state, bool timedOut)
        {
            
lock (obj)
            {
                
//判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
                List<string> removeKey = new List<string>();
                List
<WorkerThread> newTask=new List<WorkerThread>();
                
//Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
                foreach (var kvp in dict)
                {
//kvp.Value.ThreadState == ThreadState.Unstarted || 
                    if (kvp.Value.Thread.ThreadState == ThreadState.Suspended)
                    {
                        
//dict.Remove(kvp.Key);//cancle because of lock

                        WorkerThread a
=queue.FirstOrDefault();
                        
if (a != null)
                        {
                            removeKey.Add(kvp.Key);
                            
//addDict.Add(a.Key, kvp.Value.Change(a));
                            newTask.Add(kvp.Value.Change(a));

                            queue.RemoveAt(
0);
                            
//dict.Add(a.Key, kvp.Value.Change(a));//cancle because of lock
                            
//将参数加到线程中,并改变线程的状态
                            
//dict[a.Key].Thread.Resume();
                        }
                        
else
                            
break;
                        
//else
                        
//{
                        
//    System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
                        
//                                                            2000, true);
                        
//    return;
                        
//}

                    }
                }
                removeKey.ForEach(t
=>dict.Remove(t));
                newTask.ForEach(t 
=>
                {
                    dict.Add(t.Key, t);
                    t.Thread.Resume();
                });
                
while (queue.Count > 0 && dict.Count < MaxThreadNum)
                {
                    
//未到线程池最大池程数时,增加线程
                    WorkerThread b = queue.FirstOrDefault();
                    
if (b!=null)
                    {
                        queue.RemoveAt(
0);
                        
//Thread thd = new Thread(new ThreadStart(b.Exec));
                        
//thd.Priority = ThreadPriority.Lowest;
                        
//dict.Add(b.Key, thd);
                        
//thd.Start();
                        WorkerThread wt = new WorkerThread();
                        wt.Start(b);
                        dict.Add(wt.Key, wt);
                        wt.Thread.Start();

                        
//将参数加到线程中,并改变线程的状态
                    }


                }
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, 
new WaitOrTimerCallback(aa), state, 2000true);
            }
        }


        
//private static int _maxThreadNum = 1;
        public static int MaxThreadNum
        {
            
//get { return _maxThreadNum; }
            
//set { _maxThreadNum = value; }
            getset;
        }
        
public static void SetMaxThreadNum(int num)
        {
            
if (num < 1)
                num 
= 1;
            MaxThreadNum 
= num;
        }

        
/// <summary>
        
/// 任务执行队列
        
/// </summary>
        //static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
        
        
static List<WorkerThread> queue=new List<WorkerThread>();
        
/// <summary>
        
/// 目前暂定为只使用一个线程,以免耗近资源
        
/// </summary>
        static Dictionary<string, WorkerThread> dict = new Dictionary<string, WorkerThread>(1);

        
private static object state;

        
public static void Aborted(string key)
        {
            
lock (obj)
            {
                WorkerThread v;
                
if (dict.TryGetValue(key, out v))
                {
                    v.Thread.Abort();
                    dict.Remove(key);
                }
                
int index = queue.FindIndex(t => t.Key == key);
                
if (index>-1)
                    queue.RemoveAt(index);
                wait.Set();
            }
        }

        
public static void QueueUserWorkItem(WaitCallback callback, object state, string key)
        {
            WorkerThread p 
= new WorkerThread()
                        {
                            Callback 
= callback,
                            State 
= state,
                            Key 
= key
                        };
            
//queue.Enqueue(p);
            queue.Add(p);
            wait.Set();
        }


    }

}
public class WorkerThread
{
    
public Thread Thread { getset; }
    
public string Key { getset; }
    
public WaitCallback Callback { getset; }
    
public Object State { getset; }
    
public void Exec()
    {
        
while (true)
        {
            
this.Callback(this.State);
            
this.Thread.Suspend();
        }
    }
    
public WorkerThread Change(WorkerThread wt)
    {
        
this.Key = wt.Key;
        
this.Callback = wt.Callback;
        
this.State = wt.State;
        
return this;
    }
    
public void Start(WorkerThread wt)
    {
        
this.Change(wt);
        
this.Thread = new Thread(new ThreadStart(this.Exec));
        
this.Thread.Priority = ThreadPriority.Lowest;
    }

    
//public void Start(WaitCallback callback,Object state)
    
//{
    
//    this.Callback = callback;
    
//    this.State = state;
    
//    if(this.Thread==null){
    
//        this.Thread = new Thread(new ThreadStart(this.Exec));
    
//        this.Thread.Priority = ThreadPriority.Lowest;
    
//        this.Thread.IsBackground = true;
    
//        this.Thread.Start();
    
//        return;
    
//    }
    
//    if(this.Thread.ThreadState==ThreadState.Suspended)
    
//    {
    
//        this.Thread.Resume();
    
//    }
    
//}
}

 

测试代码

 

Code
<!--<br /><br />Code highlighting produced by Actipro CodeHighlighter (freeware)<br />http://www.CodeHighlighter.com/<br /><br />-->using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ThreadPoolTest
{
    
public class Class1
    {
        
static ManualResetEvent wait=new ManualResetEvent(false);
        
static void Main(string[] args)
        {
            
object state=new object();
            ThreadPool.MyThreadPool.QueueUserWorkItem(
new WaitCallback(test), state, "aa");
            ThreadPool.MyThreadPool.QueueUserWorkItem(
new WaitCallback(test2), state, "bb");
            System.Threading.Thread.Sleep(
10000);
            wait.Set();
            System.Threading.Thread.Sleep(
10000);
            Console.WriteLine(
"aborted aa");
            ThreadPool.MyThreadPool.Aborted(
"aa");
            ThreadPool.MyThreadPool.QueueUserWorkItem(
new WaitCallback(test3), state, "cc");
            System.Threading.Thread.Sleep(
10000);
            Console.WriteLine(
"aborted bb");
            ThreadPool.MyThreadPool.Aborted(
"bb");
          
            ThreadPool.MyThreadPool.QueueUserWorkItem(
new WaitCallback(test4), state, "dd");
            Console.ReadLine();
        }

        
private static void test4(object state)
        {
            Console.WriteLine(
"test4");
        }

        
private static void test3(object state)
        {
            Console.WriteLine(
"test3");
        }

        
private static void test2(object state)
        {
            
while(true)
            {
                Console.WriteLine(
"test2");
                Thread.Sleep(
2000);
            }
        }

        
private static void test(object state)
        {
            
while (true)
            {
                Console.WriteLine(
"test");
                Thread.Sleep(
2000);
                wait.WaitOne();
//只执行一次
            }
        }
    }
}
分享到:
评论

相关推荐

    自已实现spring ioc功能代码 jdk1.6的一些新特性

    在本主题中,我们将深入探讨如何使用JDOM库在Java中实现Spring的IoC(Inversion of Control,控制反转)功能,并结合Java 1.6的一些新特性来增强这一过程。Spring框架的核心特性之一就是其IoC容器,它负责管理对象的...

    自已写的DATASNAPA DEMO

    DataSnap是Embarcadero Delphi提供的一种强大的技术,它允许开发者构建分布式应用程序,实现数据的远程访问和处理。这个DEMO旨在展示如何在客户端和服务端之间进行数据交互,包括增删改查等基本数据库操作。 描述中...

    如何实现udp组播时,自已发组播自已不收组播,其他机子就能收到组播.但是自已发组播自已也收组播,其他机子就收不到组播.原因是?

    udp组播时: 自已发自已不收,其他机子就能收到组播. 自已发自已也收,其他机子就收不到...如何实现udp组播时,自已发组播自已不收组播,其他机子就能收到组播.但是自已发组播自已也收组播,其他机子就收不到组播.这是为什么?

    对一个二维数组进行Zig-Zag方式扫描(C++,包含一个自已实现的类)

    综上所述,本例涉及的核心知识点包括:二维数组的声明和动态分配、Zig-Zag扫描的算法实现,以及自定义类的封装。通过理解这些概念,开发者可以有效地处理类似的问题,无论是对二维数组进行特定的遍历,还是设计可...

    linux下ping程序实现

    本文将深入探讨如何在Linux环境下实现一个基本的`ping`程序,涉及的知识点包括网络套接字编程、ICMP协议、原始套接字以及信号处理。 首先,`ping`程序的核心是使用原始套接字(`SOCK_RAW`)来发送和接收Internet...

    文本框加下拉可实现combox样式,可自已编辑,一个文本框加下拉

    可实现combox样式,可自已编辑,一个文本框加下拉

    毕业设计-(基于python和定向爬虫的商品比价系统的实现)

    (基于python3.6和定向爬虫的商品比价系统的实现) 三个文件夹: first文件夹是用面向对象实现的代码,实现了数据库和图形界面。 second使用面向过程的代码,实现了数据库和图形界面。 third是面向过程的代码,实现...

    做自已的网盘

    标题“做自已的网盘”暗示我们正在讨论一个个人或小型团队使用的本地网络存储解决方案,它可以替代商业网盘服务,提供数据共享和权限管理功能。这个系统是绿色免安装的,意味着它不需要复杂的安装过程,只需在拥有...

    自已写记事本 MyTXT

    总的来说,“自已写记事本 MyTXT”涵盖了文本编辑器的基本实现、Windows平台下的程序开发、注册表操作等多个方面,对于学习和理解计算机程序设计有着实际的价值。无论是新手还是经验丰富的开发者,都能从中获取不同...

    pb自已做的视频监控

    在本项目“pb自已做的视频监控”中,开发者利用PB9.0版本创建了一个视频监控系统。下面将详细阐述PB9.0在视频监控系统开发中的应用、相关技术及可能涉及的知识点。 1. PowerBuilder 9.0 简介: PowerBuilder 9.0是...

    ViewPager+自定义控件实现的日历控件CalenderView

    1. 创建一个新的View类,继承自已有的View或者 ViewGroup。 2. 在新类中重写必要的方法,如onDraw()来绘制视图,onMeasure()来测量尺寸,onLayout()来布局子视图等。 3. 实现特定的功能,例如在这个日历控件中,可能...

    JSP网站模板真正实现自已架设网站

    在构建Web应用时,JSP(JavaServer Pages)是一种广泛使用的服务器端脚本语言,它允许开发者使用HTML或XML语法中嵌入Java代码,从而动态生成网页内容。"JSP网站模板"则是一种预设计的网页布局和样式,用于简化和加速...

    chboss自已用API访问rpt

    在IT行业中,API(应用程序编程...总的来说,"chboss自已用API访问rpt"的主题涵盖了API接口的使用、HTTP协议的基础、数据交换格式的理解以及实际的编程实现。这是一个涉及技术理解、文档阅读和问题解决的综合实践过程。

    自已写的图形项界面, 主要功能就是在一个view中,显示item,并可对item进行操作,还有些缩放平移功能没有实现,没有动画效

    自已写的图形项界面, 主要功能就是在一个view中,显示item,并可对item进行操作,还有些缩放平移功能没有实现,没有动画效果.zip

    基于Python+MongoDB 实现(Web)当代数据管理系统(网上书城)【100010289】

    实现一个提供网上购书功能的网站后端。 网站支持书商在上面开商店,购买者可能通过网站购买。 买家和卖家都可以注册自己的账号。 一个卖家可以开一个或多个网上商店, 买家可以为自已的账户充值,在任意商店购买图书...

    能够自已设定时间,到时间就自动关机的软件!!!

    能够自已设定时间,到时间就自动关机!!!

    用Dreamweaver MX打造自已的Blog(CHM)

    《用Dreamweaver MX打造自已的Blog》是关于使用Adobe Dreamweaver MX这款强大的网页设计工具创建个人博客的教程。Dreamweaver MX是2002年版本的Dreamweaver,它集成了HTML编辑、视觉设计和网站管理功能,为用户提供...

    自已写上传组件

    标题中的“自已写上传组件”意味着我们要讨论的是如何创建一个自定义的文件上传功能,这通常涉及到前端和后端的交互,以及处理文件上传的流程。在IT领域,文件上传是网页应用的一个常见功能,它允许用户将本地文件...

    自已写的Webmvc框架

    标题为“自已写的Webmvc框架”,这表明作者尝试创建了一个类似Spring MVC的自定义框架,用于处理Web应用程序的请求、路由和视图渲染。这里我们将详细探讨这一主题,包括相关的技术、设计原则和实现步骤。 1. **MVC...

    自已制作与应用模块简化_集成常用JS网页对话框[代码实现:实例版]

    1、集成提示对话框、用户选择对话框、用户单信息输入对话框和用户多...2、完全用JS+CSS样式实现; 3、实例版:文件中实现了各个对话框的功能转换操作,并且也给出了参数调用方法; 4、在使用中请尊重作者劳动成果。

Global site tag (gtag.js) - Google Analytics