`

Java aio(异步网络IO)初探

    博客分类:
  • java
 
阅读更多

按照《Unix网络编程》的划分,IO模型可以分为:阻塞IO、非阻塞IO、IO复用、信号驱动IO和异步IO,按照POSIX标准来划分只分为两类:同步IO和异步IO。如何区分呢?首先一个IO操作其实分成了两个步骤:发起IO请求和实际的IO操作,同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO服用、信号驱动IO都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO。阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那么就是非阻塞IO。

Java nio 2.0的主要改进就是引入了异步IO(包括文件和网络),这里主要介绍下异步网络IO API的使用以及框架的设计,以TCP服务端为例。首先看下为了支持AIO引入的新的类和接口:

java.nio.channels.AsynchronousChannel
标记一个channel支持异步IO操作。

java.nio.channels.AsynchronousServerSocketChannel
ServerSocket的aio版本,创建TCP服务端,绑定地址,监听端口等。

java.nio.channels.AsynchronousSocketChannel
面向流的异步socket channel,表示一个连接。

java.nio.channels.AsynchronousChannelGroup
异步channel的分组管理,目的是为了资源共享。一个AsynchronousChannelGroup绑定一个线程池,这个线程池执行两个任务:处理IO事件和派发CompletionHandler。AsynchronousServerSocketChannel创建的时候可以传入一个 AsynchronousChannelGroup,那么通过AsynchronousServerSocketChannel创建的 AsynchronousSocketChannel将同属于一个组,共享资源。

java.nio.channels.CompletionHandler
异步IO操作结果的回调接口,用于定义在IO操作完成后所作的回调工作。AIO的API允许两种方式来处理异步操作的结果:返回的Future模式或者注册CompletionHandler,我更推荐用CompletionHandler的方式,这些handler的调用是由 AsynchronousChannelGroup的线程池派发的。显然,线程池的大小是性能的关键因素。AsynchronousChannelGroup允许绑定不同的线程池,通过三个静态方法来创建:

  1. publicstaticAsynchronousChannelGroupwithFixedThreadPool(intnThreads,
  2. ThreadFactorythreadFactory)
  3. throwsIOException
  4. publicstaticAsynchronousChannelGroupwithCachedThreadPool(ExecutorServiceexecutor,
  5. intinitialSize)
  6. publicstaticAsynchronousChannelGroupwithThreadPool(ExecutorServiceexecutor)
  7. throwsIOException

需要根据具体应用相应调整,从框架角度出发,需要暴露这样的配置选项给用户。

在介绍完了aio引入的TCP的主要接口和类之后,我们来设想下一个aio框架应该怎么设计。参考非阻塞nio框架的设计,一般都是采用Reactor模式,Reacot负责事件的注册、select、事件的派发;相应地,异步IO有个Proactor模式,Proactor负责 CompletionHandler的派发,查看一个典型的IO写操作的流程来看两者的区别:

Reactor: send(msg) -> 消息队列是否为空,如果为空 -> 向Reactor注册OP_WRITE,然后返回 -> Reactor select -> 触发Writable,通知用户线程去处理 ->先注销Writable(很多人遇到的cpu 100%的问题就在于没有注销),处理Writeable,如果没有完全写入,继续注册OP_WRITE。注意到,写入的工作还是用户线程在处理。
Proactor: send(msg) -> 消息队列是否为空,如果为空,发起read异步调用,并注册CompletionHandler,然后返回。 -> 操作系统负责将你的消息写入,并返回结果(写入的字节数)给Proactor -> Proactor派发CompletionHandler。可见,写入的工作是操作系统在处理,无需用户线程参与。事实上在aio的API 中,AsynchronousChannelGroup就扮演了Proactor的角色

CompletionHandler有三个方法,分别对应于处理成功、失败、被取消(通过返回的Future)情况下的回调处理:

  1. publicinterfaceCompletionHandler<V,A>{
  2. voidcompleted(Vresult,Aattachment);
  3. voidfailed(Throwableexc,Aattachment);
  4. voidcancelled(Aattachment);
  5. }


其中的泛型参数V表示IO调用的结果,而A是发起调用时传入的attchment。

在初步介绍完aio引入的类和接口后,我们看看一个典型的tcp服务端是怎么启动的,怎么接受连接并处理读和写,这里引用的代码都是yanf4j 的aio分支中的代码,可以从svn checkout,svn地址:http://yanf4j.googlecode.com/svn/branches/yanf4j-aio

第一步,创建一个AsynchronousServerSocketChannel,创建之前先创建一个 AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel可以绑定一个 AsynchronousChannelGroup,那么通过这个AsynchronousServerSocketChannel建立的连接都将同属于一个AsynchronousChannelGroup并共享资源:

  1. this.asynchronousChannelGroup=AsynchronousChannelGroup
  2. .withCachedThreadPool(Executors.newCachedThreadPool(),
  3. this.threadPoolSize);

然后初始化一个AsynchronousServerSocketChannel,通过open方法:

  1. this.serverSocketChannel=AsynchronousServerSocketChannel
  2. .open(this.asynchronousChannelGroup);

通过nio 2.0引入的SocketOption类设置一些TCP选项:

  1. this.serverSocketChannel
  2. .setOption(
  3. StandardSocketOption.SO_REUSEADDR,true);
  4. this.serverSocketChannel
  5. .setOption(
  6. StandardSocketOption.SO_RCVBUF,16*1024);


绑定本地地址:

  1. this.serverSocketChannel
  2. .bind(newInetSocketAddress("localhost",8080),100);


其中的100用于指定等待连接的队列大小(backlog)。完了吗?还没有,最重要的监听工作还没开始,监听端口是为了等待连接上来以便accept产生一个AsynchronousSocketChannel来表示一个新建立的连接,因此需要发起一个accept调用,调用是异步的,操作系统将在连接建立后,将最后的结果——AsynchronousSocketChannel返回给你:

  1. publicvoidpendingAccept(){
  2. if(this.started&&this.serverSocketChannel.isOpen()){
  3. this.acceptFuture=this.serverSocketChannel.accept(null,
  4. newAcceptCompletionHandler());
  5. }else{
  6. thrownewIllegalStateException("Controllerhasbeenclosed");
  7. }
  8. }


注意,重复的accept调用将会抛出PendingAcceptException,后文提到的read和write也是如此。accept方法的第一个参数是你想传给CompletionHandler的attchment,第二个参数就是注册的用于回调的CompletionHandler,最后返回结果Future<AsynchronousSocketChannel>。你可以对future做处理,这里采用更推荐的方式就是注册一个CompletionHandler。那么accept的CompletionHandler中做些什么工作呢?显然一个赤裸裸的 AsynchronousSocketChannel是不够的,我们需要将它封装成session,一个session表示一个连接(mina里就叫 IoSession了),里面带了一个缓冲的消息队列以及一些其他资源等。在连接建立后,除非你的服务器只准备接受一个连接,不然你需要在后面继续调用pendingAccept来发起另一个accept请求

  1. privatefinalclassAcceptCompletionHandlerimplements
  2. CompletionHandler<AsynchronousSocketChannel,Object>{
  3. @Override
  4. publicvoidcancelled(Objectattachment){
  5. logger.warn("Acceptoperationwascanceled");
  6. }
  7. @Override
  8. publicvoidcompleted(AsynchronousSocketChannelsocketChannel,
  9. Objectattachment){
  10. try{
  11. logger.debug("Acceptconnectionfrom"
  12. +socketChannel.getRemoteAddress());
  13. configureChannel(socketChannel);
  14. AioSessionConfigsessionConfig=buildSessionConfig(socketChannel);
  15. Sessionsession=newAioTCPSession(sessionConfig,
  16. AioTCPController.this.configuration
  17. .getSessionReadBufferSize(),
  18. AioTCPController.this.sessionTimeout);
  19. session.start();
  20. registerSession(session);
  21. }catch(Exceptione){
  22. e.printStackTrace();
  23. logger.error("Accepterror",e);
  24. notifyException(e);
  25. }finally{
  26. <strong>pendingAccept</strong>();
  27. }
  28. }
  29. @Override
  30. publicvoidfailed(Throwableexc,Objectattachment){
  31. logger.error("Accepterror",exc);
  32. try{
  33. notifyException(exc);
  34. }finally{
  35. <strong>pendingAccept</strong>();
  36. }
  37. }
  38. }


注意到了吧,我们在failed和completed方法中在最后都调用了pendingAccept来继续发起accept调用,等待新的连接上来。有的同学可能要说了,这样搞是不是递归调用,会不会堆栈溢出?实际上不会,因为发起accept调用的线程与CompletionHandler回调的线程并非同一个,不是一个上下文中,两者之间没有耦合关系。要注意到,CompletionHandler的回调共用的是 AsynchronousChannelGroup绑定的线程池,因此千万别在CompletionHandler回调方法中调用阻塞或者长时间的操作,例如sleep,回调方法最好能支持超时,防止线程池耗尽。

连接建立后,怎么读和写呢?回忆下在nonblocking nio框架中,连接建立后的第一件事是干什么?注册OP_READ事件等待socket可读。异步IO也同样如此,连接建立后马上发起一个异步read调用,等待socket可读,这个是Session.start方法中所做的事情:

  1. publicclassAioTCPSession{
  2. protectedvoidstart0(){
  3. pendingRead();
  4. }
  5. protectedfinalvoidpendingRead(){
  6. if(!isClosed()&&this.asynchronousSocketChannel.isOpen()){
  7. if(!this.readBuffer.hasRemaining()){
  8. this.readBuffer=ByteBufferUtils
  9. .increaseBufferCapatity(this.readBuffer);
  10. }
  11. this.readFuture=this.asynchronousSocketChannel.read(
  12. this.readBuffer,this,this.readCompletionHandler);
  13. }else{
  14. thrownewIllegalStateException(
  15. "SessionOrChannelhasbeenclosed");
  16. }
  17. }
  18. }

AsynchronousSocketChannel的read调用与AsynchronousServerSocketChannel的accept调用类似,同样是非阻塞的,返回结果也是一个Future,但是写的结果是整数,表示写入了多少字节,因此read调用返回的是Future<Integer>,方法的第一个参数是读的缓冲区,操作系统将IO读到数据拷贝到这个缓冲区,第二个参数是传递给 CompletionHandler的attchment,第三个参数就是注册的用于回调的CompletionHandler。这里保存了read的结果Future,这是为了在关闭连接的时候能够主动取消调用,accept也是如此。现在可以看看read的CompletionHandler的实现:

  1. publicfinalclassReadCompletionHandlerimplements
  2. CompletionHandler<Integer,AbstractAioSession>{
  3. privatestaticfinalLoggerlog=LoggerFactory
  4. .getLogger(ReadCompletionHandler.class);
  5. protectedfinalAioTCPControllercontroller;
  6. publicReadCompletionHandler(AioTCPControllercontroller){
  7. this.controller=controller;
  8. }
  9. @Override
  10. publicvoidcancelled(AbstractAioSessionsession){
  11. log.warn("Session("+session.getRemoteSocketAddress()
  12. +")readoperationwascanceled");
  13. }
  14. @Override
  15. publicvoidcompleted(Integerresult,AbstractAioSessionsession){
  16. if(log.isDebugEnabled())
  17. log.debug("Session("+session.getRemoteSocketAddress()
  18. +")read+"+result+"bytes");
  19. if(result<0){
  20. session.close();
  21. return;
  22. }
  23. try{
  24. if(result>0){
  25. session.updateTimeStamp();
  26. session.getReadBuffer().flip();
  27. session.decode();
  28. session.getReadBuffer().compact();
  29. }
  30. }finally{
  31. try{
  32. session.pendingRead();
  33. }catch(IOExceptione){
  34. session.onException(e);
  35. session.close();
  36. }
  37. }
  38. controller.checkSessionTimeout();
  39. }
  40. @Override
  41. publicvoidfailed(Throwableexc,AbstractAioSessionsession){
  42. log.error("Sessionreaderror",exc);
  43. session.onException(exc);
  44. session.close();
  45. }
  46. }

如果IO读失败,会返回失败产生的异常,这种情况下我们就主动关闭连接,通过session.close()方法,这个方法干了两件事情:关闭channel和取消read调用:

  1. if(null!=this.readFuture){
  2. this.readFuture.cancel(true);
  3. }
  4. this.asynchronousSocketChannel.close();

在读成功的情况下,我们还需要判断结果result是否小于0,如果小于0就表示对端关闭了,这种情况下我们也主动关闭连接并返回。如果读到一定字节,也就是result大于0的情况下,我们就尝试从读缓冲区中decode出消息,并派发给业务处理器的回调方法,最终通过pendingRead继续发起read调用等待socket的下一次可读。可见,我们并不需要自己去调用channel来进行IO读,而是操作系统帮你直接读到了缓冲区,然后给你一个结果表示读入了多少字节,你处理这个结果即可。而nonblocking IO框架中,是reactor通知用户线程socket可读了,然后用户线程自己去调用read进行实际读操作。这里还有个需要注意的地方,就是decode出来的消息的派发给业务处理器工作最好交给一个线程池来处理,避免阻塞group绑定的线程池。

IO写的操作与此类似,不过通常写的话我们会在session中关联一个缓冲队列来处理,没有完全写入或者等待写入的消息都存放在队列中,队列为空的情况下发起write调用:

  1. protectedvoidwrite0(WriteMessagemessage){
  2. booleanneedWrite=false;
  3. synchronized(this.writeQueue){
  4. needWrite=this.writeQueue.isEmpty();
  5. this.writeQueue.offer(message);
  6. }
  7. if(needWrite){
  8. pendingWrite(message);
  9. }
  10. }
  11. protectedfinalvoidpendingWrite(WriteMessagemessage){
  12. message=preprocessWriteMessage(message);
  13. if(!isClosed()&&this.asynchronousSocketChannel.isOpen()){
  14. this.asynchronousSocketChannel.write(message.getWriteBuffer(),
  15. this,this.writeCompletionHandler);
  16. }else{
  17. thrownewIllegalStateException(
  18. "SessionOrChannelhasbeenclosed");
  19. }
  20. }

write调用返回的结果与read一样是一个Future<Integer>,而write的CompletionHandler处理的核心逻辑大概是这样:

  1. @Override
  2. publicvoidcompleted(Integerresult,AbstractAioSessionsession){
  3. if(log.isDebugEnabled())
  4. log.debug("Session("+session.getRemoteSocketAddress()
  5. +")writen"+result+"bytes");
  6. WriteMessagewriteMessage;
  7. Queue<WriteMessage>writeQueue=session.getWriteQueue();
  8. synchronized(writeQueue){
  9. writeMessage=writeQueue.peek();
  10. if(writeMessage.getWriteBuffer()==null
  11. ||!writeMessage.getWriteBuffer().hasRemaining()){
  12. writeQueue.remove();
  13. if(writeMessage.getWriteFuture()!=null){
  14. writeMessage.getWriteFuture().setResult(Boolean.TRUE);
  15. }
  16. try{
  17. session.getHandler().onMessageSent(session,
  18. writeMessage.getMessage());
  19. }catch(Exceptione){
  20. session.onException(e);
  21. }
  22. writeMessage=writeQueue.peek();
  23. }
  24. }
  25. if(writeMessage!=null){
  26. try{
  27. session.pendingWrite(writeMessage);
  28. }catch(IOExceptione){
  29. session.onException(e);
  30. session.close();
  31. }
  32. }
  33. }


compete方法中的result就是实际写入的字节数,然后我们判断消息的缓冲区是否还有剩余,如果没有就将消息从队列中移除,如果队列中还有消息,那么继续发起write调用。

重复一下,这里引用的代码都是yanf4j aio分支中的源码,感兴趣的朋友可以直接check out出来看看:http://yanf4j.googlecode.com/svn/branches/yanf4j-aio
在引入了aio之后,java对于网络层的支持已经非常完善,该有的都有了,java也已经成为服务器开发的首选语言之一。java的弱项在于对内存的管理上,由于这一切都交给了GC,因此在高性能的网络服务器上还是Cpp的天下。java这种单一堆模型比之erlang的进程内堆模型还是有差距,很难做到高效的垃圾回收和细粒度的内存管理。

这里仅仅是介绍了aio开发的核心流程,对于一个网络框架来说,还需要考虑超时的处理、缓冲buffer的处理、业务层和网络层的切分、可扩展性、性能的可调性以及一定的通用性要求。

 

分享到:
评论

相关推荐

    网络IO模型:同步IO和异步IO,阻塞IO和非阻塞IO

    - **AIO(Asynchronous IO)**:也称为NIO.2,它提供了异步I/O操作,进一步提升了处理能力,使得程序无需等待I/O操作完成即可继续执行。 4. IO模型的选择: - 对于需要高并发、低延迟的服务器应用,如聊天服务器...

    java IO NIO AIO.xmind

    涉及到java io, nio, aio相关知识点,学习过程中的一些总结,持续更新中,xmind 格式

    Java AIO 实例(转)

    Java AIO,全称为Asynchronous Input/Output,是Java NIO的一个扩展,它引入了非阻塞的异步I/O操作,使得Java开发者能够更高效地处理I/O事件。AIO在Java 7中被引入,相较于传统的IO模型,它的优势在于能够提高并发...

    Java 网络IO简介

    Java 网络IO简介: bio nio aio

    高性能的java AIO通信框架 物联网参考

    Java AIO,全称为Asynchronous Input/Output,是Java NIO的一个扩展,它引入了非阻塞的异步I/O模型,使得在处理高并发、大数据传输时性能更优,尤其适合于物联网(IoT)场景,其中设备之间的通信效率至关重要。...

    基于java的开发源码-smart-socket 开源的Java AIO框架.zip

    基于java的开发源码-smart-socket 开源的Java AIO框架.zip 基于java的开发源码-smart-socket 开源的Java AIO框架.zip 基于java的开发源码-smart-socket 开源的Java AIO框架.zip 基于java的开发源码-smart-socket ...

    基于Java的异步IO框架 Cindy.zip

    Java平台上的异步I/O(Asynchronous Input/Output,简称AIO)框架是开发者处理高并发、低延迟场景的重要工具。Cindy是一个这样的框架,它旨在简化Java中的异步编程模型,提供更好的性能和可扩展性。让我们深入探讨...

    aio.zip_aio_asynchronous io_linux aio

    在Linux操作系统中,AIO(Asynchronous Input/Output,异步I/O)是一种高级I/O接口,它允许程序在发起I/O操作后立即返回,而不是等待操作完成。AIO允许程序执行其他任务,提高系统效率,特别是在处理大量并发I/O请求...

    基于java aio研发的网络编程框架,从收集到的案例来看,用t-io做物联网、IM、客服的比较多,堪称殿堂级网络开发框架

    T-io就是一个基于Java AIO实现的高性能、易用且稳定的网络编程框架。它的设计目标是简化网络编程的复杂性,为开发者提供一种殿堂级的网络开发体验,尤其适用于物联网、即时通讯(IM)和客服系统等领域。 首先,我们...

    linux异步IO.pdf

    ### Linux异步IO详解 #### 引言 在Linux环境下,输入/输出(I/O)操作是系统资源管理和数据交互的核心部分。传统上,Linux采用的最常见I/O模型是同步I/O,其中应用程序在发出请求后会阻塞,直至请求完成。然而,...

    java IO、NIO、AIO详解.docx

    Java AIO(Asynchronous I/O)是 Java 语言中最新的输入/输出机制,使用异步 IO 模式实现输入/输出操作。在 AIO 中,输入/输出操作是异步式的,即应用程序可以继续执行其他任务,而不需要等待输入/输出操作的完成。...

    java 之异步套接字编程实例(AIO)

    Java中的异步套接字编程,也称为非阻塞I/O(Non-blocking I/O, NIO)或异步I/O(Asynchronous I/O, AIO),是Java在JDK 7引入的一种高级I/O模型,它极大地提高了网络编程的效率。AIO的主要目标是提供一种方法,使得...

    Java IO应届生培训讲义

    Java IO应届生培训讲义是一份面向刚毕业的大学生进行Java IO相关知识的培训资料,它涵盖了Java IO的基础知识、不同的IO模型以及Java中的BIO、NIO和AIO高级IO类库。下面详细解释这些知识点: 1. 用户空间和内核空间 ...

    一站式学习Java网络编程 全面理解BIO:NIO:AIO1

    全面理解 Java 网络编程 - BIO、NIO、AIO 本课程旨在帮助学生全面理解 Java 网络编程中的 BIO、NIO、AIO 三剑客,掌握 RPC 编程的基础知识,并结合实战项目巩固所学。 一、网络编程三剑客 - BIO、NIO、AIO BIO...

    Java编程中的IO模型详解:BIO,NIO,AIO的区别与实际应用场景分析

    AIO,也称为NIO.2,是Java 7引入的一种异步非阻塞IO模型。在AIO中,服务器不需要等待数据准备好,而是先发起一个读/写请求,然后操作系统在数据准备完成时回调通知服务器。这种模型适用于连接数量多且连接时间较长...

    java基础之IO流

    - **2.3 AIO(异步非阻塞IO)** - **工作原理**:AIO是JDK 1.7引入的新特性,它允许应用程序发起异步的读写请求,并通过CompletionHandler来接收完成的通知。 - **优缺点**: - **优点**:进一步提高了系统的并发...

    java异步server

    在Java中实现异步服务器,通常会利用NIO(Non-blocking Input/Output)或者Java 7引入的AIO(Asynchronous I/O)来实现。这两种技术都允许服务器在一个线程中处理多个客户端请求,而不是为每个连接创建一个新的线程...

    AIO异步网络编程模型-TIO.md

    tio源码分析socket服务启动,接收消息流程。适合人群高级开发软件工程师

    t-io是基于java aio研发的网络编程框架,从收集到的案例来看,用t-io做物联网、IM、客服的比较多,堪称殿堂级网络开发框架

    t-io是基于java aio研发的网络编程框架,从收集到的案例来看,用t-io做物联网、IM、客服的比较多,堪称殿堂级网络开发框架

Global site tag (gtag.js) - Google Analytics