`
Donald_Draper
  • 浏览: 980142 次
社区版块
存档分类
最新评论

Mina 协议编解码过滤器三(会话write与消息接收过滤)

    博客分类:
  • Mina
阅读更多
Mina 协议编解码过滤器一(协议编解码工厂、协议编码器):
http://donald-draper.iteye.com/blog/2376663
Mina 协议编解码过滤器二(协议解码器):
http://donald-draper.iteye.com/blog/2376679
引言:
    前面我们看了协议编解码过滤器的所涉及到的相关概念,先回顾一下:
协议编解码过滤器关联一个协议编解码工厂,协议编解码工厂用于创建协议编码和解码器;协议编码器将上层消息,编码成二级制或特定协议格式的数据,写到协议编码器输出的字节队列中,flush字节队列中的数据(filterWrite)给下一个过滤器。协议解码器将接收到的二级制或特定协议格式的数据,解码成上层消息,存放到协议解码器输出的消息队列,flush将消息队列中的消息传给后继过滤器的messageReceived方法。协议编解码过滤器ProtocolCodecFilter默认的协议编码输出为ProtocolEncoderOutputImpl,协议解码输出为SimpleProtocolDecoderOutput。
结构如下:
ProtocolCodecFilter extends IoFilterAdapter
   --ProtocolCodecFactory
      --ProtocolEncoder
         --ProtocolEncoderOutput(ProtocolEncoderOutputImpl)
      --ProtocolDecoder
         --ProtocolDecoderOutput(SimpleProtocolDecoderOutput)

今天我们正式进入来分析协议编解码过滤器的实现。
//ProtocolCodecFilter
/**
 * An {@link IoFilter} which translates binary or protocol specific data into
 * message object and vice versa using {@link ProtocolCodecFactory},
 * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
 *
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public class ProtocolCodecFilter extends IoFilterAdapter
{
    private static final Logger LOGGER = LoggerFactory.getLogger(org/apache/mina/filter/codec/ProtocolCodecFilter);
    private static final Class EMPTY_PARAMS[] = new Class[0];
    private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
    //编码器属性key
    private static final AttributeKey ENCODER = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "encoder");
    //编码器输出属性key
    private static final AttributeKey ENCODER_OUT = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "encoderOut");
    //解码器属性key
    private static final AttributeKey DECODER = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "decoder");  
    //解码器输出属性key
    private static final AttributeKey DECODER_OUT = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "decoderOut");
    private final ProtocolCodecFactory factory;//协议编解码器工厂
}

先来看构造:
//根据协议编解码器构造协议编解码过滤器
 public ProtocolCodecFilter(ProtocolCodecFactory factory) {
        if (factory == null) {
            throw new NullPointerException("factory");
        }
        this.factory = factory;
    }
//根据协议编码器和解码器构造协议编解码过滤器
    public ProtocolCodecFilter(final ProtocolEncoder encoder,
            final ProtocolDecoder decoder) {
        if (encoder == null) {
            throw new NullPointerException("encoder");
        }
        if (decoder == null) {
            throw new NullPointerException("decoder");
        }
        factory = new ProtocolCodecFactory() {
            public ProtocolEncoder getEncoder() {
                return encoder;
            }
            public ProtocolDecoder getDecoder() {
                return decoder;
            }
        };
    }
//根据协议编解码类构造协议编解码过滤器
public ProtocolCodecFilter(final Class encoderClass,
            final Class decoderClass) {
        if (encoderClass == null) {
            throw new NullPointerException("encoderClass");
        }
        if (decoderClass == null) {
            throw new NullPointerException("decoderClass");
        }
	//如果协议编解码类型参数非ProtocolEncoder,ProtocolDecoder
	//抛出非法参数异常
        if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
            throw new IllegalArgumentException("encoderClass: "
                    + encoderClass.getName());
        }
        if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
            throw new IllegalArgumentException("decoderClass: "
                    + decoderClass.getName());
        }
	//获取协议编解码器无参构造
        try {
            encoderClass.getConstructor(EMPTY_PARAMS);
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException(
                    "encoderClass doesn't have a public default constructor.");
        }
        try {
            decoderClass.getConstructor(EMPTY_PARAMS);
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException(
                    "decoderClass doesn't have a public default constructor.");
        }
       //根据协议编码器和解码器类型创建协议编解码实例,构造协议编解码工厂
        factory = new ProtocolCodecFactory() {
            public ProtocolEncoder getEncoder() throws Exception {
                return (ProtocolEncoder) encoderClass.newInstance();
            }

            public ProtocolDecoder getDecoder() throws Exception {
                return (ProtocolDecoder) decoderClass.newInstance();
            }
        };
}

从上面可以看出,协议编解码过滤器构造,说到底就是初始化协议编码器工厂。
再来看过滤器的相关方法
 public void onPreAdd(IoFilterChain parent, String name,
            NextFilter nextFilter) throws Exception {
	    //如果过滤器链已经包含协议编解码过滤器,则抛出非法状态异常
        if (parent.contains(ProtocolCodecFilter.class)) {
            throw new IllegalStateException(
                    "A filter chain cannot contain more than one ProtocolCodecFilter.");
        }
    }

从onPreAdd方法来看,一个过滤链上不能存在两个协议编解码器,即唯一。

再来看会话发送消息

public void filterWrite(NextFilter nextFilter, IoSession session,
            WriteRequest writeRequest) throws Exception {
	//从写请求获取消息
        Object message = writeRequest.getMessage();
	//如果为字节buffer,传给后继过滤器
        if (message instanceof ByteBuffer) {
            nextFilter.filterWrite(session, writeRequest);
            return;
        }
        //从会话获取协议编码器,及协议编码输出
        ProtocolEncoder encoder = getEncoder(session);
        ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session,
                nextFilter, writeRequest);

        try {
	    //编码器编码消息
            encoder.encode(session, message, encoderOut);
	    //编码输出flush消息队列
            encoderOut.flush();
	    //传递消息到下一个过滤
            nextFilter.filterWrite(session, new WriteRequest(
                    new MessageByteBuffer(writeRequest.getMessage()),
                    writeRequest.getFuture(), writeRequest.getDestination()));
        } catch (Throwable t) {
            ProtocolEncoderException pee;
            if (t instanceof ProtocolEncoderException) {
                pee = (ProtocolEncoderException) t;
            } else {
                pee = new ProtocolEncoderException(t);
            }
            throw pee;
        }
    }

来看这几点
1.
 //从会话获取协议编码器,及协议编码输出
  ProtocolEncoder encoder = getEncoder(session);
  ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session,
          nextFilter, writeRequest);

//获取协议编码器,这个很简单不讲了
private ProtocolEncoder getEncoder(IoSession session) throws Exception {
        ProtocolEncoder encoder = (ProtocolEncoder) session
                .getAttribute(ENCODER);
        if (encoder == null) {
            encoder = factory.getEncoder();
            session.setAttribute(ENCODER, encoder);
        }
        return encoder;
    }
    //获取协议编码器输出
    private ProtocolEncoderOutputImpl getEncoderOut(IoSession session,
            NextFilter nextFilter, WriteRequest writeRequest) {
        return new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
    }

2.
//传递消息到下一个过滤
nextFilter.filterWrite(session, new WriteRequest(
        new MessageByteBuffer(writeRequest.getMessage()),
        writeRequest.getFuture(), writeRequest.getDestination()));

//继承字节buffer代理,这个在前面已看过,不在讲
private static class MessageByteBuffer extends ByteBufferProxy {
    private final Object message;

    private MessageByteBuffer(Object message) {
        super(EMPTY_BUFFER);
        this.message = message;
    }

    public void acquire() {
        // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
    }

    public void release() {
        // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
    }
}

从会话写操作来看session#write(filterWrite),首先从写请求获取消息,如果消息为字节buffer,则直接传给后继过滤器,否则从协议编解码工厂获取协议编码器和协议编码器输出,
协议编码器encode编码消息,写到协议编码器输出字节buffer队列,
然后协议编码器输出flush字节buffer队列。
再来看messageSent,已一看就明白:
public void messageSent(NextFilter nextFilter, IoSession session,
            Object message) throws Exception {
        if (message instanceof HiddenByteBuffer) {
            return;
        }
        if (!(message instanceof MessageByteBuffer)) {
            nextFilter.messageSent(session, message);
            return;
        }
        nextFilter.messageSent(session, ((MessageByteBuffer) message).message);
    }

再来看接收消息
public void messageReceived(NextFilter nextFilter, IoSession session,
            Object message) throws Exception {
	//如果消息非字节buffer,则直接传给后继过滤器
        if (!(message instanceof ByteBuffer)) {
            nextFilter.messageReceived(session, message);
            return;
        }
        ByteBuffer in = (ByteBuffer) message;
	//如果字节buffer为空,直接返回
        if (!in.hasRemaining()) {
            in.release();
            return;
        }
        //获取解码器,及解码器输出
        ProtocolDecoder decoder = getDecoder(session);
        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
        int oldPos = in.position();
        try {
            synchronized (decoderOut) {
	        //解码字节buffer
                decoder.decode(session, in, decoderOut);
            }
        } catch (Throwable t) {
            ProtocolDecoderException pde;
            if (t instanceof ProtocolDecoderException) {
                pde = (ProtocolDecoderException) t;
            } else {
                pde = new ProtocolDecoderException(t);
            }

            if (pde.getHexdump() == null) {
                int curPos = in.position();
                in.position(oldPos);
                pde.setHexdump(in.getHexDump());
                in.position(curPos);
            }
            throw pde;
        } finally {
            try {
                // Release the read buffer.
		//释放字节buffer
                in.release();
            } finally {
	        //flush解码器输出消息队列
                decoderOut.flush();
            }
        }
    }

messageReceived方法有1点要关注,
//获取解码器,及解码器输出
ProtocolDecoder decoder = getDecoder(session);
ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);

//这个与协议编码器和编码器输出相似
  private ProtocolDecoder getDecoder(IoSession session) throws Exception {
        ProtocolDecoder decoder = (ProtocolDecoder) session
                .getAttribute(DECODER);
        if (decoder == null) {
            decoder = factory.getDecoder();
            session.setAttribute(DECODER, decoder);
        }
        return decoder;
    }

    private ProtocolDecoderOutput getDecoderOut(IoSession session,
            NextFilter nextFilter) {
        ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
        if (out == null) {
            out = new SimpleProtocolDecoderOutput(session, nextFilter);
            session.setAttribute(DECODER_OUT, out);
        }

        return out;
    }
 

从上面可以看出,会话接收消息messageReceived,如果消息非字节buffer,则直接传给
后继过滤器,否则获取协议解码器,及协议解码器输出,协议解码器解码字节buffer为上传消息对象,写到协议解码器输出消息队列,最后解码器输出flush消息队列。
再来看会话关闭
 public void sessionClosed(NextFilter nextFilter, IoSession session)
            throws Exception {
        // Call finishDecode() first when a connection is closed.
	//获取解码器及解码器输出
        ProtocolDecoder decoder = getDecoder(session);
        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
        try {
	    //解码器,解码会话未解码数据
            decoder.finishDecode(session, decoderOut);
        } catch (Throwable t) {
            ProtocolDecoderException pde;
            if (t instanceof ProtocolDecoderException) {
                pde = (ProtocolDecoderException) t;
            } else {
                pde = new ProtocolDecoderException(t);
            }
            throw pde;
        } finally {
            // Dispose all.
	    //释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列
            disposeEncoder(session);
            disposeDecoder(session);
            disposeDecoderOut(session);
            decoderOut.flush();
        }
        nextFilter.sessionClosed(session);
}

来看方法的最后资源释放:
//释放解码器资源
private void disposeEncoder(IoSession session) {
        //移除会将编码器属性
        ProtocolEncoder encoder = (ProtocolEncoder) session
                .removeAttribute(ENCODER);
        if (encoder == null) {
            return;
        }

        try {
	   //释放会话编码器相关资源
            encoder.dispose(session);
        } catch (Throwable t) {
            SessionLog.warn(session, "Failed to dispose: "
                    + encoder.getClass().getName() + " (" + encoder + ')');
        }
    }
   //从会话移除解码器属性,释放会话解码器资源
    private void disposeDecoder(IoSession session) {
        ProtocolDecoder decoder = (ProtocolDecoder) session
                .removeAttribute(DECODER);
        if (decoder == null) {
            return;
        }

        try {
            decoder.dispose(session);
        } catch (Throwable t) {
            SessionLog.warn(session, "Falied to dispose: "
                    + decoder.getClass().getName() + " (" + decoder + ')');
        }
    }
//从会话移除解码输出属性
 private void disposeDecoderOut(IoSession session) {
        session.removeAttribute(DECODER_OUT);
    }

从上面来看会话关闭,主要是解码器解码会话未解码数据,写到解码器输出消息队列,
最后释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列。

我们来看移除协议编解码过滤器onPostRemove
public void onPostRemove(IoFilterChain parent, String name,
            NextFilter nextFilter) throws Exception {
        disposeEncoder(parent.getSession());//从会话移除编码器属性,释放会话编码器相关资源
        disposeDecoder(parent.getSession());//从会话移除解码器属性,释放会话解码器资源
        disposeDecoderOut(parent.getSession());//从会话移除解码输出属性
    }

从上面可以看出,从过滤链移除协议编解码过滤器后,要释放会话编解码器,
及解码输出属性,并释放相关的资源。

总结:
协议编解码过滤器构造,主要是初始化协议编码器工厂。一个过滤链上不能存在两个协议编解码器,即唯一。会话写操作来看session#write(filterWrite),首先从写请求获取消息,如果消息为字节buffer,则直接传给后继过滤器,否则从协议编解码工厂获取协议编码器和协议编码器输出,协议编码器encode编码消息,写到协议编码器输出字节buffer队列,
然后协议编码器输出flush字节buffer队列。会话接收消息messageReceived,如果消息非字节buffer,则直接传给后继过滤器,否则获取协议解码器,及协议解码器输出,协议解码器解码字节buffer为上传消息对象,写到协议解码器输出消息队列,最后解码器输出flush消息队列。会话关闭,主要是解码器解码会话未解码数据,写到解码器输出消息队列,最后从会话移除编码器,解码器及解码器输出属性,释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列。协议编解码器过滤器从过滤链移除后,要释放会话编解码器,及解码输出属性,并释放相关的资源。


0
0
分享到:
评论

相关推荐

    apache-mina源码

    6. **Protocol Buffers**:MINA提供了协议编解码机制,允许开发者自定义协议格式。这使得MINA能灵活地支持各种网络协议,如HTTP、FTP、SMTP等。 在`apache-mina-2.0.16`这个版本中,我们可以看到以下主要内容: - ...

    Mina开发之客户端

    4. **创建和添加Protocol Codec**: 配置一个编解码器工厂,例如TCPCodecFactory,将其添加到过滤器链中。 5. **设置Event Handler**: 定义一个实现了IoHandler接口的类,处理客户端的网络事件。 6. **发送和接收...

    mina2.0相关jar包

    - **数据交换**:利用IoSession的read()和write()方法进行数据的读取和发送,同时通过过滤器进行数据的编码和解码。 - **异常处理**:MINA提供了一套完整的异常处理机制,可以捕获并处理网络通信过程中的异常情况...

    Mina 2.0快速入门与源码解析

    - **ProtocolCodecFilter**: 编码解码过滤器,这里使用 `TextLineCodecFactory` 来处理文本行编码。 - **TimeServerHandler**: 自定义的处理器类,负责处理客户端发送的消息和异常等。 **1.2 处理器类** (`...

    Mina入门:mina版之HelloWorld

    主要组件包括Session(会话)、Filter(过滤器)和ProtocolHandler(协议处理器)。 1. **Session**:在Mina中,Session代表了应用程序与远程客户端之间的一个连接。它包含了会话的状态信息以及用于读写数据的方法...

    Mina通信框架应用示例

    - Filter链:MINA采用过滤器链模式,允许开发者插入自定义的过滤器来处理数据,如编码、解码、安全等。 - IoSession:表示网络连接,包含与连接相关的状态信息和操作。 - IoProcessor:负责实际的I/O操作,如读写...

    mina学习笔记,记录所有API

    2. **FilterChain**: MINA采用过滤器链模式处理网络事件,每个过滤器可以对数据进行处理或转发给下一个过滤器。过滤器可以用于数据编码解码、安全处理、性能监控等。 3. **IoHandler**: 这是处理网络事件的主要接口...

    mina2.0 英文版 中文版 官方文档

    6. **Filter Chain**:MINA 的过滤器链机制允许开发者插入多个过滤器,实现数据的预处理和后处理,如日志记录、安全检查等。 7. **线程模型**:MINA 允许开发者自定义线程模型,如单线程、多线程或无守护线程模型,...

    Mina Tcp实例

    4. **ProtocolCodecFilter**:Mina提供了一个编码解码过滤器,用于将应用层的数据转换成字节流,以便在网络中传输。在TCP实例中,你可能需要自定义编码器和解码器,以适应你的消息格式。 5. **Buffer**:Mina使用...

    mina 框架demo

    这些过滤器可以用于日志记录、编解码、压缩等多种功能,极大地方便了网络应用的开发。 - **强大的错误处理和管理功能**:Mina提供了强大的异常管理和会话管理功能,使得开发者能够更加专注于业务逻辑而不是底层网络...

    mina_tcp客户端jar.zip

    5. **创建过滤器链**:MINA的过滤器链机制允许你在数据传输过程中添加处理逻辑,如加密、压缩等。在TCP客户端中,你可以添加`ProtocolCodecFilter`以使用自定义的解码器和编码器。 6. **建立连接**:使用`...

    mina socket 代码

    4. **高度可定制**:MinA的架构允许用户根据需求自定义过滤器,实现数据编码解码、安全策略等功能。 5. **强大的API**:MinA的API设计简洁,易于理解和使用。 在实际开发中,`bssvirtual`可能是一个项目或者模块的...

    NIO_MINA学习例子_源代码

    2. **MINA过滤器机制**:分析过滤器类,理解它们如何在数据传输过程中发挥作用,比如数据的压缩、加密或者协议解析。 3. **MINA会话管理**:了解Session对象如何维护客户端连接的状态,包括打开、关闭、读写事件的...

    Mina中文参考手册-API

    3. IoFilter:提供了一组拦截器接口,可以实现日志输出、黑名单过滤、数据的编码(write方向)与解码(read方向)等,其中数据的编码与解码是使用Mina时最需要关注的部分。 4. IoHandler:这是业务逻辑的实现接口,...

    Apache_Mina_Server_2.0中文参考手册

    3. IoFilter:这个接口定义了一组拦截器,用于处理日志输出、黑名单过滤等,以及数据的编码(write方向)和解码(read方向)。编码和解码操作是Mina框架中非常核心的部分,因为它涉及到网络中数据传输的格式转换。 ...

Global site tag (gtag.js) - Google Analytics