`
jean7155
  • 浏览: 62918 次
  • 性别: Icon_minigender_2
  • 来自: 上海
社区版块
存档分类
最新评论

Apache Mina: 自定义codec

阅读更多
本例子根据mina自带的例子:sumup改写。

1. 基本原理:
1) 客户端向服务端发送AddMessage对象时,先根据AddMessageEncoder编码, 当服务端接收到AddMessage后,根据自定义的AddMessageDecode解码数据。

2) 服务端数据解码后,生成回复对象ResultMessage,并对该对象通过ResultMessageEncoder进行编码,并发送到客户端。 客户端接收ResultMessage后,根据ResultMessageDecoder解码,并将数据显示出来。

3) AddMessageEncoder,ResultMessageEncoder,AddMessageDecoder,ResultMessageDecoder自定义编码和解码的方法。

2. 代码:
1) 创建AddMessage和ResultMessage对象
AbstractMessage.java
public abstract class AbstractMessage implements Serializable {
	
    private int sequence;

	public int getSequence() {
		return sequence;
	}

	public void setSequence(int sequence) {
		this.sequence = sequence;
	}
}

AddMessage.java
public class AddMessage extends AbstractMessage {

	private static final long serialVersionUID = -735205238699949292L;
	
	private int value;
	
	public AddMessage(){
		
	}
	
	public int getValue() {
		return value;
	}

	public void setValue(int value) {
		this.value = value;
	}

	@Override
	public String toString() {
		return "AddMessage [value=" + value + ", getSequence()="
				+ getSequence() + "]";
	}

}

ResultMessage.java
public class ResultMessage extends AbstractMessage {

	private static final long serialVersionUID = 7431899532938146290L;
	
	private boolean ok;
	private int value;
	
	public ResultMessage(){
		
	}

	public boolean isOk() {
		return ok;
	}

	public void setOk(boolean ok) {
		this.ok = ok;
	}

	public int getValue() {
		return value;
	}

	public void setValue(int value) {
		this.value = value;
	}

	@Override
	public String toString() {
		return "ResultMessage [ok=" + ok + ", value=" + value
				+ ", getSequence()=" + getSequence() + "]";
	}

}


2) 创建AddMessageEncoder和ResultMessageEncoder
AddMessageEncoder.java
public class AddMessageEncoder<T extends AddMessage> implements MessageEncoder<T> {

	@Override
	public void encode(IoSession session, T message, ProtocolEncoderOutput out)
			throws Exception {
		
		IoBuffer buf = IoBuffer.allocate(16);
	    buf.setAutoExpand(true); // Enable auto-expand for easier encoding

	    // Encode a header
	    buf.putInt(message.getSequence());
	    buf.putInt(message.getValue());

	    buf.flip();
	    out.write(buf);
		
	}

}

ResultMessageEncoder.java
public class ResultMessageEncoder<T extends ResultMessage> implements MessageEncoder<T> {

	@Override
	public void encode(IoSession session, T message, ProtocolEncoderOutput out)
			throws Exception {
		
		IoBuffer buf = IoBuffer.allocate(16);
	    buf.setAutoExpand(true); // Enable auto-expand for easier encoding

	    
	    if(message.isOk()){
	    	buf.putShort((short) Constants.RESULT_OK);
	    	buf.putInt(message.getSequence());
	    	buf.putInt(message.getValue());
	    }else{
	    	
	    	buf.putShort((short)Constants.RESULT_ERROR);
	    	buf.putInt(message.getSequence());
	    	buf.putInt(0);
	    }

	    buf.flip();
	    out.write(buf);
		
	}

}


3) 创建AddMessageDecoder和ResultMessageDecoder
AddMessageDecoder.java
public class AddMessageDecoder implements MessageDecoder {
	

	@Override
	public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
		
		if (in.remaining() < 1) {
            return MessageDecoderResult.NEED_DATA;
        }

        // Return NOT_OK if not matches.
        return MessageDecoderResult.OK;
	}

	@Override
	public MessageDecoderResult decode(IoSession session, IoBuffer in,
			ProtocolDecoderOutput out) throws Exception {
		
		if (in.remaining() < 1) {
			return MessageDecoderResult.NEED_DATA;
	    }
		
		int sequence = in.getInt();
		int value = in.getInt();

        AddMessage m = new AddMessage();
        m.setSequence(sequence);
        m.setValue(value);
        
        out.write(m);
        
        return MessageDecoderResult.OK;

        
	}

	@Override
	public void finishDecode(IoSession session, ProtocolDecoderOutput out)
			throws Exception {
		// TODO Auto-generated method stub

	}

}

ResultMessageDecoder.java
public class ResultMessageDecoder implements MessageDecoder {

	@Override
	public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
		
		if (in.remaining() < 1) {
            return MessageDecoderResult.NEED_DATA;
        }
		
		int code = in.getShort();
		
		if(code==Constants.RESULT_OK){
			return MessageDecoderResult.OK;
		}

        // Return NOT_OK if not matches.
        return MessageDecoderResult.NOT_OK;
	}

	@Override
	public MessageDecoderResult decode(IoSession session, IoBuffer in,
			ProtocolDecoderOutput out) throws Exception {
		
		if (in.remaining() < 1) {
			return MessageDecoderResult.NEED_DATA;
	    }
		
		int code = in.getShort();
		int sequence = in.getInt();
		int value = in.getInt();

        ResultMessage m = new ResultMessage();
        if(code==Constants.RESULT_OK){
        	m.setOk(true);
        	m.setSequence(sequence);
            m.setValue(value);
        }else{
        	m.setOk(false);
        }
        	
        out.write(m);
        
        return MessageDecoderResult.OK;

        
	}

	@Override
	public void finishDecode(IoSession session, ProtocolDecoderOutput out)
			throws Exception {
		// TODO Auto-generated method stub

	}

}


4) 创建CodecFactory,注册AddMessageEncoder, ResultMessageEncoder, AddMessageDecoder, ResultMessageDecoder.
public class DemoProtocolCodecFactory extends DemuxingProtocolCodecFactory {
	
	public DemoProtocolCodecFactory(boolean isServer){
		if(isServer){
			super.addMessageEncoder(ResultMessage.class, ResultMessageEncoder.class);
			super.addMessageDecoder(AddMessageDecoder.class);
		}else{
			super.addMessageEncoder(AddMessage.class, AddMessageEncoder.class);
			super.addMessageDecoder(ResultMessageDecoder.class);
		}
	}

}


5) 创建server端服务和业务处理ServerIoHandler
Server.java
public class Server {
	
	public void init() throws IOException{

		IoAcceptor acceptor = new NioSocketAcceptor();
		
		acceptor.getFilterChain().addLast("logger", new LoggingFilter());
		acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new DemoProtocolCodecFactory(Constants.IS_SERVER)));
		
        acceptor.setHandler(new ServerIoHandler());
		acceptor.getSessionConfig().setReadBufferSize(2048);
		acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);

        acceptor.setDefaultLocalAddress(new InetSocketAddress(Constants.PORT));
        acceptor.bind();// 启动监听  

	}
	

	public static void main(String[] args) throws IOException {
		Server server = new Server();
		server.init();
	}

}

ServerIoHandler.java
public class ServerIoHandler extends IoHandlerAdapter {
	
	private static final String SUM_KEY = "sum";

	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		System.out.println(cause.getMessage());
		cause.printStackTrace();
		session.close(true);
	}

	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		AddMessage add = (AddMessage) message;
		
		int sum = ((Integer)session.getAttribute(SUM_KEY)).intValue();
		int value = add.getValue();
		
		long total = (long) sum + value;
		
		if(total>Integer.MAX_VALUE || total<Integer.MIN_VALUE){
			ResultMessage result = new ResultMessage();
			result.setSequence(add.getSequence());
			result.setOk(false);
			session.write(result);
		}else{
			sum = (int) total;
			session.setAttribute(SUM_KEY, sum);
			
			ResultMessage result = new ResultMessage();
			result.setSequence(add.getSequence());
			result.setOk(true);
			result.setValue(sum);
			session.write(result);
		}
		
		//System.out.println("total=" + total);
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status)
			throws Exception {
		session.close(true);
	}

	@Override
	public void sessionOpened(IoSession session) throws Exception {
		session.setAttribute(SUM_KEY, new Integer(0));

	}
}


6) 创建Client端连接和业务处理ClientIoHandler
Client.java
public class Client {
	
	public void init() throws InterruptedException{
		NioSocketConnector connector = new NioSocketConnector();

        // Configure the service.
        connector.setConnectTimeoutMillis(Constants.CONNECT_TIMEOUT);
        connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new DemoProtocolCodecFactory(!Constants.IS_SERVER)));
        connector.getFilterChain().addLast("logger", new LoggingFilter());

        connector.setHandler(new ClientIoHandler());

        IoSession session;
        for (;;) {
            try {
                ConnectFuture future = connector.connect(new InetSocketAddress(Constants.HOSTNAME, Constants.PORT));
                future.awaitUninterruptibly();
                session = future.getSession();
                break;
            } catch (RuntimeIoException e) {
                System.err.println("Failed to connect.");
                e.printStackTrace();
                Thread.sleep(5000);
            }
        }

        // wait until the summation is done
        session.getCloseFuture().awaitUninterruptibly();
        
        connector.dispose();
	}

	/**
	 * @param args
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws InterruptedException {
		Client client = new Client();
		client.init();

	}

}


ClientIoHandler.java
public class ClientIoHandler extends IoHandlerAdapter {
	
	private List<Integer> values = new ArrayList<Integer>();

	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		session.close(true);
	}

	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		
		ResultMessage rm = (ResultMessage) message;
        if (rm.isOk()) {

        	if (rm.getSequence() == values.size() - 1) {
        		System.out.println("the sum is " + rm.getValue());
                session.close(true);
                //finished = true;
            }
        } else {
            System.out.println("Server error, disconnecting...");
            session.close(true);
            //finished = true;
        }
	}

	@Override
	public void sessionOpened(IoSession session) throws Exception {
		
		init();
		
	    for (int i = 0; i < values.size(); i++) {
	    	int _value = ((Integer) values.get(i)).intValue();
	    	
            AddMessage m = new AddMessage();
            m.setSequence(i);
            m.setValue(_value);
            
            session.write(m);
        }
	}

	
	private void init(){
		for(int i=0;i<3;i++){
			int _value = i*100 + 1;
			values.add(new Integer(_value));
		}
	}
}

7. Constants.java
public class Constants {
	public final static int PORT = 9123;
	public static final String HOSTNAME = "localhost";
    public static final long CONNECT_TIMEOUT = 30*1000L; // 30 seconds
	public final static boolean IS_SERVER = true;
    public static final int RESULT_OK = 0;

    public static final int RESULT_ERROR = 1;
}
分享到:
评论

相关推荐

    Apache MINA 2.0 用户指南( 缺第一章节)

    import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class EchoServer { private IoAcceptor acceptor = new ...

    Apache MINA 线程模型配置

    ### Apache MINA线程模型配置详解 #### 一、线程模型配置介绍 Apache MINA 是一个用于构建网络应用程序的高性能、高可靠性的框架。它提供了丰富的功能来简化网络编程,包括TCP/IP 和 UDP/IP 协议的支持。线程模型...

    mina开发手册与mina完全自学手册.rar

    4. **Protocol Codec**:编码和解码是网络通信中的重要环节,Mina提供了ProtocolCodec接口,允许开发者自定义协议的编码和解码规则。 5. **服务端与客户端构建**:学习如何使用Mina API创建服务端和客户端,监听...

    mina自定义编码器-自行做会话累积

    实现这个功能,我们需要创建一个新的编码器类,继承自Mina提供的基础编码器,如`org.apache.mina.filter.codec.ProtocolEncoder`或`org.apache.mina.filter.codec.ProtocolCodecFilter`。 在编码器的实现中,我们...

    ApacheMina入门

    Apache Mina是一个开源项目,由Apache软件基金会维护,它是一个网络通信框架,专注于提供高性能、高度可扩展的网络应用程序开发工具。本篇文章将带你深入理解Apache Mina的基础知识,包括其设计理念、核心组件以及...

    Apache mina框架入门教程

    Apache Mina 框架是一个强大的网络通信应用框架,它主要针对基于TCP/IP和UDP/IP的协议栈,同时也支持Java对象序列化和其他通信方式。Mina 的核心设计目标是帮助开发者快速构建高性能、高可扩展性的网络应用。它采用...

    Apache_Mina_Server_2.0.rar_mina

    4. **过滤器机制**:Mina的过滤器链允许开发者在数据传输过程中添加自定义逻辑,如数据加密、身份验证、日志记录等。过滤器可以按顺序或条件执行,增强了系统的可扩展性和灵活性。 5. **强大的会话管理**:Mina的...

    apache mina详细介绍,适合新手入门mina

    ### Apache Mina 详解 #### 一、Apache Mina 概述 Apache Mina 是一个高性能、可扩展的网络通信框架,它简化了基于 TCP/IP 和 UDP/IP 协议栈的网络应用程序的开发过程。Mina 提供了一个事件驱动、异步的操作模式,...

    Apache Mina

    import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class MyServer { public static void main(String[] args) { ...

    Mina 1.1.7 示例源码(apache.mina.example)

    Apache Mina是一个开源框架,主要用于构建高性能、高可用性的网络通信应用。在Mina 1.1.7版本中,提供的示例源码是学习和理解Mina框架工作原理及其实现各种网络协议的重要资源。这个压缩包"apache.mina.example"包含...

    apache下的mina框架的源码

    Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高度可扩展且高性能的网络应用程序框架,主要用于简化网络编程,尤其是TCP/IP和UDP/IP协议的应用。MINA的设计目标是为开发者提供一个...

    mina入门实例

    Apache Mina是一个开源框架,主要用于简化网络应用的开发,尤其是基于TCP和UDP协议的应用。它提供了高度可扩展的事件驱动模型,使得开发者可以专注于业务逻辑,而无需关注底层的网络通信细节。本入门实例将带你了解...

    mina入门例子

    import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.filter.executor.OrderedThreadPoolExecutor; import org.apache.mina....

    Mina 2.0快速入门与源码解析

    import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class ...

    mina框架的demo 入门,开发

    import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class MinaServer { ...

    mina编解码示例

    在Mina中,编码和解码是通过自定义的`Codec`实现的,它允许开发者将原始字节流转换为有意义的对象,反之亦然。 首先,我们需要理解Mina的核心概念。`Session`代表了网络连接,`Filter`是处理I/O事件的组件,而`...

    Mina2.0完全剖析,完全自学手册

    import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class Demo1Server { private static final org.slf4j.Logger ...

    Mina-2.0.7

    4. **丰富的API**:Mina提供了一套全面的API,包括Filter Chain(过滤器链)、Session(会话)和Protocol Codec(协议编解码器)等接口,使得开发者可以通过组合不同的组件来实现复杂的功能。 5. **模块化设计**:...

Global site tag (gtag.js) - Google Analytics