论坛首页 Java企业应用论坛

线程池代码完全剖析(三)

浏览 6516 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2011-03-16  

线程池代码完全剖析(三)

一、上篇文章剖析了设计线程池的第一条思路,下面分析第二条和第三条思路

 

第二条思路:用户请求的任务,而线程池是如何分配的线程给请求用户或者说是以什么样的策略方式

 

当用户调用代码

 QueuedThreadPool tp= new QueuedThreadPool();
 tp.setMinThreads(5);
        tp.setMaxThreads(10);
        tp.setMaxIdleTimeMs(1000);
        tp.setSpawnOrShrinkAt(2);
        tp.setThreadsPriority(Thread.NORM_PRIORITY-1);
        
        tp.start();
        tp.dispatch(_job);

 

 

就开始调用任务,具体流程请看dispatch方法:

 

 public boolean dispatch(Runnable job) 
    {  
        if (!isRunning() || job==null)
            return false;

        PoolThread thread=null;
        boolean spawn=false;
            
        synchronized(_lock)
        {
            // Look for an idle thread
            int idle=_idle.size();
            if (idle>0)
                thread=(PoolThread)_idle.remove(idle-1);
            else
            {
                // queue the job
                _queued++;
                if (_queued>_maxQueued)
                    _maxQueued=_queued;
                _jobs[_nextJobSlot++]=job;
                //循环的存放待留的任务
                if (_nextJobSlot==_jobs.length)
                    _nextJobSlot=0;
                /***
                 * 由于队列是一个由数组组成的循环数组,而_nextJobSlot指针是任务加入
                 * 队列数组添加时候就++,而_nextJob指针只要从队列数组中取数据的时候
                 * ++;也就是说当加入任务的数量比取出任务的数量要快队列的数组长度时候,
                 * 就扩充数组的长度即:_nextJobSlot==_nextJob
                 */
                if (_nextJobSlot==_nextJob)
                { //扩充任务队列
                    Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
                    int split=_jobs.length-_nextJob;//未增加队列时 队列还有多少个任务
                    if (split>0)//把队列剩下的任务复制给扩展的队列中
                        System.arraycopy(_jobs,_nextJob,jobs,0,split);
                    if (_nextJob!=0)//为什么还要复制呢?不是前面的任务已经执行完,应该没有必要?
                        System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
                    
                    _jobs=jobs;
                    _nextJob=0;
                    _nextJobSlot=_queued;
                }
                  //当队列任务超出了_spawnOrShrinkAt变量值时,就要创建新的线程
                spawn=_queued>_spawnOrShrinkAt;
            }
        }
        
        if (thread!=null)
        {
            thread.dispatch(job);
        }
        else if (spawn)
        {
            newThread();
        }
        return true;
    }

 

注意有的代码不明白请看上面代码的注释:这个也是通过调式后才能理解设计者的意图

1、如果闲散集合中中有闲散线程,那么就取出一个线程处理请求的任务,代码:

          int idle=_idle.size();
            if (idle>0)
                thread=(PoolThread)_idle.remove(idle-1);

               if (thread!=null)
              {
                  thread.dispatch(job);
             }
2、如何闲散集合没有线程,那么就把请求的任务放在队列中(等待有空闲的线程,注意该队列是一个循环的队列),

      代码:  _jobs[_nextJobSlot++]=job;

 

3、如果队列中存放任务已经满了,就扩充这个队列,代码:

     if (_nextJobSlot==_nextJob)
                { //扩充任务队列
                    Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
                    int split=_jobs.length-_nextJob;//未增加队列时 队列还有多少个任务
                    if (split>0)//把队列剩下的任务复制给扩展的队列中
                        System.arraycopy(_jobs,_nextJob,jobs,0,split);
                    if (_nextJob!=0)//

                        System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
                   
                    _jobs=jobs;
                    _nextJob=0;
                    _nextJobSlot=_queued;
                }

4、如何spanw=_queued>_spawnOrShrinkAt即当前队列任务数量超过了变量限制的数目时,线程池就要新建一个 线   程;代码:

        else if (spawn)
        {
            newThread();
        }

5、说明调用ThreadPool的dispatch方法,这个方法只能是在闲散集合中有线程时候才会调用,代码:

 

       /* 这个是从闲散的数组中取出来的线程
         * */
        void dispatch(Runnable job)
        {
            synchronized (this)
            {
                _job=job;
              /*
               * 这行代码是与上面的代码 this.wait(getMaxIdleTimeMs());
               * 表示有任务要调度,所以不需要等待
               */
                this.notify();

            }
        }

 

