锁定老帖子 主题:Java aio(异步网络IO)初探
该帖已经被评为良好帖
|
|
---|---|
作者 | 正文 |
发表时间:2009-09-20
最后修改:2009-09-21
按照《Unix网络编程》的划分,IO模型可以分为:阻塞IO、非阻塞IO、IO复用、信号驱动IO和异步IO,按照POSIX标准来划分只分为两类:同步IO和异步IO。如何区分呢?首先一个IO操作其实分成了两个步骤:发起IO请求和实际的IO操作,同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO服用、信号驱动IO都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO。阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那么就是非阻塞IO。 public static AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory) throws IOException public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor, int initialSize) public static AsynchronousChannelGroup withThreadPool(ExecutorService executor) throws IOException 需要根据具体应用相应调整,从框架角度出发,需要暴露这样的配置选项给用户。 public interface CompletionHandler<V,A> { void completed(V result, A attachment); void failed(Throwable exc, A attachment); void cancelled(A attachment); }
this.asynchronousChannelGroup = AsynchronousChannelGroup .withCachedThreadPool(Executors.newCachedThreadPool(), this.threadPoolSize); 然后初始化一个AsynchronousServerSocketChannel,通过open方法: this.serverSocketChannel = AsynchronousServerSocketChannel .open(this.asynchronousChannelGroup); 通过nio 2.0引入的SocketOption类设置一些TCP选项: this.serverSocketChannel .setOption( StandardSocketOption.SO_REUSEADDR,true); this.serverSocketChannel .setOption( StandardSocketOption.SO_RCVBUF,16*1024);
this.serverSocketChannel .bind(new InetSocketAddress("localhost",8080), 100); public void pendingAccept() { if (this.started && this.serverSocketChannel.isOpen()) { this.acceptFuture = this.serverSocketChannel.accept(null, new AcceptCompletionHandler()); } else { throw new IllegalStateException("Controller has been closed"); } }
private final class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Object> { @Override public void cancelled(Object attachment) { logger.warn("Accept operation was canceled"); } @Override public void completed(AsynchronousSocketChannel socketChannel, Object attachment) { try { logger.debug("Accept connection from " + socketChannel.getRemoteAddress()); configureChannel(socketChannel); AioSessionConfig sessionConfig = buildSessionConfig(socketChannel); Session session = new AioTCPSession(sessionConfig, AioTCPController.this.configuration .getSessionReadBufferSize(), AioTCPController.this.sessionTimeout); session.start(); registerSession(session); } catch (Exception e) { e.printStackTrace(); logger.error("Accept error", e); notifyException(e); } finally { pendingAccept(); } } @Override public void failed(Throwable exc, Object attachment) { logger.error("Accept error", exc); try { notifyException(exc); } finally { pendingAccept(); } } } public class AioTCPSession { protected void start0() { pendingRead(); } protected final void pendingRead() { if (!isClosed() && this.asynchronousSocketChannel.isOpen()) { if (!this.readBuffer.hasRemaining()) { this.readBuffer = ByteBufferUtils .increaseBufferCapatity(this.readBuffer); } this.readFuture = this.asynchronousSocketChannel.read( this.readBuffer, this, this.readCompletionHandler); } else { throw new IllegalStateException( "Session Or Channel has been closed"); } } } AsynchronousSocketChannel的read调用与AsynchronousServerSocketChannel的accept调用类似,同样是非阻塞的,返回结果也是一个Future,但是写的结果是整数,表示写入了多少字节,因此read调用返回的是 Future<Integer>,方法的第一个参数是读的缓冲区,操作系统将IO读到数据拷贝到这个缓冲区,第二个参数是传递给 CompletionHandler的attchment,第三个参数就是注册的用于回调的CompletionHandler。这里保存了read的结果Future,这是为了在关闭连接的时候能够主动取消调用,accept也是如此。现在可以看看read的CompletionHandler的实现: public final class ReadCompletionHandler implements CompletionHandler<Integer, AbstractAioSession> { private static final Logger log = LoggerFactory .getLogger(ReadCompletionHandler.class); protected final AioTCPController controller; public ReadCompletionHandler(AioTCPController controller) { this.controller = controller; } @Override public void cancelled(AbstractAioSession session) { log.warn("Session(" + session.getRemoteSocketAddress() + ") read operation was canceled"); } @Override public void completed(Integer result, AbstractAioSession session) { if (log.isDebugEnabled()) log.debug("Session(" + session.getRemoteSocketAddress() + ") read +" + result + " bytes"); if (result < 0) { session.close(); return; } try { if (result > 0) { session.updateTimeStamp(); session.getReadBuffer().flip(); session.decode(); session.getReadBuffer().compact(); } } finally { try { session.pendingRead(); } catch (IOException e) { session.onException(e); session.close(); } } controller.checkSessionTimeout(); } @Override public void failed(Throwable exc, AbstractAioSession session) { log.error("Session read error", exc); session.onException(exc); session.close(); } } 如果IO读失败,会返回失败产生的异常,这种情况下我们就主动关闭连接,通过session.close()方法,这个方法干了两件事情:关闭channel和取消read调用: if (null != this.readFuture) { this.readFuture.cancel(true); } this.asynchronousSocketChannel.close(); 在读成功的情况下,我们还需要判断结果result是否小于0,如果小于0就表示对端关闭了,这种情况下我们也主动关闭连接并返回。如果读到一定字节,也就是result大于0的情况下,我们就尝试从读缓冲区中decode出消息,并派发给业务处理器的回调方法,最终通过pendingRead继续发起read调用等待socket的下一次可读。可见,我们并不需要自己去调用channel来进行IO读,而是操作系统帮你直接读到了缓冲区,然后给你一个结果表示读入了多少字节,你处理这个结果即可。而nonblocking IO框架中,是reactor通知用户线程socket可读了,然后用户线程自己去调用read进行实际读操作。这里还有个需要注意的地方,就是decode出来的消息的派发给业务处理器工作最好交给一个线程池来处理,避免阻塞group绑定的线程池。 protected void write0(WriteMessage message) { boolean needWrite = false; synchronized (this.writeQueue) { needWrite = this.writeQueue.isEmpty(); this.writeQueue.offer(message); } if (needWrite) { pendingWrite(message); } } protected final void pendingWrite(WriteMessage message) { message = preprocessWriteMessage(message); if (!isClosed() && this.asynchronousSocketChannel.isOpen()) { this.asynchronousSocketChannel.write(message.getWriteBuffer(), this, this.writeCompletionHandler); } else { throw new IllegalStateException( "Session Or Channel has been closed"); } } write调用返回的结果与read一样是一个Future<Integer>,而write的CompletionHandler处理的核心逻辑大概是这样: @Override public void completed(Integer result, AbstractAioSession session) { if (log.isDebugEnabled()) log.debug("Session(" + session.getRemoteSocketAddress() + ") writen " + result + " bytes"); WriteMessage writeMessage; Queue<WriteMessage> writeQueue = session.getWriteQueue(); synchronized (writeQueue) { writeMessage = writeQueue.peek(); if (writeMessage.getWriteBuffer() == null || !writeMessage.getWriteBuffer().hasRemaining()) { writeQueue.remove(); if (writeMessage.getWriteFuture() != null) { writeMessage.getWriteFuture().setResult(Boolean.TRUE); } try { session.getHandler().onMessageSent(session, writeMessage.getMessage()); } catch (Exception e) { session.onException(e); } writeMessage = writeQueue.peek(); } } if (writeMessage != null) { try { session.pendingWrite(writeMessage); } catch (IOException e) { session.onException(e); session.close(); } } }
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2009-09-21
最后修改:2009-09-21
刚看了一点,第一行有个错别字,不是IO服用,是复用,嘿嘿
老大终于开始介绍java NIO了。。 还有一点,看过一些源代码,对事件驱动还是理解不深,也请老大介绍下把。 |
|
返回顶楼 | |
发表时间:2009-09-21
rain2005 写道 刚看了一点,第一行有个错别字,不是IO服用,是复用,嘿嘿
老大终于开始介绍java NIO了。。 还有一点,看过一些源代码,对事件驱动还是理解不深,也请老大介绍下把。 多谢指正,关于事件机制,我会画个UML图可能比较清晰 |
|
返回顶楼 | |
发表时间:2009-09-21
最后修改:2009-09-21
上面提到几个类 怎么在jdk6中没发现那
是不是要下载第三方框架 正好想研究一下 收藏啦 |
|
返回顶楼 | |
发表时间:2009-09-21
xly_971223 写道 上面提到几个类 怎么在jdk6中没发现那
是不是要下载第三方框架 正好想研究一下 收藏啦 aio是在jdk7引入的,请下载JDK7 preview1版本,或者open jdk |
|
返回顶楼 | |
发表时间:2009-09-22
dennis_zane 写道 xly_971223 写道 上面提到几个类 怎么在jdk6中没发现那
是不是要下载第三方框架 正好想研究一下 收藏啦 aio是在jdk7引入的,请下载JDK7 preview1版本,或者open jdk 是不是这个意思。 jdk7以前的nio是非阻塞IO,操作系统底层比方说linux,是用IO复用select实现的 jdk7用的是真正的异步IO,操作系统底层是用epoll实现的 是这样的吗? |
|
返回顶楼 | |
发表时间:2009-09-22
rain2005 写道 dennis_zane 写道 xly_971223 写道 上面提到几个类 怎么在jdk6中没发现那
是不是要下载第三方框架 正好想研究一下 收藏啦 aio是在jdk7引入的,请下载JDK7 preview1版本,或者open jdk 是不是这个意思。 jdk7以前的nio是非阻塞IO,操作系统底层比方说linux,是用IO复用select实现的 jdk7用的是真正的异步IO,操作系统底层是用epoll实现的 是这样的吗? epoll也不是异步IO啊。异步IO在linux上目前仅限于文件系统,并且还没有得到广泛应用,很多平台都没有这玩意。 java aio在windows上是利用iocp实现的,这是真正的异步IO。而在linux上,是通过epoll模拟的。 |
|
返回顶楼 | |
发表时间:2010-01-26
楼主,你好,我写的server是p2p的应用,恩,想请教一下,因为我对数据库这一块的操作并不是特别的多,基本是客户自己也有很多是服务器,不知道是否可以在事件响应的当前线程来对数据库操作呢?前提是把线程池子的数目设的大些?或者用那个JDK提供的可自己增加线程的池子?
因为如果在弄个池子来处理数据库的话,担心线程太多了, 你如果时间充分心情好的话,真希望你能讲解一下和别人公用一台服务器(主机)是怎么用的呢。。。 总之要谢谢你对这段代码的讲解,kang sang mi da |
|
返回顶楼 | |
发表时间:2010-01-27
wujingsong 写道 楼主,你好,我写的server是p2p的应用,恩,想请教一下,因为我对数据库这一块的操作并不是特别的多,基本是客户自己也有很多是服务器,不知道是否可以在事件响应的当前线程来对数据库操作呢?前提是把线程池子的数目设的大些?或者用那个JDK提供的可自己增加线程的池子?
因为如果在弄个池子来处理数据库的话,担心线程太多了, 你如果时间充分心情好的话,真希望你能讲解一下和别人公用一台服务器(主机)是怎么用的呢。。。 总之要谢谢你对这段代码的讲解,kang sang mi da 按我的经验来说,类似数据库操作这样的IO操作,最好还是起个线程池来处理,防止阻塞框架内部的处理线程。如果这样的操作不是特别多,那么直接在响应线程处理也未尝不可,还是建议你自己搞两个版本性能对比一下。 wujingsong 写道 和别人公用一台是怎么用的呢? 我还真不明白什么意思,现在我们的应用基本都跑在虚拟机上了,几个应用跑在一个物理机上。虚拟化我不懂,就不乱弹了。 |
|
返回顶楼 | |
发表时间:2010-01-29
kang sa mi da,谢谢楼主的回复,确实是个很好的建议.
闲聊啊,今天无意看到Google 上一个音乐的图片链接,点进去后,看到了一个不大容易理解的词,说什么 "在南中国常年保持高收听率的极有个性的节目", 费解.....广东那边是这么叫的吗?不大可能吧.. |
|
返回顶楼 | |