`
XzMarine
  • 浏览: 3342 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

jetty中的QueuedThreadPool

阅读更多

QueuedThreadPool

doStart()方法负责初始化并启动_minThreads个空线程,等待任务的到来。其中,

_theads用来存放当前池中所有线程;

_idle中存放当前空闲线程;

_jobs为存放任务的队列,先来先服务。

protected void doStart() throws Exception
{
	if (_maxThreads<_minThreads || _minThreads<=0)
		throw new IllegalArgumentException("!0<minThreads<maxThreads");
	
	_threads=new HashSet();
	_idle=new ArrayList();
	_jobs=new Runnable[_maxThreads];
	
	for (int i=0;i<_minThreads;i++)
	{
		newThread();
	}   
}

protected void newThread()
{
	synchronized (_threadsLock)
	{
		if (_threads.size()<_maxThreads)
		{
			PoolThread thread =new PoolThread();
			_threads.add(thread);
			thread.setName(thread.hashCode()+"@"+_name+"-"+_id++);
			thread.start(); 
		}
		else if (!_warned)    
		{
			_warned=true;
			Log.debug("Max threads for {}",this);
		}
	}
}

 

初始化完毕,可调用dispatch(Runnable job)方法给线程池分配任务。

如果_idle.size() > 0,即有空闲线程,直接把job分给最后一个空闲线程,并从_idle中删除之。至于为什么是给最后一个?没有为什么,只要你愿意,随便哪一个都行。

如果_idle.size() = 0,即没有空闲线程,那就只好到_jobs中排队啦。

_jobs的示意图如下图,这是一初始大小为5的队列。



_nextJob指向_jobs队列中当前可取任务地址,每取一次任务,_nextJob自增1;而_nextJobSlot指向当前可以放置任务的地址,也就是下一个任务进队列,要放在哪一个位置,每进一个任务,_nextJobSlot自增1。它们初始位置当然都是0。

一般来说,当_nextJobSlot总是处在_nextJob前面的,即_nextJobSlot >_nextJob。
_nextJob == _nextJobSlot时,就说明_jobs队列排满了,没位置了,怎么办?增加队列长度呗。QueuedThreadPool中,每调整一次,增加_maxThreads个空位置。 调整的过程中还涉及到数据的复制和两个指针位置的调整。

一旦发现,队列中的任务数_queued大于_spawnOrShrinkAt这个阀值,说明任务太多,当前线程数偏少,我们需要增加更多的工作线程来执行任务,最多_maxThreads个。

public boolean dispatch(Runnable job) 
{  
	if (!isRunning() || job==null)
		return false;

	PoolThread thread=null;
	boolean spawn=false;
		
	synchronized(_lock)
	{
		// Look for an idle thread
		int idle=_idle.size();
		if (idle>0) //有空闲线程,直接分配
			thread=(PoolThread)_idle.remove(idle-1);
		else
		{
			// queue the job
			_queued++;
			if (_queued>_maxQueued)
				_maxQueued=_queued;
			_jobs[_nextJobSlot++]=job;
			if (_nextJobSlot==_jobs.length)
				_nextJobSlot=0;
			if (_nextJobSlot==_nextJob) //环形队列_jobs已满,需要调整大小
			{
				//在原有基础上增加_maxThreads
				Runnable[] jobs= new Runnable[_jobs.length+_maxThreads];
				int split=_jobs.length-_nextJob;
				if (split>0)
					System.arraycopy(_jobs,_nextJob,jobs,0,split);
				if (_nextJob!=0)
					System.arraycopy(_jobs,0,jobs,split,_nextJobSlot);
				
				_jobs=jobs;
				_nextJob=0;
				_nextJobSlot=_queued;
			}
			// 队列任务数超过阀值,在可能的情况下,增加工作线程数目  
			spawn=_queued>_spawnOrShrinkAt;
		}
	}
	
	if (thread!=null)
	{
		thread.dispatch(job);
	}
	else if (spawn)
	{
		newThread();
	}
	return true;
}

 

PoolThread

 

上面说了,一旦没有空闲线程,任务就仅仅是加入到_jobs队列中,那么负责从队列中取任务就是PoolThread的事了。

PoolThread在自己的任务完成之后会自觉的到_jobs环形队列中领取任务。有任务,执行之;没有,把自己加入_idle中,等待_maxIdleTimeMs的时间,或QueuedThreadPool的唤醒(调用PoolThread的dispatch(Runnable job)方法)。

当线程处于空闲状态,并发现当前空闲线程数多于_spawnOrShrinkAt阀值时,它就得考虑要不要自我了断了。条件就是距离上次缩减(shrink)空闲线程时间超过_maxIdleTimeMs。

 

public void run()
{
	boolean idle=false;
	Runnable job=null;
	try
	{
		while (isRunning())
		{   
			// Run any job that we have.
			if (job!=null)
			{
				final Runnable todo=job;
				job=null;
				idle=false;
				todo.run();
			}
			
			synchronized(_lock)
			{
				// is there a queued job?
				if (_queued>0)
				{
					_queued--;
					job=_jobs[_nextJob];
					_jobs[_nextJob++]=null;
					if (_nextJob==_jobs.length)
						_nextJob=0;
					continue;
				}

				// Should we shrink?
				final int threads=_threads.size();
				
				// 我不知道为什么_threads.size()会有可能大于_maxThreads
				// 知道的兄弟请告诉我!!
				if (threads>_minThreads && 
					(threads>_maxThreads || 
					 _idle.size()>_spawnOrShrinkAt))   
				{
					long now = System.currentTimeMillis();
					if ((now-_lastShrink)>getMaxIdleTimeMs())
					{
						_lastShrink=now;
						_idle.remove(this);
						return;
					}
				}

				if (!idle)
				{   
					// Add ourselves to the idle set.
					_idle.add(this);
					idle=true;
				}
			}

			// We are idle
			// wait for a dispatched job
			synchronized (this)
			{
				if (_job==null)
					this.wait(getMaxIdleTimeMs());
				job=_job;
				_job=null;
			}
		}
	}
	catch (InterruptedException e)
	{
		Log.ignore(e);
	}
	finally
	{
		synchronized (_lock)
		{
			_idle.remove(this);
		}
		synchronized (_threadsLock)
		{
			_threads.remove(this);
		}
		synchronized (this)
		{
			job=_job;
		}
		
		// we died with a job! reschedule it
		if (job!=null)
		{
			QueuedThreadPool.this.dispatch(job);
		}
	}
}

 

 

参考资料:

http://suo.iteye.com/blog/1390134

  • 大小: 13 KB
分享到:
评论

相关推荐

    jetty内嵌到java代码启动

    Jetty是一款轻量级、高性能的Java Web服务器和Servlet容器,它允许开发者将Web服务器功能直接集成到他们的Java应用程序中。这种内嵌式部署模式在开发和调试阶段尤其有用,因为它提供了灵活的控制和快速的反馈循环。...

    jetty启动器资源配置文件

    &lt;New id="ThreadPool" class="org.eclipse.jetty.util.thread.QueuedThreadPool"&gt; &lt;!-- 线程池的相关配置项 --&gt; ``` 此处的`&lt;New&gt;`标签用于创建一个新的对象实例,并通过`class`属性指定该对象的类名。...

    jetty-6.1.26源码

    例如,`org.eclipse.jetty.util.thread`包下的`QueuedThreadPool`是Jetty的默认线程池,它优化了线程的创建和销毁,提高了服务器性能。 4. **连接器(Connector)**:Jetty通过`Connector`组件与网络通信,如`...

    jetty源代码下载

    - **嵌入式使用**:Jetty的一大特点是它可以被嵌入到其他Java应用中,无需单独部署和管理。 - **连接器技术**:Jetty支持多种连接器,如NIO(非阻塞I/O)、EPOLL(Linux下高效I/O)等,适应不同场景下的性能需求。 -...

    PDF的JETTY文档

    Jetty中默认使用的是`QueuedThreadPool`类来实现线程池功能,该类实现了`SizedThreadPool`接口。 **初始化代码示例:** ```java public Server(@Name("threadpool") ThreadPool pool) { _threadPool = (pool != ...

    Jetty使用总结

    启动Jetty可以通过命令行方式执行Jetty安装目录下的启动脚本(如`start.jar`),或者通过编程的方式在Java应用程序中嵌入Jetty服务器。具体方法可以根据实际情况选择。 #### 五、总结 通过上述步骤,我们可以轻松...

    jetty源码查阅

    - **QueuedThreadPool**:Jetty默认的线程池,具有智能调度和资源管理特性。 6. **源码阅读技巧** - **了解设计模式**:Jetty大量使用了工厂模式、装饰者模式和责任链模式,理解这些模式有助于阅读源码。 - **...

    jetty服务器性能调整过程分析

    ### Jetty服务器性能调整过程分析 #### 一、目标 Jetty服务器采用了非阻塞I/O(NIO)加线程池的技术方案来实现在高并发场景下的高性能表现。本篇文章的目标是通过调整Jetty服务器的各项配置参数,来观察并评估其对...

    jetty-util-9.4.0.M0.zip

    2. **线程模型**:Jetty利用高效的线程模型来优化性能,如`QueuedThreadPool`,允许应用程序根据需要动态调整线程数量。 3. **异步I/O**:支持NIO(非阻塞I/O)和EPOLL(在Linux系统上的高效I/O)模型,提高服务端的...

    Netty_Jetty

    2. **线程模型**:Jetty使用一种高效的线程模型,如Selectors和QueuedThreadPool,以处理大量并发请求。 3. **Servlet 3.x支持**:Jetty全面支持Servlet规范,包括最新的Servlet 3.x版本,提供异步处理和WebSocket...

Global site tag (gtag.js) - Google Analytics