本文主要现实mina的自定义协议,并且实现服务器和客户端的简单数据交互。
"mina协议的自定义"可参考本博Mina相关文章。
正题,所需要的基础类:
抽象协议类
请求协议
响应协议
(需要定制自己的协议格式)
协议编码解码工厂
协议编码
协议解码
客户端
客户端Handler
服务器
服务器Handler
/**
* 消息协议
*
* @author Simple
*
*/
public abstract class JAbsMessageProtocal {
public abstract byte getTag();// 消息协议类型 请求/响应
public abstract int getLength();// 消息协议数据长度
}
/**
* 报头:
* short tag:请求/响应
* int length:数据长度
* 报体:
* short methodCode:功能函数
* byte resultCode:结果码
* String content:数据内容
*/
/**
* 消息协议-请求
*
* @author Simple
*
*/
public class JMessageProtocalReq extends JAbsMessageProtocal {
private short functionCode;// 功能代码
private String content;// 请求内容
@Override
public int getLength() {
return 2 + (content == null ? 0 : content.getBytes().length);
}
@Override
public byte getTag() {
return JConstant.REQ;
}
public void setFunctionCode(short functionCode) {
this.functionCode=functionCode;
}
public short getFunctionCode() {
return functionCode;
}
public void setContent(String content) {
this.content=content;
}
public String getContent() {
return content;
}
@Override
public String toString() {
return "JMessageProtocalReq [content=" + content + ", functionCode=" + functionCode + ", getLength()=" + getLength()
+ ", getTag()=" + getTag() + "]";
}
}
/**
* 消息协议-响应
*
* @author Simple
*
*/
public class JMessageProtocalRes extends JAbsMessageProtocal {
private byte resultCode;// 结果码
private String content;// 响应内容
@Override
public int getLength() {
return 1 + (getContent() == null ? 0 : getContent().getBytes().length);
}
@Override
public byte getTag() {
return JConstant.RES;
}
public void setResultCode(byte resultCode) {
this.resultCode=resultCode;
}
public byte getResultCode() {
return resultCode;
}
public void setContent(String content) {
this.content=content;
}
public String getContent() {
return content;
}
@Override
public String toString() {
return "JMessageProtocalRes [content=" + content + ", resultCode=" + resultCode + ", getLength()=" + getLength()
+ ", getTag()=" + getTag() + "]";
}
}
/**
* JMessageProtocal解码编码工厂
*
* @author Simple
*
*/
public class JMessageProtocalCodecFactory implements ProtocolCodecFactory {
private final JMessageProtocalDecoder decoder;
private final JMessageProtocalEncoder encoder;
public JMessageProtocalCodecFactory(Charset charset) {
this.decoder=new JMessageProtocalDecoder(charset);
this.encoder=new JMessageProtocalEncoder(charset);
}
public ProtocolDecoder getDecoder(IoSession paramIoSession) throws Exception {
return decoder;
}
public ProtocolEncoder getEncoder(IoSession paramIoSession) throws Exception {
return encoder;
}
}
/**
* JMessageProtocal解码
* @author Simple
*
*/
public class JMessageProtocalDecoder extends ProtocolDecoderAdapter {
private Logger log=Logger.getLogger(JMessageProtocalDecoder.class);
private Charset charset;
public JMessageProtocalDecoder(Charset charset) {
this.charset=charset;
}
/**
* 解码
*/
public void decode(IoSession session, IoBuffer buf, ProtocolDecoderOutput out) throws Exception {
JAbsMessageProtocal absMP=null;
// 获取协议tag
byte tag=buf.get();
// 获取协议体长度
int length=buf.getInt();
// 取出协议体
byte[] bodyData=new byte[length];
buf.get(bodyData);
// 为解析数据做准备
// 检测协议
IoBuffer tempBuf=IoBuffer.allocate(100).setAutoExpand(true);
tempBuf.put(tag);
tempBuf.putInt(length);
tempBuf.put(bodyData);
tempBuf.flip();
if(!canDecode(tempBuf)) {
return;
}
// 协议体buf
IoBuffer bodyBuf=IoBuffer.allocate(100).setAutoExpand(true);
bodyBuf.put(bodyData);
bodyBuf.flip();
// 整个协议buf
IoBuffer allBuf=IoBuffer.allocate(100).setAutoExpand(true);
allBuf.put(tag);
allBuf.putInt(length);
allBuf.put(bodyData);
allBuf.flip();
//
if(tag == JConstant.REQ) {
JMessageProtocalReq req=new JMessageProtocalReq();
short functionCode=bodyBuf.getShort();
String content=bodyBuf.getString(charset.newDecoder());
req.setFunctionCode(functionCode);
req.setContent(content);
absMP=req;
} else if(tag == JConstant.RES) {
JMessageProtocalRes res=new JMessageProtocalRes();
byte resultCode=bodyBuf.get();
String content=bodyBuf.getString(charset.newDecoder());
res.setResultCode(resultCode);
res.setContent(content);
absMP=res;
} else {
log.error("未定义的Tag");
}
out.write(absMP);
}
// 是否可以解码
private boolean canDecode(IoBuffer buf) {
int protocalHeadLength=5;// 协议头长度
int remaining=buf.remaining();
if(remaining < protocalHeadLength) {
log.error("错误,协议不完整,协议头长度小于" + protocalHeadLength);
return false;
} else {
log.debug("协议完整");
// 获取协议tag
byte tag=buf.get();
if(tag == JConstant.REQ || tag == JConstant.RES) {
log.debug("Tag=" + tag);
} else {
log.error("错误,未定义的Tag类型");
return false;
}
// 获取协议体长度
int length=buf.getInt();
if(buf.remaining() < length) {
log.error("错误,真实协议体长度小于消息头中取得的值");
return false;
} else {
log.debug("真实协议体长度:" + buf.remaining() + " = 消息头中取得的值:" + length);
}
}
return true;
}
}
/**
* JMessageProtocal编码
* @author Simple
*
*/
public class JMessageProtocalEncoder extends ProtocolEncoderAdapter {
private Charset charset;
public JMessageProtocalEncoder(Charset charset) {
this.charset=charset;
}
/**
* 编码
*/
public void encode(IoSession session, Object object, ProtocolEncoderOutput out) throws Exception {
// new buf
IoBuffer buf=IoBuffer.allocate(2048).setAutoExpand(true);
// object --> AbsMP
JAbsMessageProtocal absMp=(JAbsMessageProtocal)object;
buf.put(absMp.getTag());
buf.putInt(absMp.getLength());
if(object instanceof JMessageProtocalReq) {// 请求协议
JMessageProtocalReq mpReq=(JMessageProtocalReq)object;
buf.putShort(mpReq.getFunctionCode());
buf.putString(mpReq.getContent(), charset.newEncoder());
} else if(object instanceof JMessageProtocalRes) {// 响应协议
JMessageProtocalRes mpRes=(JMessageProtocalRes)object;
buf.put(mpRes.getResultCode());
buf.putString(mpRes.getContent(), charset.newEncoder());
}
buf.flip();
out.write(buf);
}
}
/**
* MINA 客户端
*
* @author Simple
*
*/
public class MainClient {
@SuppressWarnings("unused")
private static Logger log=Logger.getLogger(MainClient.class);
private static final int PORT=9999;
public static void main(String[] args) {
NioSocketConnector connector=new NioSocketConnector();
DefaultIoFilterChainBuilder chain=connector.getFilterChain();
chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));
connector.setHandler(new MinaClientHandler());
connector.setConnectTimeoutMillis(3000);
ConnectFuture cf=connector.connect(new InetSocketAddress("localhost", PORT));
cf.awaitUninterruptibly();// 等待连接创建完成
JMessageProtocalReq req=new JMessageProtocalReq();
req.setFunctionCode((short)1);
req.setContent("hello world!!!");
cf.getSession().write(req);
cf.getSession().getCloseFuture().awaitUninterruptibly();// 等待连接断开
connector.dispose();
}
}
/**
* MINA 客户端消息处理
*
* @author Simple
*
*/
public class MinaClientHandler extends IoHandlerAdapter {
private Logger log=Logger.getLogger(MinaClientHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
log.error(String.format("Client产生异常!"));
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
log.debug(String.format("来自Server[%s]的消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.debug(String.format("向Server[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void sessionClosed(IoSession session) throws Exception {
log.debug(String.format("与Server[%s]断开连接!", session.getRemoteAddress()));
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.debug(String.format("与Server[%s]建立连接!", session.getRemoteAddress()));
}
@Override
public void sessionOpened(IoSession session) throws Exception {
log.debug(String.format("与Server[%s]打开连接!", session.getRemoteAddress()));
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
log.debug(String.format("Client进入空闲状态!"));
}
}
/**
* MINA 服务器
*
* @author Simple
*
*/
public class MainServer {
private static Logger log=Logger.getLogger(MainServer.class);
private static final int PORT=9999;
public static void main(String[] args) throws Exception {
SocketAcceptor acceptor=new NioSocketAcceptor();// tcp/ip 接收者
DefaultIoFilterChainBuilder chain=acceptor.getFilterChain();// 过滤管道
chain.addLast("ProtocolCodecFilter", new ProtocolCodecFilter(new JMessageProtocalCodecFactory(Charset.forName("UTF-8"))));
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);// 读写通道10s内无操作进入空闲状态
acceptor.setHandler(new MinaServerHandler());// 设置handler
acceptor.bind(new InetSocketAddress(PORT));// 设置端口
log.debug(String.format("Server Listing on %s", PORT));
}
}
/**
* MINA 服务器消息处理
*
* @author Simple
*
*/
public class MinaServerHandler extends IoHandlerAdapter {
private Logger log=Logger.getLogger(MinaServerHandler.class);
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
log.error(String.format("Server产生异常!"));
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
log.debug(String.format("来自Client[%s]的消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
log.debug(String.format("向Client[%s]发送消息:%s", session.getRemoteAddress(), message.toString()));
}
@Override
public void sessionClosed(IoSession session) throws Exception {
log.debug(String.format("Client[%s]与Server断开连接!", session.getRemoteAddress()));
}
@Override
public void sessionCreated(IoSession session) throws Exception {
log.debug(String.format("Client[%s]与Server建立连接!", session.getRemoteAddress()));
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
log.debug(String.format("Server进入空闲状态!"));
}
@Override
public void sessionOpened(IoSession session) throws Exception {
log.debug(String.format("Client[%s]与Server打开连接!", session.getRemoteAddress()));
}
}
如果想深入了解mina,我感觉还得去读一下mina的源码,会有很大的收获。
分享到:
相关推荐
**Mina自定义协议简单实现** Apache Mina(Minimum Asynchronous Network)是一个开源的网络通信框架,它为Java开发者提供了一种高效、灵活且可扩展的框架,用于构建高性能的网络应用程序,如服务器和客户端应用。...
总的来说,Mina自定义协议通信示例是一个很好的学习资源,它涵盖了网络编程的核心概念,如事件驱动模型、非阻塞I/O和自定义编解码。通过深入理解并实践这个示例,开发者能更好地掌握Mina框架,并有能力解决复杂网络...
4. **过滤器架构**:Mina的过滤器链机制使得在数据传输过程中添加中间处理逻辑变得简单,用户可以自定义过滤器来实现数据的预处理、安全控制、日志记录等功能。 5. **跨平台性**:Apache Mina是用Java编写,因此...
2. **协议无关性**:Mina支持多种网络协议,如TCP/IP、UDP、SSL/TLS等,并且可以通过简单的API来实现自定义协议。这对于开发跨平台的网络服务非常有用。 3. **过滤器架构**:Mina的过滤器架构是其强大功能的关键。...
Java Mina框架是一款高度可扩展且高性能的网络应用开发框架,专为开发网络服务和协议处理应用程序而设计。它提供了一种简洁、高效的API,使得开发者可以轻松地创建基于TCP/IP和UDP/IP协议的服务器和客户端应用。Mina...
描述中提到的"熟悉解码协议"是指在使用Mina进行数据交互时,我们需要理解并实现数据的编解码过程。因为网络传输的数据通常是字节流,我们需要将其转换为我们能理解的格式,如JSON、XML或自定义协议。Mina的Filter...
6. **数据交互**:通过过滤器链进行数据编码和解码,处理器接收并处理来自对方的消息。 **JAR包管理:** 为了保持项目的整洁性和避免版本冲突,开发者通常会使用构建工具(如Maven、Gradle)来管理JAR包,它们能...
3. **协议无关性**:Mina库允许开发者轻松实现自定义协议,只需关注业务逻辑,而无需关心底层网络细节。 4. **高性能**:经过优化的I/O处理和事件模型,使得Mina在处理大量并发连接时表现出色。 5. **稳定性**:...
开发者可以通过继承Mina提供的`IoSession`、`ProtocolDecoderOutput`、`ProtocolEncoderOutput`等接口,实现自定义的解码器和编码器来处理这些自定义协议包。 **编码器(Encoder)与解码器(Decoder):** 在Mina中...
6. **Protocol Buffers**:MINA提供了协议编解码机制,允许开发者自定义协议格式。这使得MINA能灵活地支持各种网络协议,如HTTP、FTP、SMTP等。 在`apache-mina-2.0.16`这个版本中,我们可以看到以下主要内容: - ...
总结来说,这个"mina 实现简单通讯"项目展示了一个基于Apache MINA的网络通信示例,涵盖了服务端和客户端的创建、自定义的协议编解码器、以及必要的网络配置。这为学习和理解MINA框架提供了一个基础平台,并且可以在...
- **长连接**:指客户端与服务端建立连接后保持连接状态,进行多次数据交互,适用于需要频繁通信的应用场景。 - **短连接**:每次通信结束后即断开连接,下次通信重新建立,适用于通信频率低或数据量小的场景。 ###...
例如,它可以处理TCP/IP协议栈中的HTTP、FTP或其他自定义协议。 4. **Event-driven Model**: Mina基于事件驱动模型,当网络事件发生(如连接建立、数据接收、连接关闭等)时,会触发相应的回调方法。 5. **NIO ...
开发者可以自定义过滤器来实现特定功能,如身份验证、数据编码解码等。 3. **ProtocolCodecFactory**:用于将网络数据流转换成应用程序可以理解的对象,以及将应用程序的数据对象转换回网络数据流。 4. **...
- **ProtocolDecoder** 和 **ProtocolEncoder**:这两个接口用于解码接收到的数据和编码要发送的数据,可以根据实际需求实现特定的协议解析。 2. **在Android中使用MINA** - **配置项目**:首先需要将MINA库导入...
- **定义协议编解码器**:创建一个实现ProtocolCodecFactory接口的类,定义数据的编码和解码规则。 3. **Mina客户端实现** - **创建Connector**:客户端使用的是Connector,类似于ServerBootstrap,配置客户端...
- 实现自定义过滤器可以实现编解码、压缩、加密等功能。 - **IoHandler接口** - 处理所有I/O事件的中心接口。 - 实现此接口可定义如何处理会话创建、关闭、消息接收等事件。 #### 四、Mina中的长连接与短连接 ...
通过运行服务端和客户端,可以观察数据交互的过程,理解MINA如何处理网络事件和数据传输。此外,你还可以尝试修改代码,添加新的功能,进一步提升对MINA的理解。 通过这个项目,开发者不仅可以学习到MINA框架的基础...
MINA可以通过TCP长连接来实现这一点,TCP是一种面向连接的、可靠的传输协议,适合用于需要持续交互的数据通信。 2. **长连接**:长连接是指客户端和服务端建立连接后,保持连接状态不关闭,直到应用明确要求断开或...
Mina的Decoder和Encoder将负责将这些协议数据转化为可处理的对象,以便于业务逻辑层进行处理。同时,Spring的AOP可以用来实现消息的安全性和日志记录。 在业务逻辑层,我们可以使用Spring的Service和Repository组件...