`
兩ting
  • 浏览: 78414 次
  • 性别: Icon_minigender_2
  • 来自: 成都
社区版块
存档分类
最新评论

Mina原理解析

    博客分类:
  • Mina
阅读更多

客户端通信过程
1.通过SocketConnector同服务器端建立连接
2.链接建立之后I/O的读写交给了I/O Processor线程,I/O Processor是多线程的
3.通过I/O Processor读取的数据经过IoFilterChain里所有配置的IoFilter,IoFilter进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议
4.最后IoFilter将数据交给Handler进行业务处理,完成了整个读取的过程
5.写入过程也是类似,只是刚好倒过来,通过IoSession.write写出数据,然后Handler进行写入的业务处理,处理完成后交给IoFilterChain,进行消息过滤和协议的转换,最后通过I/O Processor将数据写出到socket通道
IoFilterChain作为消息过滤链
1.读取的时候是从低级协议到高级协议的过程,一般来说从byte字节逐渐转换成业务对象的过程
2.写入的时候一般是从业务对象到字节byte的过程
IoSession贯穿整个通信过程的始终

整个过程可以用一个图来表现

消息箭头都是有NioProcessor-N线程发起调用,默认情况下也在NioProcessor-N线程中执行

类图
http://mina.apache.org/class-diagrams.html#ClassDiagrams-ProtocolDecoderclassdiagram

Connector
作为连接客户端,SocketConector用来和服务器端建立连接,连接成功,创建IoProcessor Thread(不能超过指定的processorCount),Thread由指定的线程池进行管理,IoProcessor 利用NIO框架对IO进行处理,同时创建IoSession。连接的建立是通过Nio的SocketChannel进行。

NioSocketConnector connector = new NioSocketConnector(processorCount);
ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));建立一个I/O通道

Acceptor
作为服务器端的连接接受者,SocketAcceptor用来监听端口,同客户端建立连接,连接建立之后的I/O操作全部交给IoProcessor进行处理
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.bind( new InetSocketAddress(PORT) );
Protocol
利用IoFilter,对消息进行解码和编码,如以下代码通过 MyProtocolEncoder 将java对象转成byte串,通过MyProtocalDecoder 将byte串恢复成java对象

Java代码 复制代码 收藏代码
  1. connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalFactory()));  
  2. ......  
  3. public class MyProtocalFactory implements ProtocolCodecFactory {  
  4.  ProtocolEncoderAdapter encoder = new MyProtocolEncoder();  
  5.  ProtocolDecoder decoder = new MyProtocalDecoder() ;  
  6.  public ProtocolDecoder getDecoder(IoSession session) throws Exception {  
  7.   return decoder;  
  8.  }  
  9.  public ProtocolEncoder getEncoder(IoSession session) throws Exception {  
  10.   return encoder;  
  11.  }  
  12. }  
  13. ......  
  14. public class MyProtocalDecoder extends ProtocolDecoderAdapter  {  
  15.   
  16.  public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)  
  17.    throws Exception {  
  18.     
  19.   int  id  = in.getInt();  
  20.   int  len = in.getInt();  
  21.   byte[]  dst = new byte[len];  
  22.     
  23.   in.get(dst);  
  24.     
  25.   String name = new String(dst,"GBK");  
  26.     
  27.   Item item = new Item();  
  28.   item.setId(id);  
  29.   item.setName(name);  
  30.   out.write(item);  
  31.  }  
  32. }  
  33. ......  
  34. public class MyProtocolEncoder extends ProtocolEncoderAdapter {  
  35.   
  36.  public void encode(IoSession session, Object message,  
  37.    ProtocolEncoderOutput out) throws Exception {  
  38.   Item item = (Item)message;  
  39.   int byteLen = 8 + item.getName().getBytes("GBK").length ;  
  40.   IoBuffer buf = IoBuffer.allocate(byteLen);  
  41.   buf.putInt(item.getId());  
  42.   buf.putInt(item.getName().getBytes("GBK").length);  
  43.   buf.put(item.getName().getBytes("GBK"));  
  44.   buf.flip();  
  45.   out.write(buf);  
  46.     
  47.  }  
  48. }  
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyProtocalFactory()));
......
public class MyProtocalFactory implements ProtocolCodecFactory {
 ProtocolEncoderAdapter encoder = new MyProtocolEncoder();
 ProtocolDecoder decoder = new MyProtocalDecoder() ;
 public ProtocolDecoder getDecoder(IoSession session) throws Exception {
  return decoder;
 }
 public ProtocolEncoder getEncoder(IoSession session) throws Exception {
  return encoder;
 }
}
......
public class MyProtocalDecoder extends ProtocolDecoderAdapter  {

 public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
   throws Exception {
  
  int  id  = in.getInt();
  int  len = in.getInt();
  byte[]  dst = new byte[len];
  
  in.get(dst);
  
  String name = new String(dst,"GBK");
  
  Item item = new Item();
  item.setId(id);
  item.setName(name);
  out.write(item);
 }
}
......
public class MyProtocolEncoder extends ProtocolEncoderAdapter {

 public void encode(IoSession session, Object message,
   ProtocolEncoderOutput out) throws Exception {
  Item item = (Item)message;
  int byteLen = 8 + item.getName().getBytes("GBK").length ;
  IoBuffer buf = IoBuffer.allocate(byteLen);
  buf.putInt(item.getId());
  buf.putInt(item.getName().getBytes("GBK").length);
  buf.put(item.getName().getBytes("GBK"));
  buf.flip();
  out.write(buf);
  
 }
}


handler
具体处理事件,事件包括:sessionCreated、sessionOpened、sessionClosed、sessionIdle、exceptionCaught、messageReceived、messageSent。
connector.setHandler(new MyHandler());MyHandler继承IoHandlerAdapter类或者实现IoHandler接口.事件最终由IoProcessor线程发动调用。
Processor
I/O处理器、允许多线程读写,开发过程中只需要指定线程数量,Processor通过Nio框架进行I/O的续写操作,Processor包含了Nio的Selector的引用。这点也正是mina的优势,如果直接用Nio编写,则需要自己编写代码来实现类似Processor的功能。正因为I/O Processor是异步处理读写的,所以我们有时候需要识别同一个任务的消息,比如一个任务包括发送消息,接收消息,反馈消息,那么我们需要在制定消息格式的时候,消息头里能包含一个能识别是同一个任务的id。
I/O Porcessor线程数的设置 :如果是SocketConnector,则可以在构造方法中指定,如:new SocketConnector(processorCount, Executors.newCachedThreadPool());如果是SocketAcceptor,也是一样的:SocketAcceptor acceptor = new SocketAcceptor(ProcessorCount, Executors.newCachedThreadPool());
processorCount为最大Porcessor线程数,这个值可以通过性能测试进行调优,默认值是cpu核数量+1(Runtime.getRuntime().availableProcessors() + 1)。
比较奇怪的是,每个IoProcessor在创建的时候会本地自己和自己建立一个连接?

IoSession
IoSession是用来保持IoService的上下文,一个IoService在建立Connect之后建立一个IoSession(一个连接一个session),IoSession的生命周期从Connection建立到断开为止
IoSession做两件事情:
1.通过IoSession可以获取IoService的所有相关配置对象(持有对IoService,Processor池,SocketChannel,SessionConfig和IoService.IoHandler的引用)
2.通过IoSession.write 是数据写出的入口

关于线程
http://mina.apache.org/configuring-thread-model.html
ThreadModel 1.x版本的mina还有线程模式选项在2.x之后就没有了
1.x版本指定线程模式
SocketConnectorConfig cfg = new SocketConnectorConfig();
cfg.setThreadModel(ThreadModel.MANUAL);

MINA有3种worker线程
Acceptor、Connector、I/O processor 线程
Acceptor Thread
一般作为服务器端链接的接收线程,实现了接口IoService,线程的数量就是创建SocketAcceptor 的数量
Connector Thread
一般作为客户端的请求建立链接线程,实现了接口IoService,维持了一个和服务器端Acceptor的一个链接,线程数量就是创建SocketConnector 的数量

Mina的SocketAcceptor和SocketConnector均是继承了BaseIoService,是对IoService的两种不同的实现
I/O processor Thread
作为I/O真正处理的线程,存在于服务器端和客户端,用来处理I/O的读写操作,线程的数量是可以配置的,默认最大数量是CPU个数+1
服务器端:在创建SocketAcceptor的时候指定ProcessorCount
SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());
客户端:在创建SocketConnector 的时候指定ProcessorCount
SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());
I/O Processor Thread,是依附于IoService,类似上面的例子SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());是指SocketConnector这个线程允许CPU+1个I/O Processor Thread
NioProcessor虽然是多线程,但是对与一个连接的时候业务处理只会使用一个线程进行处理(Processor线程对于一个客户端连接只使用一个线程NioProcessor-n)如果handler的业务比较耗时,会导致NioProcessor线程堵塞 ,在2个客户端同时连接上来的时候会创建第2个(前提是第1个NioProcessor正在忙),创建的最大数量由Acceptor构造方法的时候指定。如果:一个客户端连接同服务器端有很多通信,并且I/O的开销不大,但是Handler处理的业务时间比较长,那么需要采用独立的线程模式,在FilterChain的最后增加一个ExecutorFitler:
acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
这样可以保证processor和handler的线程是分开的,否则:客户端发送3个消息,而服务器对于每个消息要处理10s左右,那么这3个消息是被串行处理,在处理第一个消息的时候,后面的消息将被堵塞,同样反过来客户端也有同样的问题。

客户端Porcessor堵塞测试情况:
1.以下代码在建立连接后连续发送了5个消息(item)
 

Java代码 复制代码 收藏代码
  1. ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));  
  2.                 future.awaitUninterruptibly();  
  3.                 session = future.getSession();  
  4.                 Item item = new Item();  
  5.                 item.setId(12345);  
  6.                 item.setName("hi");  
  7.                 session.write(item);  
  8.                 session.write(item);  
  9.                 session.write(item);  
  10.                 session.write(item);  
  11.                 session.write(item);  
ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));
                future.awaitUninterruptibly();
                session = future.getSession();
                Item item = new Item();
                item.setId(12345);
                item.setName("hi");
                session.write(item);
                session.write(item);
                session.write(item);
                session.write(item);
                session.write(item);




2.在handle的messageSent方法进行了延时处理,延时3秒

 

Java代码 复制代码 收藏代码
  1. public void messageSent(IoSession session, Object message) throws Exception {  
  2.       Thread.sleep(3000);  
  3.       System.out.println(message);  
  4.         
  5.   }  
  public void messageSent(IoSession session, Object message) throws Exception {
        Thread.sleep(3000);
        System.out.println(message);
        
    }



3.测试结果
5个消息是串行发送,都由同一个IoPorcessor线程处理
             

Java代码 复制代码 收藏代码
  1. session.write(item);  
  2.               session.write(item);  
  3.               session.write(item);  
  4.               session.write(item);  
  5.               session.write(item);  
  session.write(item);
                session.write(item);
                session.write(item);
                session.write(item);
                session.write(item);


服务器端每隔3秒收到一个消息。因为调用是由IoProcessor触发,而一个connector只会使用一个IoProcessor线程

4.增加ExecutorFilter,ExecutorFilter保证在处理handler的时候是独立线程
connector.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));
5.测试结果
4个session.wirte变成了并行处理,服务器端同时收到了5条消息

分享到:
评论

相关推荐

    Mina 2.0快速入门与源码解析

    ### Mina 2.0快速入门与源码解析 #### Mina 2.0 快速入门 Mina 2.0 是一个基于Java NIO技术的高效、可伸缩的网络通信框架,广泛应用于服务器端开发。Mina 2.0 的设计目标是为开发者提供一个易于使用的API,使得...

    mina自定义编解码器详解

    本文将深入探讨mina编解码器的工作原理,提供一个典型编解码器的示例,并解析其代码。 1. **mina框架基础** - Mina提供了一个高效的、事件驱动的网络应用程序框架,简化了网络编程,尤其是TCP和UDP通信。 - 它...

    Apache mina源代码框架解析

    在这个文档中,我们将简要分析Mina 2.0框架的源代码,并通过一个简单的时钟服务器示例来了解其工作原理。 首先,我们来看`MinaTimeServer`类。这个类是Mina服务器的主入口点,它创建了一个`NioSocketAcceptor`实例...

    Mina客户端服务器Demo

    这个Demo是学习Mina网络通信开发的绝佳起点,不仅可以帮助你深入理解Mina的工作原理,还能够指导你如何将Mina应用到实际项目中,构建高性能的网络服务。通过阅读和分析源代码,你将能够更熟练地运用Mina进行网络应用...

    mina即时聊天demo

    在本“mina即时聊天demo”中,我们可以通过学习如何利用Mina来构建一个简单的即时聊天系统,了解其核心机制和工作原理。 1. **Mina框架基础** Mina的核心在于它的I/O处理模型,即NIO(Non-blocking I/O)。NIO允许...

    ftpserver mina框架,适合学习

    这个框架非常适合初学者学习,因为它提供了清晰的结构和丰富的文档,帮助理解网络通信和FTP协议的工作原理。 Apache Mina是一个高度可扩展的网络通信框架,支持多种协议如TCP、UDP等,并且适用于Java开发。它抽象出...

    Mina 1.1.7 示例源码(apache.mina.example)

    在Mina 1.1.7版本中,提供的示例源码是学习和理解Mina框架工作原理及其实现各种网络协议的重要资源。这个压缩包"apache.mina.example"包含了一系列的示例项目,可以帮助开发者快速上手并深入理解Mina。 1. **Mina...

    Mina2中文文档

    根据提供的信息,我们可以详细解析与"Mina2中文文档"相关的各个关键知识点: ### Mina2中文文档概述 #### Introduction Mina2是一个基于NIO(Non-blocking I/O)的高效网络通信框架,由Apache软件基金会维护。该...

    mina源代码学习提供下载

    在这个“mina源代码学习”资料中,你将有机会深入理解MINA的核心原理和实现机制。 MINA的核心特性包括: 1. **非阻塞I/O**:MINA基于Java NIO(Non-blocking Input/Output)库,实现了事件驱动和异步通信模型。...

    mina-2.0.4 source code

    《mina-2.0.4源码解析:深入理解Java NIO通信框架》 Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高度可扩展的、高性能的Java网络应用程序框架,主要用于构建服务器端应用。MINA...

    mina HTTP协议实例

    5. **源码分析**:阅读MINA的源码可以帮助我们更深入地理解其工作原理。例如,我们可以研究AbstractIoSession是如何管理会话状态的,或者HttpDecoder和HttpEncoder如何实现HTTP协议的解析和编码。 6. **工具使用**...

    MINA 协议解码过滤器

    在`mina src`压缩包中,可能包含MINA框架的源代码,你可以通过阅读这些源码来深入理解MINA的工作原理,特别是过滤器和解码器的实现。这对于学习MINA、理解和定制自己的网络服务非常有帮助。同时,结合提供的博客链接...

    mina实现登录功能

    首先,理解MINA的基本工作原理至关重要。MINA的核心是IoSession对象,它代表了服务器和客户端之间的持久连接。当客户端连接到服务器时,MINA会创建一个新的IoSession实例,通过这个实例,服务器可以发送和接收数据。...

    mina 上传文件和文件名

    在IT行业中,网络通信是至关重要的一个领域,而Apache MINA...理解MINA的核心原理和API使用,能帮助我们构建稳定、安全的网络服务。在实际应用中,还需要关注性能优化、错误处理和安全性等方面,以确保服务的稳定运行。

    Mina Server调试工具

    Mina Server调试工具是针对Apache Mina框架...同时,这也是一种学习Mina框架原理和最佳实践的有效途径,对于提升开发者的技术水平大有裨益。在实际项目中,结合源代码和这款工具,将有助于实现更高质量的网络服务开发。

    mina TCP、UDP通讯

    在本文中,我们将深入探讨mina如何实现TCP和UDP的通信,并通过实例来进一步理解其工作原理。 首先,TCP(传输控制协议)是一种面向连接的、可靠的传输协议,它确保数据包按照正确的顺序到达目的地,且无遗漏。mina...

    MIna中转服务

    本文将深入探讨Mina中转服务的概念、工作原理以及实际应用。 一、Mina中转服务概念 Mina中转服务,简单来说,就是利用Mina框架构建的一种网络服务,能够处理客户端的请求并转发到相应的服务端,起到数据传输的桥梁...

    mina作为服务器的用法

    通过阅读这些资料,你可以逐步了解Mina的原理和实践,避免踩坑,快速上手。 总之,Apache Mina是一个强大的网络通信框架,尤其适用于需要高效处理并发连接的服务器开发。通过理解上述知识点,你将能够熟练地运用...

    Mina官网例子之时间服务器

    通过这个简单的例子,你可以学习到Mina的基本工作原理,以及如何构建和扩展基于Mina的应用。在实际项目中,Mina可以用于构建复杂的网络应用,如FTP服务器、HTTP服务器,甚至是自定义的二进制协议通信系统。

Global site tag (gtag.js) - Google Analytics