QueuedThreadPool
doStart()方法负责初始化并启动_minThreads个空线程,等待任务的到来。其中,
_theads用来存放当前池中所有线程;
_idle中存放当前空闲线程;
_jobs为存放任务的队列,先来先服务。
protected void doStart() throws Exception
{
if (_maxThreads<_minThreads || _minThreads<=0)
throw new IllegalArgumentException("!0<minThreads<maxThreads");
_threads=new HashSet();
_idle=new ArrayList();
_jobs=new Runnable[_maxThreads];
for (int i=0;i<_minThreads;i++)
{
newThread();
}
}
protected void newThread()
{
synchronized (_threadsLock)
{
if (_threads.size()<_maxThreads)
{
PoolThread thread =new PoolThread();
_threads.add(thread);
thread.setName(thread.hashCode()+"@"+_name+"-"+_id++);
thread.start();
}
else if (!_warned)
{
_warned=true;
Log.debug("Max threads for {}",this);
}
}
}
初始化完毕,可调用dispatch(Runnable job)方法给线程池分配任务。
如果_idle.size() > 0,即有空闲线程,直接把job分给最后一个空闲线程,并从_idle中删除之。至于为什么是给最后一个?没有为什么,只要你愿意,随便哪一个都行。
如果_idle.size() = 0,即没有空闲线程,那就只好到_jobs中排队啦。
_jobs的示意图如下图,这是一初始大小为5的队列。
_nextJob指向_jobs队列中当前可取任务地址,每取一次任务,_nextJob自增1;而_nextJobSlot指向当前可以放置任务的地址,也就是下一个任务进队列,要放在哪一个位置,每进一个任务,_nextJobSlot自增1。它们初始位置当然都是0。
一般来说,当_nextJobSlot总是处在_nextJob前面的,即_nextJobSlot >_nextJob。
当_nextJob == _nextJobSlot时,就说明_jobs队列排满了,没位置了,怎么办?增加队列长度呗。QueuedThreadPool中,每调整一次,增加_maxThreads个空位置。 调整的过程中还涉及到数据的复制和两个指针位置的调整。
一旦发现,队列中的任务数_queued大于_spawnOrShrinkAt这个阀值,说明任务太多,当前线程数偏少,我们需要增加更多的工作线程来执行任务,最多_maxThreads个。
public boolean dispatch(Runnable job)
{
if (!isRunning() || job==null)
return false;
PoolThread thread=null;
boolean spawn=false;
synchronized(_lock)
{
// Look for an idle thread
int idle=_idle.size();
if (idle>0) //有空闲线程,直接分配
thread=(PoolThread)_idle.remove(idle-1);
else
{
// queue the job
_queued++;
if (_queued>_maxQueued)
_maxQueued=_queued;
_jobs[_nextJobSlot++]=job;
if (_nextJobSlot==_jobs.length)
_nextJobSlot=0;
if (_nextJobSlot==_nextJob) //环形队列_jobs已满,需要调整大小
{
//在原有基础上增加_maxThreads
Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
int split=_jobs.length-_nextJob;
if (split>0)
System.arraycopy(_jobs,_nextJob,jobs,0,split);
if (_nextJob!=0)
System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
_jobs=jobs;
_nextJob=0;
_nextJobSlot=_queued;
}
// 队列任务数超过阀值,在可能的情况下,增加工作线程数目
spawn=_queued>_spawnOrShrinkAt;
}
}
if (thread!=null)
{
thread.dispatch(job);
}
else if (spawn)
{
newThread();
}
return true;
}
PoolThread
上面说了,一旦没有空闲线程,任务就仅仅是加入到_jobs队列中,那么负责从队列中取任务就是PoolThread的事了。
PoolThread在自己的任务完成之后会自觉的到_jobs环形队列中领取任务。有任务,执行之;没有,把自己加入_idle中,等待_maxIdleTimeMs的时间,或QueuedThreadPool的唤醒(调用PoolThread的dispatch(Runnable job)方法)。
当线程处于空闲状态,并发现当前空闲线程数多于_spawnOrShrinkAt阀值时,它就得考虑要不要自我了断了。条件就是距离上次缩减(shrink)空闲线程时间超过_maxIdleTimeMs。
public void run()
{
boolean idle=false;
Runnable job=null;
try
{
while (isRunning())
{
// Run any job that we have.
if (job!=null)
{
final Runnable todo=job;
job=null;
idle=false;
todo.run();
}
synchronized(_lock)
{
// is there a queued job?
if (_queued>0)
{
_queued--;
job=_jobs[_nextJob];
_jobs[_nextJob++]=null;
if (_nextJob==_jobs.length)
_nextJob=0;
continue;
}
// Should we shrink?
final int threads=_threads.size();
// 我不知道为什么_threads.size()会有可能大于_maxThreads
// 知道的兄弟请告诉我!!
if (threads>_minThreads &&
(threads>_maxThreads ||
_idle.size()>_spawnOrShrinkAt))
{
long now = System.currentTimeMillis();
if ((now-_lastShrink)>getMaxIdleTimeMs())
{
_lastShrink=now;
_idle.remove(this);
return;
}
}
if (!idle)
{
// Add ourselves to the idle set.
_idle.add(this);
idle=true;
}
}
// We are idle
// wait for a dispatched job
synchronized (this)
{
if (_job==null)
this.wait(getMaxIdleTimeMs());
job=_job;
_job=null;
}
}
}
catch (InterruptedException e)
{
Log.ignore(e);
}
finally
{
synchronized (_lock)
{
_idle.remove(this);
}
synchronized (_threadsLock)
{
_threads.remove(this);
}
synchronized (this)
{
job=_job;
}
// we died with a job! reschedule it
if (job!=null)
{
QueuedThreadPool.this.dispatch(job);
}
}
}
参考资料:
http://suo.iteye.com/blog/1390134
- 大小: 13 KB
分享到:
相关推荐
Jetty是一款轻量级、高性能的Java Web服务器和Servlet容器,它允许开发者将Web服务器功能直接集成到他们的Java应用程序中。这种内嵌式部署模式在开发和调试阶段尤其有用,因为它提供了灵活的控制和快速的反馈循环。...
<New id="ThreadPool" class="org.eclipse.jetty.util.thread.QueuedThreadPool"> <!-- 线程池的相关配置项 --> ``` 此处的`<New>`标签用于创建一个新的对象实例,并通过`class`属性指定该对象的类名。...
例如,`org.eclipse.jetty.util.thread`包下的`QueuedThreadPool`是Jetty的默认线程池,它优化了线程的创建和销毁,提高了服务器性能。 4. **连接器(Connector)**:Jetty通过`Connector`组件与网络通信,如`...
- **嵌入式使用**:Jetty的一大特点是它可以被嵌入到其他Java应用中,无需单独部署和管理。 - **连接器技术**:Jetty支持多种连接器,如NIO(非阻塞I/O)、EPOLL(Linux下高效I/O)等,适应不同场景下的性能需求。 -...
Jetty中默认使用的是`QueuedThreadPool`类来实现线程池功能,该类实现了`SizedThreadPool`接口。 **初始化代码示例:** ```java public Server(@Name("threadpool") ThreadPool pool) { _threadPool = (pool != ...
启动Jetty可以通过命令行方式执行Jetty安装目录下的启动脚本(如`start.jar`),或者通过编程的方式在Java应用程序中嵌入Jetty服务器。具体方法可以根据实际情况选择。 #### 五、总结 通过上述步骤,我们可以轻松...
- **QueuedThreadPool**:Jetty默认的线程池,具有智能调度和资源管理特性。 6. **源码阅读技巧** - **了解设计模式**:Jetty大量使用了工厂模式、装饰者模式和责任链模式,理解这些模式有助于阅读源码。 - **...
### Jetty服务器性能调整过程分析 #### 一、目标 Jetty服务器采用了非阻塞I/O(NIO)加线程池的技术方案来实现在高并发场景下的高性能表现。本篇文章的目标是通过调整Jetty服务器的各项配置参数,来观察并评估其对...
2. **线程模型**:Jetty利用高效的线程模型来优化性能,如`QueuedThreadPool`,允许应用程序根据需要动态调整线程数量。 3. **异步I/O**:支持NIO(非阻塞I/O)和EPOLL(在Linux系统上的高效I/O)模型,提高服务端的...
2. **线程模型**:Jetty使用一种高效的线程模型,如Selectors和QueuedThreadPool,以处理大量并发请求。 3. **Servlet 3.x支持**:Jetty全面支持Servlet规范,包括最新的Servlet 3.x版本,提供异步处理和WebSocket...