`
flyPig
  • 浏览: 139819 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

Mina框架剖析--动态篇

阅读更多
一切从启动开始,MINA服务端启动代码:
    private void start(int port) throws IOException, InstantiationException,
            IllegalAccessException, ClassNotFoundException {
        //1
        NioSocketAcceptor acceptor = new NioSocketAcceptor(5);//5个NioProcessor
        Executor threadPool = Executors
                .newFixedThreadPool(100);// 固定100个的线程池
        
        //2
        acceptor.getFilterChain().addLast("exector",
                new ExecutorFilter(threadPool));
        acceptor.getFilterChain().addLast("codec",
                new ProtocolCodecFilter(new TestEncoder(), new TestDecoder()));
        LoggingFilter filter = new LoggingFilter();
        filter.setExceptionCaughtLogLevel(LogLevel.DEBUG);
        filter.setMessageReceivedLogLevel(LogLevel.DEBUG);
        filter.setMessageSentLogLevel(LogLevel.DEBUG);
        filter.setSessionClosedLogLevel(LogLevel.DEBUG);
        filter.setSessionCreatedLogLevel(LogLevel.DEBUG);
        filter.setSessionIdleLogLevel(LogLevel.DEBUG);
        filter.setSessionOpenedLogLevel(LogLevel.DEBUG);
        acceptor.getFilterChain().addLast("logger", filter);
        
        //3
        acceptor.setReuseAddress(true);//ServerSocket.setReuseAddress
        acceptor.setBacklog(128);
        acceptor.setDefaultLocalAddress(new InetSocketAddress(port));
        // 加入处理器(Handler)到Acceptor
        acceptor.setHandler(new TestHandler());

        //4
        acceptor.getSessionConfig().setReuseAddress(true);// 设置每一个非主监听连接的端口可以重用Socket.setReuseAddress
        acceptor.getSessionConfig().setReceiveBufferSize(1024);// 设置输入缓冲区的大小Socket.setReceiveBufferSize
        acceptor.getSessionConfig().setSendBufferSize(10240);// 设置输出缓冲区的大小
        // 设置为非延迟发送,为true则不组装成大包发送,收到东西马上发出Socket.setSendBufferSize
        acceptor.getSessionConfig().setTcpNoDelay(true);
        // 设置主服务监听端口的监听队列的最大值为128,如果当前已经有128个连接,再新的连接来将被reject
        
        //5
        acceptor.bind();
    }

步骤1:初始化NioSocketAcceptor和Global的ThreadPoolExecutor.
步骤2:初始化3个filter,其中ExecutorFilter用Global的ThreadPool实现。TestDecoder/TestEncoder是实现ProtocolEncoder、ProtocolDecoder的类。
步骤3:设置acceptor的一些参数,其实最终是设置的ServerSocket类的参数。
步骤4:设置SessionConfig的参数,最终是设置到established的Socket类的参数。
步骤5:启动监听开始干活。

在bind里面具体做的事情就是:
bind(SocketAddress)-->bindInternal-->startupAcceptor:启动AbstractPollingIoAcceptor.Acceptor.run使用executor [Executor]的线程,注册OP_ACCEPT,然后wakeup selector。这个里面有一个异步处理就是AbstractPollingIoAcceptor里面有一个registerQueue和cancelQueue。bind就是往registerQueue里面put,unbind就是往cancelQueue里面put.AbstractPollingIoAcceptor.Acceptor每次的循环,会先检查registerQueue,调用
完processHandles(必须有Accept事件发生),然后检查cancelQueue.


一旦有连接进来(OP_Accept)就会
1.构建NioSocketSession,对应SocketChannal,NioSocketSession持有对IoService,Processor池,SocketChannel,SessionConfig,IoHandler的所有引用。
SocketChannel ch = handle.accept();        
        if (ch == null) {
            return null;
        }
        return new NioSocketSession(this, processor, ch);

public NioSocketSession(IoService service, IoProcessor<NioSession> processor, SocketChannel ch) {
        this.service = service;
        this.processor = processor;
        this.ch = ch;
        this.handler = service.getHandler();
        this.config.setAll(service.getSessionConfig());
    }

2.初始化Session里面的attributeMap,WriteRequestQueue。
((AbstractIoSession) session).setAttributeMap(session.getService()                   .getSessionDataStructureFactory().getAttributeMap(session));
((AbstractIoSession) session).setWriteRequestQueue(session
                    .getService().getSessionDataStructureFactory()
                    .getWriteRequestQueue(session));

3.session.getProcessor().add(session)将Session加入到了newSessions这个queue中,Processor会从里面poll数据执行。
session.getProcessor().add(session);

 public final void add(T session) {
        if (isDisposing()) {
            throw new IllegalStateException("Already disposed.");
        }

        // Adds the session to the newSession queue and starts the worker
        newSessions.add(session);
        startupProcessor();
    }

可以总结一下:
1. NioSocketAcceptor就是可以大致理解为ServerSocketChannel的异步模型包装,专门处理OP_ACCEPT事件。内部有一个AbstractPollingIoAcceptor.Acceptor线程对象,由Executors来执行。
2. 一个NioSocketAcceptor对应了多个NioProcessor。NioProcessor是专门处理OP_READ事件,一个NioProcessor对应一个SocketChannel。内部的AbstractPollingIoProcessor.Processor线程对象也是由Executors执行。

这个时候所有的处理就转入到了AbstractPollingIoProcessor.Processor.run 它的主要流程:
for (;;) {     
       ......
       int selected = selector(final SELECT_TIMEOUT = 1000L);
       nSessions += handleNewSessions();
       updateTrafficMask();
       .......
       if (selected > 0) {
          process();
       }
       ......
}:
可以看到,目前还没看到OP_READ注册,接下来就是做这个事情
   private int handleNewSessions() {
        int addedSessions = 0;
        for (;;) {
            T session = newSessions.poll();
            if (session == null) {
                // All new sessions have been handled
                break;
            }
            if (addNow(session)) {
                // A new session has been created 
                addedSessions++;
            }
        }
        return addedSessions;
    }  
    private boolean addNow(T session) {
        boolean registered = false;
        boolean notified = false;
        
        try {
            init(session);
            registered = true;

            // Build the filter chain of this session.
            session.getService().getFilterChainBuilder().buildFilterChain(
                    session.getFilterChain());
            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
            // in AbstractIoFilterChain.fireSessionOpened().
            ((AbstractIoService) session.getService()).getListeners()
                    .fireSessionCreated(session);
            notified = true;
        } catch (Throwable e) {
           ....
        }
        return registered;
    }
    protected void init(NioSession session) throws Exception {
        SelectableChannel ch = (SelectableChannel) session.getChannel();
        ch.configureBlocking(false);
        session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
   public void fireSessionCreated(IoSession session) {
      ....
      IoFilterChain filterChain = session.getFilterChain(); 
      filterChain.fireSessionCreated();
      filterChain.fireSessionOpened();
      ....
    }
    }

在init方法里面,终于看到了期待的OP_READ注册。然后是FilterChain的调用。在addNow方法里面调用了fireSessionCreated方法,这里会按照注册的顺序调用IoFilter.sessionCreated,最终是IoHandler.sessionCreated.
再调用fireSessionOpened方法,同样会按照注册IoFilter的顺序调用IoFilter.sessionOpened,最终是IoHandler.sessionOpened.
再额外说下FilterChain的机制,主要的属性:
private final EntryImpl head;
private final EntryImpl tail;
private final Map<String, Entry> name2entry = new HashMap<String, Entry>();

正向就是head--> all registered filter -->tail  [fireSessionCreated/fireSessionOpened/fireMessageReceived都是这个顺序]
反向就是tail--> all registered filter -->head
[fireFilterWrite/fireFilterClose就是这个顺序]

既然事件注册了,create/open的处理也完了,那下一轮循环的selected > 0就成立了,进入process的处理流程
process()-->Iterate all session-channal -->read(session) 这个read方法是AbstractPollingIoProcessor.private void read(T session)方法。
个人感觉,这里有很明显的性能问题。
首先看具体的process方法:
 private void process() throws Exception {
        for (Iterator<T> i = selectedSessions(); i.hasNext();) {
            T session = i.next();
            process(session);
            i.remove();
        }
    }

在这里,MINA是遍历所有的有OP_READ事件的session,然后read再fireMessageReceived到我们的IoHandler.messageReceived中,即使我们注册了ExecutorFilter,它也只是让fireMessageReceived方法能被极快的处理。真正的read过程并不是并发处理,这样一来很明显后来的请求要等前面的请求被处理完才能得到处理,这样会造成后来请求的延迟.即使把NioProcessor数目变多也于事无补,因为并发数量大于NioProcessor数目后,总有至少一个NioProcessor上的session总会有排队的现象。

然后还有第二个问题,仔细看read方法
   private void read(T session) {
        IoSessionConfig config = session.getConfig();
        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
        final boolean hasFragmentation = session.getTransportMetadata()
                .hasFragmentation();
        try {
            int readBytes = 0;
            int ret;
            try {
                if (hasFragmentation) {
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }
            if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain();
                filterChain.fireMessageReceived(buf);
                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }           
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            ....
        }
    }

read(session)的主要执行流程是
read data to buffer --> if readBytes> 0 --> IoFilterChain.fireMessageReceived(buf)  IoHandler.messageReceived将在其中被调用。个人怀疑,如果client发送消息是采用消息分片方式发送,一般在client端资源不够或要报文太大,在这种情况下IoHandler.messageReceived,结果就是消息体被分割了若干份,等于我们在IoHandler.message会被调用多次,每次的message都是一个报文片,这个时候就悲剧了。

转到IoFilterChain.fireMessageReceived(buf),责任链模式无需多说,按照注册的顺序调用所有的IoFilter.messageReceived,最后会调用到IoHandler.messageReceived.需要注意的是,传入的message对象是个Object类型,具体的类型是最后一个Filter处理后的结果。但是一般上说来,只有ProtocolCodecFilter才会去更改message type.

在IoHandler.messageReceived里面就是我们具体的业务逻辑。在这里我们很可能需要做 写操作,一般会IoSession.write(Object message) 它返回的是WriteFuture。简单看看write会做什么事情。
 public WriteFuture write(Object message, SocketAddress remoteAddress) {
        .....
        // Now, we can write the message. First, create a future
        WriteFuture writeFuture = new DefaultWriteFuture(this);
        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
        IoFilterChain filterChain = getFilterChain();
        filterChain.fireFilterWrite(writeRequest);
        .....
        return writeFuture;
    }

在这里面会调用filterChain.fireFilterWrite,它会按照IoFilter注册的反向顺序调用IoFilter.filterWrite,按照IoSession.write-->tail-->registered Filter-->head的顺序,最后会调用filterChain里面的headFilter.它的filterWrite实现大概如下:
.....
s.getWriteRequestQueue().offer(s, writeRequest);
            if (!s.isWriteSuspended()) {
                s.getProcessor().flush(s);
            }
            .....

还记得前面NioSocketAcceptor处理OP_ACCEPT的时候在IoSession里面初始化的那个WriteRequestQueue吗,就在这里offer进去,然后把session add到NioProcessor的flushingSessions里面去,然后wakeup Processor.
 public final void flush(T session) {
        boolean needsWakeup = flushingSessions.isEmpty();
        if (scheduleFlush(session) && needsWakeup) {
            wakeup();
        }
    }

    private boolean scheduleFlush(T session) {
        if (session.setScheduledForFlush(true)) {
            // add the session to the queue
            flushingSessions.add(session);
            return true;
        }
        return false;
    }


如果我们没有用到ExecutorFilter,其实所有从read开始的处理全部在单线程环境下包括IoSession.write,所以以Processor的线程环境看,就是process代码处理完。
如果用到了ExecutorFilter,也就是process内部从fireMessageReceived开始是异步处理的。
但是这都不影响整个流程。总之,我们的焦点又切回到了AbstractPollingIoProcessor.Processor.run 看看它在process后要做的事情。
long currentTime = System.currentTimeMillis();
                    flush(currentTime);
                    nSessions -= removeSessions();
                    notifyIdleSessions(currentTime);

private void flush(long currentTime) {
             .....
             for (;;) {
             T session = flushingSessions.poll();
             boolean flushedAll = flushNow(session, currentTime);
                        if (flushedAll
                                && !session.getWriteRequestQueue().isEmpty(session)
                                && !session.isScheduledForFlush()) {
                            scheduleFlush(session);
                        }
             }

 private boolean flushNow(T session, long currentTime) {
final WriteRequestQueue writeRequestQueue = session
                .getWriteRequestQueue();
.....
// Clear OP_WRITE
setInterestedInWrite(session, false);
req = writeRequestQueue.poll(session);
Object message = req.getMessage();
if (message instanceof IoBuffer) {
                    localWrittenBytes = writeBuffer(session, req,
                            hasFragmentation, maxWrittenBytes - writtenBytes,
                            currentTime);
                    if (localWrittenBytes > 0
                            && ((IoBuffer) message).hasRemaining()) {
                        // the buffer isn't empty, we re-interest it in writing 
                        writtenBytes += localWrittenBytes;
                        setInterestedInWrite(session, true);
                        return false;
                    }
                }
.....
}

获取到IoSession.writeRequestQueue --> poll--> write buffer to channel
在细看下writeBuffer这个方法。
private int writeBuffer(T session, WriteRequest req,
            boolean hasFragmentation, int maxLength, long currentTime)
            throws Exception {
        IoBuffer buf = (IoBuffer) req.getMessage();
        int localWrittenBytes = 0;
        if (buf.hasRemaining()) {
            int length;
            if (hasFragmentation) {
                length = Math.min(buf.remaining(), maxLength);
            } else {
                length = buf.remaining();
            }
            for (int i = WRITE_SPIN_COUNT; i > 0; i--) {
                localWrittenBytes = write(session, buf, length);
                if (localWrittenBytes != 0) {
                    break;
                }
            }
        }
        session.increaseWrittenBytes(localWrittenBytes, currentTime);
        if (!buf.hasRemaining() || !hasFragmentation && localWrittenBytes != 0) {
            // Buffer has been sent, clear the current request.
            buf.reset();
            fireMessageSent(session, req);
        }
        return localWrittenBytes;
    }

如果顺利写入所有字节到channel,则reset buffer -->fireMessageSent-->head(Null messageSent) -->registered Filter-->tail-->IoHandler.messageSent
如果还有写入不完整则setInterestedInWrite --> register OP_WRITE-->process OP_WRITE-->add to flushingSessions

关闭连接的处理就简单的描述下
如果关闭连接,IoSession.close-->tail.filterClose-->registered Filter-->head.filterClose-->AbstractPollingIoProcessor.remove(Session) -->add to removingSessions -->wake up
然后NioProcessor在process处理完后,会调用
nSessions -= removeSessions();

AbstractPollingIoProcessor.Processor.run.removeSessions -->AbstractPollingIoProcessor.removeNow -->clearWriteRequestQueue-->destroy(Session) -->fireSessionDestroyed-->head(Null sessionClosed)-->registered Filter-->tail.sessionClosed-->IoHandler.sessionClosed

Note:如果用MINAC客户端Connector,关闭是一定要调用
connector.dispose(); 

该方法通过调用ExecutorService的shutdown()方法停止业务处理线程,并设置内部disposed标志位标识需要停止连接管理器。
分享到:
评论
1 楼 yinlei126 2013-09-03  
这么好的文章没有评论,谢谢

相关推荐

    MINA-2.0.0-M3

    标题中的"MINA-2.0.0-M3"指的是MINA框架的第2.0.0-M3版本。这个版本是一个里程碑版本(M3,Milestones),通常在正式版本发布之前,用于收集用户反馈和进行测试。 MINA的核心设计目标是提供一个灵活、可扩展的网络...

    mina源码+例子mina-2.0.0-M6.zip

    - **src** 目录:包含了MINA框架的源代码,通过阅读源代码,你可以深入理解MINA的设计思想和实现细节。 - **examples** 目录:包含了一系列示例项目,这些项目展示了如何使用MINA来创建不同类型的网络服务,如简单的...

    mina框架测试jar-lib

    标题中的"mina框架测试jar-lib"表明这是一个关于Mina框架的测试库,其中包含了一些必要的JAR文件。描述中提到的"三个jar包:jcl-over-slf4j, mina-core, slf4j-api"是这个测试库的核心组成部分,它们在Mina框架的...

    mina2.0.7所有jar

    2. **mina-example-2.0.7.jar**:这个文件包含了一系列MINA的示例代码,可以帮助开发者快速理解和学习如何使用MINA框架来构建实际应用。 3. **mina-statemachine-2.0.7.jar**:MINA的状态机模块,提供了一种模型来...

    Java springboot 整合mina 框架,nio通讯基础教程,mina框架基础教程.zip

    最后,“MinaDemo”可能是Mina框架的一个示例项目,你可以通过运行和分析这个项目,进一步掌握Mina的实战应用。 总结来说,本教程将引导你从理论到实践,掌握Java NIO的基本原理,理解Mina框架的使用,以及如何在...

    Apache MINA框架相关资料

    标题中的“Apache MINA框架相关资料”涵盖了对MINA框架的全面学习材料,包括中文参考手册、源码分析、API文档和与Spring框架的整合指南。 1. **中文参考手册**(Apache_Mina_Server_2.0中文参考手册V1.0.pdf):这...

    Mina 框架源码解析-构建简单通信程序

    本篇文章将深入解析Mina框架的源码,通过构建一个简单的心跳通信程序,来理解其核心机制。 首先,我们来看一下心跳通信的基本概念。在分布式系统中,心跳机制是保持连接状态和检测网络故障的重要手段。心跳包是周期...

    Mina2.0框架源码剖析

    Mina框架的优势在于其高度模块化的设计,允许开发者根据需求自由组合过滤器和处理器,实现复杂的应用逻辑。此外,Mina还支持多种协议,包括HTTP、FTP、SMTP等,为开发者提供了丰富的选择。 通过深入分析Mina2.0的...

    Mina2.0框架源码剖析.pdf

    Mina2.0框架源码剖析 Mina2.0是一个基于Java的网络应用框架,提供了一个简洁、灵活的API,帮助开发者快速构建高性能的网络应用程序。下面是Mina2.0框架源码剖析的相关知识点: 一、Mina2.0框架概述 Mina2.0是一个...

    基于Android开发MINA框架使用详解

    本教程将深入探讨如何在Android项目中集成MINA框架,以及如何解决在实际开发中可能遇到的问题,如中文乱码和消息接收不全等问题。 1. **MINA框架基础** - MINA的核心是其异步事件驱动模型,它允许开发者通过回调...

    mina框架demo

    标题中的"mina框架demo"是指使用MINA框架创建的一个示例项目,通常这个项目会包含一系列的代码和配置,演示了如何利用MINA来构建网络应用程序。这样的示例对于初学者来说是非常有价值的,因为它可以帮助理解MINA的...

    mina框架实例

    在实际应用中,"mina框架实例"的`src`目录中的代码可能会展示如何创建一个简单的MINA服务器,监听端口,接收连接,以及如何处理来自客户端的数据。开发者可以通过分析这些代码来学习MINA的基本用法,如创建Filter、...

    mina框架中socket使用,有服务端和客户端。

    通过分析这些代码,你可以更深入地理解MINA框架的使用。 8. **学习与实践**:这个项目作为一个完整的示例,非常适合初学者研究和学习MINA框架。通过运行服务端和客户端,可以观察数据交互的过程,理解MINA如何处理...

    Android平台MINA框架使用详细解析

    MINA框架在Android开发中的应用,可以极大地简化网络通信的实现,尤其是对于需要处理大量并发连接和数据传输的场景。 首先,我们需要理解如何在Android Studio中集成MINA框架。这通常涉及将MINA的JAR库或AAR包导入...

    apache-mina-2.0.4.rar_apache mina_mina

    Apache Mina是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。这个"apache-mina-2.0.4.rar"压缩包包含的是Apache Mina 2.0.4版本的源代码,是深入理解和定制Mina的...

    MINA框架使用jar包(Android)

    在Android上使用MINA框架,首先需要引入相应的jar包。这里提供的jar包有log4j-1.2.14.jar,它是一个广泛使用的日志记录库,用于收集和输出应用程序运行时的信息。slf4j-android-1.6.1-RC1.jar和slf4j-api-1.7.6.jar...

    mina2.0.19所需的11个jar包

    Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高度可扩展且功能丰富的网络应用程序框架,它为Java开发人员提供了创建高性能、高效率的网络应用程序的工具。MINA的主要目标是简化网络...

    socket mina测试框架

    - 在测试过程中,Mina框架会帮助捕获和处理网络通信中的异常,同时结合日志系统,记录关键信息,方便后期分析问题和优化。 9. **可扩展性**: - Mina测试框架允许自定义过滤器和处理器,方便扩展新的功能或适配...

    mina框架详解

    # Mina框架详解 ## 一、Mina框架概述 Mina框架,全称为Apache Mina Server,是一款基于Java的高效、可扩展性强大的网络通信应用框架。它主要支持TCP/IP与UDP/IP协议栈,同时也提供了序列化服务、虚拟机管道通信等...

Global site tag (gtag.js) - Google Analytics