`
Donald_Draper
  • 浏览: 980116 次
社区版块
存档分类
最新评论

Mina 过滤链抽象实现

    博客分类:
  • Mina
阅读更多
Mina 过滤器定义:http://donald-draper.iteye.com/blog/2376161
Mina 日志过滤器与引用计数过滤器:http://donald-draper.iteye.com/blog/2376226
Mina 过滤链默认构建器:http://donald-draper.iteye.com/blog/2375985
引言:
在过滤链默认构建器文章我们看了一下过滤链默认构建器和过滤链的定义,先来回顾一下:
     过滤器链IoFilterChain用Entry存放过滤器对,即每个过滤器IoFilter关联一个后继过滤器NextFilter。我们可以通过滤器名name或过滤器实例ioFilter或过滤器类型获取相应的过滤器或过滤器对应的Entry。fireMessage*/exceptionCaught相关方法为触发IoHandler的相关事件,fireFilterWrite/Close触发的是,会话的相关事件IoSession#write/close。
     DefaultIoFilterChainBuilder用entries列表(CopyOnWriteArrayList<DefaultIoFilterChainBuilder.EntryImpl>)来管理过滤器;添加过滤器,移除过滤器,及判断是否包含过滤器都是依赖于CopyOnWriteArrayList的相关功能。buildFilterChain方法是将默认过滤器链构建器的过滤器集合中的过滤器添加到指定的过滤链IoFilterChain上。DefaultIoFilterChainBuilder的过滤器EntryImpl中的getNextFilter并没有实际作用,即无效,这就说明了DefaultIoFilterChainBuilder只用于在创建会话时,构建过滤器链。创建完毕后,对过滤器链构建器的修改不会影响到会话实际的过滤器链IoFilterChain(SocketFilterChain,DatagramFilterChain...)。
今天我们来看一下过滤链的抽象实现AbstractIoFilterChain:
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilter;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoFilterLifeCycleException;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoFilter.NextFilter;
import org.apache.mina.common.IoFilter.WriteRequest;
import org.apache.mina.util.ByteBufferUtil;
import org.apache.mina.util.SessionLog;

/**
 * An abstract implementation of {@link IoFilterChain} that provides
 * common operations for developers to implement their own transport layer.
 * <p>
AbstractIoFilterChain未过滤链的抽象实现,为开发者提供了传输层的一般操作。
开发者仅需要实现#doWrite方法,当过滤器拦截到会话发送消息,则调用此方法。
 * The only method a developer should implement is
 * {@link #doWrite(IoSession, IoFilter.WriteRequest)}.  This method is invoked
 * when filter chain is evaluated for
 * {@link IoFilter#filterWrite(NextFilter, IoSession, IoFilter.WriteRequest)} and
 * finally to be written out.
 *
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public abstract class AbstractIoFilterChain implements IoFilterChain {
    /**
     * A session attribute that stores a {@link ConnectFuture} related with
     * the {@link IoSession}.  {@link AbstractIoFilterChain} clears this
     * attribute and notifies the future when {@link #fireSessionOpened(IoSession)}
     * or {@link #fireExceptionCaught(IoSession, Throwable)} is invoked
     */
    public static final String CONNECT_FUTURE = AbstractIoFilterChain.class
            .getName()
            + ".connectFuture";
    private final IoSession session;//过滤链关联会话
    //HashMap<String,EntryImpl>,key为过滤器名,value为过滤器Entry
    private final Map name2entry = new HashMap();
    private final EntryImpl head;//过滤链头
    private final EntryImpl tail;//过滤链尾
    protected AbstractIoFilterChain(IoSession session) {
        if (session == null) {
            throw new NullPointerException("session");
        }
        //初始化Session
        this.session = session;
	//初始化过滤链头为HeadFilter EntryImpl
        head = new EntryImpl(null, null, "head", new HeadFilter());
	//初始化过滤链尾为TailFilter EntryImpl
        tail = new EntryImpl(head, null, "tail", new TailFilter());
        head.nextEntry = tail;
    }
}

从上来看AbstractIoFilterChain内部关联一个IoSession,用EntryImp来包装过滤器,过滤链中
用HashMap<String,EntryImpl>来存放过滤器Entry,key为过滤器名,value为过滤器Entry;
过滤链头为HeadFilter,链尾为TailFilter。

先来看一下过滤器Entry的实现
 private class EntryImpl implements Entry {
        private EntryImpl prevEntry;//前驱
        private EntryImpl nextEntry;//后继
        private final String name;//过滤器名
        private final IoFilter filter;//关联过滤器
        private final NextFilter nextFilter;//过滤器后继
        private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry,
                String name, IoFilter filter) {
            if (filter == null) {
                throw new NullPointerException("filter");
            }
            if (name == null) {
                throw new NullPointerException("name");
            }
            this.prevEntry = prevEntry;
            this.nextEntry = nextEntry;
            this.name = name;
            this.filter = filter;
            this.nextFilter = new NextFilter() {
                public void sessionCreated(IoSession session) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionCreated(nextEntry, session);
                }
                public void sessionOpened(IoSession session) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionOpened(nextEntry, session);
                }
                public void sessionClosed(IoSession session) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionClosed(nextEntry, session);
                }
                public void sessionIdle(IoSession session, IdleStatus status) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionIdle(nextEntry, session, status);
                }
                public void exceptionCaught(IoSession session, Throwable cause) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextExceptionCaught(nextEntry, session, cause);
                }
                public void messageReceived(IoSession session, Object message) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextMessageReceived(nextEntry, session, message);
                }
                public void messageSent(IoSession session, Object message) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextMessageSent(nextEntry, session, message);
                }
                public void filterWrite(IoSession session,
                        WriteRequest writeRequest) {
                    Entry nextEntry = EntryImpl.this.prevEntry;
                    callPreviousFilterWrite(nextEntry, session, writeRequest);
                }
                public void filterClose(IoSession session) {
                    Entry nextEntry = EntryImpl.this.prevEntry;
                    callPreviousFilterClose(nextEntry, session);
                }
            };
        }
        public String getName() {
            return name;
        }
        public IoFilter getFilter() {
            return filter;
        }
        public NextFilter getNextFilter() {
            return nextFilter;
        }
        public String toString() {
            return "(" + getName() + ':' + filter + ')';
        }
}

我们来看一个EntryImpl的后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,我们挑一个来看:
//AbstractIoFilterChain-EntryImpl
//会话创建事件
 public void sessionCreated(IoSession session) {
     //后去当前Entry的后继Entry
     Entry nextEntry = EntryImpl.this.nextEntry;
      //将会话创建时间传递给Entry对应的过滤器

     callNextSessionCreated(nextEntry, session);
 }

//AbstractIoFilterChain
 //将会话创建时间传递给Entry对应的过滤器
 private void callNextSessionCreated(Entry entry, IoSession session) {
        try {
            entry.getFilter().sessionCreated(entry.getNextFilter(), session);
        } catch (Throwable e) {
            fireExceptionCaught(session, e);
        }
    }

其他相关的IoHandler和IoSession事件处理与此方法相似。
//AbstractIoFilterChain-EntryImpl
 public void filterClose(IoSession session) {
    Entry nextEntry = EntryImpl.this.prevEntry;
    callPreviousFilterClose(nextEntry, session);
}

//AbstractIoFilterChain
private void callPreviousFilterClose(Entry entry, IoSession session) {
     try {
         entry.getFilter().filterClose(entry.getNextFilter(), session);
     } catch (Throwable e) {
         fireExceptionCaught(session, e);
     }
 }

从callPreviousFilterClose方法来看虽然从方法面为向前转递FilterClose事件,实际上是
传给当前过滤器,并转发给其后继过滤器,这个是因为过滤链HashMap<String,EntryImpl>是双向。
这里我们只贴处理其他事件的代码,不多解释
//AbstractIoFilterChain
 private void callNextSessionOpened(Entry entry, IoSession session) {
        try {
            entry.getFilter().sessionOpened(entry.getNextFilter(), session);
        } catch (Throwable e) {
            fireExceptionCaught(session, e);
        }
    }
private void callNextSessionClosed(Entry entry, IoSession session) {
        try {
            entry.getFilter().sessionClosed(entry.getNextFilter(), session);

        } catch (Throwable e) {
            fireExceptionCaught(session, e);
        }
    }
 private void callNextSessionIdle(Entry entry, IoSession session,
            IdleStatus status) {
        try {
            entry.getFilter().sessionIdle(entry.getNextFilter(), session,
                    status);
        } catch (Throwable e) {
            fireExceptionCaught(session, e);
        }
    }
private void callNextMessageReceived(Entry entry, IoSession session,
            Object message) {
        try {
            entry.getFilter().messageReceived(entry.getNextFilter(), session,
                    message);
        } catch (Throwable e) {
            fireExceptionCaught(session, e);
        }
    }
private void callNextMessageSent(Entry entry, IoSession session,
            Object message) {
        try {
            entry.getFilter().messageSent(entry.getNextFilter(), session,
                    message);
        } catch (Throwable e) {
            fireExceptionCaught(session, e);
        }
    }
 private void callNextExceptionCaught(Entry entry, IoSession session,
            Throwable cause) {
        try {
            entry.getFilter().exceptionCaught(entry.getNextFilter(), session,
                    cause);
        } catch (Throwable e) {
            SessionLog.warn(session,
                    "Unexpected exception from exceptionCaught handler.", e);
        }
    }
private void callPreviousFilterWrite(Entry entry, IoSession session,
            WriteRequest writeRequest) {
        try {
            entry.getFilter().filterWrite(entry.getNextFilter(), session,
                    writeRequest);
        } catch (Throwable e) {
            writeRequest.getFuture().setWritten(false);
            fireExceptionCaught(session, e);
        }
    }

EntryImpl是过滤器在过滤链上存在的形式,EntryImpl有一个前驱和一个后继,内部包裹一个过滤器 with name,及过滤器的后继过滤器NextFilter。后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,主要是将事件转发给后继Entry对应的过滤器。
再来看一下过滤链头HeadFilter
//HeadFilter
 private class HeadFilter extends IoFilterAdapter {
        public void sessionCreated(NextFilter nextFilter, IoSession session) {
            nextFilter.sessionCreated(session);
        }

        public void sessionOpened(NextFilter nextFilter, IoSession session) {
            nextFilter.sessionOpened(session);
        }

        public void sessionClosed(NextFilter nextFilter, IoSession session) {
            nextFilter.sessionClosed(session);
        }

        public void sessionIdle(NextFilter nextFilter, IoSession session,
                IdleStatus status) {
            nextFilter.sessionIdle(session, status);
        }

        public void exceptionCaught(NextFilter nextFilter, IoSession session,
                Throwable cause) {
            nextFilter.exceptionCaught(session, cause);
        }

        public void messageReceived(NextFilter nextFilter, IoSession session,
                Object message) {
            nextFilter.messageReceived(session, message);
        }

        public void messageSent(NextFilter nextFilter, IoSession session,
                Object message) {
            nextFilter.messageSent(session, message);
        }

        public void filterWrite(NextFilter nextFilter, IoSession session,
                WriteRequest writeRequest) throws Exception {
            if (session.getTransportType().getEnvelopeType().isAssignableFrom(
                    writeRequest.getMessage().getClass())) {
                doWrite(session, writeRequest);
            } else {
                throw new IllegalStateException(
                        "Write requests must be transformed to "
                                + session.getTransportType().getEnvelopeType()
                                + ": " + writeRequest);
            }
        }
        public void filterClose(NextFilter nextFilter, IoSession session)
                throws Exception {
            doClose(session);
        }
}

从HeadFilter的定义来看,HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;
有两个方法有所不同:
//HeadFilter
//会话写操作
public void filterWrite(NextFilter nextFilter, IoSession session,
        WriteRequest writeRequest) throws Exception {
    if (session.getTransportType().getEnvelopeType().isAssignableFrom(
            writeRequest.getMessage().getClass())) {
        doWrite(session, writeRequest);
    } else {
        throw new IllegalStateException(
                "Write requests must be transformed to "
                        + session.getTransportType().getEnvelopeType()
                        + ": " + writeRequest);
    }
}

//AbstractIoFilterChain,待子类扩展
protected abstract void doWrite(IoSession session, WriteRequest writeRequest)
            throws Exception;

//HeadFilter
//会话关闭
public void filterClose(NextFilter nextFilter, IoSession session)
        throws Exception {
    doClose(session);
}

//AbstractIoFilterChain,待子类扩展
protected abstract void doClose(IoSession session) throws Exception;

从上面来看,HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;但对于IoSession write/close事件除了传递事件外,需要调用实际的事件操作doWrite/doClose,这两个方法需要子类扩展实现。

再来看一下过滤链尾为TailFilter
 private static class TailFilter extends IoFilterAdapter {
        public void sessionCreated(NextFilter nextFilter, IoSession session)
                throws Exception {
	   //直接调用会话处理器IoHandler的sessionCreated
            session.getHandler().sessionCreated(session);
        }
        public void sessionOpened(NextFilter nextFilter, IoSession session)
                throws Exception {
            try {
                session.getHandler().sessionOpened(session);
            } finally {
                // Notify the related ConnectFuture
                // if the session is created from SocketConnector.
		//如果是SocketConnector创建的会话,则通知相关ConnectFuture
                ConnectFuture future = (ConnectFuture) session
                        .removeAttribute(CONNECT_FUTURE);
                if (future != null) {
                    future.setSession(session);
                }
            }
        }
        public void sessionClosed(NextFilter nextFilter, IoSession session)
                throws Exception {
            try {
                session.getHandler().sessionClosed(session);
            } finally {
                // Remove all filters.会话关闭,清除过滤链
                session.getFilterChain().clear();
            }
        }
        public void sessionIdle(NextFilter nextFilter, IoSession session,
                IdleStatus status) throws Exception {
            session.getHandler().sessionIdle(session, status);
        }
        public void exceptionCaught(NextFilter nextFilter, IoSession session,
                Throwable cause) throws Exception {
            session.getHandler().exceptionCaught(session, cause);
        }
        public void messageReceived(NextFilter nextFilter, IoSession session,
                Object message) throws Exception {
            try {
                session.getHandler().messageReceived(session, message);
            } finally {
	        //如果发送消息对象为ByteBuffer,则释放buffer
                ByteBufferUtil.releaseIfPossible(message);
            }
        }
        public void messageSent(NextFilter nextFilter, IoSession session,
                Object message) throws Exception {
            try {
                session.getHandler().messageSent(session, message);
            } finally {
	         //如果发送消息对象为ByteBuffer,则释放buffer
                ByteBufferUtil.releaseIfPossible(message);
            }
        }
        public void filterWrite(NextFilter nextFilter, IoSession session,
                WriteRequest writeRequest) throws Exception {
            nextFilter.filterWrite(session, writeRequest);
        }
        public void filterClose(NextFilter nextFilter, IoSession session)
                throws Exception {
            nextFilter.filterClose(session);
        }
    }

来看一下释放buffer:
package org.apache.mina.util;

import org.apache.mina.common.ByteBuffer;

/**
 * ByteBuffer utility.
 * 
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public class ByteBufferUtil {
   /**
     * Increases the internal reference count of this buffer to defer
     * automatic release.  You have to invoke {@link #release()} as many
     * as you invoked this method to release this buffer.
     记录buffer内部引用的计数器自增
     *
     */
    public static void acquireIfPossible(Object message) {
        if (message instanceof ByteBuffer) {
            ((ByteBuffer) message).acquire();
        }
    }
    //如果消息对象为ByteBuffer,则释放空间
    public static void releaseIfPossible(Object message) {
        if (message instanceof ByteBuffer) {
            ((ByteBuffer) message).release();
        }
    }
    private ByteBufferUtil() {
    }
}

从上面来看,TailFilter触发IoHandler和IoSession事件时,直接调用会话处理器IoHandler的相关事件方法。在sessionOpened事件中,最后如果是SocketConnector创建的会话,则要通知相关ConnectFuture;在sessionClosed事件中,最后还要清空过滤链;messageSent和messageReceived事件,如果消息对象为ByteBuffer,则释放buffer。

再来看AbstractIoFilterChain的添加过滤器相关操作:
添加过滤器到链头
public synchronized void addFirst(String name, IoFilter filter) {
        checkAddable(name);//检查过滤器在过滤链上是否存在
        register(head, name, filter);
}
/**
 * Checks the specified filter name is already taken and throws an exception if already taken.
 //检查过滤器在过滤链上是否存在
 */
private void checkAddable(String name) {
    if (name2entry.containsKey(name)) {
        throw new IllegalArgumentException(
                "Other filter is using the same name '" + name + "'");
    }
}
private void register(EntryImpl prevEntry, String name, IoFilter filter) {
    //根据filter的前驱prevEntry和后继prevEntry.nextEntry,构造EntryImpl
    EntryImpl newEntry = new EntryImpl(prevEntry, prevEntry.nextEntry,
            name, filter);

    try {
        //触发过滤器onPreAdd事件
        filter.onPreAdd(this, name, newEntry.getNextFilter());
    } catch (Exception e) {
        throw new IoFilterLifeCycleException("onPreAdd(): " + name + ':'
                + filter + " in " + getSession(), e);
    }

    prevEntry.nextEntry.prevEntry = newEntry;
    prevEntry.nextEntry = newEntry;
    //添加过滤器name与Entry映射到过滤链
    name2entry.put(name, newEntry);

    try {
     //触发过滤器onPostAdd事件
        filter.onPostAdd(this, name, newEntry.getNextFilter());
    } catch (Exception e) {
        deregister0(newEntry);
        throw new IoFilterLifeCycleException("onPostAdd(): " + name + ':'
                + filter + " in " + getSession(), e);
    }
}
//添加过滤器到链尾
public synchronized void addLast(String name, IoFilter filter) {
        checkAddable(name);
        register(tail.prevEntry, name, filter);
}
//添加过滤器到baseName过滤器的前面
 public synchronized void addBefore(String baseName, String name,
        IoFilter filter) {
    EntryImpl baseEntry = checkOldName(baseName);//获取baseName对应的过滤器
    checkAddable(name);//检查过滤器是否存在
    register(baseEntry.prevEntry, name, filter);//注册到过滤链上
}
 /**
  * Throws an exception when the specified filter name is not registered in this chain.
  *获取baseName对应的过滤器
  * @return An filter entry with the specified name.
  */
 private EntryImpl checkOldName(String baseName) {
     EntryImpl e = (EntryImpl) name2entry.get(baseName);
     if (e == null) {
         throw new IllegalArgumentException("Unknown filter name:"
                 + baseName);
     }
     return e;
 }
//添加过滤器到baseName过滤器的后面
public synchronized void addAfter(String baseName, String name,
        IoFilter filter) {
    EntryImpl baseEntry = checkOldName(baseName);
    checkAddable(name);
    register(baseEntry, name, filter);
}

从上可以看出添加过滤器到过滤链,首先检查过滤链上是否存在过滤器,不存在,才添加;
添加过滤器到头部即,插入过滤器到链头的后面,添加过滤器到尾部,即插入过滤器到链尾的前面;添加到指定过滤器前后,思路基本相同。

再来看AbstractIoFilterChain移除过滤器操作:
public synchronized IoFilter remove(String name) {
    EntryImpl entry = checkOldName(name);
    deregister(entry);//反注册name过滤器对应的Entry

    return entry.getFilter();
}
//反注册过滤器Entry
private void deregister(EntryImpl entry) {
      IoFilter filter = entry.getFilter();
      try {
         //触发过滤器onPreRemove事件
          filter.onPreRemove(this, entry.getName(), entry.getNextFilter());
      } catch (Exception e) {
          throw new IoFilterLifeCycleException("onPreRemove(): "
                  + entry.getName() + ':' + filter + " in " + getSession(), e);
      }
      //委托给deregister0完成实际的移除工作
      deregister0(entry);

      try {
          //触发过滤器onPostRemove事件
          filter.onPostRemove(this, entry.getName(), entry.getNextFilter());
      } catch (Exception e) {
          throw new IoFilterLifeCycleException("onPostRemove(): "
                  + entry.getName() + ':' + filter + " in " + getSession(), e);
      }
  }
private void deregister0(EntryImpl entry) {
      EntryImpl prevEntry = entry.prevEntry;
      EntryImpl nextEntry = entry.nextEntry;
      prevEntry.nextEntry = nextEntry;
      nextEntry.prevEntry = prevEntry;

      name2entry.remove(entry.name);
  }

从上可以移除过滤器,首先获取过滤器对应的Entry,然后触发过滤器onPreRemove事件,
从过滤链name2entry移除Entry,然后触发过滤器onPostRemove事件。
再来看其他操作:
//清空过滤链
 public synchronized void clear() throws Exception {
        //遍历过滤链,移除过滤器
        Iterator it = new ArrayList(name2entry.keySet()).iterator();
        while (it.hasNext()) {
            this.remove((String) it.next());
        }
    }
//获取过滤链依附Io会话
  public IoSession getSession() {
        return session;
    }
//获取name对应的过滤器Entry
    public Entry getEntry(String name) {
        Entry e = (Entry) name2entry.get(name);
        if (e == null) {
            return null;
        }
        return e;
    }
//获取name对应的过滤器
    public IoFilter get(String name) {
        Entry e = getEntry(name);
        if (e == null) {
            return null;
        }

        return e.getFilter();
    }
//获取name对应的过滤器后继NextFilter
    public NextFilter getNextFilter(String name) {
        Entry e = getEntry(name);
        if (e == null) {
            return null;
        }

        return e.getNextFilter();
    }
 //获取过滤链上的所有过滤器(正序,添加顺序),从头到尾
  public List getAll() {
        List list = new ArrayList();
        EntryImpl e = head.nextEntry;
        while (e != tail) {
            list.add(e);
            e = e.nextEntry;
        }

        return list;
    }
 //获取过滤链上的所有过滤器(逆序),从尾到头
    public List getAllReversed() {
        List list = new ArrayList();
        EntryImpl e = tail.prevEntry;
        while (e != head) {
            list.add(e);
            e = e.prevEntry;
        }
        return list;
    }
//判断是否包含name过滤器
    public boolean contains(String name) {
        return getEntry(name) != null;
    }
//判断是否包含过滤器filter
    public boolean contains(IoFilter filter) {
        EntryImpl e = head.nextEntry;
        while (e != tail) {
            if (e.getFilter() == filter) {
                return true;
            }
            e = e.nextEntry;
        }
        return false;
    }
//判断是否包含指定类型filterType的过滤器
    public boolean contains(Class filterType) {
        EntryImpl e = head.nextEntry;
        while (e != tail) {
            if (filterType.isAssignableFrom(e.getFilter().getClass())) {
                return true;
            }
            e = e.nextEntry;
        }
        return false;
    }

在来看触发事件的相关方法:
//从链头传递SessionCreated
 public void fireSessionCreated(IoSession session) {
        Entry head = this.head;
        callNextSessionCreated(head, session);
    }
 public void fireSessionOpened(IoSession session) {
        Entry head = this.head;
        callNextSessionOpened(head, session);
    }
 public void fireSessionIdle(IoSession session, IdleStatus status) {
        Entry head = this.head;
        callNextSessionIdle(head, session, status);
    }
    //消息接收,从链头到链尾-》Iohanlder(这个过程handler有参数处理相关事件)
   public void fireMessageReceived(IoSession session, Object message) {
        Entry head = this.head;
        callNextMessageReceived(head, session, message);
    }
public void fireMessageSent(IoSession session, WriteRequest request) {
        try {
	    //写回写请求结果,及消息已发送
            request.getFuture().setWritten(true);
        } catch (Throwable t) {
            fireExceptionCaught(session, t);
        }

        Entry head = this.head;
        callNextMessageSent(head, session, request.getMessage());
    }
    //移除发生,如果是SocketConnector重建的会话,则从会话中移除CONNECT_FUTURE属性,并设置Future异常
    public void fireExceptionCaught(IoSession session, Throwable cause) {
        // Notify the related ConnectFuture
        // if the session is created from SocketConnector.
        ConnectFuture future = (ConnectFuture) session
                .removeAttribute(CONNECT_FUTURE);
        if (future == null) {
            Entry head = this.head;
            callNextExceptionCaught(head, session, cause);
        } else {
            // Please note that this place is not the only place that
            // calls ConnectFuture.setException().
            future.setException(cause);
        }
    }
     public void fireSessionClosed(IoSession session) {
        // Update future.
        try {
	    //通过会话的closeFuture,会话已经关闭
            session.getCloseFuture().setClosed();
        } catch (Throwable t) {
            fireExceptionCaught(session, t);
        }

        // And start the chain.
        Entry head = this.head;
        callNextSessionClosed(head, session);
    }
  //消息发送,Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发  //送消息,handler并不处理会话事件)
  public void fireFilterWrite(IoSession session, WriteRequest writeRequest) {
        Entry tail = this.tail;
        callPreviousFilterWrite(tail, session, writeRequest);
    }
 //触发会话关闭事件从链尾到链头-
 public void fireFilterClose(IoSession session) {
        Entry tail = this.tail;
        callPreviousFilterClose(tail, session);
    }

从上面可以看出,IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler有参数处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,
  handler并不处理会话事件)
总结:

      AbstractIoFilterChain内部关联一个IoSession,用EntryImp来包装过滤器,过滤链中用HashMap<String,EntryImpl>来存放过滤器Entry,key为过滤器名,value为过滤器Entry。
      EntryImpl是过滤器在过滤链上存在的形式,EntryImpl有一个前驱和一个后继,内部包裹一个过滤器 with name,及过滤器的后继过滤器NextFilter。后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,主要是将事件转发给后继Entry对应的过滤器。过滤链头为HeadFilter,链尾为TailFilter。
     HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;但对于IoSession write/close事件除了传递事件外,需要调用实际的事件操作doWrite/doClose,这两个方法需要子类扩展实现。
     TailFilter触发IoHandler和IoSession事件时,直接调用会话处理器IoHandler的相关事件方法。在sessionOpened事件中,最后如果是SocketConnector创建的会话,则要通知相关ConnectFuture;在sessionClosed事件中,最后还要清空过滤链;messageSent和messageReceived事件,如果消息对象为ByteBuffer,则释放buffer。
     添加过滤器到过滤链,首先检查过滤链上是否存在过滤器,不存在,才添加;
添加过滤器到头部即,插入过滤器到链头的后面,添加过滤器到尾部,即插入过滤器到链尾的前面;添加到指定过滤器前后,思路基本相同;添加前触发过滤器onPreAdd事件,添加后触发过滤器onPostAdd事件;移除过滤器,首先获取过滤器对应的Entry,然后触发过滤器onPreRemove事件,从过滤链name2entry移除Entry,然后触发过滤器onPostRemove事件。
     过滤链处理相关事件策略为:与IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,关闭会话,handler并不处理会话事件)


附:
//IoFilterChain
/**
 * A container of {@link IoFilter}s that forwards {@link IoHandler} events
 * to the consisting filters and terminal {@link IoHandler} sequentially.
 * Every {@link IoSession} has its own {@link IoFilterChain} (1-to-1 relationship). 
 * IoFilterChain是IoFilter的容器,用于转发IoHandler的事件到包含过滤器和Io处理器的链。
 (IoService->IoProcessor->IoFilter->IoFilter->...->IoHandler)
 */
public interface IoFilterChain
{
    
    /**
     * Represents a name-filter pair that an {@link IoFilterChain} contains.
     * *IoFilterChain包含的IOFilter对
     * @author The Apache Directory Project (mina-dev@directory.apache.org)
     */
    public interface Entry {
        /**
         * Returns the name of the filter.
	 过滤器名
         */
        String getName();

        /**
         * Returns the filter.
	 当前过滤器
         */
        IoFilter getFilter();

        /**
         * Returns the {@link NextFilter} of the filter.
	 过滤器后继
         * 
         * @throws IllegalStateException if the {@link NextFilter} is not available
         */
        NextFilter getNextFilter();
    }
    public abstract IoSession getSession();//
    /**
     * Returns the parent {@link IoSession} of this chain.
     返回过滤链依附的Io会话
     * @return {@link IoSession}
     */
    IoSession getSession();

    /**
     * Returns the {@link Entry} with the specified <tt>name</tt> in this chain.
     * @return <tt>null</tt> if there's no such name in this chain
     根据过滤器name获取Entry
     */
    Entry getEntry(String name);

    /**
     * Returns the {@link IoFilter} with the specified <tt>name</tt> in this chain.
     * @return <tt>null</tt> if there's no such name in this chain
     根据过滤器name获取IoFilter
     */
    IoFilter get(String name);

    /**
     * Returns the {@link NextFilter} of the {@link IoFilter} with the
     * specified <tt>name</tt> in this chain.
     * @return <tt>null</tt> if there's no such name in this chain
     根据过滤器name获取IoFilter的后继过滤器NextFilter
     */
    NextFilter getNextFilter(String name);

    /**
     * Returns the list of all {@link Entry}s this chain contains.
     返回所有Entry(添加顺序)
     */
    List getAll();

    /**
     * Returns the reversed list of all {@link Entry}s this chain contains.
     获取所有Entry倒序(LIFO)
     */
    List getAllReversed();

    /**
     * Returns <tt>true</tt> if this chain contains an {@link IoFilter} with the
     * specified <tt>name</tt>.
     判断是否包含name对应的IoFilter
     */
    boolean contains(String name);

    /**
     * Returns <tt>true</tt> if this chain contains the specified <tt>filter</tt>.
     判断是否包含IoFilter类型的实例
     */
    boolean contains(IoFilter filter);

    /**
     * Returns <tt>true</tt> if this chain contains an {@link IoFilter} of the
     * specified <tt>filterType</tt>.
     判断是否包含Class类型的过滤器
     */
    boolean contains(Class filterType);

    /**
     * Adds the specified filter with the specified name at the beginning of this chain.
     * @throws IoFilterLifeCycleException
     *             if {@link IoFilter#onPostAdd(IoFilterChain, String, NextFilter)} or
     *             {@link IoFilter#init()} throws an exception.
     添加过滤器到过滤链的头部
     */
    void addFirst(String name, IoFilter filter);

    /**
     * Adds the specified filter with the specified name at the end of this chain.
     * @throws IoFilterLifeCycleException
     *             if {@link IoFilter#onPostAdd(IoFilterChain, String, NextFilter)} or
     *             {@link IoFilter#init()} throws an exception.
      添加过滤器到过滤链的尾部
     */
    void addLast(String name, IoFilter filter);

    /**
     * Adds the specified filter with the specified name just before the filter whose name is
     * <code>baseName</code> in this chain.
     * @throws IoFilterLifeCycleException
     *             if {@link IoFilter#onPostAdd(IoFilterChain, String, NextFilter)} or
     *             {@link IoFilter#init()} throws an exception.
      添加过滤器到baseName过滤器的前面
     */
    void addBefore(String baseName, String name, IoFilter filter);

    /**
     * Adds the specified filter with the specified name just after the filter whose name is
     * <code>baseName</code> in this chain.
     * @throws IoFilterLifeCycleException
     *             if {@link IoFilter#onPostAdd(IoFilterChain, String, NextFilter)} or
     *             {@link IoFilter#init()} throws an exception.
     添加过滤器到baseName过滤器的后面
     */
    void addAfter(String baseName, String name, IoFilter filter);

    /**
     * Removes the filter with the specified name from this chain.
     * @throws IoFilterLifeCycleException
     *             if {@link IoFilter#onPostRemove(IoFilterChain, String, NextFilter)} or
     *             {@link IoFilter#destroy()} throws an exception.
     移除name对应的过滤器
     */
    IoFilter remove(String name);

    /**
     * Removes all filters added to this chain.清空过滤器链
     * @throws Exception if {@link IoFilter#onPostRemove(IoFilterChain, String, NextFilter)} thrown an exception.
     */
    void clear() throws Exception;

    /**
     * Fires a {@link IoHandler#sessionCreated(IoSession)} event.  Most users don't need to
     * call this method at all.  Please use this method only when you implement a new transport
     * or fire a virtual event.
     通知IoHandler#sessionCreated创建事件。用户必须要调用这个方法。仅在实现一个新的transport或
     通知一个虚拟事件时,才调用此方法。
     */
    public void fireSessionCreated(IoSession session);

    /**
     * Fires a {@link IoHandler#sessionOpened(IoSession)} event.  Most users don't need to call
     * this method at all.  Please use this method only when you implement a new transport or
     * fire a virtual event.
     */
    public void fireSessionOpened(IoSession session);

    /**
     * Fires a {@link IoHandler#sessionClosed(IoSession)} event.  Most users don't need to call
     * this method at all.  Please use this method only when you implement a new transport or
     * fire a virtual event.
     */
    public void fireSessionClosed(IoSession session);

    /**
     * Fires a {@link IoHandler#sessionIdle(IoSession, IdleStatus)} event.  Most users don't
     * need to call this method at all.  Please use this method only when you implement a new
     * transport or fire a virtual event.
     */
    public void fireSessionIdle(IoSession session, IdleStatus status);

    /**
     * Fires a {@link #fireMessageReceived(IoSession, Object)} event.  Most users don't need to
     * call this method at all.  Please use this method only when you implement a new transport
     * or fire a virtual event.
     */
    public void fireMessageReceived(IoSession session, Object message);

    /**
     * Fires a {@link IoHandler#sessionOpened(IoSession)} event.  Most users don't need to call
     * this method at all.  Please use this method only when you implement a new transport or
     * fire a virtual event.
     */
    public void fireMessageSent(IoSession session, WriteRequest request);

    /**
     * Fires a {@link IoHandler#exceptionCaught(IoSession, Throwable)} event.  Most users don't
     * need to call this method at all.  Please use this method only when you implement a new
     * transport or fire a virtual event.
     */
    public void fireExceptionCaught(IoSession session, Throwable cause);

    /**
     * Fires a {@link IoSession#write(Object)} event.  Most users don't need to call this
     * method at all.  Please use this method only when you implement a new transport or fire a
     * virtual event.
     通知IoSession#write事件
     */
    public void fireFilterWrite(IoSession session, WriteRequest writeRequest);

    /**
     * Fires a {@link IoSession#close()} event.  Most users don't need to call this method at
     * all.  Please use this method only when you implement a new transport or fire a virtual
     * event.
     通知IoSession#close事件
     */
    public void fireFilterClose(IoSession session);
}

过滤链IoFilterChain的fireMessage*/exceptionCaught相关方法为触发IoHandler的相关事件,fireFilterWrite/Close触发的是,会话的相关事件IoSession#write/close。过滤链IoFilterChain用Entry存放过滤器对,即每个过滤器IoFilter关联一个后继过滤器NextFilter。
0
1
分享到:
评论

相关推荐

    MINA长连接框架实现通讯

    MINA的核心组件包括Acceptor(服务器端接收器)、Session(会话)和FilterChain(过滤器链)等。 2. **MINA Server端实现**:在MINA中,Server端主要负责监听指定的端口,接收到客户端连接请求后,创建一个Session...

    mina服务器--实现纯文本和非纯文本的加密通讯

    SSLEngine是SSL/TLS协议的抽象实现,允许用户在自定义的网络协议上添加安全层。开发者需要设置证书、密钥和其他安全参数,然后在MINA的过滤器链中使用SSLEngine进行数据的加密和解密。 3. **过滤器链**:MINA的核心...

    apache-mina-2.0.4.rar_apache mina_mina

    通过分析源码,你可以了解到Mina如何实现非阻塞I/O、如何调度任务、如何组织过滤器链,以及如何处理各种网络事件。这对于优化和重构Mina代码以提升性能将非常有帮助。 对于希望提高网络应用性能或熟悉Java NIO编程...

    Mina+Socket通信

    文件"MinaSocket"可能包含了实现上述功能的详细代码,包括服务端的Acceptor配置、过滤器设置、事件处理器编写,以及客户端的Socket连接、数据发送和接收等。通过阅读和理解这些代码,你可以更好地掌握Mina与Socket...

    websocket+java服务器(mina)

    2. **Filter Chain**:Mina通过过滤器链处理进来的消息。开发者可以自定义过滤器来实现特定功能,如身份验证、数据编码解码等。 3. **ProtocolCodecFactory**:用于将网络数据流转换成应用程序可以理解的对象,以及...

    apache mina实现多人聊天室

    总的来说,通过Apache Mina实现的多人聊天室项目,不仅展示了Mina如何处理网络通信,还涉及到并发编程、协议解析、过滤器链等多方面的技术知识。对于学习和理解网络编程以及Mina框架的工作原理,这是一个非常有价值...

    mina2.0.3源代码

    MINA的过滤器链机制允许开发者插入自定义的处理逻辑在数据传输过程中。每个过滤器都可以对数据进行读取、修改或写入,这样可以实现解码、编码、安全加密、流量控制等功能。过滤器链的设计提高了代码的可重用性和...

    mina库文件

    2. **过滤器链**:MINA 的过滤器链是其设计的一个亮点,它提供了一种可插拔的架构,允许开发者插入自定义的过滤器来处理数据。这些过滤器可以实现数据编码、解码、认证、加密等功能,增加了系统的灵活性和可扩展性。...

    mina源码+例子mina-2.0.0-M6.zip

    5. **丰富的过滤器链**:MINA中的过滤器链机制允许在数据传输过程中添加自定义处理逻辑,例如数据编码解码、安全加密、压缩等。 6. **跨平台兼容**:由于MINA是用Java编写的,因此它可以在任何支持Java的平台上运行...

    mina2.0.9jar包下载

    3. **协议独立**:Mina提供了抽象层,使得开发者可以专注于业务逻辑,而不必关心底层网络协议的实现。例如,你可以用相同的代码处理TCP和UDP协议。 4. **过滤器链**:Mina的过滤器链机制允许开发者插入自定义的过滤...

    MINA源码与例子

    通过阅读源码,你可以深入了解MINA的设计思想和实现细节,例如如何实现非阻塞I/O、过滤器链的工作原理、以及如何自定义协议编码解码器等。 学习MINA源码可以帮助你: 1. **理解网络编程**:MINA是基于Java NIO实现...

    Mina源码解析

    Mina的核心设计理念之一是过滤器链(Filter Chain),它借鉴了Servlet的过滤器模型。每个过滤器都可以在数据传输过程中进行拦截和处理,如数据编码、解码、安全检查等。过滤器之间通过`Filter.nextFilter`方法串联...

    MINA1.7包(源码)

    2. **Filter Chain**:MINA的过滤器链机制,允许开发者插入自定义的过滤器来处理进/出站数据,实现数据预处理、安全检查等功能。 3. **Transport Layer**:MINA提供了多种传输层实现,如NioSocketAcceptor和...

    MINA 2.0.9源码

    通过阅读MINA 2.0.9的源码,你可以了解如何实现高效的网络通信,包括事件驱动的设计模式、多线程处理、数据包的拆分和组合,以及如何利用过滤器链实现复杂的功能。此外,源码分析还能帮助你掌握如何自定义Filter和...

    mina依赖jar包

    5. **可扩展性**:MINA的过滤器链机制允许用户自定义过滤器,实现数据编码、解码、安全加密等功能,增强了框架的灵活性和可扩展性。 6. **跨平台性**:MINA完全由Java编写,因此具有良好的跨平台特性,可以在任何...

    mina文档说明书

    2. **Filter**:MINA使用过滤器链模式来处理网络数据。过滤器是处理I/O事件或数据的小型组件,它们按照特定顺序排列,数据在过滤器链中逐个传递,每个过滤器可以添加、修改或转发数据。 3. **Protocol Buffers**:...

    mina源代码学习提供下载

    4. **过滤器链**:MINA引入了过滤器的概念,数据在传输过程中会经过一系列过滤器,每个过滤器可以执行特定的操作,如编码、解码、日志记录等。这种设计使得模块化和复用变得简单。 5. **多线程支持**:MINA支持多...

    mina即时聊天demo

    - **Filter链**: Mina采用过滤器链设计模式,每个Filter处理一部分业务逻辑,形成一个完整的处理流程。 - **ProtocolCodec**: 用于数据编码和解码,确保网络传输的数据能够在两端正确解析。 5. **Android集成** ...

    Mina框架入门介绍

    总的来说,Apache Mina通过提供高度抽象的网络通信接口,简化了网络应用的开发工作,让开发者可以专注于业务逻辑,而不是底层网络通信的复杂性。通过IoSession、IoHandler和IoFilter的组合,Mina构建了一个灵活、...

    mina服务器实例

    2. **过滤器链**:Mina的过滤器链是其核心设计之一,它允许开发者插入自定义的过滤器来处理数据,这些过滤器可以实现如数据编码解码、安全认证等功能。 3. **协议支持**:Mina对多种网络协议如TCP/IP、UDP、SSL/TLS...

Global site tag (gtag.js) - Google Analytics