浏览 3825 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2008-10-23
java 1.4给我们带来了NIO编程模型,由于它的读写操作都是无阻塞的,这样使我们能够只用一个线程处理所有的IO事件,当然我们不会真的只用一个线程来处理,比较常见的编写NIO网络服务程序的模型是半同步-半异步模式,其实现原理大体上是单线程同步处理网络IO请求,当有请求到达时,将该请求放入一个工作队列中,由另外的线程处理,主线程继续等待新的网络IO请求,这种编程模型的缺点主要是: 1.使用工作队列带来的内存的动态分配问题 2.每次网络IO请求,总是分配给另外一个线程处理,这样频繁的线程的context switching也会 影响性能 解决的方案是使用Leader-Follower线程模型,它的基本思想是所有的线程被分配成两种角色: Leader和Follower,一般同时只有一个Leader线程,所有的Follower线程排队等待成为Leader线程,线程池启动时自动产生一个Leader负责等待网络IO事件,当有一个事件产生时,Leader线程首先通知一个Follower线程,并将其提拔为新的Leader,然后自己去处理这个网络事件,处理完毕后加入Follower线程等待队列,等待重新成为Leader. 这个线程模型主要解决了内存的动态分配问题,我们不需要不断的将到来的网络IO事件放入队列,并且等待网络IO事件的线程在等到网络事件的发生后,是自己处理的这个事件,也就是说没有context switching的过程. 下面是简单的代码演示 首先定义我们的事件模型Event public interface Event { SelectableChannel getChannel(); } 它的实现: public class DefaultEventImpl implements Event { private SelectableChannel channel; public DefaultEventImpl(SelectableChannel channel) { this.channel = channel; } @Override public SelectableChannel getChannel() { return this.channel; } } 然后定义我们的EventHandler public interface EventHandler { /** * this can be blocked * * @return Event */ Event pollEvent(); void handleEvent(Event e); } 其中的pollEvent()方法会阻塞,当有网络请求时返回一个我们封装的Event对象,其实里面大体上就是select()方法 接下来是我们的核心部分,Leader-Follower线程池的定义 public interface LFThreadPool { void start(); void promoteLeader(); void waitToBeLeader(); } 通过start方法启动线程池,然后调用一次promoteLeader产生了一个Leader线程等待网络事件的到达,其余线程则是waitToBeLeader的状态. 我们还定义了一个Worker线程模型 public interface WorkerThread extends Runnable { void start(); } start方法主要是启动所有的workers,其实觉得可以省去这个接口的 然后看一下我们的worker thread实现 public class DefaultWorkerThread implements WorkerThread { private LFThreadPool pool; private EventHandler handler; private boolean isActive; public DefaultWorkerThread(DefaultLFThreadPoolImpl pool, EventHandler handler) { this.pool = pool; this.handler = handler; } @Override public void run() { while (isActive) { this.pool.waitToBeLeader(); //阻塞等待pollEvent返回待处理的事件,此处等待的是Leader线程 Event event = handler.pollEvent(); //notify 唤醒一个正在等待的线程成为leader,由它来等待新的 //网络事件到达,自己则去处理当前的IO事件 this.pool.promoteLeader(); handler.handleEvent(event); //处理完毕后you重新waitToBeLeader等待成为Leader } } @Override public synchronized void start() { if (!isActive) { isActive = true; } new Thread(this).start(); } } 最后是我们的线程池的实现部分,看一下LFThreadPool的实现 public class DefaultLFThreadPoolImpl implements LFThreadPool { private WorkerThread[] workers; private boolean isActive; private Object semaphore = new Object(); public DefaultLFThreadPoolImpl(int poolSize, EventHandler handler) { workers = new DefaultWorkerThread[poolSize]; for (int i = 0; i < poolSize; i++) { workers[i] = new DefaultWorkerThread(this, handler); } } @Override public synchronized void start() { if (!isActive) { //启动所有的workers for (int i = 0; i < workers.length; i++) { workers[i].start(); } //保证所有的workers已经启动 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //产生一个Leader线程 promoteLeader(); isActive = true; } } @Override public void promoteLeader() { synchronized (semaphore) { semaphore.notify(); } } @Override public void waitToBeLeader() { synchronized (semaphore) { try { semaphore.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } 整个过程中没有动态内存分配,也没有频繁的context switching,应该可以提高部分效率,后续会发出完整的程序,欢迎大家指正. 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |