论坛首页 Java企业应用论坛

介绍一个基于NIO的异步框架Cindy

浏览 44446 次
该帖已经被评为精华帖
作者 正文
   发表时间:2005-03-05  
Cindy是一个基于java nio的I/O框架,支持TCP/UDP单播/UDP多播/Pipe,为应用程序提供了一个统一的接口去实现异步和同步的网络操作。

    java io包提供了一个简单的模型去处理网络流,它所有的读写方法均为阻塞操作,因此在一般的应用里,用户总是开启一个独立线程或一个线程池去处理这些操作。java io模型非常简单易用,但在扩展性和效率上存在着一些问题。如果用户只需要一两个网络连接,开启几个独立线程操作无伤大雅,但是如果面对服务器端的成百上千个连接,采用java io提供的机制,就需要同时开启成百上千个线程,即使能够处理过来,花在线程上下文切换的时间也远远多于网络操作的用时。

    在JDK 1.4中,Java引入了nio包,除开nio提供的一些其他功能外,最吸引人的就是引入了非阻塞IO的实现。但是直接使用nio中非阻塞I/O需要处理众多异常,维护状态,为应用带来不必要的复杂性。并且JDK 1.4中的nio包只实现了普通TCP/UDP单播/Pipe,JDK 5.0中引入了SSLEngine类,使得基于非阻塞I/O的TCP可以支持SSL和TLS,在未来JDK 6.0中才会加入对非阻塞I/O UDP多播的实现。如果应用程序只想使用同一个模型去操作网络I/O,而不想关心这些诸多限制,那么Cindy是一个很好的选择。

    目前基于nio的开源I/O框架很多,Cindy并不是唯一的选择,比较好实现如Netty2、MINA等等。由于是Cindy的作者,对Cindy的了解程度要高于对其他框架的了解程度,这里所做的比较只是基于作者自身的看法,可能会有失偏颇。在你进行选择之前,你应该从你自身应用的角度进行仔细的比较。

[list]
  • Netty2具有很好的构架,并且该框架非常有名,使用该框架的项目不少,这意味着开发团队持续更新的动力非常大(BTW,刚知道Netty2的开发已经停止了,抱歉)。同时Netty2的文档非常齐全,并且支持JMX(这是我不熟悉的领域,不做评论)。Netty2的缺点就是实现代码的质量不是非常好,最大的缺点就是只支持普通的TCP。
  • MINA是Netty2作者的一个新项目,该项目目前实现的版本已经支持TCP/UDP/Pipe,并且由它的Roadmap上可得知将来会加入对JMX和容器的支持。我未曾使用过该项目,不过既然是Netty2作者的新项目,应该也吸收了Netty2的优点,并且对Netty2中的缺点做了改良。从该项目带的Example来看,使用起来应该是非常简单的。
  • Cindy起源于Netty2之后,借鉴了Netty2中MessageRecognizer类的设计,在当前的版本中已经全面支持普通TCP/Secure TCP/UDP单播/UDP多播/Pipe,可以使用同一个模型进行同步/异步IO处理,并且成功的应用在JML项目中(Java Msn Messenger Library,开源项目,目前发布的版本基于Cindy老的版本),并且在目前公司的项目中也有实际应用。Cindy目前并不打算加入对JMX的支持(不熟悉),也没有计划加入对容器的支持(这个应该是应用做的事情),我想保持Cindy核心的简洁和高效,最好能够代替Socket/DatagramSocket的地位(简单应用的话也许还是使用这个更好 )。Cindy目前缺点是文档相对较少以及应用的项目比较少,希望这几篇文档能够稍微弥补以下Cindy在这文档方面的不足。
  • [/list:u]
       发表时间:2005-03-05  
    客户端Hello world example,发送Hello World!到服务器上:

    //使用tcp进行连接
    SocketSession session = new SocketSession();;
    //设置要连接的地址,这里是localhost 1234端口
    session.setSocketAddress(new InetSocketAddress("localhost",1234););;
    //设置事件产生器
    session.setEventGenerator(new AutoCloseEventGenerator(););;
    //开始连接
    session.start(true);;
    //发送Hello world!
    session.blockWrite(new ByteArrayMessage("Hello world!".getBytes();););;
    //关闭连接
    session.close(true);;



    服务器端Hello world example,接收客户端发过来的Hello World!,并打印出来:

    //建立一个普通的TCP服务
    SimpleServerSocketSession session = new SimpleServerSocketSession();;
    //设置事件产生器,如果和上面一段程序一起运行,应该共享同一个事件产生器以提高效率
    session.setEventGenerator(new AutoCloseEventGenerator(););;
    //设置要监听的端口
    session.setListenPort(1234);;
    //添加连接上SocketSession的事件监听器
    session.addSocketSessionListener(new SessionAdapter(); {
            //接收到消息,打印消息
            public void messageReceived(Session session, Message message); {
                System.out.println(message);;
            }
    });;
    //开始服务
    session.start(true);;
    0 请登录后投票
       发表时间:2005-03-05  
    生成一个Session的实例。

    如果是进行普通TCP连接,可以创建一个SocketSession的实例;如果进行SSL TCP连接,可以创建一个SecureSocketSession的实例;如果进行UDP单播,可以创建DatagramSession实例;如果进行UDP多播,可以创建SimulatedMulticastSession实例等等。

    Session的继承关系图可以参见:http://cindy.sourceforge.net/Session.gif

    假设使用普通TCP连接,则:
    SocketSession session = new SocketSession();;
    0 请登录后投票
       发表时间:2005-03-05  
    设置Session关联的事件生成器。

    //设置事件生成器,用于给Session生成相应事件
    session.setEventGenerator(new AutoCloseEventGenerator(););; 
    


    EventGenerator是一个事件生成器,用于为其关联的Session产生相应的事件。比如session收到数据,EventGenerator会给session一个接收到数据的event;如session关闭了,EventGenerator会给session一个关闭的event,不同的Session实现会根据不同的event做相应的操作。

    熟悉nio的用户可以阅读它的源码,它内部实现是用一个独立线程维持了一个内部Selector变量,在不停的进行内部循环,将产生的事件分发给其关联的session。所以一定要重用EventGenerator才能带来效率上的提高,不要每生成一个Session实例就关联一个新的EventGenerator,在大部分的情况下,所用的Session共用一个EventGenerator是一个比较好的做法。

    目前EventGenerator有两个实现,SimpleEventGenerator和AutoCloseEventGenerator,其区别在于SimpleEventGenerator需要手工关闭,而AutoCloseEventGenerator在其关联的所有Session都close后会自动关闭。(EventGenerator关闭后,其关联的Session都会关闭,模拟出的Session除外)

    EventGenerator/Event这两个接口的存在就是为了降低其与Session之间的耦合性。类似于消息机制,如果某个Session实现收到了一个其不知道的Event,就应该忽略该Event,只有收到了其可以识别的Event才去做相应的事情。
    0 请登录后投票
       发表时间:2005-03-05  
    设置消息识别器。

    //设置消息识别器,用于识别消息
    session.setMessageRecognizer(new ByteArrayMessageRecognizer(););; 


    MessageRecognizer接口是用于识别消息。一般网络协议都分为协议头和协议体,例如MSN的协议,前三个字节代表指令,后面的字节根据指令的不一样而不一样。

    VER 1 MSNP11 MSNP10 CVR0
    CVR 2 0x0409 winnt 5.1 i386 MSNMSGRBETAM2 7.0.0632 msmsgs msn@hotmail.com
    USR 3 TWN I msn@hotmail.com


    MessageRecognizer类就可以根据前三个字节来判断消息类型,返回相应消息类型的实例,比如VerMessage/CvrMessage/UsrMessage,然后利用多态来加载实际的网络流。每个Session目前默认的MessageRecognizer是ByteArrayMessageRecognizer,即把消息流就当作一个字节数组来识别。
    0 请登录后投票
       发表时间:2005-03-05  
    添加Listener。

    //设置分发者,分发SessionListener关心的事件
    session.setDispatcher(new SimpleDispatcher(););;
    //添加Listener
    session.addSessionListener(new SessionAdapter(); {
    
            //连接建立成功
            public void sessionEstablished(Session session); {
                    System.out.println("session established");;
                }
    
            //连接关闭
            public void sessionClosed(Session session); {
                    System.out.println("session closed");;
                }
    });;


    Dispatcher和SessionListener是关联在一起的,SessionListener是Session提供的应用的监听器,Session发生了相应的事件会分发给注册的监听器,而Dispatcher就是用来分发这些事件的。Session默认的Dispatcher就是SimpleDispatcher,是运行在当前线程的。

    在SessionListener中Sleep或者做一些耗时的操作,可能会阻塞后面的事件分发,如果要提高效率或者伸缩性,可以采用PooledOrderedDispatcher,用一个线程池来分发事件。(但是注意,还是不推荐在SessionListener中sleep阻塞)
    0 请登录后投票
       发表时间:2005-03-05  
    设置其他属性。

    //设置要连接的Socket地址
    session.setSocketAddress(new InetSocketAddress("localhost",1234););;


    setSocketAddress是SocketSession特有的方法,用来设置要连接的地址;如果是ServerSocketSession,则可以通过setListenPort或setListenAddress来设置要监听的端口,其他的各种Session实例有着其特殊的设置属性。

    特有的方法请参加Session的继承关系图
    0 请登录后投票
       发表时间:2005-03-06  
    //开始连接
    session.start(true);;


    传入的boolean类型参数代表是否阻塞。如果传入false,则方法立即返回,SocketSession开始异步连接,等到连接建立好了,会通过sessionEstablished事件分发给Listener;如果传入true,则该方法等到连接建立成功或建立失败后才返回,这时用户可以调用session.isAvailable()来判断连接是否建立成功。

    注意,如果传入false,则该行代码返回时,连接可能还正在建立,尚未建立成功。

    //关闭连接
    session.close(false);;


    传入的boolean类型参数代表是否阻塞。如果传入false,则方法立即返回,SocketSession开始异步关闭,等到连接关闭好了,会通过sessionClosed事件分发给Listener;如果传入true,则该方法在连接关闭后才会返回。

    注意,如果传入false,则该行代码返回时,连接可能尚未关闭成功。
    0 请登录后投票
       发表时间:2005-03-06  
    发送消息。在上面实例的SessionListener中,只是简单的在连接建立和连接关闭时打印了一行信息。如果要发送和接收字节流,则可以如以下代码所示:

    session.addSessionListener(new SessionAdapter(); {
    
                public void sessionEstablished(Session session); {
                    ByteArrayMessage message1 = new ByteArrayMessage();;
                    message1.setContent("message1".getBytes(););;
                    session.write(message1);;
    
                    ByteArrayMessage message2 = new ByteArrayMessage();;
                    message2.setContent("message2".getBytes(););;
                    System.out.println(session.blockWrite(message2););;
                }
    
                public void sessionClosed(Session session); {
                    System.out.println("session closed");;
                }
    
                public void messageSent(Session session, Message message); {
                    System.out.println(message);;
                }
            });;


    第一段生成了一个message1的ByteArrayMessage实例,并且通过session.write方法将其发送出去;第二段生成了一个message2的ByteArrayMessage实例,并且通过session.blockWrite方法将其发送出去。

    session.write方式是通过异步的方式来发送,该调用立即返回,等到消息真正发送完毕,会通过messageSent事件通知Listener;session.blockWrite是通过同步的方式来发送,该调用阻塞,直到消息发送完毕或发送失败才返回,并且返回消息发送是否成功。

    Session发送消息都是按照添加到发送队列的顺序来发送的,即使第一条消息是非阻塞发送,第二条消息是阻塞发送,Session的实现会保证只要第一条消息是在第二条消息之前添加到发送队列的,Session会先发送完第一条消息,再发送第二条消息。

    ByteArrayMessage是Cindy框架中所带的一个Message实现,至于应用程序消息,只要实现了Message接口即可以被Session所发送。Message接口有两个方法,toByteBuffer和readFromBuffer,toByteBuffer将该消息转换为字节流,readFromBuffer从字节流中读回该消息,类似于Java里面的序列化。

    toByteBuffer返回类型是一个ByteBuffer数组,这是从节约内存开销的角度设计的接口。有些情况下,一个Message的实现包括一个协议头和协议体,比如典型的实现是协议头占三四个字节,协议体就是一个几K长的字节数组,如果返回ByteBuffer类型,需要生成一个足够长的ByteBuffer,并将内容填充进去;但是返回ByteBuffer数组的话,则可以返回一个长度为2的ByteBuffer数组,第一个数组是协议头,第二个数组是用ByteBuffer.wrap(content)方法包装的ByteBuffer,有效的节约了内存。经过实际测试,如果存在大量的这类消息,在繁忙的服务器上可以明显减少垃圾回收次数。NIO的出现本身是基于效率上的考量,所以这里的设计也经过实际使用中的考量,希望应用者理解这个设计。
    0 请登录后投票
       发表时间:2005-03-06  
    Cindy接收消息的步骤是先通过MessageRecognizer返回消息的实例,然后通过该实例的readFromBuffer方法读入字节流。
    假设使用MSN Messenger的协议,字节流的前三个字节表示消息类型,后面的根据消息类型不一样,协议体不一样。可以为SocketSession设置如下的MessageRecognizer:(伪码)

    public Message recognize(Session session, ByteBuffer buffer); {
          if (buffer len < 3);  return;
          String s = get the first 3 chars;
          if (s == some protocol); then  return new MessageA();;
          else if (s == some protocol); then return new MessageB();;
          else 
                unknown message, may be wrong, close session;
    }


    在识别了相应的消息后通过该实例的readFromBuffer方法从字节流中读入数据,如果读入完成,则readFromBuffer应该返回true,如果字节流尚未接收完成,如只接收了一半,则应该返回false,等待剩余的字节流到达后再进行识别。

    消息识别完毕后会通过messageReceived事件分发的注册的Listener。

    值得注意的是,TCP是基于流的协议,而UDP是基于包的协议。所以如果想要在UDP中得到接收包的地址,请实现PacketMessage接口,相应的Session的自动将接收包的地址设置给该实例。发送时也是如此,如果不是connected UDP传输,在发送Message的时候,也请发送PacketMessage实例,否则会在运行期抛出异常。

    Cindy中内置的ByteArrayMessage就是继承自AbstractPacketMessage,在使用TCP和UDP的情况下都能运行良好;而内置的ByteArrayMessageRecognizer也是一个非常简单的MessageRecognizer,其实现仅仅是简单的:

    public Message recognize(Session session, ByteBuffer buffer); {
            if (buffer.hasRemaining();); {
                return new ByteArrayMessage();;
            }
            return null;
        }


    内置的MessageRecognizerChain则是一个链状的设计。考虑到将来可能出现基于Cindy的标准库,比如识别HTTP消息的库,识别RTSP消息的库,识别MSN消息的库等等,如果在一个应用中需要同时识别多种消息,可以重用这些标准库的MessageRecognizer,当第一个识别不出来后由后面的MessageRecognizer来识别,如:(伪码)

    MessageRecognizerChain chain = new MessageRecognizerChain();;
    chain.addMessageRecognizer(new HttpMessageRecognizer(););;
    chain.addMessageRecognizer(new RtspMessageRecognizer(););;
    chain.addMessageRecognizer(new MsnMessageRecognizer(););;
    session.setMessageRecognizer(chain);;
    0 请登录后投票
    论坛首页 Java企业应用版

    跳转论坛:
    Global site tag (gtag.js) - Google Analytics