`
he_wen
  • 浏览: 239498 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

线程池代码完全剖析(三)

阅读更多

线程池代码完全剖析(三)

一、上篇文章剖析了设计线程池的第一条思路,下面分析第二条和第三条思路

 

第二条思路:用户请求的任务,而线程池是如何分配的线程给请求用户或者说是以什么样的策略方式

 

当用户调用代码

 QueuedThreadPool tp= new QueuedThreadPool();
 tp.setMinThreads(5);
        tp.setMaxThreads(10);
        tp.setMaxIdleTimeMs(1000);
        tp.setSpawnOrShrinkAt(2);
        tp.setThreadsPriority(Thread.NORM_PRIORITY-1);
        
        tp.start();
        tp.dispatch(_job);

 

 

就开始调用任务,具体流程请看dispatch方法:

 

 public boolean dispatch(Runnable job) 
    {  
        if (!isRunning() || job==null)
            return false;

        PoolThread thread=null;
        boolean spawn=false;
            
        synchronized(_lock)
        {
            // Look for an idle thread
            int idle=_idle.size();
            if (idle>0)
                thread=(PoolThread)_idle.remove(idle-1);
            else
            {
                // queue the job
                _queued++;
                if (_queued>_maxQueued)
                    _maxQueued=_queued;
                _jobs[_nextJobSlot++]=job;
                //循环的存放待留的任务
                if (_nextJobSlot==_jobs.length)
                    _nextJobSlot=0;
                /***
                 * 由于队列是一个由数组组成的循环数组,而_nextJobSlot指针是任务加入
                 * 队列数组添加时候就++,而_nextJob指针只要从队列数组中取数据的时候
                 * ++;也就是说当加入任务的数量比取出任务的数量要快队列的数组长度时候,
                 * 就扩充数组的长度即:_nextJobSlot==_nextJob
                 */
                if (_nextJobSlot==_nextJob)
                { //扩充任务队列
                    Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
                    int split=_jobs.length-_nextJob;//未增加队列时 队列还有多少个任务
                    if (split>0)//把队列剩下的任务复制给扩展的队列中
                        System.arraycopy(_jobs,_nextJob,jobs,0,split);
                    if (_nextJob!=0)//为什么还要复制呢?不是前面的任务已经执行完,应该没有必要?
                        System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
                    
                    _jobs=jobs;
                    _nextJob=0;
                    _nextJobSlot=_queued;
                }
                  //当队列任务超出了_spawnOrShrinkAt变量值时,就要创建新的线程
                spawn=_queued>_spawnOrShrinkAt;
            }
        }
        
        if (thread!=null)
        {
            thread.dispatch(job);
        }
        else if (spawn)
        {
            newThread();
        }
        return true;
    }

 

注意有的代码不明白请看上面代码的注释:这个也是通过调式后才能理解设计者的意图

1、如果闲散集合中中有闲散线程,那么就取出一个线程处理请求的任务,代码:

          int idle=_idle.size();
            if (idle>0)
                thread=(PoolThread)_idle.remove(idle-1);

               if (thread!=null)
              {
                  thread.dispatch(job);
             }
2、如何闲散集合没有线程,那么就把请求的任务放在队列中(等待有空闲的线程,注意该队列是一个循环的队列),

      代码:  _jobs[_nextJobSlot++]=job;

 

3、如果队列中存放任务已经满了,就扩充这个队列,代码:

     if (_nextJobSlot==_nextJob)
                { //扩充任务队列
                    Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
                    int split=_jobs.length-_nextJob;//未增加队列时 队列还有多少个任务
                    if (split>0)//把队列剩下的任务复制给扩展的队列中
                        System.arraycopy(_jobs,_nextJob,jobs,0,split);
                    if (_nextJob!=0)//

                        System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
                   
                    _jobs=jobs;
                    _nextJob=0;
                    _nextJobSlot=_queued;
                }

4、如何spanw=_queued>_spawnOrShrinkAt即当前队列任务数量超过了变量限制的数目时,线程池就要新建一个 线   程;代码:

        else if (spawn)
        {
            newThread();
        }

5、说明调用ThreadPool的dispatch方法,这个方法只能是在闲散集合中有线程时候才会调用,代码:

 

       /* 这个是从闲散的数组中取出来的线程
         * */
        void dispatch(Runnable job)
        {
            synchronized (this)
            {
                _job=job;
              /*
               * 这行代码是与上面的代码 this.wait(getMaxIdleTimeMs());
               * 表示有任务要调度,所以不需要等待
               */
                this.notify();

            }
        }

 

二、主要是将ThreadPool类

 

 public class PoolThread extends Thread 
    {
    	//this是保持_job的同步,这个变量主要是等待分配任务
        Runnable _job=null;

        /* ------------------------------------------------------------ */
        PoolThread()
        {
            setDaemon(_daemon);
            setPriority(_priority);
        }
        
        /* ------------------------------------------------------------ */
        /** BoundedThreadPool run.
         * Loop getting jobs and handling them until idle or stopped.
         */
        public void run()
        {
            boolean idle=false;
            Runnable job=null;
            try
            {
                while (isRunning())
                {   
                    // Run any job that we have.
                    if (job!=null)
                    {
                        final Runnable todo=job;
                        job=null;
                        idle=false;//这个标志等任务调度完后,在放到闲散线程集合里
                        todo.run();
                    }
                    
                    synchronized(_lock)
                    {
                        // is there a queued job?
                        if (_queued>0)
                        {
                            _queued--;
                            job=_jobs[_nextJob++];
                            if (_nextJob==_jobs.length)
                                _nextJob=0;
                            continue;
                        }

                        // Should we shrink?
                        final int threads=_threads.size();
                        if (threads>_minThreads && 
                            (threads>_maxThreads || 
                             _idle.size()>_spawnOrShrinkAt))   
                        {
                        	
                            long now = System.currentTimeMillis();
                            if ((now-_lastShrink)>getMaxIdleTimeMs())
                            {
                                _lastShrink=now;
                                _idle.remove(this);
                                return;
                            }
                        }
                        /**
                         * 线程池中在刚刚在池中创建线程的时候没有任务安排
                         * 所以同时也是闲散线程的数目,同时任务完成后,该线程就要归还
                         * 给闲散集合
                         */
                        if (!idle)
                        {   
                            // Add ourselves to the idle set.
                            _idle.add(this);
                            
//                            System.out.println(_idle.size());
                            idle=true;
                        }
                    }

                    // We are idle
                    // wait for a dispatched job
                    synchronized (this)
                    {
                        if (_job==null)
                            this.wait(getMaxIdleTimeMs());
                        job=_job;
                        _job=null;
                    }
                }
            }
            catch (InterruptedException e)
            {
                Log.ignore(e);
            }
            finally
            {
                synchronized (_lock)
                {
                    _idle.remove(this);
                }
                synchronized (_threadsLock)
                {
                    _threads.remove(this);
                }
                synchronized (this)
                {
                    job=_job;
                }
                
                // we died with a job! reschedule it
                if (job!=null)
                {
                    QueuedThreadPool.this.dispatch(job);
                }
            }
        }
        
        /* ------------------------------------------------------------ 
         * 这个是从闲散的数组中取出来的线程
         * */
        void dispatch(Runnable job)
        {
            synchronized (this)
            {
                _job=job;
              /*
               * 这行代码是与上面的代码 this.wait(getMaxIdleTimeMs());
               * 表示有任务要调度,所以不需要等待
               */
                this.notify();

            }
        }
    }

 

该类是QueueThreadPool的内部类,主要是创建线程维护线程池的状态,该类的run方法,下面会详细剖析,该方法是该类的一个核心方法,非常经典。。。

 

 三、下面说第三条思路线程池中线程的生命周期

 

线程的生命周期也就是想说一下ThreadPool类中的run()方法。。。。

1、如果有一个工作任务的时候,线程就要执行该任务;

         if (job!=null)
                    {
                        final Runnable todo=job;
                        job=null;
                        idle=false;//这个标志等任务调度完后,在放到闲散线程集合里
                        todo.run();
                    }

那么这个工作任务是从什么地方来的呢?

 

来源一:就是从队列中取出任务

来源二:客户端调用dispatch方法时,线程池中还有闲散线程,于是就执行了ThreadPool中的dispatch方法,

                void dispatch(Runnable job)
        {
            synchronized (this)
            {
                _job=job;
              /*
               * 这行代码是与上面的代码 this.wait(getMaxIdleTimeMs());
               * 表示有任务要调度,所以不需要等待
               */
                this.notify();

            }
        }

然后就通知run方法中:

 

    // We are idle
                    // wait for a dispatched job
                    synchronized (this)
                    {
                        if (_job==null)
                            this.wait(getMaxIdleTimeMs());
                        job=_job;
                        _job=null;
                    }

这样就获得了一个任务

2、任务执行完后,要归还给闲散集合

                /**
                         * 线程池中在刚刚在池中创建线程的时候没有任务安排
                         * 所以同时也是闲散线程的数目,同时任务完成后,该线程就要归还
                         * 给闲散集合
                         */
                        if (!idle)
                        {  
                            // Add ourselves to the idle set.
                            _idle.add(this);
                            idle=true;
                        }

3、当队列中有任务时,队列就取出任务来执行任务

        if (_queued>0)
                        {
                            _queued--;
                            job=_jobs[_nextJob++];
                            if (_nextJob==_jobs.length)
                                _nextJob=0;
                            continue;
                        }

4、run方法中主要是while循环,只有当线程池需要缩减线程数目时,该线程的生命才会结束,即跳出循环体

       final int threads=_threads.size();
                        if (threads>_minThreads &&
                            (threads>_maxThreads ||
                             _idle.size()>_spawnOrShrinkAt))  
                        {
                         
                            long now = System.currentTimeMillis();
                            if ((now-_lastShrink)>getMaxIdleTimeMs())
                            {
                                _lastShrink=now;
                                _idle.remove(this);
                                return;
                            }
                        }

只有当线程数目大于线程池限定的最小数目并且当前线程数目大于线程池限定的最大数目或者闲散集合的数目大于线程池中变量_spawnOrShrinkAt时,同时还要满足((now-_lastShrink)>getMaxIdleTimeMs()这个要求才会让线程生命周期结束。

 

已经详细说明了三个思路。。。

 

四、下面还想说明一下,如何安全的关闭线程池

 

 /** Stop the BoundedThreadPool.
     * New jobs are no longer accepted,idle threads are interrupted
     * and stopJob is called on active threads.
     * The method then waits 
     * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
     * stop, at which time killJob is called.
     */
    protected void doStop() throws Exception
    {   
        super.doStop();
        
        long start=System.currentTimeMillis();
        for (int i=0;i<100;i++)
        {
            synchronized (_threadsLock)
            {
                Iterator iter = _threads.iterator();
                while (iter.hasNext())
                    ((Thread)iter.next()).interrupt();
            }
             Thread.yield();
            if (_threads.size()==0 || (_maxStopTimeMs>0 && _maxStopTimeMs < (System.currentTimeMillis()-start)))
               break;
            
            try
            {
                Thread.sleep(i*100);
            }
            catch(InterruptedException e){}
            
            
        }

        // TODO perhaps force stops
        if (_threads.size()>0)
            Log.warn(_threads.size()+" threads could not be stopped");
        /***
         * 通知那些让出CPU权限的线程
         */
        synchronized (_joinLock)
        {
            _joinLock.notifyAll();
        }
    }

 

 关闭的时候新的工作不在接收,中断闲散集合中的线程;停止正在工作任务的线程,设计者主要处理方法时根据时间和循环,来杀死工作的线程

 

五、总结

 

我认为(这个只是我个人的看法)

该停止线程池中的线程工作还是有缺点,如何优雅的关闭线程池中正在工作的线程,主要做的工作:

1、应该要知道有哪些线程正在执行

2、哪些工作的线程需要马上中断

3、哪些工作线程必须要处理完才能中断

4、必须做到这些点式非常必要的。。。。

 

反正里面的设计思想非常值得学习和思考,为什么要这样设计

 

下面有Junit测试代码和源程序,我会上传过来。。。

 

 

分享到:
评论
4 楼 he_wen 2011-04-14  
谢谢你提的建议 确实是理解有点偏差  还有想说的是,本人现在在读研究生想在六月左右找一份实习的工作:
  主要兴趣是对于高并发感兴趣,对多线程和网络的理论有深入的研究,希望有一个实践的平台

希望各位推荐推荐 谢谢
3 楼 feidian1028 2011-04-13  
/***
                * 由于队列是一个由数组组成的循环数组,而_nextJobSlot指针是任务加入
                * 队列数组添加时候就++,而_nextJob指针只要从队列数组中取数据的时候
                * ++;也就是说当加入任务的数量比取出任务的数量要快队列的数组长度时候,
                * 就扩充数组的长度即:_nextJobSlot==_nextJob
                */ 
               if (_nextJobSlot==_nextJob) 
               { //扩充任务队列 
                   Runnable[] jobs= new Runnable[_jobs.length+_maxThreads]; 
                   int split=_jobs.length-_nextJob;//未增加队列时 队列还有多少个任务 
                   if (split>0)//把队列剩下的任务复制给扩展的队列中 
                       System.arraycopy(_jobs,_nextJob,jobs,0,split); 
                   if (_nextJob!=0)//为什么还要复制呢?不是前面的任务已经执行完,应该没有必要? 
我的理解,lz这段代码的理解有问题。
1.因为是循环数组,_nextJobSlot==_nextJob的情况是_nextJobSlot比_nextJob多了一个循环,就像跑步时跑得快的要拉开跑慢的整整一圈时候的情况,通常我们叫套圈。
在这里,表示的意思是整个jobs数据已经都放满了待处理的任务,所以需要处理。
2.而下面两个数组的拷贝,我理解,只是纯粹得改变一下数组中任务的位置。
2 楼 beforezero 2011-03-18  
最烦的就是一上来就上代码或配置文件的。多少来点自己的思想啊
1 楼 sebatinsky 2011-03-18  
占个位置,先看看,,,,

相关推荐

    Java 线程池框架核心代码分析1

    5. `TERMINATED`:`terminated()`方法执行完毕,线程池完全停止。 线程池提供了`execute()`方法来提交任务,根据线程池的状态和工作队列的状况,决定如何处理新任务。如果当前线程数量小于`corePoolSize`,会创建新...

    线程池demo

    5. 示例代码可能还包括日志打印,以便于调试和分析线程池的工作状态。 理解并熟练运用线程池对于提高Android应用的性能和稳定性至关重要。通过合理地调整线程池参数,我们可以有效地控制并发程度,防止过度消耗系统...

    《Java完全自学宝典》 光盘代码

    《Java完全自学宝典》是一本深度覆盖Java编程语言的自学教材,其光盘代码包含了大量的实例和项目,旨在帮助读者通过实践加深对Java的理解。这本书涵盖了从基础语法到高级特性的全面讲解,是初学者入门和进阶的宝贵...

    C++线程池源代码

    在IT领域,线程池是一种优化...通过对"ThreadPool"源代码的分析,我们可以学习如何设计和实现这样的高效并发结构,这对于提升系统的可扩展性和性能大有裨益。同时,无锁线程池也是理解和实践无锁编程的一个理想起点。

    雍俊海编著《java 程序设计》书目源代码

    通过分析和运行这些代码,读者可以更直观地理解Java编程的概念和技术。 1. **面向对象编程**:Java是一种完全面向对象的语言,书中可能通过类的设计、对象的创建和方法的定义来演示如何进行面向对象的思考和编程。...

    disruptor 代码分析

    ### disruptor 代码分析 #### 重要知识点概览 Disruptor框架是LMAX交易所开源的一个高性能、低延迟的消息队列实现,它采用无锁化编程技术,利用环形缓冲区(Ring Buffer)来实现高效的多生产者多消费者模型。本文...

    《Java 宝典》源代码

    源代码是书中理论知识的实践体现,通过分析和运行这些代码,读者可以更好地理解Java编程的精髓。 在Java编程中,以下几个重要的知识点不容忽视: 1. **Java基础语法**:包括变量、数据类型、运算符、流程控制(如...

    Visual Studio 2010完全使用手册

    最后,手册还会涉及一些高级主题,如性能优化、单元测试和代码分析,帮助开发者编写更高效、更可靠的代码。同时,也会讲解如何使用Visual Studio的扩展性功能,如插件和宏,来定制和增强IDE的功能。 总之,《Visual...

    阿里巴巴开源在线分析诊断工具Arthas(阿尔萨斯)

    Arthas是完全开源的,遵循Apache License 2.0协议,代码托管在GitHub上。开发者可以自由地参与到项目中,提交问题、提供建议或贡献代码。此外,Arthas有活跃的社区,提供了丰富的文档和教程,方便用户学习和交流。 ...

    经典算法代码实现.zip

    《经典算法代码实现》压缩包包含了多个经典的计算机科学与信息技术领域的算法实现,这些算法在软件开发、数据分析以及机器学习等领域有着广泛的应用。以下是对每个算法的详细解释: 1. **快速排序**:由C.A.R. ...

    精易模块[源码] V5.15

    6、新增“类_任务栏”可以显示隐藏任何第三方窗口图标,相当于易中的(不在任务栏显示),带【实例】演示。 7、新增“类_线程池1”中的“等待”方法。 8、修复“编码_Utf8到Ansi“分配内存失败BUG,感谢易友【仁鹰】...

    多线程采集C#源代码(C#)

    在IT领域,多线程编程是一项关键技能,尤其是在高性能、高并发的应用中。本文将深入探讨C#语言中实现多线程数据采集的核心概念和技术。...通过分析这些代码,开发者可以学习到如何在C#中构建稳定、高效的多线程应用。

    并行程序设计 曼德罗伯特集源代码

    源代码完全可编译意味着这些程序已经过测试,可以直接在各种编程环境中运行,比如C++、Python、Java或Julia等。对于初学者,这是一份宝贵的参考资料,可以学习如何实现并行计算,以及如何将并行化技术应用到实际问题...

    C#程序开发范例宝典源代码7(共20章)

    2. **面向对象编程**:C#是一种完全面向对象的语言,包括类、对象、继承、多态、封装等核心概念。通过实例,我们可以看到如何设计和实现类,理解继承如何扩展基类功能,以及多态如何提供代码的灵活性。 3. **接口与...

    java代码答案

    对于每个习题,解题过程往往包括需求分析、设计合适的数据结构和算法、编写和调试代码、以及最后的测试和优化。分析部分则会帮助学习者理解为什么要这样设计,以及在遇到类似问题时如何做出决策。通过这种方式,你...

    《Java并发编程的艺术》源代码

    Java线程之间的通信对程序员完全透明,内存可见性问题很容易困扰Java程序员,本章试图揭开Java内存模型的神秘面纱。 第4章从介绍多线程技术带来的好处开始,讲述了如何启动和终止线程以及线程的状态,详细阐述了多...

    loadrunner分析内存泄露

    进一步分析代码发现,Mule框架中的`XFireServiceComponent`类的`setDescriptor`方法每隔一段时间就会被调用一次,导致大量全局对象的创建。 #### 八、配置调整 通过对Mule框架的配置进行仔细审查,发现问题可能与...

    关于Java在软件开发中的误区分析.zip

    Java的自动内存管理机制使得开发者不必像C++那样手动管理内存,但这并不意味着可以完全忽略内存问题。不当使用对象、内存泄漏或过度依赖大对象可能导致性能下降。理解垃圾回收的工作原理,以及如何编写内存效率高的...

    javaSE代码实例

    13.6.3 利用正则式对字符串进行分析 268 13.7 小结 269 第14章 集合框架——强大的对象管理器 270 14.1 Object类——所有类的超类 270 14.1.1 toString方法的重写 270 14.1.2 equals方法的意义 271 ...

    java问题定位技术+性能优化

    #### 三、Java内存泄漏分析和堆内存设置 - **3.1 Java内存泄漏的背景知识** - **3.1.1 Java对象的size** - 32位平台上,对象头、实例数据和对齐填充构成了对象的实际大小。 - **3.1.2 Java对象及其引用** - 引用...

Global site tag (gtag.js) - Google Analytics