锁定老帖子 主题:nio框架中的多个Selector结构
该帖已经被评为良好帖
|
|
---|---|
作者 | 正文 |
发表时间:2009-10-06
随着并发数量的提高,传统nio框架采用一个Selector来支撑大量连接事件的管理和触发已经遇到瓶颈,因此现在各种nio框架的新版本都采用多个 Selector并存的结构,由多个Selector均衡地去管理大量连接。这里以Mina和Grizzly的实现为例。 public NioProcessor(Executor executor) { super(executor); try { // Open a new selector selector = Selector.open(); } catch (IOException e) { throw new RuntimeIoException("Failed to open a selector.", e); } } protected int select(long timeout) throws Exception { return selector.select(timeout); } protected boolean isInterestedInRead(NioSession session) { SelectionKey key = session.getSelectionKey(); return key.isValid() && (key.interestOps() & SelectionKey.OP_READ) != 0; } protected boolean isInterestedInWrite(NioSession session) { SelectionKey key = session.getSelectionKey(); return key.isValid() && (key.interestOps() & SelectionKey.OP_WRITE) != 0; } protected int read(NioSession session, IoBuffer buf) throws Exception { return session.getChannel().read(buf.buf()); } protected int write(NioSession session, IoBuffer buf, int length) throws Exception { if (buf.remaining() <= length) { return session.getChannel().write(buf.buf()); } else { int oldLimit = buf.limit(); buf.limit(buf.position() + length); try { return session.getChannel().write(buf.buf()); } finally { buf.limit(oldLimit); } } } 这些方法的调用都是通过AbstractPollingIoProcessor来处理,这个类里可以看到一个nio框架的核心逻辑,注册、select、派发,具体因为与本文主题不合,不再展开。NioProcessor的初始化是在NioSocketAcceptor的构造方法中调用的: public NioSocketAcceptor() { super(new DefaultSocketSessionConfig(), NioProcessor.class); ((DefaultSocketSessionConfig) getSessionConfig()).init(this); }
protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) { this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true); } 这里其实是一个组合模式,SimpleIoProcessorPool和NioProcessor都实现了Processor接口,一个是组合形成的Processor池,而另一个是单独的类。调用的SimpleIoProcessorPool的构造函数是这样: private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1; public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) { this(processorType, null, DEFAULT_SIZE); } protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception { SelectionKey key = handle.keyFor(selector); if ((key == null) || (!key.isValid()) || (!key.isAcceptable()) ) { return null; } // accept the connection from the client SocketChannel ch = handle.accept(); if (ch == null) { return null; } return new NioSocketSession(this, processor, ch); } private void processHandles(Iterator<H> handles) throws Exception { while (handles.hasNext()) { H handle = handles.next(); handles.remove(); // Associates a new created connection to a processor, // and get back a session T session = accept(processor, handle); if (session == null) { break; } initSession(session, null, null); // add the session to the SocketIoProcessor session.getProcessor().add(session); } }
private IoProcessor<T> nextProcessor() { checkDisposal(); return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length]; } if (p == null) { p = nextProcessor(); IoProcessor<T> oldp = (IoProcessor<T>) session.setAttributeIfAbsent(PROCESSOR, p); if (oldp != null) { p = oldp; } } protected boolean doSelect() { selectorHandler = transport.getSelectorHandler(); selectionKeyHandler = transport.getSelectionKeyHandler(); strategy = transport.getStrategy(); try { if (isResume) { // If resume SelectorRunner - finish postponed keys isResume = false; if (keyReadyOps != 0) { if (!iterateKeyEvents()) return false; } if (!iterateKeys()) return false; } lastSelectedKeysCount = 0; selectorHandler.preSelect(this); readyKeys = selectorHandler.select(this); if (stateHolder.getState(false) == State.STOPPING) return false; lastSelectedKeysCount = readyKeys.size(); if (lastSelectedKeysCount != 0) { iterator = readyKeys.iterator(); if (!iterateKeys()) return false; } selectorHandler.postSelect(this); } catch (ClosedSelectorException e) { notifyConnectionException(key, "Selector was unexpectedly closed", e, Severity.TRANSPORT, Level.SEVERE, Level.FINE); } catch (Exception e) { notifyConnectionException(key, "doSelect exception", e, Severity.UNKNOWN, Level.SEVERE, Level.FINE); } catch (Throwable t) { logger.log(Level.SEVERE,"doSelect exception", t); transport.notifyException(Severity.FATAL, t); } return true; }
private static final int DEFAULT_SELECTOR_RUNNERS_COUNT = 2; public void start() throws IOException { if (selectorRunnersCount <= 0) { selectorRunnersCount = DEFAULT_SELECTOR_RUNNERS_COUNT; } startSelectorRunners(); } protected void startSelectorRunners() throws IOException { selectorRunners = new SelectorRunner[selectorRunnersCount]; synchronized(selectorRunners) { for (int i = 0; i < selectorRunnersCount; i++) { SelectorRunner runner = new SelectorRunner(this, SelectorFactory.instance().create()); runner.start(); selectorRunners[i] = runner; } } }
可见Grizzly并没有采用一个单独的池对象来管理SelectorRunner,而是直接采用数组管理,默认数组大小是2。 SelectorRunner实现了Runnable接口,它的start方法调用了一个线程池来运行自身。刚才我提到了说Grizzly的Accept 是单独一个Selector来管理的,那么是如何表现的呢?答案在RoundRobinConnectionDistributor类,这个类是用于派发注册事件到相应的SelectorRunner上,它的派发方式是这样: public Future<RegisterChannelResult> registerChannelAsync( SelectableChannel channel, int interestOps, Object attachment, CompletionHandler completionHandler) throws IOException { SelectorRunner runner = getSelectorRunner(interestOps); return transport.getSelectorHandler().registerChannelAsync( runner, channel, interestOps, attachment, completionHandler); } private SelectorRunner getSelectorRunner(int interestOps) { SelectorRunner[] runners = getTransportSelectorRunners(); int index; if (interestOps == SelectionKey.OP_ACCEPT || runners.length == 1) { index = 0; } else { index = (counter.incrementAndGet() % (runners.length - 1)) + 1; } return runners[index]; }
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2010-01-06
这么好的文章,居然没有人支持一下,楼主再接再厉
|
|
返回顶楼 | |
发表时间:2010-01-20
文章写得很不错,我正在搞这方面,期待与楼主交流
|
|
返回顶楼 | |
发表时间:2010-10-05
多个Selector的情况,比如6个Selector,那么OP_ACCEPT是否只分配一个,剩下的5个分配给读写事件吗?请问这里您是怎么处理的?
|
|
返回顶楼 | |
浏览 11207 次