一、Mina对编解码的支持
我们知道网络通讯过程实际是对二进制数据进行处理的过程,二进制数据是计算机认识的数据。对于接收到的二进制数据我们需要将其转换成我们所熟悉的数据格式,此过程称为解码(decode);对于所要发送的数据,我们需要转换为计算机所能处理的二进制数据,此过程称为编码(encode)。
Mina对数据的编解码提供了良好的支持,它提供了过滤器ProtocolCodecFilter支持编码和解码过程,可以查看包org.apache.mina.filter.codec下的代码。
看下此过滤器的调用,代码很简单:
// 加入编解码过滤器
chain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));
实现原理
ProtocolCodecFilter包含一个编码器和解码器工厂:
//编码器和解码器工厂
private final ProtocolCodecFactory factory;
此工厂可以通过构造方法传入,具体构造方法可以具体看源码,比较简单,此处不做详细介绍。
主要看下解码和编码过程,解码应该是消息接收到,我们程序对消息进行处理时进行的,此时我们想到ProtocolCodecFilter应该覆盖messageReceived方法。编码应该是发送消息时,需要将我们的业务数据结构转换为二进制数据,此时我们想到ProtocolCodecFilter应该覆盖filterWrite方法。
解码过程
前面已经说了,解码过程就是将二进制数据转换为我们可以识别的数据结构,所以messageReceived方法一开始就有个判断:
//对于解码,消息类型必须是IoBuffer类型的,如果不是,转向下个filter
if (!(message instanceof IoBuffer)) {
nextFilter.messageReceived(session, message);
return;
}
解码的核心操作:
//处理消息,如果buffer中还有数据,就处理数据
while (in.hasRemaining()) {
int oldPos = in.position();
try {
synchronized (decoderOut) {
//进行解码操作。
decoder.decode(session, in, decoderOut);
}
…………
}
我们需要实现解码器ProtocolDecoder接口,主要实现解码方法decode。可以参考TextLineDecoder类的实现,下面代码是本人实际项目中的实现:
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
final int packHeadLength = 2;
// 先获取上次的处理上下文,其中可能有未处理完的数据
Context ctx = getContext(session);
// 先把当前buffer中的数据追加到Context的buffer当中
ctx.append(in);
// 把position指向0位置,把limit指向原来的position位置
IoBuffer buf = ctx.getBuffer();
buf.flip();
// 当前剩余长度大于2
while (buf.remaining() >= packHeadLength) {
buf.mark();
if (ByteConvertUtil.toHex(buf.get()).equalsIgnoreCase("eb")) {
if (ByteConvertUtil.toHex(buf.get()).equalsIgnoreCase("93")) {
buf.reset();
if(buf.remaining()<11){
break;
}
byte[] dataArray = new byte[11];
buf.get(dataArray, 0, 11);
if (SensorData.checkData(dataArray)) {
SensorData data = new SensorData(dataArray);
out.write(data);
// 回应客户端
byte[] b = new byte[2];
b[0] = ByteConvertUtil.uniteBytes("eb");
b[1] = ByteConvertUtil.uniteBytes("93");
session.write(IoBuffer.wrap(b));
}
} else {
continue;
}
} else {
continue;
}
}
//断包处理,将剩余数据放入CONTEXT中
if (buf.hasRemaining()) {
IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);
temp.put(buf);
temp.flip();
buf.clear();
buf.put(temp);
} else {
buf.clear();
}
}
顺便说下,我们最好要把我们的的数据包的格式提前定义好,了解了数据包的格式我们才能更好的进行数据的编解码。定义好数据包格式一方面方便编解码,另一方面可以解决下面要说的粘包和断包的问题。
数据包的定义有很多种方式,这里说下我所用过的两种方式:
1.固定消息长度,消息头+消息体+校验码。此方式相对简单,表示的内容也比较少。
2.不定消息长度,消息头+消息长度+消息体。此方式可以无限消息长度,比较灵活。
解码出一个消息体后,需要将数据通过ProtocolDecoderOutput的write方法写入到队列(queue)里面去:
public void write(Object message) {
if (message == null) {
throw new IllegalArgumentException("message");
}
//将消息写入队列
messageQueue.add(message);
}
真正执行消息向下传递是通过flush方法:
public void flush(NextFilter nextFilter, IoSession session) {
Queue<Object> messageQueue = getMessageQueue();
// 取出队列里面的消息向下传递
while (!messageQueue.isEmpty()) {
nextFilter.messageReceived(session, messageQueue.poll());
}
}
编码过程
看了上面的解码过程,编码过程就不难理解了,编码过程只不过是解码过程的逆向过程,同样在filterWrite方法里有消息类型的判断:
//消息如果已经是IoBuffer,就不需要再进行编码
if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
nextFilter.filterWrite(session, writeRequest);
return;
}
编码:
// 进行数据编码
encoder.encode(session, message, encoderOut);
此处编码实现可以参考TextLineEncoder的编码实现,比较简单,此处就不多做解释了。
同样编码也是通过write到一个队列中,然后通过flush写入到后面的过滤器中的。
二、Mina对粘包和断包的处理
上面说了mina对编解码的支持,在解码过程中,不得不面对的一个问题就是TCP的粘包和断包,先说下什么是粘包和断包。
TCP通讯是面向数据流的通讯,我们将数据流理解为一支竹竿,数据包就相当于竹竿中的每一节,那么我们的解码过程就相当于对竹竿进行分解的过程。竹竿就是多个数据包的“粘包”,断包就是指竹节中间断开,我们需要将它拼接成为一个完整的竹节,如果不能拼接起来就要废弃这部分。
粘包:
断包:
对粘包的处理相对比较简单,只需要依据数据包的格式进行数据流的分割即可;对于断包的处理我们需要将断包的数据保存起来,等待接收下次的数据进行拼接。
通常情况下我们要考虑粘包和断包同时出现的情况下的解码代码编写。有两种实现方式:
1.继承CumulativeProtocolDecoder类,实现doDecode方法。
2.实现ProtocolDecoder接口,自己解决粘包和断包的问题。
先看下CumulativeProtocolDecoder的实现。
它有一个成员变量BUFFER:
//存放断包数据
private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");
doDecode方法一方面判断数据包是否符合解码要求(数据包可能过短,数据包格式不合要求都可能不能通过解码要求),不符合刚返回false;另一方面对于符合解码要求的数据进行数据解码,并返回true。可以参考ImageRequestDecoder类的实现。
看下它的decode方法实现:
public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
if (!session.getTransportMetadata().hasFragmentation()) {
while (in.hasRemaining()) {
// 判断是否符合解码要求,不符合则中断并返回
if (!doDecode(session, in, out)) {
break;
}
}
return;
}
boolean usingSessionBuffer = true;
// 取得上次断包数据
IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
// If we have a session buffer, append data to that; otherwise
// use the buffer read from the network directly.
if (buf != null) { // 如果有断包数据
boolean appended = false;
// Make sure that the buffer is auto-expanded.
if (buf.isAutoExpand()) {
try {
// 将断包数据和当前传入的数据进行拼接
buf.put(in);
appended = true;
} catch (IllegalStateException e) {
// A user called derivation method (e.g. slice()),
// which disables auto-expansion of the parent buffer.
} catch (IndexOutOfBoundsException e) {
// A user disabled auto-expansion.
}
}
if (appended) {
buf.flip();// 如果是拼接的数据,将buf置为读模式
} else {
// Reallocate the buffer if append operation failed due to
// derivation or disabled auto-expansion.
//如果buf不是可自动扩展的buffer,刚通过数据拷贝的方式将断包数据和当前数据进行拼接
buf.flip();
IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);
newBuf.order(buf.order());
newBuf.put(buf);
newBuf.put(in);
newBuf.flip();
buf = newBuf;
// Update the session attribute.
session.setAttribute(BUFFER, buf);
}
} else {
buf = in;
usingSessionBuffer = false;
}
for (;;) {
int oldPos = buf.position();
boolean decoded = doDecode(session, buf, out);// 进行数据的解码操作
if (decoded) {
// 如果符合解码要求并进行了解码操作,
// 则当前position和解码前的position不可能一样
if (buf.position() == oldPos) {
throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");
}
// 如果已经没有数据,则退出循环
if (!buf.hasRemaining()) {
break;
}
} else {// 如果不符合解码要求,则退出循环
break;
}
}
// if there is any data left that cannot be decoded, we store
// it in a buffer in the session and next time this decoder is
// invoked the session buffer gets appended to
if (buf.hasRemaining()) {
if (usingSessionBuffer && buf.isAutoExpand()) {
buf.compact();
} else {
//如果还有没处理完的数据(一般为断包),刚将此数据存入session中,以便和下次数据进行拼接。
storeRemainingInSession(buf, session);
}
} else {
if (usingSessionBuffer) {
removeSessionBuffer(session);
}
}
}
上面的处理过程可以这样理解:
1.取得断包数据,如果有断包数据,就和当前数据拼接。
2.进行数据解码操作。
3.将可以进行解码操作的数据解码完成后,如果还有数据,则将剩余数据存入session中,等待下次数据到来,从步骤1开始再次执行。
通过继承ProtocolDecoder,实现decode方法,自己处理粘包和断包的方式其实和CumulativeProtocolDecoder类的实现原理是类似的,此处实现可以参考类TextLineDecoder,内部类Context保存了上下文信息,同样是保存在了sesion中的,具体实现方式大家可以仔细阅读代码。
三、总结
基于TCP的通讯协议才有可能产生粘包和断包的情况,粘包和断包的产生有多种原因,处理好粘包和断包的问题是网络编程必然面对的情况,对于这块的处理,大家如果有什么好的想法可以一起讨论。
每天进步一点点,不做无为的码农。。。。。
2012年6月17日星期日
码农虎虎
http://weibo.com/hurtigf
http://www.lifanghu.com/
wslfh2005@163.com
相关推荐
总的来说,理解和处理Mina中的断包和粘包问题,需要深入理解网络通信协议、数据编码解码原理以及Mina框架的工作机制。通过分析Android-Mina-master项目中的代码,我们可以学习到如何在实际应用中解决这些问题,提高...
总的来说,Apache Mina为解决网络通信中的断包和粘包问题提供了强大的工具。通过理解和应用mina_optimize代码,你可以更好地掌握如何在实际项目中规避这些问题,提升网络通信的稳定性和可靠性。在实际开发中,结合...
在这个"apache-mina源码"中,我们可以深入理解MINA的设计原理和实现细节。 MINA的核心概念包括: 1. **IoSession**:IoSession是MINA中的核心组件,代表了服务端和客户端之间的连接。它包含了会话的状态信息,如...
本文将深入探讨mina编解码器的工作原理,提供一个典型编解码器的示例,并解析其代码。 1. **mina框架基础** - Mina提供了一个高效的、事件驱动的网络应用程序框架,简化了网络编程,尤其是TCP和UDP通信。 - 它...
4. **ProtocolCodec**:为了处理不同协议的数据编码和解码,Mina提供了ProtocolCodec接口。你可以自定义实现,以适应你的特定协议需求。 5. **Executor**:Mina使用Executor服务来管理和调度任务,确保异步操作的...
apache-mina-2.0.4 架包 源码 学习教程.apache mina是Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序...
深入理解Apache_Mina_(4)----_IoFilter和IoHandler的区别和联系 深入理解Apache_Mina_(5)----_配置Mina的线程模型 深入理解Apache_Mina_(6)----_Java_Nio_ByteBuffer与Mina_ByteBuffer的区别(类图) 相信你们也愿意去...
mina编解码示例是基于Apache Mina框架的一个实践项目,主要展示了如何在Mina中进行数据的编码和解码操作。Apache Mina是一个高度可扩展的网络通信框架,广泛应用于开发高性能、高并发的网络应用程序,如TCP/IP和UDP...
深入研究"apache-mina-2.0.4"源码,我们可以学习到MINA如何实现非阻塞I/O,过滤器链的构建和事件传播机制,以及如何定制和扩展MINA以满足特定需求。例如,可以查看IoSession的实现,了解其如何管理会话状态;研究...
这个示例集合是为了学习和交流而准备的,开发者可以通过分析源码了解Mina如何处理网络通信,如何构建过滤器链,以及如何实现自定义的协议编码解码器。同时,这些示例也提供了实际操作的机会,便于开发者在遇到问题...
3. **Protocol Codec**:Apache Mina提供了多种编码器和解码器,如ByteToMessageDecoder和MessageToByteEncoder,用于处理不同协议的数据格式转换。 4. **NIO(Non-blocking I/O)**:Mina利用Java的NIO库,实现高效...
Apache MINA...总之,Apache MINA 2.0.7版的二进制包和源码提供了全面的工具集,帮助开发者构建高效、灵活且可扩展的网络应用程序。无论是初学者还是经验丰富的开发者,都能从中受益,提升网络编程的能力。
Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)
通过深入学习和实践这个Apache Mina入门Demo,你将掌握如何利用Mina构建网络应用,并了解其核心特性和工作原理,这对于从事Java网络编程或者需要处理大规模并发连接的开发者来说是非常有价值的。
最近一直在看 Mina 的源码,用了 Mina 这么长时间,说实话,现在才开始对 Mina 有了一 些 深刻的理解,关于 Mina 的基本知识的介绍,这里就不多说了,网上已经有很多不错的文 章 都对 Mina 做了较深刻的剖析,现在...
Apache MINA(Multipurpose Infrastructure for Network Applications)是一个Java框架,用于构建高性能、...在实际项目中,配合源码阅读和测试,能更好地理解和掌握MINA FtpServer的工作原理及其在复杂环境中的应用。
3. **Mina2源码分析**(Mina2源码分析.doc):源码分析文档通常由经验丰富的开发者编写,通过深入剖析MINA的源代码,揭示其内部工作原理,帮助开发者理解MINA如何实现非阻塞I/O,以及如何高效地处理网络连接和数据...
过滤器(Filter)是Mina的另一个重要组件,它们按照预定义的顺序对数据进行处理,如解码、编码、安全检查等,极大地简化了网络应用的开发。 **4. 丰富的协议支持** Apache Mina支持多种网络协议,如TCP、UDP、SSL/...