`
simple1024
  • 浏览: 74314 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Mina自定义协议-实现数据交互

    博客分类:
  • Mina
阅读更多

 

本文主要现实mina的自定义协议,并且实现服务器和客户端的简单数据交互。

 

"mina协议的自定义"可参考本博Mina相关文章。

 

正题,所需要的基础类:

抽象协议类

请求协议

响应协议

(需要定制自己的协议格式)

 

协议编码解码工厂

协议编码

协议解码

 

客户端

客户端Handler

 

服务器

服务器Handler

 

 

/**
 * 消息协议
 * 
 * @author Simple
 * 
 */
public abstract class JAbsMessageProtocal {

  public abstract byte getTag();// 消息协议类型  请求/响应

  public abstract int getLength();// 消息协议数据长度
}

/**
 * 报头:
 *    short tag:请求/响应
 *    int   length:数据长度
 * 报体:
 *    short   methodCode:功能函数
 *    byte    resultCode:结果码
 *    String  content:数据内容
 */
 

 

 

/**
 * 消息协议-请求
 * 
 * @author Simple
 * 
 */
public class JMessageProtocalReq extends JAbsMessageProtocal {

  private short functionCode;// 功能代码

  private String content;// 请求内容

  @Override
  public int getLength() {
    return 2 + (content == null ? 0 : content.getBytes().length);
  }

  @Override
  public byte getTag() {
    return JConstant.REQ;
  }

  public void setFunctionCode(short functionCode) {
    this.functionCode=functionCode;
  }

  public short getFunctionCode() {
    return functionCode;
  }

  public void setContent(String content) {
    this.content=content;
  }

  public String getContent() {
    return content;
  }

  @Override
  public String toString() {
    return "JMessageProtocalReq [content=" + content + ", functionCode=" + functionCode + ", getLength()=" + getLength()
      + ", getTag()=" + getTag() + "]";
  }
}
 

 

 

/**
 * 消息协议-响应
 * 
 * @author Simple
 * 
 */
public class JMessageProtocalRes extends JAbsMessageProtocal {

  private byte resultCode;// 结果码

  private String content;// 响应内容

  @Override
  public int getLength() {
    return 1 + (getContent() == null ? 0 : getContent().getBytes().length);
  }

  @Override
  public byte getTag() {
    return JConstant.RES;
  }

  public void setResultCode(byte resultCode) {
    this.resultCode=resultCode;
  }

  public byte getResultCode() {
    return resultCode;
  }

  public void setContent(String content) {
    this.content=content;
  }

  public String getContent() {
    return content;
  }

  @Override
  public String toString() {
    return "JMessageProtocalRes [content=" + content + ", resultCode=" + resultCode + ", getLength()=" + getLength()
      + ", getTag()=" + getTag() + "]";
  }
}

 

/**
 * JMessageProtocal解码编码工厂
 * 
 * @author Simple
 * 
 */
public class JMessageProtocalCodecFactory implements ProtocolCodecFactory {

  private final JMessageProtocalDecoder decoder;

  private final JMessageProtocalEncoder encoder;

  public JMessageProtocalCodecFactory(Charset charset) {
    this.decoder=new JMessageProtocalDecoder(charset);
    this.encoder=new JMessageProtocalEncoder(charset);
  }

  public ProtocolDecoder getDecoder(IoSession paramIoSession) throws Exception {
    return decoder;
  }

  public ProtocolEncoder getEncoder(IoSession paramIoSession) throws Exception {
    return encoder;
  }
}

 

/**
 * JMessageProtocal解码
 * @author Simple
 *
 */
public class JMessageProtocalDecoder extends ProtocolDecoderAdapter {

  private Logger log=Logger.getLogger(JMessageProtocalDecoder.class);

  private Charset charset;

  public JMessageProtocalDecoder(Charset charset) {
    this.charset=charset;
  }

  /**
   * 解码
   */
  public void decode(IoSession session, IoBuffer buf, ProtocolDecoderOutput out) throws Exception {
    JAbsMessageProtocal absMP=null;
    // 获取协议tag
    byte tag=buf.get();
    // 获取协议体长度
    int length=buf.getInt();
    // 取出协议体
    byte[] bodyData=new byte[length];
    buf.get(bodyData);
    // 为解析数据做准备
    // 检测协议
    IoBuffer tempBuf=IoBuffer.allocate(100).setAutoExpand(true);
    tempBuf.put(tag);
    tempBuf.putInt(length);
    tempBuf.put(bodyData);
    tempBuf.flip();
    if(!canDecode(tempBuf)) {
      return;
    }
    // 协议体buf
    IoBuffer bodyBuf=IoBuffer.allocate(100).setAutoExpand(true);
    bodyBuf.put(bodyData);
    bodyBuf.flip();
    // 整个协议buf
    IoBuffer allBuf=IoBuffer.allocate(100).setAutoExpand(true);
    allBuf.put(tag);
    allBuf.putInt(length);
    allBuf.put(bodyData);
    allBuf.flip();
    //
    if(tag == JConstant.REQ) {
      JMessageProtocalReq req=new JMessageProtocalReq();
      short functionCode=bodyBuf.getShort();
      String content=bodyBuf.getString(charset.newDecoder());
      req.setFunctionCode(functionCode);
      req.setContent(content);
      absMP=req;
    } else if(tag == JConstant.RES) {
      JMessageProtocalRes res=new JMessageProtocalRes();
      byte resultCode=bodyBuf.get();
      String content=bodyBuf.getString(charset.newDecoder());
      res.setResultCode(resultCode);
      res.setContent(content);
      absMP=res;
    } else {
      log.error("未定义的Tag");
    }
    out.write(absMP);
  }

  // 是否可以解码
  private boolean canDecode(IoBuffer buf) {
    int protocalHeadLength=5;// 协议头长度
    int remaining=buf.remaining();
    if(remaining < protocalHeadLength) {
      log.error("错误,协议不完整,协议头长度小于" + protocalHeadLength);
      return false;
    } else {
      log.debug("协议完整");
      // 获取协议tag
      byte tag=buf.get();
      if(tag == JConstant.REQ || tag == JConstant.RES) {
        log.debug("Tag=" + tag);
      } else {
        log.error("错误,未定义的Tag类型");
        return false;
      }
      // 获取协议体长度
      int length=buf.getInt();
      if(buf.remaining() < length) {
        log.error("错误,真实协议体长度小于消息头中取得的值");
        return false;
      } else {
        log.debug("真实协议体长度:" + buf.remaining() + " = 消息头中取得的值:" + length);
      }
    }
    return true;
  }
}

 

/**
 * JMessageProtocal编码
 * @author Simple
 *
 */
public class JMessageProtocalEncoder extends ProtocolEncoderAdapter {

  private Charset charset;

  public JMessageProtocalEncoder(Charset charset) {
    this.charset=charset;
  }

  /**
   * 编码
   */
  public void encode(IoSession session, Object object, ProtocolEncoderOutput out) throws Exception {
    // new buf
    IoBuffer buf=IoBuffer.allocate(2048).setAutoExpand(true);
    // object --> AbsMP
    JAbsMessageProtocal absMp=(JAbsMessageProtocal)object;
    buf.put(absMp.getTag());
    buf.putInt(absMp.getLength());
    if(object instanceof JMessageProtocalReq) {// 请求协议
      JMessageProtocalReq mpReq=(JMessageProtocalReq)object;
      buf.putShort(mpReq.getFunctionCode());
      buf.putString(mpReq.getContent(), charset.newEncoder());
    } else if(object instanceof JMessageProtocalRes) {// 响应协议
      JMessageProtocalRes mpRes=(JMessageProtocalRes)object;
      buf.put(mpRes.getResultCode());
      buf.putString(mpRes.getContent(), charset.newEncoder());
    }
    buf.flip();
    out.write(buf);
  }
}

 

/**
 * MINA 客户端
 * 
 * @author Simple
 * 
 */
public class MainClient {

  @SuppressWarnings("unused")
  private static Logger log=Logger.getLogger(MainClient.class);

  private static final int PORT=9999;

  public static void main(String[] args) {
    NioSocketConnector connector=new NioSocketConnector();
    DefaultIoFilterChainBuilder chain=connector.getFilterChain();
    chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));
    connector.setHandler(new MinaClientHandler());
    connector.setConnectTimeoutMillis(3000);
    ConnectFuture cf=connector.connect(new InetSocketAddress("localhost", PORT));
    cf.awaitUninterruptibly();// 等待连接创建完成
    JMessageProtocalReq req=new JMessageProtocalReq();
    req.setFunctionCode((short)1);
    req.setContent("hello world!!!");
    cf.getSession().write(req);
    cf.getSession().getCloseFuture().awaitUninterruptibly();// 等待连接断开
    connector.dispose();
  }
}

 

/**
 * MINA 客户端消息处理
 * 
 * @author Simple
 * 
 */
public class MinaClientHandler extends IoHandlerAdapter {

  private Logger log=Logger.getLogger(MinaClientHandler.class);

  @Override
  public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
    log.error(String.format("Client产生异常!"));
  }

  @Override
  public void messageReceived(IoSession session, Object message) throws Exception {
    log.debug(String.format("来自Server[%s]的消息:%s", session.getRemoteAddress(), message.toString()));
  }

  @Override
  public void messageSent(IoSession session, Object message) throws Exception {
    log.debug(String.format("向Server[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));
  }

  @Override
  public void sessionClosed(IoSession session) throws Exception {
    log.debug(String.format("与Server[%s]断开连接!", session.getRemoteAddress()));
  }

  @Override
  public void sessionCreated(IoSession session) throws Exception {
    log.debug(String.format("与Server[%s]建立连接!", session.getRemoteAddress()));
  }

  @Override
  public void sessionOpened(IoSession session) throws Exception {
    log.debug(String.format("与Server[%s]打开连接!", session.getRemoteAddress()));
  }

  @Override
  public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
    log.debug(String.format("Client进入空闲状态!"));
  }
}

 

/**
 * MINA 服务器
 * 
 * @author Simple
 * 
 */
public class MainServer {

  private static Logger log=Logger.getLogger(MainServer.class);

  private static final int PORT=9999;

  public static void main(String[] args) throws Exception {
    SocketAcceptor acceptor=new NioSocketAcceptor();// tcp/ip 接收者
    DefaultIoFilterChainBuilder chain=acceptor.getFilterChain();// 过滤管道
    chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));
    acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 读写通道10s内无操作进入空闲状态
    acceptor.setHandler(new MinaServerHandler());// 设置handler
    acceptor.bind(new InetSocketAddress(PORT));// 设置端口
    log.debug(String.format("Server Listing on %s", PORT));
  }
}

 

/**
 * MINA 服务器消息处理
 * 
 * @author Simple
 * 
 */
public class MinaServerHandler extends IoHandlerAdapter {

  private Logger log=Logger.getLogger(MinaServerHandler.class);

  @Override
  public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
    log.error(String.format("Server产生异常!"));
  }

  @Override
  public void messageReceived(IoSession session, Object message) throws Exception {
    log.debug(String.format("来自Client[%s]的消息:%s", session.getRemoteAddress(), message.toString()));
  }

  @Override
  public void messageSent(IoSession session, Object message) throws Exception {
    log.debug(String.format("向Client[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));
  }

  @Override
  public void sessionClosed(IoSession session) throws Exception {
    log.debug(String.format("Client[%s]与Server断开连接!", session.getRemoteAddress()));
  }

  @Override
  public void sessionCreated(IoSession session) throws Exception {
    log.debug(String.format("Client[%s]与Server建立连接!", session.getRemoteAddress()));
  }

  @Override
  public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
    log.debug(String.format("Server进入空闲状态!"));
  }

  @Override
  public void sessionOpened(IoSession session) throws Exception {
    log.debug(String.format("Client[%s]与Server打开连接!", session.getRemoteAddress()));
  }
}

 

 

如果想深入了解mina,我感觉还得去读一下mina的源码,会有很大的收获。

3
2
分享到:
评论
1 楼 fengqingyebai 2015-11-02  
感谢分享谢谢

相关推荐

    Mina自定义协议简单实现

    **Mina自定义协议简单实现** Apache Mina(Minimum Asynchronous Network)是一个开源的网络通信框架,它为Java开发者提供了一种高效、灵活且可扩展的框架,用于构建高性能的网络应用程序,如服务器和客户端应用。...

    Mina自定义协议通信的示例

    总的来说,Mina自定义协议通信示例是一个很好的学习资源,它涵盖了网络编程的核心概念,如事件驱动模型、非阻塞I/O和自定义编解码。通过深入理解并实践这个示例,开发者能更好地掌握Mina框架,并有能力解决复杂网络...

    apache-mina-2.0.7-bin.tar

    4. **过滤器架构**:Mina的过滤器链机制使得在数据传输过程中添加中间处理逻辑变得简单,用户可以自定义过滤器来实现数据的预处理、安全控制、日志记录等功能。 5. **跨平台性**:Apache Mina是用Java编写,因此...

    apache-mina-2.0.21-src.zip

    2. **协议无关性**:Mina支持多种网络协议,如TCP/IP、UDP、SSL/TLS等,并且可以通过简单的API来实现自定义协议。这对于开发跨平台的网络服务非常有用。 3. **过滤器架构**:Mina的过滤器架构是其强大功能的关键。...

    java mina框架全套

    Java Mina框架是一款高度可扩展且高性能的网络应用开发框架,专为开发网络服务和协议处理应用程序而设计。它提供了一种简洁、高效的API,使得开发者可以轻松地创建基于TCP/IP和UDP/IP协议的服务器和客户端应用。Mina...

    socket 与 mina 交互数据

    描述中提到的"熟悉解码协议"是指在使用Mina进行数据交互时,我们需要理解并实现数据的编解码过程。因为网络传输的数据通常是字节流,我们需要将其转换为我们能理解的格式,如JSON、XML或自定义协议。Mina的Filter...

    Mina框架+常用JAR包

    6. **数据交互**:通过过滤器链进行数据编码和解码,处理器接收并处理来自对方的消息。 **JAR包管理:** 为了保持项目的整洁性和避免版本冲突,开发者通常会使用构建工具(如Maven、Gradle)来管理JAR包,它们能...

    Android-MinaSocket一款基于Mina的Socket长连接库

    3. **协议无关性**:Mina库允许开发者轻松实现自定义协议,只需关注业务逻辑,而无需关心底层网络细节。 4. **高性能**:经过优化的I/O处理和事件模型,使得Mina在处理大量并发连接时表现出色。 5. **稳定性**:...

    apache mina 框架实例

    开发者可以通过继承Mina提供的`IoSession`、`ProtocolDecoderOutput`、`ProtocolEncoderOutput`等接口,实现自定义的解码器和编码器来处理这些自定义协议包。 **编码器(Encoder)与解码器(Decoder):** 在Mina中...

    apache-mina源码

    6. **Protocol Buffers**:MINA提供了协议编解码机制,允许开发者自定义协议格式。这使得MINA能灵活地支持各种网络协议,如HTTP、FTP、SMTP等。 在`apache-mina-2.0.16`这个版本中,我们可以看到以下主要内容: - ...

    websocket+java服务器(mina)

    开发者可以自定义过滤器来实现特定功能,如身份验证、数据编码解码等。 3. **ProtocolCodecFactory**:用于将网络数据流转换成应用程序可以理解的对象,以及将应用程序的数据对象转换回网络数据流。 4. **...

    mina 实现简单通讯

    总结来说,这个"mina 实现简单通讯"项目展示了一个基于Apache MINA的网络通信示例,涵盖了服务端和客户端的创建、自定义的协议编解码器、以及必要的网络配置。这为学习和理解MINA框架提供了一个基础平台,并且可以在...

    Mina2.0.7原代码,去掉slf4j代码

    例如,它可以处理TCP/IP协议栈中的HTTP、FTP或其他自定义协议。 4. **Event-driven Model**: Mina基于事件驱动模型,当网络事件发生(如连接建立、数据接收、连接关闭等)时,会触发相应的回调方法。 5. **NIO ...

    Android-基于ApacheMINA进行封装实现AndroidAPP作为服务器客户端进行通讯

    - **ProtocolDecoder** 和 **ProtocolEncoder**:这两个接口用于解码接收到的数据和编码要发送的数据,可以根据实际需求实现特定的协议解析。 2. **在Android中使用MINA** - **配置项目**:首先需要将MINA库导入...

    mina服务器和客服端实现

    - **定义协议编解码器**:创建一个实现ProtocolCodecFactory接口的类,定义数据的编码和解码规则。 3. **Mina客户端实现** - **创建Connector**:客户端使用的是Connector,类似于ServerBootstrap,配置客户端...

    mina初步学习笔记

    - 实现自定义过滤器可以实现编解码、压缩、加密等功能。 - **IoHandler接口** - 处理所有I/O事件的中心接口。 - 实现此接口可定义如何处理会话创建、关闭、消息接收等事件。 #### 四、Mina中的长连接与短连接 ...

    mina框架中socket使用,有服务端和客户端。

    通过运行服务端和客户端,可以观察数据交互的过程,理解MINA如何处理网络事件和数据传输。此外,你还可以尝试修改代码,添加新的功能,进一步提升对MINA的理解。 通过这个项目,开发者不仅可以学习到MINA框架的基础...

    mina及时推送客户端服务端实现

    MINA可以通过TCP长连接来实现这一点,TCP是一种面向连接的、可靠的传输协议,适合用于需要持续交互的数据通信。 2. **长连接**:长连接是指客户端和服务端建立连接后,保持连接状态不关闭,直到应用明确要求断开或...

    mina+spring实现多人聊天室程序

    Mina的Decoder和Encoder将负责将这些协议数据转化为可处理的对象,以便于业务逻辑层进行处理。同时,Spring的AOP可以用来实现消息的安全性和日志记录。 在业务逻辑层,我们可以使用Spring的Service和Repository组件...

Global site tag (gtag.js) - Google Analytics