`
maosheng
  • 浏览: 565034 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Netty 解析

    博客分类:
  • Java
nio 
阅读更多
Linux网络IO模型:

Linux的内核将所有外部设备都可以看做一个文件来操作,那么我们对与外部设备的操作都可以看做对文件进行操作。我们对一个文件的读写,都通过调用内核提供的系统调用;内核给我们返回一个文件描述符file descriptor(fd)。

描述符就是一个数字,指向内核中一个机构体

Linux IO 复用模型:

Linux 提供select/poll,进程通过将一个或多个fd传递给select或poll系统调用,阻塞在select,这样select/poll可以帮我们侦测许多fd是否就绪。但是select/poll是顺序扫描fd是否就绪,而且支持的fd数量有限,默认值2048。

Linux 还提供了一个epoll系统调用,epoll是基于事件驱动方式,而不是顺序扫描,当有fd就绪时,立即回调函数rollback。

IO 复用常见的应用场景:
1)服务器需要同时处理多个处于监听状态和多个连接状态的套接字
2)服务器需要处理多种网络协议的套接字

epoll与select原理类似,只不过,epoll作出了一些重大改进,具体如下:

1.支持一个进程打开的socket描述符(fd)不受限制(仅受限于操作系统的最大文件句柄数);select有个比较大的缺陷就是一个进程所打开的fd是有一定限制的,由FD_SETSIZE设置,默认值是2048(epoll在1GB内存的机器上大约是10万左右)。

2.IO效率可能随着fd数目增加而线性下降;传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,由于网络延时,任一时间只有部分的socket是“活跃”的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,他只会对“活跃”的socket进行操作(这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的)。那么,只有“活跃”的socket才会主动的去调用callback函数,其他idle状态的socket则不会。

3.使用mmap加速内核与用户空间的消息传递;无论是select/poll还是epoll都需要内核把fd消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的

4.epoll的API更加简单:包括创建一个epoll描述符,添加监听事件,阻塞等待所监听的时间发生,关闭epoll描述符。


传统的BIO编程:

    网络编程的基本模型是Client/Server 模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定的IP 地址和监听端口),客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接,如果连接建立成功,双方就可以通过网络套接字(Socket)进行通信。
    在基于传统同步阻塞模型开发中,ServerSocket 负责绑定IP 地址,启动监听端口;Socket 负责发起连接操作。连接成功之后,双方通过输入和输出流进行同步阻塞式通信。

    采用BIO 通信模型的服务端,通常由一个独立的Acceptor 线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的一请求一应答通信模型。





    该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1 的正比关系,由于线程是Java 虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者僵死,不能对外提供服务。

    为了解决同步阻塞I/O 面临的一个链路需要一个线程处理的问题,后来有人对它的线程模型进行了优化,后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N 的比例关系,其中M 可以远远大于N,通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。

    采用线程池和任务队列可以实现一种叫做伪异步的I/O 通信框架,它的模型如下所示。





    当有新的客户端接入的时候,将客户端的Socket 封装成一个Task(该任务实现java.lang.Runnable 接口)投递到后端的线程池中进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

    伪异步I/O 实际上仅仅只是对之前I/O 线程模型的一个简单优化,它无法从根本上解决同步I/O 导致的通信线程阻塞问题。下面我们就简单分析下如果通信对方返回应答时间过长,会引起的级联故障。

        1. 服务端处理缓慢,返回应答消息耗费60s,平时只需要10ms。
        2. 采用伪异步I/O的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,因此,它将会被同步阻塞60s。
        3. 假如所有的可用线程都被故障服务器阻塞,那后续所有的I/O消息都将在队列中排队。
        4. 由于线程池采用阻塞队列实现,当队列积满之后,后续入队列的操作将被阻塞。
        5. 由于前端只有一个Accptor线程接收客户端接入,它被阻塞在线程池的同步阻塞队列之后,新的客户端请求消息将被拒绝,客户端会发生大量的连接超时。
        6. 由于几乎所有的连接都超时,调用者会认为系统已经崩溃,无法接收新的请求消息。


几种I/O 模型的功能和特性对比:




从JDK1.4开始,JDK提供了一套专门的类库支持非阻塞I/O(NIO),NIO API由三个主要部分组成:缓存区(Buffers)、通道(Channels)和Selector组成。

NIO是基于事件驱动思想来实现的,他采用Reactor模式实现,主要用来解决BIO模型中一个服务端无法同时并发处理大量客户端连接的问题。

NIO基于Selector进行轮询,当socket有数据可读、可写、连接完成、新的TCP请求接入事件时,操作系统内核会触发Selector返回准备就绪的SelectionKey的集合,通过SelectableChannel进行读写操作。由于JDK的Selector底层基于epoll实现,因此不受2048连接数的限制,理论上可以同时处理操作系统最大文件句柄个数的连接。

SelectableChannel的读写操作都是异步非阻塞的,当由于数据没有就绪导致读半包时,立即返回,不会同步阻塞等待数据就绪,当TCP缓存区数据就绪之后,会触发Selector的读事件,驱动下一次读操作。因此,一个Reactor线程就可以同时处理N个客户端的连接,这就解决了之前BIO的一连接一线程的弊端,使Java服务端的并发读写能力得到极大的提升。


业界主流的NIO框架Netty:

   



    Netty是一个异步、非阻塞、事件驱动的网络应用框架。基于Netty,可以快速的开发和部署高性能、高可用的网络服务端和客户端应用。

Netty服务端时序图:



Netty服务端创建时序图:




Netty客户端时序图:




Netty客户端创建时序图:




Netty架构:

Netty采用了比较典型的三层网络架构进行设计和开发,逻辑架构如下:



第一层:业务逻辑编排层,业务逻辑编排层通常有两类:一类是纯粹的业务逻辑编排,还有一类是其他的应用层协议插件,用于特性协议相关的会话和链路管理。

第二层;职责链PipeLine,他负责事件在职责链中的有序传播,同时负责动态的编排职责链,职责链可以选择监听和处理自己关心的事件,他可以拦截处理和向后/向前传播事件,不同的应用的Handler节点的功能也不同,通常情况下,往往会开发编解码Handler用于消息的编解码,他可以将外部的协议消息转换成内部的POJO对象,这样上层业务侧只需要关心处理业务逻辑即可,不需要感知底层的协议差异和线程模型差异,实现了架构层面的分离隔离。

第三层:Reactor通信调度层,他由一系列辅助类完成,包括Reactor线程NioEventLoop以及起父类、NioSocketChannel/NioserverSocketChannel以及其父类、ByteBuffer以及由其衍生出来的各种Buffer、Unsafe以及其衍生出的各种内部类等。该层的主要职责就是监听网络的读写和连接操作,负责将网络层的数据读取到内存缓存区中,然后触发各种网络事件,例如连接创建、连接激活、读事件、写事件等等,将这些时间出发到PipeLine中,由PipeLine充当的职责链来进行后续的处理。

在Netty里,所有事件都来自ChannelEvent 接口,这些事件涵盖监听端口、建立连接、读写数
据等网络通讯的各个阶段。而事件的处理者就是ChannelHandler ,这样,不但是业务逻辑,连网络通讯流程中底层的处理,都可以通过实现ChannelHandler 来完成了。事实上,Netty内部的连接处理、协议编解码、超时等机制,都是通过handler完成的。

在Netty里, Channel 是通讯的载体(连接的通道),是ChannelEvent的产生者,而ChannelHandler 负责Channel中的逻辑处理。

ChannelPipeline 是ChannelHandler的容器:一个Channel包含一个ChannelPipeline,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。

在Netty中, ChannelEvent 是数据或者状态的载体,例如传输的数据对应MessageEvent ,状态的改变对应ChannelStateEvent 。当对Channel进行操作时,会产生一个ChannelEvent,并
发送到ChannelPipeline 。ChannelPipeline会选择一个ChannelHandler进行处理。这个ChannelHandler处理之后,可能会产生新的ChannelEvent,并流转到下一个ChannelHandler。





Netty的ChannelPipeline包含两条线路:Upstream和Downstream。Upstream对应上行,接收到的消息、被动的状态改变,都属于Upstream。Downstream则对应下行,发送的消息、主动的状态改变,都属于Downstream。ChannelPipeline 接口包含了两个重要的方法: sendUpstream(ChannelEvent e) 和sendDownstream(ChannelEvent e) ,就分别对应了
Upstream和Downstream。

对应的,ChannelPipeline里包含的ChannelHandler也包含两类: ChannelUpstreamHandler 和ChannelDownstreamHandler 。每条线路的Handler是互相独立的。它们都很简单的只包含一个方法: ChannelUpstreamHandler.handleUpstream 和ChannelDownstreamHandler.handleDownstream 。





在一条“流”里,一个ChannelEvent 并不会主动的”流”经所有的Handler,而是由上一个Handler显式的调用ChannelPipeline.sendUp(Down)stream 产生,并Handler,而是由上一个Handler显式的调用ChannelPipeline.sendUp(Down)stream 产生,并交给下一个Handler处理。也就是说,每个Handler接收到一个ChannelEvent,并处理结束后,如果需要继续处理,那么它需要调用sendUp(Down)stream 新发起一个事件。如果它不再发起事件,那么处理就到此结束,即使它后面仍然有Handler没有执行。这个机制可以保证最大的灵活性,当然对Handler的先后顺序也有了更严格的要求。

DefaultChannelPipeline的内部结构ChannelPipeline 的主要的实现代码在DefaultChannelPipeline 类里。

列一下DefaultChannelPipeline的主要字段:

public class DefaultChannelPipeline implements ChannelPipeline {
  private volatile Channel channel;
  private volatile ChannelSink sink;
  private volatile DefaultChannelHandlerContext head;
  private volatile DefaultChannelHandlerContext tail;
  private final Map<String, DefaultChannelHandlerContext> name2ctx =
  new HashMap<String, DefaultChannelHandlerContext>(4);
}

ChannelHandlerContext保存了Netty与Handler相关的的上下文信息。而这里的DefaultChannelHandlerContext ,则是对ChannelHandler 的一个包装。一个DefaultChannelHandlerContext 内部,除了包含一个ChannelHandler ,还保存了”next”和”prev”两个指针,从而形成一个双向链表。

因此,在DefaultChannelPipeline 中,我们看到的是对DefaultChannelHandlerContext 的引
用,而不是对ChannelHandler 的直接引用。这里包含”head”和”tail”两个引用,分别指向链表的头和尾。而name2ctx则是一个按名字索引DefaultChannelHandlerContext用户的一个
map,主要在按照名称删除或者添加ChannelHandler时使用。

sendUpstream和sendDownstream是ChannelPipeline 接口的两个重要的方法:sendUpstream(ChannelEvent e) 和sendDownstream(ChannelEvent e) 。所有事件的发起都
是基于这两个方法进行的。Channels 类有一系列fireChannelBound 之类的fireXXXX 方法,其实都是对这两个方法的facade包装。

先看DefaultChannelHandlerContext.sendUpstream(对代码做了一些简化,保留主逻辑):

public void sendUpstream(ChannelEvent e) {

  DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
  head.getHandler().handleUpstream(head, e);
}
private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
  DefaultChannelHandlerContext realCtx = ctx;
  while (!realCtx.canHandleUpstream()) {
     realCtx = realCtx.next;
     if (realCtx == null) {
        return null;
     }
   }
   return realCtx;
}

这里最终调用了ChannelUpstreamHandler.handleUpstream 来处理这个ChannelEvent。有意思的是,这里我们看不到任何”将Handler向后移一位”的操作,但是我们总不能每次都用同一个Handler来进行处理啊?实际上,我们更为常用的是ChannelHandlerContext.handleUpstream 方法(实现是DefaultChannelHandlerContext.sendUpstream 方法):

public void sendUpstream(ChannelEvent e) {
    DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
    DefaultChannelPipeline.this.sendUpstream(next, e);
}

可以看到,这里最终仍然调用了ChannelPipeline.sendUpstream 方法,但是它会将Handler指
针后移。

我们接下来看看DefaultChannelHandlerContext.sendDownstream :

public void sendDownstream(ChannelEvent e) {
   DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
   if (prev == null) {
       try {
             getSink().eventSunk(DefaultChannelPipeline.this, e);
       } catch (Throwable t) {
             notifyHandlerException(e, t);
       }
   } else {
       DefaultChannelPipeline.this.sendDownstream(prev, e);
   }
}


与sendUpstream好像不大相同哦?这里有两点:一是到达末尾时,会调用ChannelSink进行处理;二是这里指针是往前移的,所以我们知道了:UpstreamHandler是从前往后执行的,DownstreamHandler是从后往前执行的。在ChannelPipeline里添加时需要注意顺序了!

ChannelSink包含一个重要方法ChannelSink.eventSunk ,可以接受任意ChannelEvent,实际上,它的作用确实是这样,也可以换个说法:”处于末尾的万能Handler”。








  • 大小: 123.6 KB
  • 大小: 134.9 KB
  • 大小: 63.3 KB
  • 大小: 176.7 KB
  • 大小: 103.6 KB
  • 大小: 224.4 KB
  • 大小: 73.2 KB
  • 大小: 63 KB
  • 大小: 66.1 KB
  • 大小: 56.2 KB
  • 大小: 205.3 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics