`

BIO, NIO, AIO(转)

 
阅读更多

NIO通常采用Reactor模式,AIO通常采用Proactor模式。AIO简化了程序的编写,stream的读取和写入都有OS来完成,不需要像NIO那样子遍历Selector。Windows基于IOCP实现AIO,Linux只有eppoll模拟实现了AIO。

Java7之前的JDK只支持NIO和BIO,从7开始支持AIO。

4种通信方式:TCP/IP+BIO, TCP/IP+NIO, UDP/IP+BIO, UDP/IP+NIO。

TCP/IP+BIO、
Socket和ServerSocket实现,ServerSocket实现Server端端口监听,Socket用于建立网络IO连接。

不适用于处理多个请求 1.生成Socket会消耗过多的本地资源。2. Socket连接的建立一般比较慢。

BIO情况下,能支持的连接数有限,一般都采取accept获取Socket以后采用一个thread来处理,one connection one thread。无论连接是否有真正数据请求,都需要独占一个thread。

可以通过设立Socket池来一定程度上解决问题,但是使用池需要注意的问题是:1. 竞争等待比较多。 2. 需要控制好超时时间。

TCP/IP+NIO
使用Channel(SocketChannel和ServerSocketChannel)和Selector。

Server端通常由一个thread来监听connect事件,另外多个thread来监听读写事件。这样做的好处是这些连接只有在真是请求的时候才会创建thread来处理,one request one thread。这种方式在server端需要支持大量连接但这些连接同时发送请求的峰值不会很多的时候十分有效。

UDP/IP+BIO
DatagramSocket和DatagramPacket。DatagramSocket负责监听端口以及读写数据,DatagramPacket作为数据流对象进行传输。

UDP/IP是无连接的,无法进行双向通信,除非双方都成为UDP Server。

UDP/IP+NIO
通过DatagramChannel和ByteBuffer实现。DatagramChannel负责端口监听及读写。ByteBuffer负责数据流传输。

如果要将消息发送到多台机器,如果为每个目标机器都建立一个连接的话,会有很大的网络流量压力。这时候可以使用基于UDP/IP的Multicast协议传输,Java中可以通过MulticastSocket和DatagramPacket来实现。

Multicast一般多用于多台机器的状态同步,比如JGroups。SRM, URGCP都是Multicast的实现方式。eBay就采用SRM来实现将数据从主数据库同步到各个搜索节点机器。
Java aio(异步网络IO)初探

按照《Unix网络编程》的划分,IO模型可以分为:阻塞IO、非阻塞IO、IO复用、信号驱动IO和异步IO,按照POSIX标准来划分只分为两类:同步IO和异步IO。如何区分呢?首先一个IO操作其实分成了两个步骤:发起IO请求和实际的IO操作,同步IO和异步IO的区别就在于第二个步骤是否阻塞,如果实际的IO读写阻塞请求进程,那么就是同步IO,因此阻塞IO、非阻塞IO、IO服用、信号驱动IO都是同步IO,如果不阻塞,而是操作系统帮你做完IO操作再将结果返回给你,那么就是异步IO。阻塞IO和非阻塞IO的区别在于第一步,发起IO请求是否会被阻塞,如果阻塞直到完成那么就是传统的阻塞IO,如果不阻塞,那么就是非阻塞IO。

Java nio 2.0的主要改进就是引入了异步IO(包括文件和网络),这里主要介绍下异步网络IO API的使用以及框架的设计,以TCP服务端为例。首先看下为了支持AIO引入的新的类和接口:

java.nio.channels.AsynchronousChannel
标记一个channel支持异步IO操作。

java.nio.channels.AsynchronousServerSocketChannel
ServerSocket的aio版本,创建TCP服务端,绑定地址,监听端口等。

java.nio.channels.AsynchronousSocketChannel
面向流的异步socket channel,表示一个连接。

java.nio.channels.AsynchronousChannelGroup
异步channel的分组管理,目的是为了资源共享。一个AsynchronousChannelGroup绑定一个线程池,这个线程池执行两个任务:处理IO事件和派发CompletionHandler。AsynchronousServerSocketChannel创建的时候可以传入一个 AsynchronousChannelGroup,那么通过AsynchronousServerSocketChannel创建的 AsynchronousSocketChannel将同属于一个组,共享资源。

java.nio.channels.CompletionHandler
异步IO操作结果的回调接口,用于定义在IO操作完成后所作的回调工作。AIO的API允许两种方式来处理异步操作的结果:返回的Future模式或者注册CompletionHandler,我更推荐用CompletionHandler的方式,这些handler的调用是由 AsynchronousChannelGroup的线程池派发的。显然,线程池的大小是性能的关键因素。AsynchronousChannelGroup允许绑定不同的线程池,通过三个静态方法来创建:

1public static AsynchronousChannelGroup withFixedThreadPool(int nThreads,  
2                                                              ThreadFactory threadFactory)  
3       throws IOException  
4  
5public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor,  
6                                                               int initialSize)  
7  
8public static AsynchronousChannelGroup withThreadPool(ExecutorService executor)  
9       throws IOException  
10 

需要根据具体应用相应调整,从框架角度出发,需要暴露这样的配置选项给用户。

在介绍完了aio引入的TCP的主要接口和类之后,我们来设想下一个aio框架应该怎么设计。参考非阻塞nio框架的设计,一般都是采用Reactor模式,Reacot负责事件的注册、select、事件的派发;相应地,异步IO有个Proactor模式,Proactor负责 CompletionHandler的派发,查看一个典型的IO写操作的流程来看两者的区别:

Reactor: send(msg) -> 消息队列是否为空,如果为空 -> 向Reactor注册OP_WRITE,然后返回 -> Reactor select -> 触发Writable,通知用户线程去处理 ->先注销Writable(很多人遇到的cpu 100%的问题就在于没有注销),处理Writeable,如果没有完全写入,继续注册OP_WRITE。注意到,写入的工作还是用户线程在处理。
Proactor: send(msg) -> 消息队列是否为空,如果为空,发起read异步调用,并注册CompletionHandler,然后返回。 -> 操作系统负责将你的消息写入,并返回结果(写入的字节数)给Proactor -> Proactor派发CompletionHandler。可见,写入的工作是操作系统在处理,无需用户线程参与。事实上在aio的API 中,AsynchronousChannelGroup就扮演了Proactor的角色。

CompletionHandler有三个方法,分别对应于处理成功、失败、被取消(通过返回的Future)情况下的回调处理:

1public interface CompletionHandler<V,A> {  
2  
3     void completed(V result, A attachment);  
4  
5    void failed(Throwable exc, A attachment);  
6  
7     
8    void cancelled(A attachment);  
9}  

其中的泛型参数V表示IO调用的结果,而A是发起调用时传入的attchment。

在初步介绍完aio引入的类和接口后,我们看看一个典型的tcp服务端是怎么启动的,怎么接受连接并处理读和写,这里引用的代码都是yanf4j 的aio分支中的代码,可以从svn checkout,svn地址: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio

第一步,创建一个AsynchronousServerSocketChannel,创建之前先创建一个 AsynchronousChannelGroup,上文提到AsynchronousServerSocketChannel可以绑定一个 AsynchronousChannelGroup,那么通过这个AsynchronousServerSocketChannel建立的连接都将同属于一个AsynchronousChannelGroup并共享资源:

1this.asynchronousChannelGroup = AsynchronousChannelGroup  
2                    .withCachedThreadPool(Executors.newCachedThreadPool(),  
3                            this.threadPoolSize);  

然后初始化一个AsynchronousServerSocketChannel,通过open方法:

1this.serverSocketChannel = AsynchronousServerSocketChannel  
2                .open(this.asynchronousChannelGroup);  
3 

通过nio 2.0引入的SocketOption类设置一些TCP选项:

1this.serverSocketChannel  
2                    .setOption(  
3                            StandardSocketOption.SO_REUSEADDR,true);  
4this.serverSocketChannel  
5                    .setOption(  
6                            StandardSocketOption.SO_RCVBUF,16*1024);  
7 

绑定本地地址:

1this.serverSocketChannel  
2                    .bind(new InetSocketAddress("localhost",8080), 100);  

其中的100用于指定等待连接的队列大小(backlog)。完了吗?还没有,最重要的监听工作还没开始,监听端口是为了等待连接上来以便accept产生一个AsynchronousSocketChannel来表示一个新建立的连接,因此需要发起一个accept调用,调用是异步的,操作系统将在连接建立后,将最后的结果——AsynchronousSocketChannel返回给你:

1public void pendingAccept() {  
2        if (this.started && this.serverSocketChannel.isOpen()) {  
3            this.acceptFuture = this.serverSocketChannel.accept(null,  
4                    new AcceptCompletionHandler());  
5  
6        } else {  
7            throw new IllegalStateException("Controller has been closed");  
8        }  
9    }  
10 

注意,重复的accept调用将会抛出PendingAcceptException,后文提到的read和write也是如此。accept方法的第一个参数是你想传给CompletionHandler的attchment,第二个参数就是注册的用于回调的CompletionHandler,最后返回结果Future。你可以对future做处理,这里采用更推荐的方式就是注册一个CompletionHandler。那么accept的CompletionHandler中做些什么工作呢?显然一个赤裸裸的 AsynchronousSocketChannel是不够的,我们需要将它封装成session,一个session表示一个连接(mina里就叫 IoSession了),里面带了一个缓冲的消息队列以及一些其他资源等。在连接建立后,除非你的服务器只准备接受一个连接,不然你需要在后面继续调用pendingAccept来发起另一个accept请求:

1private final class AcceptCompletionHandler implements  
2            CompletionHandler<AsynchronousSocketChannel, Object> {  
3  
4        @Override  
5        public void cancelled(Object attachment) {  
6            logger.warn("Accept operation was canceled");  
7        }  
8  
9        @Override  
10        public void completed(AsynchronousSocketChannel socketChannel,  
11                Object attachment) {  
12            try {  
13                logger.debug("Accept connection from "  
14                        + socketChannel.getRemoteAddress());  
15                configureChannel(socketChannel);  
16                AioSessionConfig sessionConfig = buildSessionConfig(socketChannel);  
17                Session session = new AioTCPSession(sessionConfig,  
18                        AioTCPController.this.configuration  
19                                .getSessionReadBufferSize(),  
20                        AioTCPController.this.sessionTimeout);  
21                session.start();  
22                registerSession(session);  
23            } catch (Exception e) {  
24                e.printStackTrace();  
25                logger.error("Accept error", e);  
26                notifyException(e);  
27            } finally {  
28                <strong>pendingAccept</strong>();  
29            }  
30        }  
31  
32        @Override  
33        public void failed(Throwable exc, Object attachment) {  
34            logger.error("Accept error", exc);  
35            try {  
36                notifyException(exc);  
37            } finally {  
38                <strong>pendingAccept</strong>();  
39            }  
40        }  
41    }  
42 

注意到了吧,我们在failed和completed方法中在最后都调用了pendingAccept来继续发起accept调用,等待新的连接上来。有的同学可能要说了,这样搞是不是递归调用,会不会堆栈溢出?实际上不会,因为发起accept调用的线程与CompletionHandler回调的线程并非同一个,不是一个上下文中,两者之间没有耦合关系。要注意到,CompletionHandler的回调共用的是 AsynchronousChannelGroup绑定的线程池,因此千万别在CompletionHandler回调方法中调用阻塞或者长时间的操作,例如sleep,回调方法最好能支持超时,防止线程池耗尽。

连接建立后,怎么读和写呢?回忆下在nonblocking nio框架中,连接建立后的第一件事是干什么?注册OP_READ事件等待socket可读。异步IO也同样如此,连接建立后马上发起一个异步read调用,等待socket可读,这个是Session.start方法中所做的事情:

1public class AioTCPSession {  
2    protected void start0() {  
3        pendingRead();  
4    }  
5  
6    protected final void pendingRead() {  
7        if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {  
8            if (!this.readBuffer.hasRemaining()) {  
9                this.readBuffer = ByteBufferUtils  
10                        .increaseBufferCapatity(this.readBuffer);  
11            }  
12            this.readFuture = this.asynchronousSocketChannel.read(  
13                    this.readBuffer, this, this.readCompletionHandler);  
14        } else {  
15            throw new IllegalStateException(  
16                    "Session Or Channel has been closed");  
17        }  
18    }  
19     
20}  
21 

AsynchronousSocketChannel的read调用与AsynchronousServerSocketChannel的accept调用类似,同样是非阻塞的,返回结果也是一个Future,但是写的结果是整数,表示写入了多少字节,因此read调用返回的是 Future,方法的第一个参数是读的缓冲区,操作系统将IO读到数据拷贝到这个缓冲区,第二个参数是传递给 CompletionHandler的attchment,第三个参数就是注册的用于回调的CompletionHandler。这里保存了read的结果Future,这是为了在关闭连接的时候能够主动取消调用,accept也是如此。现在可以看看read的CompletionHandler的实现:

1public final class ReadCompletionHandler implements  
2        CompletionHandler<Integer, AbstractAioSession> {  
3  
4    private static final Logger log = LoggerFactory  
5            .getLogger(ReadCompletionHandler.class);  
6    protected final AioTCPController controller;  
7  
8    public ReadCompletionHandler(AioTCPController controller) {  
9        this.controller = controller;  
10    }  
11  
12    @Override  
13    public void cancelled(AbstractAioSession session) {  
14        log.warn("Session(" + session.getRemoteSocketAddress()  
15                + ") read operation was canceled");  
16    }  
17  
18    @Override  
19    public void completed(Integer result, AbstractAioSession session) {  
20        if (log.isDebugEnabled())  
21            log.debug("Session(" + session.getRemoteSocketAddress()  
22                    + ") read +" + result + " bytes");  
23        if (result < 0) {  
24            session.close();  
25            return;  
26        }  
27        try {  
28            if (result > 0) {  
29                session.updateTimeStamp();  
30                session.getReadBuffer().flip();  
31                session.decode();  
32                session.getReadBuffer().compact();  
33            }  
34        } finally {  
35            try {  
36                session.pendingRead();  
37            } catch (IOException e) {  
38                session.onException(e);  
39                session.close();  
40            }  
41        }  
42        controller.checkSessionTimeout();  
43    }  
44  
45    @Override  
46    public void failed(Throwable exc, AbstractAioSession session) {  
47        log.error("Session read error", exc);  
48        session.onException(exc);  
49        session.close();  
50    }  
51  
52}  
53 

如果IO读失败,会返回失败产生的异常,这种情况下我们就主动关闭连接,通过session.close()方法,这个方法干了两件事情:关闭channel和取消read调用:

1if (null != this.readFuture) {  
2            this.readFuture.cancel(true);  
3        }  
4this.asynchronousSocketChannel.close();  
5 

在读成功的情况下,我们还需要判断结果result是否小于0,如果小于0就表示对端关闭了,这种情况下我们也主动关闭连接并返回。如果读到一定字节,也就是result大于0的情况下,我们就尝试从读缓冲区中decode出消息,并派发给业务处理器的回调方法,最终通过pendingRead继续发起read调用等待socket的下一次可读。可见,我们并不需要自己去调用channel来进行IO读,而是操作系统帮你直接读到了缓冲区,然后给你一个结果表示读入了多少字节,你处理这个结果即可。而nonblocking IO框架中,是reactor通知用户线程socket可读了,然后用户线程自己去调用read进行实际读操作。这里还有个需要注意的地方,就是decode出来的消息的派发给业务处理器工作最好交给一个线程池来处理,避免阻塞group绑定的线程池。

IO写的操作与此类似,不过通常写的话我们会在session中关联一个缓冲队列来处理,没有完全写入或者等待写入的消息都存放在队列中,队列为空的情况下发起write调用:

1protected void write0(WriteMessage message) {  
2      boolean needWrite = false;  
3      synchronized (this.writeQueue) {  
4          needWrite = this.writeQueue.isEmpty();  
5          this.writeQueue.offer(message);  
6      }  
7      if (needWrite) {  
8          pendingWrite(message);  
9      }  
10  }  
11  
12  protected final void pendingWrite(WriteMessage message) {  
13      message = preprocessWriteMessage(message);  
14      if (!isClosed() && this.asynchronousSocketChannel.isOpen()) {  
15          this.asynchronousSocketChannel.write(message.getWriteBuffer(),  
16                  this, this.writeCompletionHandler);  
17      } else {  
18          throw new IllegalStateException(  
19                  "Session Or Channel has been closed");  
20      }  
21  }  
22 

write调用返回的结果与read一样是一个Future,而write的CompletionHandler处理的核心逻辑大概是这样:

1@Override  
2    public void completed(Integer result, AbstractAioSession session) {  
3        if (log.isDebugEnabled())  
4            log.debug("Session(" + session.getRemoteSocketAddress()  
5                    + ") writen " + result + " bytes");  
6                  
7        WriteMessage writeMessage;  
8        Queue<WriteMessage> writeQueue = session.getWriteQueue();  
9        synchronized (writeQueue) {  
10            writeMessage = writeQueue.peek();  
11            if (writeMessage.getWriteBuffer() == null  
12                    || !writeMessage.getWriteBuffer().hasRemaining()) {  
13                writeQueue.remove();  
14                if (writeMessage.getWriteFuture() != null) {  
15                    writeMessage.getWriteFuture().setResult(Boolean.TRUE);  
16                }  
17                try {  
18                    session.getHandler().onMessageSent(session,  
19                            writeMessage.getMessage());  
20                } catch (Exception e) {  
21                    session.onException(e);  
22                }  
23                writeMessage = writeQueue.peek();  
24            }  
25        }  
26        if (writeMessage != null) {  
27            try {  
28                session.pendingWrite(writeMessage);  
29            } catch (IOException e) {  
30                session.onException(e);  
31                session.close();  
32            }  
33        }  
34    }  
35 

compete方法中的result就是实际写入的字节数,然后我们判断消息的缓冲区是否还有剩余,如果没有就将消息从队列中移除,如果队列中还有消息,那么继续发起write调用。

重复一下,这里引用的代码都是yanf4j aio分支中的源码,感兴趣的朋友可以直接check out出来看看: http://yanf4j.googlecode.com/svn/branches/yanf4j-aio。
在引入了aio之后,java对于网络层的支持已经非常完善,该有的都有了,java也已经成为服务器开发的首选语言之一。java的弱项在于对内存的管理上,由于这一切都交给了GC,因此在高性能的网络服务器上还是Cpp的天下。java这种单一堆模型比之erlang的进程内堆模型还是有差距,很难做到高效的垃圾回收和细粒度的内存管理。

这里仅仅是介绍了aio开发的核心流程,对于一个网络框架来说,还需要考虑超时的处理、缓冲buffer的处理、业务层和网络层的切分、可扩展性、性能的可调性以及一定的通用性要求。

分享到:
评论

相关推荐

    bio nio aio demo

    为了处理与外部世界的交互,Java提供了三种不同的I/O模型:BIO( Blocking I/O)、NIO(Non-blocking I/O)和AIO(Asynchronous I/O)。这些模型各有优缺点,适用于不同场景。下面我们将深入探讨这三种I/O模型,并...

    基于java的BIO、NIO、AIO通讯模型代码实现

    Java作为一门广泛使用的开发语言,提供了多种I/O(Input/Output)通信模型,包括传统的阻塞I/O(BIO)、非阻塞I/O(NIO)以及异步I/O(AIO)。这些通信模型在不同的场景下有着各自的优势,理解和掌握它们对于优化...

    java BIO NIO AIO

    Java BIO NIO AIO Java BIO、NIO、AIO是 Java 中的三种 I/O 模式,每种模式都有其特点和应用场景。下面对每种模式进行详细解释。 Java BIO Java BIO( Blocking I/O)是一种同步阻塞式的 I/O 模式,即服务器实现...

    一站式学习Java网络编程 全面理解BIO:NIO:AIO1

    全面理解 Java 网络编程 - BIO、NIO、AIO 本课程旨在帮助学生全面理解 Java 网络编程中的 BIO、NIO、AIO 三剑客,掌握 RPC 编程的基础知识,并结合实战项目巩固所学。 一、网络编程三剑客 - BIO、NIO、AIO BIO...

    BIO,NIO,AIO实现的demo

    这里我们主要探讨三种不同的I/O模型:BIO( Blocking I/O)、NIO(Non-blocking I/O)和AIO(Asynchronous I/O)。这三种模型各有特点,适用于不同的场景。 BIO(阻塞I/O)是Java早期的标准I/O模型,它基于流...

    BIO、NIO、AIO、Netty 、TCP全网最全解析!Netty中提供了哪些线程模型?

    本文将深入探讨BIO( Blocking I/O)、NIO(Non-blocking I/O)、AIO(Asynchronous I/O)以及Netty框架中的线程模型,并与TCP网络协议相结合,为您提供全网最全面的解析。 首先,让我们从基础开始,了解这些I/O...

    JAVA BIO AIO NIO测试代码

    对java io总结时编写的测试代码,包括BIO,NIO,AIO的实现,Java io操作是编程人员经常使用到的,以前只是使用没有对这三种IO做系统的了解,本文将对这三种IO作详细的介绍并附有测试完整代码

    读书笔记:java网络编程BIONIO, AIO 源码示例.zip

    读书笔记:java网络编程BIONIO, AIO 源码示例

    BIO,NIO,AIO,Netty面试题.pdf

    ### BIO、NIO、AIO、Netty 面试题解析 #### 1. Java IO 基础概述 Java中的I/O操作是通过流(Stream)来实现的,所有的数据都通过流的方式被串行化处理。串行化的含义在于数据必须按顺序输入输出。Java中的IO操作...

    SocketIO-BIO-NIO-AIO.zip

    SocketIO-BIO-NIO-AIO.zip是一个压缩包文件,它包含了一个关于Java中三种不同的I/O模型——BIO( Blocking I/O)、NIO(Non-blocking I/O)和AIO(Asynchronous I/O)的深入讲解。这些I/O模型是Java进行网络编程时的...

    2024年Java常见的-BIO,NIO,AIO,Netty面试题

    ### 2024年Java常见BIO、NIO、AIO、Netty面试题解析 #### 一、基础知识概述 1. **IO概念**: - Java中的I/O(Input/Output)指的是输入输出操作,它以流为基础进行数据的输入输出。所有的数据在Java中都是以流的...

    BIO、NIO、AIO

    Java作为一门广泛使用的编程语言,提供了多种I/O模型来处理数据的读写操作,其中包括BIO(Blocking I/O)、NIO(Non-blocking I/O)和AIO(Asynchronous I/O)。这些模型各有特点,适用于不同的场景,理解它们的原理...

    bio-nio-aio.zip

    本文将深入探讨Java中的三种IO模型:传统IO(BIO)、非阻塞IO(NIO)以及反应器模式(Reactor),并结合提供的压缩包文件中的示例代码进行解析。 一、传统IO(BIO) 传统的Java IO基于流(Stream)进行数据传输,它...

    BIO,NIO,AIO,Netty面试题

    本文将深入探讨四个关键的概念:BIO( Blocking I/O)、NIO(Non-blocking I/O)、AIO(Asynchronous I/O)以及Netty,这些都是Java平台上的网络编程模型。在面试中,对这些概念的理解和应用能力常常被用来评估候选...

    25 网络编程高级(同步、异步、NIO、BIO、AIO)

    教程视频:网络编程高级(同步、异步、NIO、BIO、AIO)。

    Java通讯模型-BIO、NIO、AIO综合演练

    本文将深入探讨Java中的三种主要通讯模型:BIO( Blocking I/O)、NIO(Non-blocking I/O)和AIO(Asynchronous I/O),并结合实际的代码示例进行综合演练。 **一、BIO(阻塞I/O)** 1. **概念**:BIO是Java早期的...

    读书笔记:Java网络编程BIO、NIO、AIO.zip

    读书笔记:Java网络编程BIO、NIO、AIO

    一文彻底理解Java中IO的BIO、NIO、AIO

    在Java的发展历程中,IO模型经历了三个主要阶段:BIO(Blocking IO)、NIO(Non-blocking IO)和AIO(Asynchronous IO),这三种模型各自有其特性和适用场景,下面将详细解析它们的工作原理和区别。 **1. BIO(阻塞...

Global site tag (gtag.js) - Google Analytics