`

Netty源码学习-ReplayingDecoder

阅读更多
ReplayingDecoder是FrameDecoder的子类,不熟悉FrameDecoder的,可以先看看
http://bylijinnan.iteye.com/blog/1982618

API说,ReplayingDecoder简化了操作,比如:

FrameDecoder在decode时,需要判断数据是否接收完全:

public class IntegerHeaderFrameDecoder extends FrameDecoder {

   protected Object decode(ChannelHandlerContext ctx,
                           Channel channel,
                           ChannelBuffer buf) throws Exception {

     if (buf.readableBytes() < 4) {
        return null;
     }

     buf.markReaderIndex();
     int length = buf.readInt();

     if (buf.readableBytes() < length) {
        buf.resetReaderIndex();
        return null;
     }

     return buf.readBytes(length);
   }
 }




而ReplayingDecoder则隐藏了这些判断,好像数据已经接收完全了一样:


 public class IntegerHeaderFrameDecoder
      extends ReplayingDecoder<VoidEnum> {

   protected Object decode(ChannelHandlerContext ctx,
                           Channel channel,
                           ChannelBuffer buf,
                           VoidEnum state) throws Exception {
	 int length = buf.readInt();
     return buf.readBytes(length);
   }
 }


可见,ReplayingDecoder使用起来要简单、直观

除此之外,
很多时候frame的结构不会是“length + content”这么简单,如果结构复杂了,
FrameDecoder的if语句就会非常多,且先到达的sub-frame就会被多次decode
ReplayingDecoder允许用户自定义state,表示decode到哪一部分了,从而下
一次数据到达时,直接从nextState开始decode

那ReplayingDecoder这是怎么做到“好像数据已经接收完全了一样”?
事实上它跟FrameDecoder的思路是一样的,只是它在数据接收不完全时,
“悄悄地失败”。这主要是通过ReplayingDecoderBuffer来实现的:

 class ReplayingDecoderBuffer implements ChannelBuffer {

    private static final Error REPLAY = new ReplayError();

    private final ReplayingDecoder<?> parent;
    private boolean terminated;

	//ReplayingDecoder作为参数
    ReplayingDecoderBuffer(ReplayingDecoder<?> parent) {
        this.parent = parent;
    }

	/*ReplayingDecoderBuffer实际上是FrameDecoder里面的cumulation(类型为ChannelBuffer)
	FrameDecoder里面,对数据的累积接收,以及数据的读取,都是在cumulation,
	而ReplayingDecoderBuffer对cumulation进行了封装,在数据不完全时,读取操作都会抛出Error
	*/
    private ChannelBuffer buf() {
        return parent.internalBuffer();
    }
	
	public byte getByte(int index) {
        checkIndex(index);
        return buf().getByte(index);
    }
	
	//数据接收不完全时,抛出Error,在ReplayingDecoder里捕获
	private void checkIndex(int index) {
        if (index > buf().writerIndex()) {
            throw REPLAY;
        }
    }
 
 
 


 public abstract class ReplayingDecoder<T extends Enum<T>>
        extends FrameDecoder {

	//创建了ReplayingDecoderBuffer
    private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(this);
    private T state;	
	
	//标记着下一次数据读取的开始位置,也代表着已经decode到哪一部分了
    private int checkpoint;
	
	protected void checkpoint() {
        ChannelBuffer cumulation = this.cumulation;
        if (cumulation != null) {
            checkpoint = cumulation.readerIndex();
        } else {
            checkpoint = -1; // buffer not available (already cleaned up)
        }
    }
	
    protected void checkpoint(T state) {
        checkpoint();
        setState(state);
    }

	protected T setState(T newState) {
        T oldState = state;
        state = newState;
        return oldState;
    }
	
	//这个方法交由子类实现,子类可以定义更详细、语义更丰富的state
	 protected abstract Object decode(ChannelHandlerContext ctx,
            Channel channel, ChannelBuffer buffer, T state) throws Exception;
			
	//重点在这个方法,注意对ReplayError的处理
	private void callDecode(
            ChannelHandlerContext context, Channel channel,
            ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
        while (input.readable()) {
            int oldReaderIndex = checkpoint = input.readerIndex();
            Object result = null;
            T oldState = state;
            try {
			
				//如果数据接收还不完全,decode方法就会抛出ReplayError
				//因为decode方法会调用replayableInput(类型为ReplayingDecoderBuffer)的getXXX方法
                result = decode(context, channel, replayableInput, state);
                if (result == null) {
					//省略
                }
            } catch (ReplayError replay) {
			
				//数据不充足,回退到checkpoint,下一次messageReceived再试
                // Return to the checkpoint (or oldPosition) and retry.
                int checkpoint = this.checkpoint;
                if (checkpoint >= 0) {
                    input.readerIndex(checkpoint);
                } else {
                    // Called by cleanup() - no need to maintain the readerIndex
                    // anymore because the buffer has been released already.
                }
            }

            if (result == null) {
                // Seems like more data is required.
                // Let us wait for the next notification.
                break;
            }

            // A successful decode
            unfoldAndFireMessageReceived(context, remoteAddress, result);
        }
    }
	
 


ReplayingDecoder的使用,API给出了一个简单的例子:
 
  public class IntegerHeaderFrameDecoder
      extends ReplayingDecoder<MyDecoderState> {

   private int length;

   public IntegerHeaderFrameDecoder() {
     // Set the initial state.
     super(MyDecoderState.READ_LENGTH);
   }

   protected Object decode(ChannelHandlerContext ctx,
                           Channel channel,
                           ChannelBuffer buf,
                           MyDecoderState state) throws Exception {
     switch (state) {
     case READ_LENGTH:
       length = buf.readInt();
	   
	   //设置下一次的decode从哪一个state开始
       checkpoint(MyDecoderState.READ_CONTENT);
     case READ_CONTENT:
       ChannelBuffer frame = buf.readBytes(length);
       checkpoint(MyDecoderState.READ_LENGTH);
       return frame;
     default:
       throw new Error("Shouldn't reach here.");
     }
   }
 }
 
 


注意到上面的switch语句是不需要break的,因为case READ_LENGTH后:
1.如果数据不充足,那就会在buf.readBytes(length)里面抛出ReplayError,相当于有了break语句
2.如果数据充足,那就接着decode,最终成功并 return frame

除了API以外,下面这两篇文章写得非常好:
http://biasedbit.com/netty-tutorial-replaying-decoder/
http://biasedbit.com/an-enhanced-version-of-replayingdecoder-for-netty/

其中第二个例子,文章中给的代码不是很完整,我补全并做了一个简单的测试:
https://github.com/bylijinnan/nettyLearn/tree/master/ljn-netty3-learn/src/main/java/com/ljn/handler/replay





0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics