`
uniseraph
  • 浏览: 83408 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

[mina指南]诡异的IoFilterChain实现

阅读更多
IoFilterChain位于通讯层与业务层之间,负责将byte[]转化成业务层需要的业务逻辑bean,在mina框架中起着承前启后的作用。




DefaultIoFilterChain的构建
在初始话的时候,DefaultIoFilterChain的构造函数如下:
    public DefaultIoFilterChain(AbstractIoSession session) {
        if (session == null) {
            throw new NullPointerException("session");
        }

        this.session = session;
        head = new EntryImpl(null, null, "head", new HeadFilter());
        tail = new EntryImpl(head, null, "tail", new TailFilter());
        head.nextEntry = tail;
    }


整体结构图:


IoFilterChain初始化的典型代码
        IoAcceptor acceptor = new NioSocketAcceptor();
        acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
        acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
   
        acceptor.setHandler(  new TimeServerHandler() );


初始化以后的结构图为


Message received流程
1. NioProcessor的工作者线程一旦发现有数据来则分别处理各个Session上的数据;
public void run() {
            int nSessions = 0;
            lastIdleCheckTime = System.currentTimeMillis();

            for (;;) {
                try {
                    boolean selected = select(1000);

                    nSessions += add();
                    updateTrafficMask();

                    if (selected) {
                        process();
                    }

          //以下省略



    private void process() throws Exception {
        for (Iterator<T> i = selectedSessions(); i.hasNext();) {
            process(i.next());
            i.remove();
        }
    }
    private void process(T session) {

        if (isReadable(session) && session.getTrafficMask().isReadable()) {
            read(session);
        }

        if (isWritable(session) && session.getTrafficMask().isWritable()) {
            scheduleFlush(session);
        }
    }

2. 读取session上的所有数据,传给session的IoFilterChain
 private void read(T session) {
            //读取socket的数据,具体省略

            if (readBytes > 0) {


                session.getFilterChain().fireMessageReceived(buf);
          

                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                scheduleRemove(session);
            }
            session.getFilterChain().fireExceptionCaught(e);
        }
    }


3. DefaultIoFilterChain调用HeadFilter的
    public void fireMessageReceived(Object message) {
        if (message instanceof IoBuffer) {
            session.increaseReadBytes(
                    ((IoBuffer) message).remaining(),
                    System.currentTimeMillis());
        }

        Entry head = this.head;
        callNextMessageReceived(head, session, message);
    }

    private void callNextMessageReceived(
            Entry entry, IoSession session, Object message) {
        try {
            entry.getFilter().messageReceived(
                    entry.getNextFilter(), session, message);
        } catch (Throwable e) {
            fireExceptionCaught(e);
        }
    }


4. 如上图head.getFilter()得到的是HeadFilter,注意EntryImpl的构造函数
  public DefaultIoFilterChain(AbstractIoSession session) {
        if (session == null) {
            throw new NullPointerException("session");
        }

        this.session = session;
        head = new EntryImpl(null, null, "head", new HeadFilter());
        tail = new EntryImpl(head, null, "tail", new TailFilter());
        head.nextEntry = tail;
    }

      private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry,
                String name, IoFilter filter) {
            if (filter == null) {
                throw new NullPointerException("filter");
            }
            if (name == null) {
                throw new NullPointerException("name");
            }

            this.prevEntry = prevEntry;
            this.nextEntry = nextEntry;
            this.name = name;
            this.filter = filter;
            this.nextFilter = new NextFilter() {
                public void sessionCreated(IoSession session) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionCreated(nextEntry, session);
                }

                public void sessionOpened(IoSession session) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionOpened(nextEntry, session);
                }

                public void sessionClosed(IoSession session) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionClosed(nextEntry, session);
                }

                public void sessionIdle(IoSession session, IdleStatus status) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionIdle(nextEntry, session, status);
                }

                public void exceptionCaught(IoSession session, Throwable cause) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextExceptionCaught(nextEntry, session, cause);
                }

                public void messageReceived(IoSession session, Object message) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextMessageReceived(nextEntry, session, message);
                }

                public void messageSent(IoSession session,
                        WriteRequest writeRequest) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextMessageSent(nextEntry, session, writeRequest);
                }

                public void filterWrite(IoSession session,
                        WriteRequest writeRequest) {
                    Entry nextEntry = EntryImpl.this.prevEntry;
                    callPreviousFilterWrite(nextEntry, session, writeRequest);
                }

                public void filterClose(IoSession session) {
                    Entry nextEntry = EntryImpl.this.prevEntry;
                    callPreviousFilterClose(nextEntry, session);
                }

                public void filterSetTrafficMask(IoSession session,
                        TrafficMask trafficMask) {
                    Entry nextEntry = EntryImpl.this.prevEntry;
                    callPreviousFilterSetTrafficMask(nextEntry, session, trafficMask);
                }
            };
        }





5. HeadFilter除了调用LogingFilter.MessageRecevied什么都不做
        
        public void messageReceived(NextFilter nextFilter, IoSession session,
                Object message) {
            nextFilter.messageReceived(session, message);
        }


6. LoggingFilter和ProtocolCodecFilter都是在执行自己的逻辑之后调用下一个IoFilter
    public void messageReceived(NextFilter nextFilter, IoSession session,
            Object message) throws Exception {
        log(IoEventType.MESSAGE_RECEIVED, "RECEIVED: {}", message);
        nextFilter.messageReceived(session, message);
    }



7. 在TailFilter的MessageReceved则将消息转发给Session的handler,完成传递
        @Override
        public void messageReceived(NextFilter nextFilter, IoSession session,
                Object message) throws Exception {
            AbstractIoSession s = (AbstractIoSession) session;
            if (!(message instanceof IoBuffer)) {
                s.increaseReadMessages(System.currentTimeMillis());
            } else if (!((IoBuffer) message).hasRemaining()) {
                s.increaseReadMessages(System.currentTimeMillis());
            }

            try {
                session.getHandler().messageReceived(s, message);
            } finally {
                if (s.getConfig().isUseReadOperation()) {
                    s.offerReadFuture(message);
                }
            }
        }


IoFilter事件执行


IoFilter一共处理10种事件
1 created
2 opened
3 closed
4 sessionIdle
5 exceptionCaught
6 messageReceived
7 messageSent
8 fireWrite
9 fireFilterClose
10 filterSetTrafficMask

其中1-7种事件都是从前往后执行,依次为:
HeadFilter->filter1->filter2->...->filterN->TailFilter->IoHandler

而8-10正好相反,如fireWrite的执行顺序为:
iosession.write->TailFilter->filterN->....-->filter1->HeadFilter







  • 大小: 116.1 KB
  • 大小: 86 KB
  • 大小: 96 KB
分享到:
评论
3 楼 uniseraph 2011-11-01  
ubuntu dia
2 楼 thomescai 2011-10-14  
请问,你这个类图用什么工具画得?
1 楼 nextw3 2011-03-03  
MINA为什么要采用这种过滤链结构那?请赐教

相关推荐

    MINA长连接框架实现通讯

    在"MINA长连接框架实现通讯"的场景中,我们要讨论的核心知识点包括以下几个部分: 1. **MINA框架基础**:MINA是一个基于NIO(Non-blocking I/O)的框架,它提供了一种处理大量并发连接的方式,通过非阻塞I/O模型,...

    基于 MINA 的 TLS/SSL NIO Socket 实现(二)

    在本篇博文中,我们将深入探讨如何利用Apache MINA库实现基于TLS/SSL的NIO(非阻塞I/O)Socket通信。MINA是一个高度可扩展的网络应用框架,广泛用于构建高性能、高并发的网络应用程序,如服务器端的TCP和UDP服务。...

    Mina 框架研究与实现

    ### Mina框架研究与实现 #### 引言 在当今高度网络化的世界中,服务器端程序面临着前所未有的挑战,特别是当需要同时处理成百上千的客户端连接时。这不仅要求服务器具备高性能,还必须保证高可用性。Mina框架正是...

    mina 实现简单通讯

    在这个“mina 实现简单通讯”的项目中,我们看到了一个基于MINA的基本通信实现,涵盖了服务端和客户端的交互。 首先,MINA的核心组件包括`IoSession`,它是网络连接的抽象,包含了与特定连接相关的所有信息,如输入...

    Mina自定义协议简单实现

    **Mina自定义协议简单实现** Apache Mina(Minimum Asynchronous Network)是一个开源的网络通信框架,它为Java开发者提供了一种高效、灵活且可扩展的框架,用于构建高性能的网络应用程序,如服务器和客户端应用。...

    Mina消息发送简单实现

    基于Mina的网络通讯,分为服务端和客户端。 研究selector NIO实现时,发现了这个架构。...Mina的底层实现实际就是selector和SocketChannel。所以如果对Mina源码感兴趣的可以先去看下selector相关的例子。

    Mina+Socket通信

    本篇文章将深入探讨如何使用Mina与Socket实现通信,并提供客户端和服务端的实现代码概述。 Mina(全称“MINA: Minimalistic Application Networking API”)是Apache软件基金会的一个开源项目,它为开发者提供了一...

    Mina实现长连接和短连接实例

    在这个实例中,我们将探讨如何使用Mina实现长连接和短连接。 首先,理解长连接和短连接的概念至关重要。在TCP/IP通信中,短连接(Short Connection)是指一次数据传输完成后立即关闭连接,而长连接(Long ...

    Apache_MINA_2_用户指南.pdf

    Apache MINA 2 用户指南 Apache MINA 2 是一个基于 Java 语言的网络应用框架,旨在帮助开发者快速构建高性能、可靠、可扩展的网络应用程序。该框架提供了一个灵活的架构,使得开发者可以轻松地构建各种类型的网络...

    Apache MINA 2.0 用户指南中英文对照阅读版[带书签]

    本资源包含两个 pdf 文档,一本根据官方最新文档 (http://mina.apache.org/mina-project/userguide/user-guide-toc.html) 整理的 mina_2.0_user_guide_en.pdf,一个中文翻译的 mina_2.0_user_guide_cn.pdf。...

    mina服务器和客服端实现

    《mina服务器和客户端实现详解》 Apache Mina(Minimum Asynchronous Network)是一个高度可扩展的网络通信框架,它为开发者提供了构建高性能、高可用性的网络应用程序的基础。在本文中,我们将深入探讨如何利用...

    mina服务器--实现纯文本和非纯文本的加密通讯

    在这个主题中,“mina服务器--实现纯文本和非纯文本的加密通讯”涉及了如何使用MINA来确保数据传输的安全性。 MINA提供了丰富的API和工具,使得开发者可以方便地创建基于TCP和UDP协议的服务器端和客户端应用。在...

    websocket+java服务器(mina)

    以上就是关于使用Mina框架实现WebSocket服务器的关键知识点,通过深入理解和实践,你可以构建出高性能、可扩展的WebSocket服务。提供的压缩包文件可能包含了实现示例代码和HTML客户端,可以帮助你更直观地理解整个...

    apache-mina-2.0.4.rar_apache mina_mina

    6. **Transport Layer**:Mina支持多种传输层实现,如TCP、UDP等,这些都抽象为IoAcceptor和IoConnector接口,方便开发者使用。 深入研究源码,你可能会关注以下方面: - **mina-core**模块:这是Mina的核心库,...

    mina的高级使用,mina文件图片传送,mina发送文件,mina报文处理,mina发送xml和json

    Mina的ProtocolDecoder和ProtocolEncoder接口是实现这一过程的关键。 5. **Mina发送XML和JSON** XML和JSON作为常见的数据交换格式,经常在分布式系统中使用。Mina可以通过集成如JAXB(Java Architecture for XML ...

    使用mina框架实现cmpp2.0服务端

    **使用mina实现CMPP2.0服务端的关键点** 1. **连接管理**:使用Mina的Acceptor来监听特定端口,接受来自移动网关的连接请求。每个连接通常对应一个独立的工作线程,处理来自客户端的CMPP请求。 2. **会话建立**:...

    基于MINA2实现的UDP双向通信源码

    本源码是《NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战》一文的服务端实现(MINA2版),详见:http://www.52im.net/thread-378-1-1.html

    Apache MINA 2.0 用户指南( 缺第一章节)

    ### Apache MINA 2.0 用户指南:基础知识 #### 基础概念介绍 Apache MINA 2.0 是一款高性能且易于使用的网络应用程序框架,它简化了开发人员在网络编程方面的负担,允许开发者专注于应用程序的核心功能,而不是底层...

    MINA_API+MINA_DOC+mina

    这个压缩包包含了MINA API文档、自学手册以及开发指南,对于学习和理解MINA框架有极大的帮助。 首先,`MINA-2.0.0-API.chm` 文件是MINA 2.0版本的API帮助文档,它是以CHM(Compiled Help Manual)格式编译的Windows...

Global site tag (gtag.js) - Google Analytics