`
mlzboy
  • 浏览: 724880 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

自制线程池4

阅读更多

需求:

有一种任务需要定时的执行,而且非常的耗时,因此我把它放到线程池中执行,并设置线程池为1,如果该任务已经在队列中或正在执行该任务,则不要再将该任务加入线程池中了。

测试代码如下

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> --> 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading;
 6 using ThreadPool2;
 7 
 8 namespace ThreadPoolTest.MyThreadPool2Test
 9 {
10     class Class6
11     {
12         static void Main(string[] args)
13         {
14             MyThreadPool2 pool=new MyThreadPool2(1,true,30000);
15             object obj=new object();
16             Random rnd=new Random();
17             for (var i = 0; i < 20;i++ )
18                 pool.QueueUserWorkItem(call, obj, rnd.Next(1,4).ToString(), succ, err);
19             Console.ReadLine();
20         }
21 
22         private static void err(object state)
23         {
24                 Console.WriteLine("err");
25         }
26 
27         private static void succ(object state, object result)
28         {
29             Console.WriteLine("succ");
30         }
31 
32         private static object call(object state)
33         {
34             while(true)
35             {
36                 Thread.Sleep(2000);
37                 Console.WriteLine("exec");
38             }
39         }
40     }
41 }
42 

 

线程池代码如下,

 

<!--<br /> <br /> Code highlighting produced by Actipro CodeHighlighter (freeware)<br /> http://www.CodeHighlighter.com/<br /> <br /> -->using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using Amib.Threading.Internal;
using Rhino.Commons;

namespace ThreadPool2
{
    
public delegate object WaitCallback2(object state);
    
public delegate void SuccCallback(object state, object result);
    
public delegate void ErrCallback(object state);
    
/**//// <summary>
    
/// 此线程池的作用是将某一类特殊的任务交给此线程池执行,
    
/// 可以设定该线程池的最大线程数,
    
/// 这类线程池的优点时,占用的资源少,优先级低,
    
/// 适合于执行任务需要长期执行,不考虑时间因素的任务
    
/// 同时根据在传入线程池时的标记key,可以Aborted指定任务,
    
/// 若该任务正在执行或尚在执行队列中
    
/// </summary>

    public class MyThreadPool2
    
{
        
/**//// <summary>
        
/// 任务执行队列
        
/// </summary>

        //static ThreadSafeQueue<WorkerThread> queue = new ThreadSafeQueue<WorkerThread>();
        List<WorkerThread> queue = new List<WorkerThread>();
        
/**//// <summary>
        
/// 目前暂定为只使用一个线程,以免耗近资源
        
/// </summary>

        SynchronizedDictionary<string, WorkerThread> dict = new SynchronizedDictionary<string, WorkerThread>();
        
private object state;
        AutoResetEvent wait 
= new AutoResetEvent(false);
        AutoResetEvent wait2 
= new AutoResetEvent(false);
        
private int MaxLimitedTime getset; }
        
private bool IsLimitedExecTime getset; }
        
private int IdleTimeout getset; }
        
//private static int _maxThreadNum = 1;
        private int MaxThreadNum
        
{
            
//get { return _maxThreadNum; }
            
//set { _maxThreadNum = value; }
            get;
            
set;
        }

        
private MyThreadPool2()
        
{
            
//System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state, 2000,true);
            
//SetMaxThreadNum(2);
            
//SetMaxExecTime(false, 10000);
        }

        
/**//// <summary>
        
/// 设置专用线程池的初始参数
        
/// </summary>
        
/// <param name="num">线程池的最大线程数,最小为1</param>
        
/// <param name="b">是否起用限制最大单个任务执行时间设定</param>
        
/// <param name="MaxLimitedTime">单个任务执行的最大时间</param>

        public MyThreadPool2(int num, bool b, int MaxLimitedTime)
        
{
            System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, 
new WaitOrTimerCallback(aa), state, 2000true);
            
if (num < 1)
                num 
= 1;
            MaxThreadNum 
= num;
            IsLimitedExecTime 
= b;
            
this.MaxLimitedTime = MaxLimitedTime;
            
if (IsLimitedExecTime)
                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait2, 
new WaitOrTimerCallback(bb), state,
                                                                        
this.MaxLimitedTime, true);
        }


        
/**//// <summary>
        
/// 定时将队列中的数据装载到线程中执行,如果还没有到达最大线程数还有任务则创建线程
        
/// </summary>
        
/// <param name="state"></param>
        
/// <param name="timedOut"></param>

        private void aa(object state, bool timedOut)
        
{
            
//Console.WriteLine("执行aa()将队列中的任务加到线程中");
            lock(WorkerThread.Manual){
            WorkerThread.Manual.Reset();
            
lock (queue)
            
{
                Console.WriteLine(
"queue count={0}",queue.Count);
                
//判断任务队列中有无积压的任务且有无空闲的线程,如果符合上述条件则执行之
                List<string> removeKey = new List<string>();
                List
<WorkerThread> newTask = new List<WorkerThread>();
                List
<string> tasks = new List<string>();
                
//Dictionary<string,WorkerThread> addDict=new Dictionary<string, WorkerThread>();
                foreach (var kvp in dict)
                
{//kvp.Value.ThreadState == ThreadState.Unstarted || 
                    
//if (kvp.Value.Thread.ThreadState == ThreadState.Suspended)

                    
//将不活动的线程记录下来并移除
                    if (!kvp.Value.Thread.IsAlive)
                        tasks.Add(kvp.Key);
                    
//将活动且空闲的线程赋于新的任务
                    if (kvp.Value.Thread.IsAlive == true && kvp.Value.CurrentThreadState == WorkerThreadState.Idle)
                    
{
                        
//dict.Remove(kvp.Key);//cancle because of lock

                        WorkerThread a 
= queue.FirstOrDefault();
                        
if (a != null)
                        
{
                            removeKey.Add(kvp.Key);
                            
//addDict.Add(a.Key, kvp.Value.Change(a));
                            newTask.Add(kvp.Value.Change(a));
                            
//a.Thread = kvp.Value.Thread;
                            
//newTask.Add(a);
                            queue.RemoveAt(0);
                            
//dict.Add(a.Key, kvp.Value.Change(a));//cancle because of lock
                            
//将参数加到线程中,并改变线程的状态
                            
//dict[a.Key].Thread.Resume();
                        }

                        
else
                            
break;
                        
//else
                        
//{
                        
//    System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, new WaitOrTimerCallback(aa), state,
                        
//                                                            2000, true);
                        
//    return;
                        
//}

                    }

                }

                tasks.ForEach(t 
=> 
                

                    dict.Remove(t);
                    Debug.WriteLine(
"移除销毁线程对应的dict中的键值项,key="+t);
                }
);
                removeKey.ForEach(t 
=> dict.Remove(t));
                newTask.ForEach(t 
=>
                
{
                    Debug.WriteLine(
"复用线程用于执行新任务"+t.Key);
                    dict.Add(t.Key, t);
                    
//t.StartExecTime = DateTime.Now;
                    t.Auto.Set();
                    
//t.CurrentThreadState = WorkerThreadState.Busy;
                    
//t.Thread.Resume();
                }
);
                
while (queue.Count > 0 && dict.Count < MaxThreadNum)
                
{
                    
//未到线程池最大池程数时,增加线程
                    WorkerThread b = queue.FirstOrDefault();
                    
if (b != null)
                    
{
                        queue.RemoveAt(
0);
                        
//Thread thd = new Thread(new ThreadStart(b.Exec));
                        
//thd.Priority = ThreadPriority.Lowest;
                        
//dict.Add(b.Key, thd);
                        
//thd.Start();
                        WorkerThread wt = new WorkerThread();
                        wt.Start(b);
                        dict.Add(wt.Key, wt);
                        wt.Thread.Start();
                        Debug.WriteLine(
"新建线程用于执行新任务"+ wt.Key);


                        
//将参数加到线程中,并改变线程的状态
                    }



                }

                System.Threading.ThreadPool.RegisterWaitForSingleObject(wait, 
new WaitOrTimerCallback(aa), state, 2000,
http://www.cnblogs.com/Imag
分享到:
评论

相关推荐

    Asynchronous-Server:使用非阻塞 IO 和自制线程池实现的异步服务器,能够使用有限的线程集处理数百个并发网络连接

    4. **自制线程池**:虽然Java提供内置的线程池实现,但有时根据特定需求,开发者可能需要自定义线程池。自定义线程池可以精确控制线程的数量、工作队列的大小、线程的优先级等,以满足特定性能和资源管理的要求。 5...

    自制网盘搜索器Ⅱ(使用CefSharp内嵌Chrome浏览器内核)

    开发者可能需要使用到线程池来并发处理多个搜索请求,提高响应速度,同时也要处理好异常情况,保证程序的稳定运行。 在实际使用中,XiLinJieSearch这个名字可能代表了该搜索器的特定版本或作者的标识。用户在使用时...

    CLR.via.C#.(中文第3版)(自制详细书签)

    CLR.via.C#.(中文第3版)(自制详细书签)Part2 CLR via C#(第3版) Jeffrey Richter 著 周靖 译 出版时间:2010年09月 页数:800 介绍 享有全球盛誉的编程专家Jeffrey Richter,这位与Microsoft .NET开发团队...

    CLR.via.C#.(中文第3版)(自制详细书签)Part1

    CLR.via.C#.(中文第3版)(自制详细书签) CLR via C#(第3版) Jeffrey Richter 著 周靖 译 出版时间:2010年09月 页数:800 介绍 享有全球盛誉的编程专家Jeffrey Richter,这位与Microsoft .NET开发团队合作长达...

    CLR.via.C#.(中文第3版)(自制详细书签)Part3

    CLR.via.C#.(中文第3版)(自制详细书签)Part3 CLR via C#(第3版) Jeffrey Richter 著 周靖 译 出版时间:2010年09月 页数:800 介绍 享有全球盛誉的编程专家Jeffrey Richter,这位与Microsoft .NET开发团队合作...

    CLR.via.C#.(中文第3版)(自制详细书签)Part2

    CLR.via.C#.(中文第3版)(自制详细书签)Part2 CLR via C#(第3版) Jeffrey Richter 著 周靖 译 出版时间:2010年09月 页数:800 介绍 享有全球盛誉的编程专家Jeffrey Richter,这位与Microsoft .NET开发团队合作...

    My_Software:所有自制软件的源代码

    6. **并发与多线程**:Java提供了强大的并发处理能力,源代码可能展示了如何处理多线程问题,如同步机制、线程池等。 7. **网络编程**:如果软件涉及网络通信,那么可能会看到Socket编程或者HTTP客户端/服务器端...

    Java核心技术,卷1(原书第8版).pdf 中文 自制完整书签

    根据提供的信息,“Java核心技术,卷1(原书第8版).pdf 中文 自制完整书签”这份资料主要聚焦于Java编程语言的核心概念和技术。由于实际内容并未给出,以下将根据该书可能涵盖的主要章节来推测其核心知识点,并对这些...

    串口开发升级版.zip 自制的用于编写串口助手或者上位机的类

    `BSerialComs`可能采用了线程池或者事件循环模型来处理串口事件,保证了主线程的流畅运行,避免了因为串口操作阻塞而影响用户体验。此外,多线程还能让程序在等待串口响应时执行其他任务,提高整体效率。 事件驱动...

    MyQQ.zip_MYQQ

    2. **多线程与并发处理**:为了处理多个用户连接和消息同步,程序需要支持多线程和并发操作,这可能涉及到线程池、锁机制、异步编程模型等概念。 3. **数据库管理**:用户信息、聊天记录等数据通常存储在数据库中,...

    Java咖啡厅系统

    【Java咖啡厅系统】是一款基于Java技术开发的自制咖啡厅管理系统,旨在提高咖啡厅的运营效率和服务质量。这个系统集成了订单管理、库存控制、客户关系管理、员工调度等多种功能,为小型咖啡厅提供了一站式的后台解决...

    JAVA 范例大全 光盘 资源

    实例132 执行任务(线程池) 378 实例133 碰撞的球(多线程) 382 实例134 钟表(多线程) 387 实例135 模拟生产者与消费者 392 实例136 仿迅雷下载文件 396 第15章 图形编程 403 实例137 多变的按钮 403 ...

    Java范例开发大全(全书源程序)

    实例242 手术任务(线程池) 462 实例243 模拟人工服务台(线程连接池) 466 13.6 线程应用实例 471 实例244 下雪的村庄 472 实例245 小飞侠 474 实例246 飞流直下 477 实例247 多线程断点续传 479 实例248 ...

    Java范例开发大全 (源程序)

     1.2.2 安装JDK 4  1.2.3 配置环境 5  1.2.4 测试JDK配置是否成功 7  实例1 开发第一个Java程序 7  第2章 Java基础类型与运算符(教学视频:39分钟) 9  2.1 基础类型 9  实例2 自动提升 9  实例3 ...

    java范例开发大全(pdf&源码)

    实例242 手术任务(线程池) 462 实例243 模拟人工服务台(线程连接池) 466 13.6 线程应用实例 471 实例244 下雪的村庄 472 实例245 小飞侠 474 实例246 飞流直下 477 实例247 多线程断点续传 479 实例248 滚动的...

    Linux多线程服务端编程:使用muduo C++网络库

    3.3.2线程池. . . . . . . . . . . . . . . . . . . .. . . . . . . . . . . . . 63 3.3.3推荐模式. . . . . . . . . . . . . . . . . . . .. . . . . . . . . . . . 64 3.4进程间通信只用TCP . . . . . . . . . . ....

    java范例开发大全源代码

     1.2.2 安装JDK 4  1.2.3 配置环境 5  1.2.4 测试JDK配置是否成功 7  实例1 开发第一个Java程序 7  第2章 Java基础类型与运算符(教学视频:39分钟) 9  2.1 基础类型 9  实例2 自动提升 9  ...

    java范例开发大全

    实例242 手术任务(线程池) 462 实例243 模拟人工服务台(线程连接池) 466 13.6 线程应用实例 471 实例244 下雪的村庄 472 实例245 小飞侠 474 实例246 飞流直下 477 实例247 多线程断点续传 479 实例248 滚动的...

Global site tag (gtag.js) - Google Analytics