`
wode66
  • 浏览: 744016 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

线程池--jetty中QueuedThreadPool分析(一)

阅读更多

jetty版本:jetty-6.1.26

1.由于jetty中的许多组件都实现了LifeCycle接口,先了解下该接口的定义:

 

package org.mortbay.component;

import java.util.EventListener;

public interface LifeCycle
{
     public void start()  throws Exception;
     public void stop()   throws Exception;
  
    public boolean isRunning();  
    public boolean isStarted();   
    public boolean isStarting();
    public boolean isStopping();    
    public boolean isStopped();
    public boolean isFailed();
   
    public void addLifeCycleListener(LifeCycle.Listener listener);
    public void removeLifeCycleListener(LifeCycle.Listener listener);
    

    /* ------------------------------------------------------------ */
    /** Listener.
     * A listener for Lifecycle events.
     */
    public interface Listener extends EventListener
    {
        public void lifeCycleStarting(LifeCycle event);
        public void lifeCycleStarted(LifeCycle event);
        public void lifeCycleFailure(LifeCycle event,Throwable cause);
        public void lifeCycleStopping(LifeCycle event);
        public void lifeCycleStopped(LifeCycle event);
    }
}

2.AbstractLifeCycle的抽象类,该类实现了LifeCycle接口(其中start()和stop()两个方法在类中采用模板模式实现):

 

//========================================================================
//$Id: AbstractLifeCycle.java,v 1.3 2005/11/11 22:55:41 gregwilkins Exp $
//Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
//------------------------------------------------------------------------
//Licensed under the Apache License, Version 2.0 (the "License");
//you may not use this file except in compliance with the License.
//You may obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//See the License for the specific language governing permissions and
//limitations under the License.
//========================================================================

package org.mortbay.component;

import org.mortbay.log.Log;
import org.mortbay.util.LazyList;

/**
 * Basic implementation of the life cycle interface for components.
 * 
 * @author gregw
 */
public abstract class AbstractLifeCycle implements LifeCycle
{
    private Object _lock = new Object();
    private final int FAILED = -1, STOPPED = 0, STARTING = 1, STARTED = 2, STOPPING = 3;
    private volatile int _state = STOPPED;
    protected LifeCycle.Listener[] _listeners;

    protected void doStart() throws Exception
    {
    }

    protected void doStop() throws Exception
    {
    }

    public final void start() throws Exception
    {
        synchronized (_lock)
        {
            try
            {
                if (_state == STARTED || _state == STARTING)
                    return;
                setStarting();
                doStart();
                Log.debug("started {}",this);
                setStarted();
            }
            catch (Exception e)
            {
                setFailed(e);
                throw e;
            }
            catch (Error e)
            {
                setFailed(e);
                throw e;
            }
        }
    }

    public final void stop() throws Exception
    {
        synchronized (_lock)
        {
            try
            {
                if (_state == STOPPING || _state == STOPPED)
                    return;
                setStopping();
                doStop();
                Log.debug("stopped {}",this);
                setStopped();
            }
            catch (Exception e)
            {
                setFailed(e);
                throw e;
            }
            catch (Error e)
            {
                setFailed(e);
                throw e;
            }
        }
    }

    public boolean isRunning()
    {
        return _state == STARTED || _state == STARTING;
    }

    public boolean isStarted()
    {
        return _state == STARTED;
    }

    public boolean isStarting()
    {
        return _state == STARTING;
    }

    public boolean isStopping()
    {
        return _state == STOPPING;
    }

    public boolean isStopped()
    {
        return _state == STOPPED;
    }

    public boolean isFailed()
    {
        return _state == FAILED;
    }

    public void addLifeCycleListener(LifeCycle.Listener listener)
    {
        _listeners = (LifeCycle.Listener[])LazyList.addToArray(_listeners,listener,LifeCycle.Listener.class);
    }

    public void removeLifeCycleListener(LifeCycle.Listener listener)
    {
        _listeners = (LifeCycle.Listener[])LazyList.removeFromArray(_listeners,listener);
    }

    private void setStarted()
    {
        _state = STARTED;
        if (_listeners != null)
        {
            for (int i = 0; i < _listeners.length; i++)
            {
                _listeners[i].lifeCycleStarted(this);
            }
        }
    }

    private void setStarting()
    {
        _state = STARTING;
        if (_listeners != null)
        {
            for (int i = 0; i < _listeners.length; i++)
            {
                _listeners[i].lifeCycleStarting(this);
            }
        }
    }

    private void setStopping()
    {
        _state = STOPPING;
        if (_listeners != null)
        {
            for (int i = 0; i < _listeners.length; i++)
            {
                _listeners[i].lifeCycleStopping(this);
            }
        }
    }

    private void setStopped()
    {
        _state = STOPPED;
        if (_listeners != null)
        {
            for (int i = 0; i < _listeners.length; i++)
            {
                _listeners[i].lifeCycleStopped(this);
            }
        }
    }

    private void setFailed(Throwable th)
    {
        Log.warn("failed "+this+": "+th);
        Log.debug(th);
        _state = FAILED;
        if (_listeners != null)
        {
            for (int i = 0; i < _listeners.length; i++)
            {
                _listeners[i].lifeCycleFailure(this,th);
            }
        }
    }

}

 3.QueuedThreadPool的实现(在jetty7中该类采用了concurrent包中的许多特性,有空可以对比分析下)。

其中主要的方法为:doStart(),doStop(),newThread(),dispatch(),以及内部类PoolThread的run()和dispatch()方法。

 

// ========================================================================
// Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at 
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ========================================================================

package org.mortbay.thread;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import org.mortbay.component.AbstractLifeCycle;
import org.mortbay.log.Log;

/* ------------------------------------------------------------ */
/** A pool of threads.
 * <p>
 * Avoids the expense of thread creation by pooling threads after
 * their run methods exit for reuse.
 * <p>
 * If an idle thread is available a job is directly dispatched,
 * otherwise the job is queued.  After queuing a job, if the total
 * number of threads is less than the maximum pool size, a new thread 
 * is spawned.
 * <p>
 *
 * @author Greg Wilkins <gregw@mortbay.com>
 */
public class QueuedThreadPool extends AbstractLifeCycle implements Serializable, ThreadPool
{
    private String _name;
    private Set _threads;//线程池里的所有poolThread
    private List _idle;//空闲的poolThread
    private Runnable[] _jobs;//等待执行的job(即:工作队列)
    private int _nextJob;//工作队列中下一个出队的位置
    private int _nextJobSlot;//工作队列中下一个入队的位置
    private int _queued;//工作队列的实际长度
    private int _maxQueued;
    
    private boolean _daemon;
    private int _id;

    private final Object _lock = new Lock();//工作队列_jobs和空闲线程_idle队列的锁
    private final Object _threadsLock = new Lock();//线程池所有线程_threads的锁
    private final Object _joinLock = new Lock();//

    private long _lastShrink;
    private int _maxIdleTimeMs=60000;
    private int _maxThreads=250;
    private int _minThreads=2;
    private boolean _warned=false;
    private int _lowThreads=0;
    private int _priority= Thread.NORM_PRIORITY;
    private int _spawnOrShrinkAt=0;
    private int _maxStopTimeMs;

    
    /* ------------------------------------------------------------------- */
    /* Construct
     */
    public QueuedThreadPool()
    {
        _name="qtp-"+hashCode();
    }
    
    /* ------------------------------------------------------------------- */
    /* Construct
     */
    public QueuedThreadPool(int maxThreads)
    {
        this();
        setMaxThreads(maxThreads);
    }

    /* ------------------------------------------------------------ */
    /** Run job.
     * @return true 
     */
    public boolean dispatch(Runnable job) 
    {  
        if (!isRunning() || job==null)
            return false;

        PoolThread thread=null;
        boolean spawn=false;
            
        synchronized(_lock)
        {
            // Look for an idle thread
            int idle=_idle.size();
            if (idle>0)
                thread=(PoolThread)_idle.remove(idle-1);
            else
            {
                // queue the job
                _queued++;//初始值为0
                if (_queued>_maxQueued)//当入列的job数大于最大队列数时,更新最大队列数为当前入列个数.
                    _maxQueued=_queued;
                _jobs[_nextJobSlot++]=job;//_jobs[0]=job;	_nextJobSlot = 1; _nextJobSlot表示下一个可以插入_jobs队列的位置。
                if (_nextJobSlot==_jobs.length)//
                    _nextJobSlot=0;
                if (_nextJobSlot==_nextJob)//_nextJob表示当前_jobs队列第一个可用的job的位置。_jobs队列已满时,重新扩容(倍增)。
                {
                    // Grow the job queue
                    Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];//jobs队列倍增
                    int split=_jobs.length-_nextJob;
                    if (split>0)
                        System.arraycopy(_jobs,_nextJob,jobs,0,split);
                    if (_nextJob!=0)
                        System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
                    
                    _jobs=jobs;
                    _nextJob=0;
                    _nextJobSlot=_queued;
                }
                  
                spawn=_queued>_spawnOrShrinkAt;
            }
        }
        
        if (thread!=null)
        {
            thread.dispatch(job);
        }
        else if (spawn)
        {
            newThread();
        }
        return true;
    }

    /* ------------------------------------------------------------ */
    /** Get the number of idle threads in the pool.
     * @see #getThreads
     * @return Number of threads
     */
    public int getIdleThreads()
    {
        return _idle==null?0:_idle.size();
    }
    
    /* ------------------------------------------------------------ */
    /**
     * @return low resource threads threshhold
     */
    public int getLowThreads()
    {
        return _lowThreads;
    }
    
    /* ------------------------------------------------------------ */
    /**
     * @return maximum queue size
     */
    public int getMaxQueued()
    {
        return _maxQueued;
    }
    
    /* ------------------------------------------------------------ */
    /** Get the maximum thread idle time.
     * Delegated to the named or anonymous Pool.
     * @see #setMaxIdleTimeMs
     * @return Max idle time in ms.
     */
    public int getMaxIdleTimeMs()
    {
        return _maxIdleTimeMs;
    }
    
    /* ------------------------------------------------------------ */
    /** Set the maximum number of threads.
     * Delegated to the named or anonymous Pool.
     * @see #setMaxThreads
     * @return maximum number of threads.
     */
    public int getMaxThreads()
    {
        return _maxThreads;
    }

    /* ------------------------------------------------------------ */
    /** Get the minimum number of threads.
     * Delegated to the named or anonymous Pool.
     * @see #setMinThreads
     * @return minimum number of threads.
     */
    public int getMinThreads()
    {
        return _minThreads;
    }

    /* ------------------------------------------------------------ */
    /** 
     * @return The name of the BoundedThreadPool.
     */
    public String getName()
    {
        return _name;
    }

    /* ------------------------------------------------------------ */
    /** Get the number of threads in the pool.
     * @see #getIdleThreads
     * @return Number of threads
     */
    public int getThreads()
    {
        return _threads.size();
    }

    /* ------------------------------------------------------------ */
    /** Get the priority of the pool threads.
     *  @return the priority of the pool threads.
     */
    public int getThreadsPriority()
    {
        return _priority;
    }

    /* ------------------------------------------------------------ */
    public int getQueueSize()
    {
        return _queued;
    }
    
    /* ------------------------------------------------------------ */
    /**
     * @return the spawnOrShrinkAt  The number of queued jobs (or idle threads) needed 
     * before the thread pool is grown (or shrunk)
     */
    public int getSpawnOrShrinkAt()
    {
        return _spawnOrShrinkAt;
    }

    /* ------------------------------------------------------------ */
    /**
     * @param spawnOrShrinkAt The number of queued jobs (or idle threads) needed 
     * before the thread pool is grown (or shrunk)
     */
    public void setSpawnOrShrinkAt(int spawnOrShrinkAt)
    {
        _spawnOrShrinkAt=spawnOrShrinkAt;
    }

    /* ------------------------------------------------------------ */
    /**
     * @return maximum total time that stop() will wait for threads to die.
     */
    public int getMaxStopTimeMs()
    {
        return _maxStopTimeMs;
    }

    /* ------------------------------------------------------------ */
    /**
     * @param stopTimeMs maximum total time that stop() will wait for threads to die.
     */
    public void setMaxStopTimeMs(int stopTimeMs)
    {
        _maxStopTimeMs = stopTimeMs;
    }

    /* ------------------------------------------------------------ */
    /** 
     * Delegated to the named or anonymous Pool.
     */
    public boolean isDaemon()
    {
        return _daemon;
    }

    /* ------------------------------------------------------------ */
    public boolean isLowOnThreads()
    {
        return _queued>_lowThreads;
    }

    /* ------------------------------------------------------------ */
    public void join() throws InterruptedException
    {
        synchronized (_joinLock)
        {
            while (isRunning()){
                _joinLock.wait();
            }
        }
        
        // TODO remove this semi busy loop!
        while (isStopping()){
            Thread.sleep(100);
        }
    }

    /* ------------------------------------------------------------ */
    /** 
     * Delegated to the named or anonymous Pool.
     */
    public void setDaemon(boolean daemon)
    {
        _daemon=daemon;
    }

    /* ------------------------------------------------------------ */
    /**
     * @param lowThreads low resource threads threshhold
     */
    public void setLowThreads(int lowThreads)
    {
        _lowThreads = lowThreads;
    }
    
    /* ------------------------------------------------------------ */
    /** Set the maximum thread idle time.
     * Threads that are idle for longer than this period may be
     * stopped.
     * Delegated to the named or anonymous Pool.
     * @see #getMaxIdleTimeMs
     * @param maxIdleTimeMs Max idle time in ms.
     */
    public void setMaxIdleTimeMs(int maxIdleTimeMs)
    {
        _maxIdleTimeMs=maxIdleTimeMs;
    }

    /* ------------------------------------------------------------ */
    /** Set the maximum number of threads.
     * Delegated to the named or anonymous Pool.
     * @see #getMaxThreads
     * @param maxThreads maximum number of threads.
     */
    public void setMaxThreads(int maxThreads)
    {
        if (isStarted() && maxThreads<_minThreads)
            throw new IllegalArgumentException("!minThreads<maxThreads");
        _maxThreads=maxThreads;
    }

    /* ------------------------------------------------------------ */
    /** Set the minimum number of threads.
     * Delegated to the named or anonymous Pool.
     * @see #getMinThreads
     * @param minThreads minimum number of threads
     */
    public void setMinThreads(int minThreads)
    {
        if (isStarted() && (minThreads<=0 || minThreads>_maxThreads))
            throw new IllegalArgumentException("!0<=minThreads<maxThreads");
        _minThreads=minThreads;
        synchronized (_threadsLock)
        {
            while (isStarted() && _threads.size()<_minThreads)
            {
                newThread();   
            }
        }
    }

    /* ------------------------------------------------------------ */
    /** 
     * @param name Name of the BoundedThreadPool to use when naming Threads.
     */
    public void setName(String name)
    {
        _name= name;
    }

    /* ------------------------------------------------------------ */
    /** Set the priority of the pool threads.
     *  @param priority the new thread priority.
     */
    public void setThreadsPriority(int priority)
    {
        _priority=priority;
    }

    /* ------------------------------------------------------------ */
    /* Start the BoundedThreadPool.
     * Construct the minimum number of threads.
     */
    protected void doStart() throws Exception
    {
        if (_maxThreads<_minThreads || _minThreads<=0)
            throw new IllegalArgumentException("!0<minThreads<maxThreads");
        
        _threads=new HashSet();
        _idle=new ArrayList();
        _jobs=new Runnable[_maxThreads];//按照最大线程数创建的工作队列
        
        for (int i=0;i<_minThreads;i++)//按最小线程数创建的poolThread
        {
            newThread();
        }   
    }

    /* ------------------------------------------------------------ */
    /** Stop the BoundedThreadPool.
     * New jobs are no longer accepted,idle threads are interrupted
     * and stopJob is called on active threads.
     * The method then waits 
     * min(getMaxStopTimeMs(),getMaxIdleTimeMs()), for all jobs to
     * stop, at which time killJob is called.
     */
    protected void doStop() throws Exception
    {   
        super.doStop();
        
        long start=System.currentTimeMillis();
        for (int i=0;i<100;i++)
        {
            synchronized (_threadsLock)
            {
                Iterator iter = _threads.iterator();
                while (iter.hasNext())
                    ((Thread)iter.next()).interrupt();
            }
            
            Thread.yield();
            if (_threads.size()==0 || (_maxStopTimeMs>0 && _maxStopTimeMs < (System.currentTimeMillis()-start)))
               break;
            
            try
            {
                Thread.sleep(i*100);
            }
            catch(InterruptedException e){}
            
            
        }

        // TODO perhaps force stops
        if (_threads.size()>0)
            Log.warn(_threads.size()+" threads could not be stopped");
        
        synchronized (_joinLock)
        {
            _joinLock.notifyAll();
        }
    }

    /* ------------------------------------------------------------ */
    protected void newThread()
    {
        synchronized (_threadsLock)
        {
            if (_threads.size()<_maxThreads)
            {
                PoolThread thread =new PoolThread();
                _threads.add(thread);//添加到线程池中
                thread.setName(thread.hashCode()+"@"+_name+"-"+_id++);//线程name = 线程hashCode + @ + 线程池名字  + 线程在线程池中的序号(递增)
                thread.start(); 
            }
            else if (!_warned)    
            {
                _warned=true;
                Log.debug("Max threads for {}",this);
            }
        }
    }

    /* ------------------------------------------------------------ */
    /** Stop a Job.
     * This method is called by the Pool if a job needs to be stopped.
     * The default implementation does nothing and should be extended by a
     * derived thread pool class if special action is required.
     * @param thread The thread allocated to the job, or null if no thread allocated.
     * @param job The job object passed to run.
     */
    protected void stopJob(Thread thread, Object job)
    {
        thread.interrupt();
    }
    

    /* ------------------------------------------------------------ */
    public String dump()
    {
        StringBuffer buf = new StringBuffer();

        synchronized (_threadsLock)
        {
            for (Iterator i=_threads.iterator();i.hasNext();)
            {
                Thread thread = (Thread)i.next();
                buf.append(thread.getName()).append(" ").append(thread.toString()).append('\n');
            }
        }
        
        return buf.toString();
    }
    
    /* ------------------------------------------------------------ */
    /**
     * @param name The thread name to stop.
     * @return true if the thread was found and stopped.
     * @Deprecated Use {@link #interruptThread(long)} in preference
     */
    public boolean stopThread(String name)
    {
        synchronized (_threadsLock)
        {
            for (Iterator i=_threads.iterator();i.hasNext();)
            {
                Thread thread = (Thread)i.next();
                if (name.equals(thread.getName()))
                {
                    thread.stop();
                    return true;
                }
            }
        }
        return false;
    }
    
    /* ------------------------------------------------------------ */
    /**
     * @param name The thread name to interrupt.
     * @return true if the thread was found and interrupted.
     */
    public boolean interruptThread(String name)
    {
        synchronized (_threadsLock)
        {
            for (Iterator i=_threads.iterator();i.hasNext();)
            {
                Thread thread = (Thread)i.next();
                if (name.equals(thread.getName()))
                {
                    thread.interrupt();
                    return true;
                }
            }
        }
        return false;
    }

    /* ------------------------------------------------------------ */
    /** Pool Thread class.
     * The PoolThread allows the threads job to be
     * retrieved and active status to be indicated.
     */
    public class PoolThread extends Thread 
    {
        Runnable _job=null;//线程池通过内部类的成员变量进行交互

        /* ------------------------------------------------------------ */
        PoolThread()
        {
            setDaemon(_daemon);
            setPriority(_priority);
        }
        
        /* ------------------------------------------------------------ */
        /** BoundedThreadPool run.
         * Loop getting jobs and handling them until idle or stopped.
         */
        public void run()
        {
            boolean idle=false;//
            Runnable job=null;//独立于线程
            try
            {
                while (isRunning())
                {   
                    // Run any job that we have.
                    if (job!=null)
                    {
                        final Runnable todo=job;
                        job=null;
                        idle=false;
                        todo.run();
                    }
                    
                    synchronized(_lock)
                    {
                        // is there a queued job?
                        if (_queued>0)
                        {
                            _queued--;
                            job=_jobs[_nextJob];
                            _jobs[_nextJob++]=null;
                            if (_nextJob==_jobs.length)
                                _nextJob=0;
                            continue;
                        }

                        // Should we shrink?
                        final int threads=_threads.size();
                        if (threads>_minThreads && 
                            (threads>_maxThreads || 
                             _idle.size()>_spawnOrShrinkAt))   
                        {
                            long now = System.currentTimeMillis();
                            if ((now-_lastShrink)>getMaxIdleTimeMs())
                            {
                                _lastShrink=now;
                                _idle.remove(this);
                                return;
                            }
                        }

                        if (!idle)
                        {   
                            // Add ourselves to the idle set.
                            _idle.add(this);
                            idle=true;
                        }
                    }

                    // We are idle
                    // wait for a dispatched job
                    synchronized (this)
                    {
                        if (_job==null)
                            this.wait(getMaxIdleTimeMs());
                        job=_job;
                        _job=null;
                    }
                }
            }
            catch (InterruptedException e)
            {
                Log.ignore(e);
            }
            finally
            {
                synchronized (_lock)
                {
                    _idle.remove(this);
                }
                synchronized (_threadsLock)
                {
                    _threads.remove(this);
                }
                synchronized (this)
                {
                    job=_job;
                }
                
                // we died with a job! reschedule it
                if (job!=null)
                {
                	//此处是因为内部类和外部类有同名的方法,否则,直接调用即可.
                	//调用外部类对象(线程池)的dispatch()方法,而不是内部类对象(PoolThread线程)的dispatch()方法
                    QueuedThreadPool.this.dispatch(job);
                }
            }
        }
        
        /* ------------------------------------------------------------ */
        void dispatch(Runnable job)
        {
            synchronized (this)
            {
                _job=job;
                this.notify();
            }
        }
    }

    private class Lock{}
}

  4.QueuedThreadPoolTest测试类,调用QueuedThreadPool对象的start()方法启动线程池,调用dispatch()方法分发任务。

 

package com.iteye.suo.jetty.thread;
import org.mortbay.thread.QueuedThreadPool;
public class QueuedThreadPoolTest {

	public static void main(String[] args) throws Exception {
		QueuedThreadPool pool = new QueuedThreadPool();
		pool.start();
		
		for(int i=0;i<20;i++){
			final int num = i;
			pool.dispatch(new Runnable(){
				public void run() {
					System.out.println(Thread.currentThread().getName() + " loop of " + num);
				}
			});
		}
		System.out.println("done!");
		//如何停止pool?这样停止的话,若线程池里有任务,将会被中断。
		pool.stop();
	}
}
 

 

 

分享到:
评论
3 楼 wjsr0409 2013-04-23  
XzMarine 写道
非常感谢你的大作。下面是我的疑问:
_nextJobSlot==_nextJob能推出_jobs队列已满吗?当_jobs中没有queued的任务时,_nextJobSlot==_nextJob也是成立的吧?


这里不是判断是否满吧 其实这里是一个消费者和生产者模式 _nextJobSlot==_nextJob表示的是目前消费的速度已经超过生产的速度了,所以要扩容
2 楼 XzMarine 2012-02-27  
还有,为什么不直接用_queued来做判断?当_queued == _jobs.length()的时候,不是很明显就能看出_jobs队列已满吗?
1 楼 XzMarine 2012-02-27  
非常感谢你的大作。下面是我的疑问:
_nextJobSlot==_nextJob能推出_jobs队列已满吗?当_jobs中没有queued的任务时,_nextJobSlot==_nextJob也是成立的吧?

相关推荐

Global site tag (gtag.js) - Google Analytics