二、主要是将ThreadPool类

 

 public class PoolThread extends Thread 
    {
    	//this是保持_job的同步,这个变量主要是等待分配任务
        Runnable _job=null;

        /* ------------------------------------------------------------ */
        PoolThread()
        {
            setDaemon(_daemon);
            setPriority(_priority);
        }
        
        /* ------------------------------------------------------------ */
        /** BoundedThreadPool run.
         * Loop getting jobs and handling them until idle or stopped.
         */
        public void run()
        {
            boolean idle=false;
            Runnable job=null;
            try
            {
                while (isRunning())
                {   
                    // Run any job that we have.
                    if (job!=null)
                    {
                        final Runnable todo=job;
                        job=null;
                        idle=false;//这个标志等任务调度完后,在放到闲散线程集合里
                        todo.run();
                    }
                    
                    synchronized(_lock)
                    {
                        // is there a queued job?
                        if (_queued>0)
                        {
                            _queued--;
                            job=_jobs[_nextJob++];
                            if (_nextJob==_jobs.length)
                                _nextJob=0;
                            continue;
                        }

                        // Should we shrink?
                        final int threads=_threads.size();
                        if (threads>_minThreads && 
                            (threads>_maxThreads || 
                             _idle.size()>_spawnOrShrinkAt))   
                        {
                        	
                            long now = System.currentTimeMillis();
                            if ((now-_lastShrink)>getMaxIdleTimeMs())
                            {
                                _lastShrink=now;
                                _idle.remove(this);
                                return;
                            }
                        }
                        /**
                         * 线程池中在刚刚在池中创建线程的时候没有任务安排
                         * 所以同时也是闲散线程的数目,同时任务完成后,该线程就要归还
                         * 给闲散集合
                         */
                        if (!idle)
                        {   
                            // Add ourselves to the idle set.
                            _idle.add(this);
                            
//                            System.out.println(_idle.size());
                            idle=true;
                        }
                    }

                    // We are idle
                    // wait for a dispatched job
                    synchronized (this)
                    {
                        if (_job==null)
                            this.wait(getMaxIdleTimeMs());
                        job=_job;
                        _job=null;
                    }
                }
            }
            catch (InterruptedException e)
            {
                Log.ignore(e);
            }
            finally
            {
                synchronized (_lock)
                {
                    _idle.remove(this);
                }
                synchronized (_threadsLock)
                {
                    _threads.remove(this);
                }
                synchronized (this)
                {
                    job=_job;
                }
                
                // we died with a job! reschedule it
                if (job!=null)
                {
                    QueuedThreadPool.this.dispatch(job);
                }
            }
        }
        
        /* ------------------------------------------------------------ 
         * 这个是从闲散的数组中取出来的线程
         * */
        void dispatch(Runnable job)
        {
            synchronized (this)
            {
                _job=job;
              /*
               * 这行代码是与上面的代码 this.wait(getMaxIdleTimeMs());
               * 表示有任务要调度,所以不需要等待
               */
                this.notify();

            }
        }
    }

 

该类是QueueThreadPool的内部类,主要是创建线程维护线程池的状态,该类的run方法,下面会详细剖析,该方法是该类的一个核心方法,非常经典。。。

 

 三、下面说第三条思路线程池中线程的生命周期

 

线程的生命周期也就是想说一下ThreadPool类中的run()方法。。。。

1、如果有一个工作任务的时候,线程就要执行该任务;

         if (job!=null)
                    {
                        final Runnable todo=job;
                        job=null;
                        idle=false;//这个标志等任务调度完后,在放到闲散线程集合里
                        todo.run();
                    }

那么这个工作任务是从什么地方来的呢?

 

来源一:就是从队列中取出任务

来源二:客户端调用dispatch方法时,线程池中还有闲散线程,于是就执行了ThreadPool中的dispatch方法,

                void dispatch(Runnable job)
        {
            synchronized (this)
            {
                _job=job;
              /*
               * 这行代码是与上面的代码 this.wait(getMaxIdleTimeMs());
               * 表示有任务要调度,所以不需要等待
               */
                this.notify();

            }
        }

然后就通知run方法中:

 

    // We are idle
                    // wait for a dispatched job
                    synchronized (this)
                    {
                        if (_job==null)
                            this.wait(getMaxIdleTimeMs());
                        job=_job;
                        _job=null;
                    }

这样就获得了一个任务

2、任务执行完后,要归还给闲散集合

                /**
                         * 线程池中在刚刚在池中创建线程的时候没有任务安排
                         * 所以同时也是闲散线程的数目,同时任务完成后,该线程就要归还
                         * 给闲散集合
                         */
                        if (!idle)
                        {  
                            // Add ourselves to the idle set.
                            _idle.add(this);
                            idle=true;
                        }

3、当队列中有任务时,队列就取出任务来执行任务

        if (_queued>0)
                        {
                            _queued--;
                            job=_jobs[_nextJob++];
                            if (_nextJob==_jobs.length)
                                _nextJob=0;
                            continue;
                        }

4、run方法中主要是while循环,只有当线程池需要缩减线程数目时,该线程的生命才会结束,即跳出循环体

       final int threads=_threads.size();
                        if (threads>_minThreads &&
                            (threads>_maxThreads ||
                             _idle.size()>_spawnOrShrinkAt))  
                        {
                         
                            long now = System.currentTimeMillis();
                            if ((now-_lastShrink)>getMaxIdleTimeMs())
                            {
                                _lastShrink=now;
                                _idle.remove(this);
                                return;
                            }
                        }

只有当线程数目大于线程池限定的最小数目并且当前线程数目大于线程池限定的最大数目或者闲散集合的数目大于线程池中变量_spawnOrShrinkAt时,同时还要满足((now-_lastShrink)>getMaxIdleTimeMs()这个要求才会让线程生命周期结束。

 

已经详细说明了三个思路。。。

 

四、下面还想说明一下,如何安全的关闭线程池

 

 /** Stop the BoundedThreadPool.
     * New jobs are no longer accepted,idle threads are interrupted
     * and stopJob is called on active threads.
     * The method then waits 
     * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
     * stop, at which time killJob is called.
     */
    protected void doStop() throws Exception
    {   
        super.doStop();
        
        long start=System.currentTimeMillis();
        for (int i=0;i<100;i++)
        {
            synchronized (_threadsLock)
            {
                Iterator iter = _threads.iterator();
                while (iter.hasNext())
                    ((Thread)iter.next()).interrupt();
            }
             Thread.yield();
            if (_threads.size()==0 || (_maxStopTimeMs>0 && _maxStopTimeMs < (System.currentTimeMillis()-start)))
               break;
            
            try
            {
                Thread.sleep(i*100);
            }
            catch(InterruptedException e){}
            
            
        }

        // TODO perhaps force stops
        if (_threads.size()>0)
            Log.warn(_threads.size()+" threads could not be stopped");
        /***
         * 通知那些让出CPU权限的线程
         */
        synchronized (_joinLock)
        {
            _joinLock.notifyAll();
        }
    }

 

 关闭的时候新的工作不在接收,中断闲散集合中的线程;停止正在工作任务的线程,设计者主要处理方法时根据时间和循环,来杀死工作的线程

 

五、总结

 

我认为(这个只是我个人的看法)

该停止线程池中的线程工作还是有缺点,如何优雅的关闭线程池中正在工作的线程,主要做的工作:

1、应该要知道有哪些线程正在执行

2、哪些工作的线程需要马上中断

3、哪些工作线程必须要处理完才能中断

4、必须做到这些点式非常必要的。。。。

 

反正里面的设计思想非常值得学习和思考,为什么要这样设计

 

下面有Junit测试代码和源程序,我会上传过来。。。

 

 

   发表时间:2011-03-18  
最烦的就是一上来就上代码或配置文件的。多少来点自己的思想啊
0 请登录后投票
   发表时间:2011-04-13  
/***
                * 由于队列是一个由数组组成的循环数组,而_nextJobSlot指针是任务加入
                * 队列数组添加时候就++,而_nextJob指针只要从队列数组中取数据的时候
                * ++;也就是说当加入任务的数量比取出任务的数量要快队列的数组长度时候,
                * 就扩充数组的长度即:_nextJobSlot==_nextJob
                */ 
               if (_nextJobSlot==_nextJob) 
               { //扩充任务队列 
                   Runnable[] jobs= new Runnable[_jobs.length+_maxThreads]; 
                   int split=_jobs.length-_nextJob;//未增加队列时 队列还有多少个任务 
                   if (split>0)//把队列剩下的任务复制给扩展的队列中 
                       System.arraycopy(_jobs,_nextJob,jobs,0,split); 
                   if (_nextJob!=0)//为什么还要复制呢?不是前面的任务已经执行完,应该没有必要? 
我的理解,lz这段代码的理解有问题。
1.因为是循环数组,_nextJobSlot==_nextJob的情况是_nextJobSlot比_nextJob多了一个循环,就像跑步时跑得快的要拉开跑慢的整整一圈时候的情况,通常我们叫套圈。
在这里,表示的意思是整个jobs数据已经都放满了待处理的任务,所以需要处理。
2.而下面两个数组的拷贝,我理解,只是纯粹得改变一下数组中任务的位置。
0 请登录后投票
   发表时间:2011-04-14  
谢谢你提的建议 确实是理解有点偏差  还有想说的是,本人现在在读研究生想在六月左右找一份实习的工作:
  主要兴趣是对于高并发感兴趣,对多线程和网络的理论有深入的研究,希望有一个实践的平台

希望各位推荐推荐 谢谢
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics