`
flyPig
  • 浏览: 139637 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Mina框架剖析--静态篇

阅读更多
一、基础框架

IoService:IoService相当于是Mina的Socket层,负责所有SocketIO事件的注册,select,分发等。它位于org.apache.mina.core.service包内,它有两个子接口,表示Server端接收方的IoAcceptor和Client发起方的IoConnector,以及所有的实现类:
NioDatagramAcceptor/NioDatagramConnector:基于UDP的实现
NioSocketAcceptor/NioSocketConnector:基于TCP的实现
VmPipeAcceptor/VmPipeConnector:基于Pipe的实现

服务端初始化的方式一般是:
 
        SocketAcceptor acceptor = new NioSocketAcceptor();
        acceptor.setReuseAddress( true );
        acceptor.setHandler(new EchoProtocolHandler());
        acceptor.bind(new InetSocketAddress(PORT));

初始化NioSocketAcceptor的时候会做主要事情:
1.需要通过具体的实现提供的transportMetaData,来判断其中规定的session配置类(eg:DefaultSocketSessionConfig)是否来自接口SessionConfig.
public NioSocketAcceptor() {
        super(new DefaultSocketSessionConfig(), NioProcessor.class);
        ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
    }

2.默认启动了一个SimpleIoProcessorPool来包装NioProcessor.
    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,
            Class<? extends IoProcessor<T>> processorClass) {
        this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass),
                true);
    }

而SimpleIoProcessorPool默认是启动CPU个数 +1个 NioProcess,并以数组形式管理。
    private static final int DEFAULT_SIZE = Runtime.getRuntime()
            .availableProcessors() + 1;
    private final IoProcessor<T>[] pool;    
public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType) {
        this(processorType, null, DEFAULT_SIZE);
    }
public SimpleIoProcessorPool(Class<? extends IoProcessor<T>> processorType,
            Executor executor, int size) {
        if (processorType == null) {
            throw new NullPointerException("processorType");
        }
        if (size <= 0) {
            throw new IllegalArgumentException("size: " + size
                    + " (expected: positive integer)");
        }

        if (executor == null) {
            this.executor = executor = Executors.newCachedThreadPool();
            this.createdExecutor = true;
        } else {
            this.executor = executor;
            this.createdExecutor = false;
        }

        pool = new IoProcessor[size];

        boolean success = false;
        Constructor<? extends IoProcessor<T>> processorConstructor = null;
        boolean usesExecutorArg = true;
        ....
       }


在这里并没有指定Executor,因此用默认的Executors.newCachedThreadPool().这个ThreadPool管理着NioSocketAcceptor和所有的IoProcessor处理线程.

在这里,NioSocketAcceptor可以理解为一个Server,而NioProcessor就是其中的并行的处理程序。NioProcessor默认个数是CPU个数+1,这正好是属于Processor的selector的个数,它们专门处理OP_READ/OP_WRITE事件。在NioSocketAcceptor里面也有个Selector,它是专门用作处理OP_ACCEPT事件.这个分离设计使OP_ACCEPT不被读写事件所影响。因此,采用默认设置,MINA会启动 CPU个数+2 Selector.

每个NioSocketAcceptor内部有个Acceptor线程对象,NioSocketAcceptor会使用传入的或者生成的Executor来执行这个Acceptor。
每个NioProcessor内部也有个Processor线程对象,NioProcessor会使用传入的或者生成的Executor来执行这个Processor。

3.IoService初始化时,建立与监听器容器IoServiceListenerSupport的双向关联,注册匿名内部实现serviceActivationListener到监听器容器.

客户端的启动方式:
NioSocketConnector connector = new NioSocketConnector();
connector.getFilterChain().addLast( "logger", new LoggingFilter() );
connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
connector.setConnectTimeout(30);
connector.setHandler(new TimeClientHandler());
ConnectFuture cf = connector.connect(
new InetSocketAddress("127.0.0.1", 8833));

其基本过程跟NioSocketAcceptor差不多.

IoFilter
IoFilter也是一个接口,有点像ServletFilter,在事件被 IoHandler处理之前或之后进行一些特定的操作,比如记录日志(LoggingFilter)、压缩数据(CompressionFilter),SSL加密(SSLFilter)黑名单过滤(BlackListFilter),心跳检测(KeepAliveFilter)等等。这些都是MINA自带的。
实际项目中,有个东西肯定是存在的,就是ProtocolCodecFilter或者跟它类似的filter.我们不可能让IoHandler直接去操作字节流,所以在这一层,一定要把二进制流转成java对象或者文本,这样才能不枉费MINA的良好分层设计。

ProtocolCodecFilter
使用ProtocolCodecFilter很简单,我们只要把ProtocolCodecFilter加入到FilterChain就可以了,但是我们需要提供一个ProtocolCodecFactory。其实ProtocolCodecFilter仅仅是实现了过滤器部分的功能,它会将最终的转换工作,交给从ProtocolCodecFactory获得的Encode和Decode。如果我们需要编写自己的ProtocolCodec,就应该从 ProtocolCodecFactory入手。MINA内置了几个ProtocolCodecFactory,比较常用的就是 ObjectSerializationCodecFactory和TextLineCodecFactory。
ObjectSerializationCodecFactory是Java Object序列化之后的内容直接跟ByteBuffer互相转化,比较适合两端都是Java的情况使用。在笔者所做的自定义协议环境下,在这里就需要重写ObjectSerializationCodecFactory。
TextLineCodecFactory就是 String跟ByteBuffer的转化,比如HTTP,RTSP这类纯文本协议。

IoFilter的顺序问题:IoFilter是有加入顺序的,例如,先加入LoggingFilter再加入ProtocolCodecFilter,和先加入ProtocolCodecFilter再加入LoggingFilter的效果是不一样的,前者 LoggingFilter写入日志的内容是ByteBuffer,而后者写入日志的是转换后具体的类,例如String。

ExecutorFilter
有一个比较重要的过滤器就是ExecutorFilter。前面已经知道,每个NioProcess对应的是一个Process内部线程对象,在Process的run方法里面会循环的调用process方法。也就是第一个事件没有执行完,第二个事件不会执行,如果某次消息处理太耗时,就会导致其他消息等待,整体的吞吐量下降。ExecutorFilter的的作用就是将同一个类型的消息合并起来按顺序调用
public final void messageReceived(NextFilter nextFilter, IoSession session,
            Object message) {
        if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
            IoFilterEvent event = new IoFilterEvent(nextFilter,
                IoEventType.MESSAGE_RECEIVED, session, message); 
            fireEvent(event);
        } else {
            nextFilter.messageReceived(session, message);
        }
    }

从上面的代码可以看到,如果满足事件集合条件,就组装成IoFilterEvent交给Executor异步执行。这样在filter上就不会因为某个事件filter执行时间过长而block后面的事件。
protected void fireEvent(IoFilterEvent event) {
        executor.execute(event);
    }

public void fire() {
        IoSession session = getSession();
        NextFilter nextFilter = getNextFilter();
        IoEventType type = getType();

        if ( LOGGER.isDebugEnabled()) {
            LOGGER.debug( "Firing a {} event for session {}",type, session.getId() );
        }

        switch (type) {
        case MESSAGE_RECEIVED:
            Object parameter = getParameter();
            nextFilter.messageReceived(session, parameter);
            break;
        case MESSAGE_SENT:
            WriteRequest writeRequest = (WriteRequest)getParameter();
            nextFilter.messageSent(session, writeRequest);
            break;
        case WRITE:
            writeRequest = (WriteRequest)getParameter();
            nextFilter.filterWrite(session, writeRequest);
            break;
        case CLOSE:
            nextFilter.filterClose(session);
            break;
        case EXCEPTION_CAUGHT:
            Throwable throwable = (Throwable)getParameter();
            nextFilter.exceptionCaught(session, throwable);
            break;
        case SESSION_IDLE:
            nextFilter.sessionIdle(session, (IdleStatus) getParameter());
            break;
        case SESSION_OPENED:
            nextFilter.sessionOpened(session);
            break;
        case SESSION_CREATED:
            nextFilter.sessionCreated(session);
            break;
        case SESSION_CLOSED:
            nextFilter.sessionClosed(session);
            break;
        default:
            throw new IllegalArgumentException("Unknown event type: " + type);
        }
        
        if ( LOGGER.isDebugEnabled()) {
            LOGGER.debug( "Event {} has been fired for session {}", type, session.getId() );
        }
    }


实际上MINA是用filterChain的方式顺序调用所有注册的filter.默认的DefaultIoFilterChain
if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireMessageReceived(buf);

然后就会一个filter接一个的传下去,直到最后的尾调用IoHandler.

IoHandler
IoHanlder则是所有事件最终产生响应的位置,一般用来处理业务比如分析数据,数据库操作等。
void sessionCreated(IoSession session) throws Exception;
    void sessionOpened(IoSession session) throws Exception;
    void sessionClosed(IoSession session) throws Exception;
    void sessionIdle(IoSession session, IdleStatus status) throws Exception;
    void exceptionCaught(IoSession session, Throwable cause) throws Exception;
    void messageReceived(IoSession session, Object message) throws Exception;
    void messageSent(IoSession session, Object message) throws Exception;
messageReceived是接收客户端消息的事件,我们应该在这里实现业务逻辑。
messageSent是服务器发送消息的事件,一般情况下不会使用它。
sessionClosed是客户端断开连接的事件,可以在这里进行一些资源回收等操作。sessionCreated和sessionOpened,两者稍有不同,sessionCreated是由I/O processor线程触发的,而sessionOpened在其后,由业务线程触发的,由于MINA的I/O processor线程非常少,因此如果我们真的需要使用sessionCreated,也必须是耗时短的操作,一般情况下,我们应该把业务初始化的功能放在sessionOpened事件中。

IoSession
IoSession是一个接口,它提供了对当前连接的操作功能,还有用户定义属性的存储功能,就像HttpSession。一个IoSession就代表一个Client与Server的连接。IoSession是线程安全的,第一层子类AbstractIoSession内部用到了大量的lock机制,因此可以放心的使用而不用担心并发问题。常用的方法:
//
WriteFuture write(Object message)
CloseFuture close();

//属性相关操作 
Object getAttribute(String key);
Object setAttribute(String key, Object value);
Object removeAttribute(String key);
Set getAttributeKeys();
//连接状态操作
boolean isConnected();
boolean isClosing();
SocketAddress getRemoteAddress();
boolean isIdle(IdleStatus status);

主要方法可以分为三类:
连接操作
最主要的方法有两个,向客户端发送消息和断开连接。可以看的出,write接受的变量是一个Object,实际传入的类型应该是最后一个filter处理后的结果。最初始的状态下,message是一个IoBuffer.
IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());IoFilterChain filterChain = session.getFilterChain();
filterChain.fireMessageReceived(buf);


另外注意的是,write返回的WriteFuture类,这样调用IoSession.write方法是不会阻塞的。调用了write方法之后,消息内容会发到底层等候发送,至于什么时候发出,就看线程的调度处理了。非常典型的Future模式的应用。
如果需要确认消息是否成功的发送出去了,只需要wait一下,然后检查下future的状态。
WriteFuture future = session.write(xx);
future.awaitUninterruptibly();
boolean isCompleted = future.isWritten()

当调用write的时候,会通过跟read相反的次序依次传递,直至由IoService负责把数据发送给客户端.
如果在很短的时间里,对同一个IoSession进行了两次write操作,客户端有可能只收到一条消息,而这条消息就是服务器发出的两条消息前后接起来。这样的设计可以在高并发的时候节省网络开销。

属性存储操作
一般用于状态维护。参考HttpSession的作用。跟Attribute相关的4个方法都是跟这个相关的。

连接状态
最后面的4个方法全是连接属性的查询。没什么好说的。




  • 大小: 29 KB
分享到:
评论

相关推荐

    微信“小程序”开发的系统实现及前景分析 (1).pdf

    - 组件结构:小程序的组件结构在MINA框架下进行,该框架支持网络通信,组件结构灵活多样,适用于不同的应用场景。 4. 关键技术: - API:微信小程序底层框架构建基于node.js模块,API技术允许使用react、redux等...

    Java软件开发工程师简历模板.docx

    - **技术栈**:使用Hessian + Mina构建RPC框架,数据库采用MongoDB。 - **职责**:担任技术经理,负责系统的整体架构设计和业务开发工作,成功处理千万级数据和高并发访问需求。 #### 3. **金融商城项目** - **技术...

    myhttp:提供基于mina 的不依赖web容器对http协的支持

    ...然而,对于某些特定场景,如轻量级服务、嵌入式系统或者对性能有极致追求的应用,...通过理解和掌握`myhttp`的原理和实现,开发者可以更好地利用Mina框架进行网络服务开发,同时也为自定义网络协议处理提供了思路。

    Java中级面试题.docx

    - 常见的Java Socket框架有Apache Mina和Netty,这两个框架提供了高效、灵活的网络通信能力。 5. **Java反射机制**: - 反射允许在运行时动态创建和操作类、接口、字段和方法。通过Class对象,可以创建任何类的...

    微信小程序 在Chrome浏览器上运行以及WebStorm的使用

    7. 利用WebStorm的代码审查和静态分析工具,检查代码质量并发现潜在问题。 【MINA框架与编译过程】 微信小程序基于MINA框架,它是一个专为微信小程序设计的轻量级框架。MINA处理了小程序的渲染、生命周期管理和数据...

    JAVA实用组件集

    3. **9_3_数值处理组件**:这个组件专注于数值计算,可能包含统计分析、矩阵运算、复杂数运算等功能,例如Apache Commons Math库,能够帮助开发者进行科学计算和工程应用。 4. **7_文件操作组件**:文件操作组件...

    chatserver:网页在线聊天系统特定

    Java作为跨平台的后端开发语言,提供了丰富的库和框架,如Spring Boot用于快速开发,Netty作为高性能的网络应用框架,以及Apache Mina或Jetty等,这些都能帮助构建高效的聊天服务器。 在解压的文件"chatserver-...

Global site tag (gtag.js) - Google Analytics