`
季铵盐
  • 浏览: 58839 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

MINA,xSocket同样的性能缺陷及陷阱,Grizzly better

阅读更多
本文转自博客:http://www.blogjava.net/adapterofcoms/articles/314560.html
因最近想重构下底层的服务器代码,特意查了下开源框架nio的东西 下面的不错 大家一起学校下 [url][/url]

MINA,Grizzly[grizzly-nio-framework],xSocket都是基于 java nio的 server framework.
这里的性能缺陷的焦点是指当一条channel上的SelectionKey.OP_READ ready时,1.是由select thread读完数据之后再分发给应用程序的handler,2.还是直接就分发,由handler thread来负责读数据和handle.
mina,xsocket是1. grizzly-nio-framework是2.
尽管读channel buffer中bytes是很快的,但是如果我们放大,当连接channel达到上万数量级,甚至更多,这种延迟响应的效果将会愈加明显.
MINA:
for all selectedKeys
{
    read data then fireMessageReceived.
}
xSocket:
for all selectedKeys
{
    read data ,append it to readQueue then performOnData.
}
其中mina在fireMessageReceived时没有使用threadpool来分发,所以需要应用程序在handler.messageReceived中再分发.而xsocket的performOnData默认是分发给threadpool[WorkerPool],WorkerPool虽然解决了线程池中的线程不能充到最大的问题[跟tomcat6的做法一样],但是它的调度机制依然缺乏灵活性.
Grizzly:
for all selectedKeys
{
   [NIOContext---filterChain.execute--->our filter.execute]<------run In DefaultThreadPool
}
grizzly的DefaultThreadPool几乎重写了java util concurrent threadpool,并使用自己的LinkedTransferQueue,但同样缺乏灵活的池中线程的调度机制.

下面分别是MINA,xSocket,Grizzly的源码分析:
Apache MINA (mina-2.0.0-M6源码为例):
    我们使用mina nio tcp最常用的样例如下:
        NioSocketAcceptor acceptor = new NioSocketAcceptor(/*NioProcessorPool's size*/);
        DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();       
        //chain.addLast("codec", new ProtocolCodecFilter(
                //new TextLineCodecFactory()));
        ......
        // Bind
        acceptor.setHandler(/*our IoHandler*/);
        acceptor.bind(new InetSocketAddress(port));
------------------------------------------------------------------------------------
    首先从NioSocketAcceptor(extends AbstractPollingIoAcceptor)开始,
bind(SocketAddress)--->bindInternal--->startupAcceptor:启动AbstractPollingIoAcceptor.Acceptor.run使用executor[Executor]的线程,注册[interestOps:SelectionKey.OP_ACCEPT],然后wakeup selector.
一旦有连接进来就构建NioSocketSession--对应--channal,然后session.getProcessor().add(session)将当前的channal加入到NioProcessor的selector中去[interestOps:SelectionKey.OP_READ],这样每个连接中有请求过来就由相应的NioProcessor来处理.

这里有几点要说明的是:
1.一个NioSocketAcceptor对应了多个NioProcessor,比如NioSocketAcceptor就使用了SimpleIoProcessorPool DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1.当然这个size在new NioSocketAcceptor的时候可以设定.
2.一个NioSocketAcceptor对应一个java nio selector[OP_ACCEPT],一个NioProcessor也对应一个java nio selector[OP_READ].
3.一个NioSocketAcceptor对应一个内部的AbstractPollingIoAcceptor.Acceptor---thread.
4.一个NioProcessor也对应一个内部的AbstractPollingIoProcessor.Processor---thread.
5.在new NioSocketAcceptor的时候如果你不提供Executor(线程池)的话,那么默认使用Executors.newCachedThreadPool().
这个Executor将被NioSocketAcceptor和NioProcessor公用,也就是说上面的Acceptor---thread(一条)和Processor---thread(多条)都是源于这个Executor.
      当一个连接java nio channal--NioSession被加到ProcessorPool[i]--NioProcessor中去后就转入了AbstractPollingIoProcessor.Processor.run,
AbstractPollingIoProcessor.Processor.run方法是运行在上面的Executor中的一条线程中的,当前的NioProcessor将处理注册在它的selector上的所有连接的请求[interestOps:SelectionKey.OP_READ].

AbstractPollingIoProcessor.Processor.run的主要执行流程:
for (;;) {     
       ......
       int selected = selector(final SELECT_TIMEOUT = 1000L);
       .......
       if (selected > 0) {
          process();
       }
       ......
}

process()-->for all session-channal:OP_READ -->read(session):这个read方法是AbstractPollingIoProcessor.private void read(T session)方法.
read(session)的主要执行流程是read channal-data to buf,if readBytes>0 then IoFilterChain.fireMessageReceived(buf)/*我们的IoHandler.messageReceived将在其中被调用*/;
    到此mina Nio 处理请求的流程已经明了.
    mina处理请求的线程模型也出来了,性能问题也来了,那就是在AbstractPollingIoProcessor.Processor.run-->process-->read(per session)中,在process的时候mina是for all selected-channals 逐次read data再fireMessageReceived到我们的IoHandler.messageReceived中,而不是并发处理,这样一来很明显后来的请求将被延迟处理.
我们假设:如果NioProcessorPool's size=2 现在有200个客户端同时连接过来,假设每个NioProcessor都注册了100个连接,对于每个NioProcessor将依次顺序处理这100个请求,那么这其中的第100个请求要得到处理,那它只有等到前面的99个被处理完了.
    有人提出了改进方案,那就是在我们自己的IoHandler.messageReceived中利用线程池再进行分发dispatching,这个当然是个好主意.
    但是请求还是被延迟处理了,因为还有read data所消耗的时间,这样第100个请求它的数据要被读,就要等前面的99个都被读完才行,即便是增加ProcessorPool的尺寸也不能解决这个问题.
    此外mina的陷阱(这个词较时髦)也出来了,就是在read(session)中,在说这个陷阱之前先说明一下,我们的client端向server端发送一个消息体的时候不一定是完整的只发送一次,可能分多次发送,特别是在client端忙或要发送的消息体的长度较长的时候.而mina在这种情况下就会call我们的IoHandler.messageReceived多次,结果就是消息体被分割了若干份,等于我们在IoHandler.messageReceived中每次处理的数据都是不完整的,这会导致数据丢失,无效.
下面是read(session)的源码:
private void read(T session) {
        IoSessionConfig config = session.getConfig();
        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());

        final boolean hasFragmentation =
            session.getTransportMetadata().hasFragmentation();

        try {
            int readBytes = 0;
            int ret;

            try {
                if (hasFragmentation/*hasFragmentation一定为ture,也许mina的开发人员也意识到了传输数据的碎片问题,但是靠下面的处理是远远不够的,因为client一旦间隔发送,ret就可能为0,退出while,不完整的readBytes将被fire*/) {
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }

            if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                scheduleRemove(session);
            }
            IoFilterChain filterChain = session.getFilterChain();
            filterChain.fireExceptionCaught(e);
        }
    }
这个陷阱大家可以测试一下,看会不会一个完整的消息被多次发送,你的IoHandler.messageReceived有没有被多次调用.
要保持我们应用程序消息体的完整性也很简单只需创建一个断点breakpoint,然后set it to the current IoSession,一旦消息体数据完整就dispatching it and remove it from the current session.
--------------------------------------------------------------------------------------------------
下面以xSocket v2_8_8源码为例:
tcp usage e.g:
IServer srv = new Server(8090, new EchoHandler());
srv.start() or run();
-----------------------------------------------------------------------
class EchoHandler implements IDataHandler {  
    public boolean onData(INonBlockingConnection nbc)
             throws IOException,
             BufferUnderflowException,
             MaxReadSizeExceededException {
       String data = nbc.readStringByDelimiter("\r\n");
       nbc.write(data + "\r\n");
       return true;
    }
  }
------------------------------------------------------------------------
说明1.Server:Acceptor:IDataHandler ------1:1:1
Server.run-->IoAcceptor.accept()在port上阻塞,一旦有channel就从IoSocketDispatcherPool中获取一个IoSocketDispatcher,同时构建一个IoSocketHandler和NonBlockingConnection,调用Server.LifeCycleHandler.onConnectionAccepted(ioHandler)  initialize the IoSocketHandler.注意:IoSocketDispatcherPool.size默认为2,也就是说只有2条do select的线程和相应的2个IoSocketDispatcher.这个和MINA的NioProcessor数是一样的.
说明2.IoSocketDispatcher[java nio Selector]:IoSocketHandler:NonBlockingConnection------1:1:1
在IoSocketDispatcher[对应一个Selector].run中--->IoSocketDispatcher.handleReadWriteKeys:
for all selectedKeys
{
    IoSocketHandler.onReadableEvent/onWriteableEvent.
}
IoSocketHandler.onReadableEvent的处理过程如下:
1.readSocket();
2.NonBlockingConnection.IoHandlerCallback.onData
NonBlockingConnection.onData--->appendDataToReadBuffer: readQueue append data
3.NonBlockingConnection.IoHandlerCallback.onPostData
NonBlockingConnection.onPostData--->HandlerAdapter.onData[our dataHandler] performOnData in WorkerPool[threadpool].

因为是把channel中的数据读到readQueue中,应用程序的dataHandler.onData会被多次调用直到readQueue中的数据读完为止.所以依然存在类似mina的陷阱.解决的方法依然类似,因为这里有NonBlockingConnection.
----------------------------------------------------------------------------------------------
再下面以grizzly-nio-framework v1.9.18源码为例:
tcp usage e.g:
Controller sel = new Controller();
         sel.setProtocolChainInstanceHandler(new DefaultProtocolChainInstanceHandler(){
             public ProtocolChain poll() {
                 ProtocolChain protocolChain = protocolChains.poll();
                 if (protocolChain == null){
                     protocolChain = new DefaultProtocolChain();
                     //protocolChain.addFilter(our app's filter/*应用程序的处理从filter开始,类似mina.ioHandler,xSocket.dataHandler*/);
                     //protocolChain.addFilter(new ReadFilter());
                 }
                 return protocolChain;
             }
         });
         //如果你不增加自己的SelectorHandler,Controller就默认使用TCPSelectorHandler port:18888
         sel.addSelectorHandler(our app's selectorHandler on special port);        
  sel.start();
------------------------------------------------------------------------------------------------------------
说明1.Controller:ProtocolChain:Filter------1:1:n,Controller:SelectorHandler------1:n,
SelectorHandler[对应一个Selector]:SelectorHandlerRunner------1:1,
Controller. start()--->for per SelectorHandler start SelectorHandlerRunner to run.
SelectorHandlerRunner.run()--->selectorHandler.select()  then handleSelectedKeys:
for all selectedKeys
{
   NIOContext.execute:dispatching to threadpool for ProtocolChain.execute--->our filter.execute.
}

你会发现这里没有read data from channel的动作,因为这将由你的filter来完成.所以自然没有mina,xsocket它们的陷阱问题,分发提前了.但是你要注意SelectorHandler:Selector:SelectorHandlerRunner:Thread[SelectorHandlerRunner.run]都是1:1:1:1,也就是说只有一条线程在doSelect then handleSelectedKeys.

    相比之下虽然grizzly在并发性能上更优,但是在易用性方面却不如mina,xsocket,比如类似mina,xsocket中表示当前连接或会话的IoSession,INonBlockingConnection对象在grizzly中由NIOContext来负责,但是NIOContext并没有提供session/connection lifecycle event,以及常规的read/write操作,这些都需要你自己去扩展SelectorHandler和ProtocolFilter,从另一个方面也可以说明grizzly的可扩展性,灵活性更胜一筹.

///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

一般网络通讯程序是,客户端请求,然后服务端回复响应,即key.isReadable()时,服务器端接收请求,key.isWriteable()时,服务端返回响应!一般我们在处理完接收后,我们就注册写事件,然后有写事件时处理写事件,正常网上的实例都是这样写的,可是这样的话,客户端第二次以后发送时,服务器端再也没有进入到读事件中,还写事件回复的数据也很怪异,返回很多重复的数据。

再改进一点的是,写事件完事以后,又注册一个读事件,这样又可以读了,然后读了以后再注册写,这样反复重复,的确,能很好的解决问题了。可是是因为什么原因呢?当时我是这样解决的,凑合用着,但是不知道为什么!

昨天偶尔想起这个问题,然后查看mina源码时,发现其只注册过读事件,而对写事件并没有注册过,只是发现有一些key.interestOps(SelectionKey.OP_WRITE)的操作,不是很明白,后来查看帮忙文档,知道这是管道当前感兴趣的事件时突然有些明白了,后来又查看jdk源码,发现,每当重复注册时,都会检测事件是否已经注册过,如果已经注册过的事件,会把当前感兴趣的事件切换成现在感兴趣的事件,这样所有的都明白了。

即:管道注册事件,可以有四种事件,它可以注册所有的事件,但是管道每次只能对一种事件有效,即注册读事件时,此时只对读事件有效,注册写事件时,此时只对写事件有效,这样上面的笨方法来回注册刚好达到这种效果,多的只是每次注册时的一次判断是否已经注册过。当然我们也有更好的方法,那就是通过key.interestOps方法来切换当前感兴趣的事件,这样就可以避免每次注册时都需要判断了。
分享到:
评论

相关推荐

    mina权威性能测试例子

    Apache Mina是一个开源的网络通信应用框架,主要应用于构建高性能、高可用性的网络服务器和客户端。这个"mina权威性能测试例子"是针对Apache Mina的一个实际性能测试案例,旨在展示在特定环境下Mina如何处理大量并发...

    MINA NIO 高性能异步并发网络通讯框架

    由于这一系列的重大改进,使得 2.0.x 成为十分令人期待的一个版本 我们在惊叹 MINA 可以带来多么大便利的同时,还不得不为其卓越的性能而骄傲,据称使用MINA开发服务器程序的性能已经逼近使用 C/C++ 语言开发的...

    基于MINA构建简单高性能的NIO应用.pdf

    "基于MINA构建简单高性能的NIO应用.pdf" 这个标题指出,文档主要讨论如何使用MINA框架来构建一个基于非阻塞I/O (Non-blocking I/O, NIO) 的应用程序。MINA是一个开源的Java框架,专为网络通信设计,特别是高性能、高...

    基于java的开发源码-mina高性能Java网络框架.zip

    基于java的开发源码-mina高性能Java网络框架.zip 基于java的开发源码-mina高性能Java网络框架.zip 基于java的开发源码-mina高性能Java网络框架.zip 基于java的开发源码-mina高性能Java网络框架.zip 基于java的开发...

    基于MINA构建高性能的NIO应用

    ### 基于MINA构建高性能的NIO应用 #### 概述 MINA作为一款优秀的客户端/服务器架构下的Java服务器框架,凭借其强大的功能和灵活性,在开发高性能网络应用程序方面表现突出。本文将深入探讨MINA的核心概念、优势...

    高性能网络架构Mina框架 下载

    ### 高性能网络架构Mina框架简介 #### 一、Mina框架概述 Mina(Multithreaded Internet Network Application)框架是由Apache软件基金会提供的一个高性能、可伸缩的网络编程框架,它主要应用于Java NIO环境下的...

    高性能Java网络框架 MINA

    Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高性能、异步事件驱动的网络应用程序框架,专为开发基于TCP/IP和UDP/IP协议的应用程序而设计。MINA的目标是简化网络编程,使得开发者...

    apache-mina-2.0.4.rar_apache mina_mina

    Apache Mina是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"apache-mina-2.0.4.rar"压缩包包含的是Apache Mina 2.0.4版本的源代码,是深入理解和定制Mina的...

    mina apach 网络通信框架高性能例子

    Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高级网络通信框架,主要针对Java平台设计,提供了高度可扩展和高性能的网络应用开发环境。MINA旨在简化网络编程,特别是对于TCP/IP和UDP...

    mina的高级使用,mina文件图片传送,mina发送文件,mina报文处理,mina发送xml和json

    Apache Mina是一个开源的网络通信应用框架,主要应用于Java平台,它为高性能、高可用性的网络应用程序提供了基础架构。在本文中,我们将深入探讨Mina的高级使用,特别是在文件图片传送、文件发送、XML和JSON报文处理...

    使用 Apache MINA 开发高性能网络应用程序

    Apache MINA 是一个高度可扩展的网络应用框架,专注于提供高性能和高可用性的服务。它主要基于 Java NIO(非阻塞I/O)技术,允许开发者构建TCP/UDP网络服务以及串口通信程序。MINA 提供了一套丰富的组件和过滤器系统...

    一种基于Mina状态机的高性能服务器的设计方法

    Mina 服务器设计方法基于状态机的高性能服务器设计方法 Mina 服务器设计方法是基于状态机的高性能服务器设计方法,旨在提高服务器的性能和可靠性。该方法使用 Mina 网络应用程序框架和有限状态机,对服务器的设计和...

    MINA_API+MINA_DOC+mina

    MINA (Java IO Network Application Framework) 是一个由Apache软件基金会开发的开源网络通信框架,主要应用于构建高性能、高可用性的网络服务器。这个压缩包包含了MINA API文档、自学手册以及开发指南,对于学习和...

    高性能Java网络框架 MINA.7z

    MINA(Multipurpose Infrastructure for Network Applications)是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。MINA由Apache软件基金会开发,并且是其顶级项目之一...

    mina连接 mina心跳连接 mina断线重连

    Apache Mina是一个开源的网络通信框架,常用于构建高性能、高效率的服务端应用程序,尤其在Java平台上。在本文中,我们将深入探讨Mina的核心概念,包括连接管理、心跳机制以及断线重连策略。 首先,让我们理解"Mina...

    给予mina 协议进行大数据传输

    MINA是Apache软件基金会的一个项目,它提供了一个高度可扩展和高性能的网络应用开发框架,适用于多种传输协议,如TCP/IP和UDP/IP,以及SSL/TLS加密。在大数据传输场景中,MINA因其非阻塞I/O模型而被广泛采用,能够...

    apache-mina-2.0.4架包及源码各pdf学习教程

    apache-mina-2.0.4 架包 源码 学习教程.apache mina是Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序...

    Mina+Socket通信

    Mina(全称“MINA: Minimalistic Application Networking API”)是Apache软件基金会的一个开源项目,它为开发者提供了一种简单而高效的方式来构建高性能、跨平台的网络应用。Mina的核心优势在于它的事件驱动和异步I...

    Java springboot 整合mina 框架,nio通讯基础教程,mina框架基础教程.zip

    Java SpringBoot 整合Mina框架,涉及到的核心技术主要包括Java NIO(非阻塞I/O)、Mina框架以及SpringBoot的集成应用。本教程旨在帮助开发者深入理解和掌握这些技术,并提供了一个可直接使用的基础平台框架。 Java ...

Global site tag (gtag.js) - Google Analytics