`
shuofenglxy
  • 浏览: 194491 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

MINA探究之IoFilterChain

    博客分类:
  • MINA
阅读更多

IoFilterChain主要是维护一个这样的容器,这个容器维护一个IoSesion和相对应的IoFilterChain(1对1关系),都是串行执行的。

IOFilterChain中主要有事件触发的一些方法和保存每一个具体IoFilter的Entry(存放一些具体IoFilter相关的信息,主要包括名字,IoFilter,下一个IoFilter等,以及一些相对应的操作)。

IoFilterChain的一个实现类是:DefaultIoFilterChain。主要实现的功能是给IoFilterChain中增删IoFilter等。具体实现的办法可以用过如下源码来看:

 

 

 

 public synchronized void addFirst(String name, IoFilter filter) {
        checkAddable(name);
        register(head, name, filter);
    }

    public synchronized void addLast(String name, IoFilter filter) {
        checkAddable(name);
        register(tail.prevEntry, name, filter);
    }

    public synchronized void addBefore(String baseName, String name,
            IoFilter filter) {
        EntryImpl baseEntry = checkOldName(baseName);
        checkAddable(name);
        register(baseEntry.prevEntry, name, filter);
    }

    public synchronized void addAfter(String baseName, String name,
            IoFilter filter) {
        EntryImpl baseEntry = checkOldName(baseName);
        checkAddable(name);
        register(baseEntry, name, filter);
    }

    public synchronized IoFilter remove(String name) {
        EntryImpl entry = checkOldName(name);
        deregister(entry);
        return entry.getFilter();
    }

    public synchronized void remove(IoFilter filter) {
        EntryImpl e = head.nextEntry;
        while (e != tail) {
            if (e.getFilter() == filter) {
                deregister(e);
                return;
            }
            e = e.nextEntry;
        }
        throw new IllegalArgumentException("Filter not found: "
                + filter.getClass().getName());
    }

    public synchronized IoFilter remove(Class<? extends IoFilter> filterType) {
        EntryImpl e = head.nextEntry;
        while (e != tail) {
            if (filterType.isAssignableFrom(e.getFilter().getClass())) {
                IoFilter oldFilter = e.getFilter();
                deregister(e);
                return oldFilter;
            }
            e = e.nextEntry;
        }
        throw new IllegalArgumentException("Filter not found: "
                + filterType.getName());
    }

    public synchronized IoFilter replace(String name, IoFilter newFilter) {
        EntryImpl entry = checkOldName(name);
        IoFilter oldFilter = entry.getFilter();
        entry.setFilter(newFilter);
        return oldFilter;
    }

    public synchronized void replace(IoFilter oldFilter, IoFilter newFilter) {
        EntryImpl e = head.nextEntry;
        while (e != tail) {
            if (e.getFilter() == oldFilter) {
                e.setFilter(newFilter);
                return;
            }
            e = e.nextEntry;
        }
        throw new IllegalArgumentException("Filter not found: "
                + oldFilter.getClass().getName());
    }

    public synchronized IoFilter replace(
            Class<? extends IoFilter> oldFilterType, IoFilter newFilter) {
        EntryImpl e = head.nextEntry;
        while (e != tail) {
            if (oldFilterType.isAssignableFrom(e.getFilter().getClass())) {
                IoFilter oldFilter = e.getFilter();
                e.setFilter(newFilter);
                return oldFilter;
            }
            e = e.nextEntry;
        }
        throw new IllegalArgumentException("Filter not found: "
                + oldFilterType.getName());
    }

    public synchronized void clear() throws Exception {
        List<IoFilterChain.Entry> l = new ArrayList<IoFilterChain.Entry>(
                name2entry.values());
        for (IoFilterChain.Entry entry : l) {
            try {
                deregister((EntryImpl) entry);
            } catch (Exception e) {
                throw new IoFilterLifeCycleException("clear(): "
                        + entry.getName() + " in " + getSession(), e);
            }
        }
    }

    private void register(EntryImpl prevEntry, String name, IoFilter filter) {
        EntryImpl newEntry = new EntryImpl(prevEntry, prevEntry.nextEntry,
                name, filter);

        try {
            filter.onPreAdd(this, name, newEntry.getNextFilter());
        } catch (Exception e) {
            throw new IoFilterLifeCycleException("onPreAdd(): " + name + ':'
                    + filter + " in " + getSession(), e);
        }

        prevEntry.nextEntry.prevEntry = newEntry;
        prevEntry.nextEntry = newEntry;
        name2entry.put(name, newEntry);

        try {
            filter.onPostAdd(this, name, newEntry.getNextFilter());
        } catch (Exception e) {
            deregister0(newEntry);
            throw new IoFilterLifeCycleException("onPostAdd(): " + name + ':'
                    + filter + " in " + getSession(), e);
        }
    }

    private void deregister(EntryImpl entry) {
        IoFilter filter = entry.getFilter();

        try {
            filter.onPreRemove(this, entry.getName(), entry.getNextFilter());
        } catch (Exception e) {
            throw new IoFilterLifeCycleException("onPreRemove(): "
                    + entry.getName() + ':' + filter + " in " + getSession(), e);
        }

        deregister0(entry);

        try {
            filter.onPostRemove(this, entry.getName(), entry.getNextFilter());
        } catch (Exception e) {
            throw new IoFilterLifeCycleException("onPostRemove(): " private class HeadFilter extends IoFilterAdapter {
        @SuppressWarnings("unchecked")
        @Override
        public void filterWrite(NextFilter nextFilter, IoSession session,
                WriteRequest writeRequest) throws Exception {

            AbstractIoSession s = (AbstractIoSession) session;

            // Maintain counters.
            if (writeRequest.getMessage() instanceof IoBuffer) {
                IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
                // I/O processor implementation will call buffer.reset()
                // it after the write operation is finished, because
                // the buffer will be specified with messageSent event.
                buffer.mark();
                int remaining = buffer.remaining();
                if (remaining == 0) {
                    // Zero-sized buffer means the internal message
                    // delimiter.
                    s.increaseScheduledWriteMessages();
                } else {
                    s.increaseScheduledWriteBytes(remaining);
                }
            } else {
                s.increaseScheduledWriteMessages();
            }

            s.getWriteRequestQueue().offer(s, writeRequest);
            if (!s.isWriteSuspended()) {
                s.getProcessor().flush(s);
            }
        }

        @SuppressWarnings("unchecked")
        @Override
        public void filterClose(NextFilter nextFilter, IoSession session)
                throws Exception {
            ((AbstractIoSession) session).getProcessor().remove(((AbstractIoSession) session));
        }
    }
  + entry.getName() + ':' + filter + " in " + getSession(), e); } } private void deregister0(EntryImpl entry) { EntryImpl prevEntry = entry.prevEntry; EntryImpl nextEntry = entry.nextEntry; prevEntry.nextEntry = nextEntry; nextEntry.prevEntry = prevEntry; name2entry.remove(entry.name); }

 

 

通过调用这些方法,完成自己对自定制的IoFilterChain的配置。

 

DefaultIoFilterChain实现中默认在头和尾设置了两个Filter。先看head,源码如下:

 

 

 private class HeadFilter extends IoFilterAdapter {
        @SuppressWarnings("unchecked")
        @Override
        public void filterWrite(NextFilter nextFilter, IoSession session,
                WriteRequest writeRequest) throws Exception {

            AbstractIoSession s = (AbstractIoSession) session;

            // Maintain counters.
            if (writeRequest.getMessage() instanceof IoBuffer) {
                IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
                // I/O processor implementation will call buffer.reset()
                // it after the write operation is finished, because
                // the buffer will be specified with messageSent event.
                buffer.mark();
                int remaining = buffer.remaining();
                if (remaining == 0) {
                    // Zero-sized buffer means the internal message
                    // delimiter.
                    s.increaseScheduledWriteMessages();
                } else {
                    s.increaseScheduledWriteBytes(remaining);
                }
            } else {
                s.increaseScheduledWriteMessages();
            }

            s.getWriteRequestQueue().offer(s, writeRequest);
            if (!s.isWriteSuspended()) {
                s.getProcessor().flush(s);
            }
        }

        @SuppressWarnings("unchecked")
        @Override
        public void filterClose(NextFilter nextFilter, IoSession session)
                throws Exception {
            ((AbstractIoSession) session).getProcessor().remove(((AbstractIoSession) session));
        }
    }

 

 

Head主要的作用是做一个数据统计维持AbstractIoSession 中 scheduledWriteMessages这个统计量。然后将这个session放入 writeRequestQueue中等待写入。当这个IoFilter关闭时,则从IoProcessor中删除这个session以释放占用的相关资源。

 

tail相关源码如下:

 

 

private static class TailFilter extends IoFilterAdapter {

        @Override

        public void sessionCreated(NextFilter nextFilter, IoSession session)

                throws Exception {

            try {

                session.getHandler().sessionCreated(session);

            } finally {

                // Notify the related future.

                ConnectFuture future = (ConnectFuture) session

                        .removeAttribute(SESSION_CREATED_FUTURE);

                if (future != null) {

                    future.setSession(session);

                }

            }

        }

 

        @Override

        public void sessionOpened(NextFilter nextFilter, IoSession session)

                throws Exception {

            session.getHandler().sessionOpened(session);

        }

 

        @Override

        public void sessionClosed(NextFilter nextFilter, IoSession session)

                throws Exception {

            AbstractIoSession s = (AbstractIoSession) session;

            try {

                s.getHandler().sessionClosed(session);

            } finally {

                try {

                    s.getWriteRequestQueue().dispose(session);

                } finally {

                    try {

                        s.getAttributeMap().dispose(session);

                    } finally {

                        try {

                            // Remove all filters.

                            session.getFilterChain().clear();

                        } finally {

                            if (s.getConfig().isUseReadOperation()) {

                                s.offerClosedReadFuture();

                            }

                        }

                    }

                }

            }

        }

 

        @Override

        public void sessionIdle(NextFilter nextFilter, IoSession session,

                IdleStatus status) throws Exception {

            session.getHandler().sessionIdle(session, status);

        }

 

        @Override

        public void exceptionCaught(NextFilter nextFilter, IoSession session,

                Throwable cause) throws Exception {

            AbstractIoSession s = (AbstractIoSession) session;

            try {

                s.getHandler().exceptionCaught(s, cause);

            } finally {

                if (s.getConfig().isUseReadOperation()) {

                    s.offerFailedReadFuture(cause);

                }

            }

        }

 

        @Override

        public void messageReceived(NextFilter nextFilter, IoSession session,

                Object message) throws Exception {

            AbstractIoSession s = (AbstractIoSession) session;

            if (!(message instanceof IoBuffer)) {

                s.increaseReadMessages(System.currentTimeMillis());

            } else if (!((IoBuffer) message).hasRemaining()) {

                s.increaseReadMessages(System.currentTimeMillis());

            }

 

            try {

                session.getHandler().messageReceived(s, message);

            } finally {

                if (s.getConfig().isUseReadOperation()) {

                    s.offerReadFuture(message);

                }

            }

        }

 

        @Override

        public void messageSent(NextFilter nextFilter, IoSession session,

                WriteRequest writeRequest) throws Exception {

            session.getHandler()

                    .messageSent(session, writeRequest.getMessage());

        }

 

        @Override

        public void filterWrite(NextFilter nextFilter, IoSession session,

                WriteRequest writeRequest) throws Exception {

            nextFilter.filterWrite(session, writeRequest);

        }

 

        @Override

        public void filterClose(NextFilter nextFilter, IoSession session)

                throws Exception {

            nextFilter.filterClose(session);

        }

    }

 

 

这个IoFilter主要是响应一些事件。具体通过方法名比较容易明白,这里不再罗嗦。

 

 

值得注意的是,所有的自定义的IoFilter插入都放在了head之后,tail之前。可以定义Logger,序列化工具,业务线程池等多个需要IoFilter。真正改进性能的地方可能就是业务处理的话,业务处理线程池,序列化的话,看能不能改进序列化的效率,如采用protocalBuffer这种东东了。具体的等谈Mina的Thread model的时候在展开一些池的使用场景吧。

 

关于IoFilter的使用可以直接参阅 http://mina.apache.org/iofilter.html 的内容来选择合适的filter来实现想要的功能。

 

 

分享到:
评论

相关推荐

    Mina开发之客户端

    《Mina开发之客户端详解》 Apache Mina(Minimum Asynchronous Network)是一个高度可扩展的、高性能的网络应用框架,主要用于构建服务器端的网络应用程序。它简化了网络编程的复杂性,提供了基于事件驱动和异步I/O...

    mina的高级使用,mina文件图片传送,mina发送文件,mina报文处理,mina发送xml和json

    Apache Mina是一个开源的网络通信应用框架,主要应用于Java平台,它为高性能、高可用性的网络应用程序提供了基础架构。在本文中,我们将深入探讨Mina的高级使用,特别是在文件图片传送、文件发送、XML和JSON报文处理...

    Mina入门:mina版之HelloWorld

    **Mina入门:Mina版之HelloWorld** Apache Mina是一个开源项目,它提供了一个高度模块化、高性能的网络通信框架。Mina旨在简化网络应用的开发,支持多种传输协议,如TCP、UDP、HTTP、FTP等。在这个“Mina入门:Mina...

    apache-mina-2.0.4.rar_apache mina_mina

    1. **Filter Chain**:Mina的核心设计模式之一是过滤器链。每个连接都有一系列过滤器,它们按照顺序处理入站和出站事件。过滤器可以实现特定功能,如数据编码解码、安全验证、性能监控等。 2. **Session**:Session...

    Mina开发之服务器

    Mina开发之服务器的代码,详情请查看:http://www.cnblogs.com/getherBlog/p/3937196.html Mina开发之客户端的代码,详情请查看:http://www.cnblogs.com/getherBlog/p/3937196.html

    mina连接 mina心跳连接 mina断线重连

    Apache Mina是一个开源的网络通信框架,常用于构建高性能、高效率的服务端应用程序,尤其在Java平台上。在本文中,我们将深入探讨Mina的核心概念,包括连接管理、心跳机制以及断线重连策略。 首先,让我们理解"Mina...

    mina.zip内涵所有mina所需jar包

    Mina.jar可能包含了Mina的核心组件,如IoSession接口,IoFilterChain,以及各种I/O处理器等。此外,它可能还包含了支持不同协议(如TCP,UDP)的处理器,以及各种I/O事件的监听器。 在压缩包子文件的文件名称列表中...

    MINA_API+MINA_DOC+mina

    MINA (Java IO Network Application Framework) 是一个由Apache软件基金会开发的开源网络通信框架,主要应用于构建高性能、高可用性的网络服务器。这个压缩包包含了MINA API文档、自学手册以及开发指南,对于学习和...

    Mina+Socket通信

    Mina和Socket是两种常见的网络通信框架和技术,它们在Java编程环境中被广泛使用。本篇文章将深入探讨如何使用Mina与Socket实现通信,并提供客户端和服务端的实现代码概述。 Mina(全称“MINA: Minimalistic ...

    基于 MINA 的 TLS/SSL NIO Socket 实现(二)

    在MINA的网络编程中,我们通常创建一个IoFilterChain,并将SSLFilter添加到其中,以启用SSL/TLS支持。以下是一段创建SSLFilter并添加到过滤器链的基本代码示例: ```java SslFilter sslFilter = new SslFilter(ssl...

    apache-mina源码

    - **核心类库**:包含IoAcceptor、IoConnector、IoSession、IoFilterChain等核心类的实现。 - **协议处理**:MINA提供了一些预定义的协议处理器,如TCP、UDP的实现,以及一些基础的协议编码解码器。 - **示例应用**...

    mina2.0 含11个jar包

    mina-core-2.0.0-M6.jar mina-example-2.0.0-M6.jar mina-filter-codec-netty-2.0.0-M6.jar mina-filter-compression-2.0.0-M6.jar mina-integration-beans-2.0.0-M6.jar mina-integration-jmx-2.0.0-M6.jar mina-...

    mina新手教程源码 mina+springboot+idea最简单的案例。

    mina新手案例,mina新手教程源码 mina+springboot最简单的案例。用的IDEA * mina服务端 * 1、添加@Controller注解和 @PostConstruct注解,代表启动springboot项目时也调用该类下的该方法, * 启动springboot项目...

    给予mina 协议进行大数据传输

    标题中的“给予mina协议进行大数据传输”指的是一种基于Java的网络通信框架——Apache MINA(Model-View-Controller for Network Applications)。MINA是Apache软件基金会的一个项目,它提供了一个高度可扩展和高...

    Java springboot 整合mina 框架,nio通讯基础教程,mina框架基础教程.zip

    Java SpringBoot 整合Mina框架,涉及到的核心技术主要包括Java NIO(非阻塞I/O)、Mina框架以及SpringBoot的集成应用。本教程旨在帮助开发者深入理解和掌握这些技术,并提供了一个可直接使用的基础平台框架。 Java ...

    Mina开发实例(服务端、客户端)DEMO

    Apache Mina是一个高度可扩展的网络通信框架,它允许开发者创建高性能、高效率的服务端和客户端应用程序。在Java世界中,Mina以其简洁的API和灵活性而受到青睐,尤其适用于处理大量的并发连接,如TCP/IP和UDP协议。...

    mina自定义编解码器详解

    **mina自定义编解码器详解** mina是一个Java开发的网络通信框架,广泛应用于TCP和UDP协议的服务器和客户端开发。在mina框架中,编解码器(Codec)扮演着至关重要的角色,它负责将应用层的数据转换为网络传输的字节...

    Mina官网例子之时间服务器

    **Mina官网例子之时间服务器** Apache Mina(Minimum Asynchronous Network)是一个高度可扩展的网络通信框架,它为各种协议提供了低级别的基础结构,包括TCP/IP和UDP/IP。Mina的目标是简化网络编程,使其变得高效...

    mina demo mina jar包

    Apache Mina是一个开源项目,它提供了一个高度可扩展的网络通信框架,用于简化开发高性能、高可用性的网络服务器和客户端应用程序。"Mina demo mina jar包"指的是使用Apache Mina框架创建的一个演示示例,这个示例...

    spring boot 整合mina 串口

    **Spring Boot 整合Mina实现串口通信详解** 在Java开发中,有时我们需要与硬件设备进行串口通信,例如读取传感器数据或控制工业设备。Spring Boot作为一款轻量级的框架,使得快速构建应用变得简单。而Mina则是一款...

Global site tag (gtag.js) - Google Analytics