最近在为公司做一个消息中心的项目,项目中使用了Apache的MINA 1.7的版本,为消息中心做通讯接口。
[size=13px; line-height: 20px; ][size=x-small;] Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序,MINA 所支持的功能也在进一步的扩展中。[/size][/size]
使用MINA组件确实可以提高通讯的并发性能以及可靠性,在1.7版本中,MINA封装了JDK中的ByteBuffer,虽然调用上方便了不少,但是如果不正当的使用它,容易引起性能上的问题,甚至会出现Out of Memory的问题。这次我就遇到了Out of Memory的问题,后来经过我的仔细分析和调试,终于发现了这条臭虫。
首先来看一下如何搭建MINA的TCP的服务端:
// output log to console
org.apache.log4j.BasicConfigurator.configure();
acceptor = new SocketAcceptor();
// Create a service configuration
SocketAcceptorConfig cfg = new SocketAcceptorConfig();
cfg.setReuseAddress(true);
// add a protocol codec, the protocol is wrapper from 0x00 to 0xff, and between 0x00 and 0xff is a message.
cfg.getFilterChain().addLast(
"protocolFilter",
new ProtocolCodecFilter(
new WrapperTagProtocolCodecFactory()));
// add logger filter
cfg.getFilterChain().addLast("logger", new LoggingFilter());
acceptor
.bind(new InetSocketAddress(PORT), new IoHandlerAdapter(){
@Override
public void exceptionCaught(IoSession session, Throwable cause)
throws Exception {
// do something...
}
@Override
public void messageReceived(IoSession session, Object message)
throws Exception {
session.write("response something").join();
}
}, cfg);
这段代码只是作为测试用例而写的,从代码中我们可以看到,代码中创建了SocketAcceptor对象,并且使用了自定义的ProtocolCodecFilter,这个通讯协议比较简单,协议格式为"<0x00><Message><0xff>",传输的消息被0x00和0xff两个字节包裹着。那么读取这条时,当读到0x00时开始,读取到0xff结束。当未读取到0x00时而接收到了一些数据,这些读取到的数据将被忽略。WrapperTagProtocolCodecFactory是一个工厂类,他将负责创建解码对象和编码对象,解码对象负责解释读取中的协议数据,并将协议数据转换为消息。而编码对象是将消息转换为协议数据。
Socket通讯是一个不可靠的消息传输通道,网络上传输的数据,并不是一次性就从客户端送到服务端,或者是服务端送到客户端,而是通过数据链路层,将数据送过来,这样会产生数据将有可能通过多次送到目的地。而MINA中的组件使用了ByteBuffer对象作为数据接收缓存,所以需要制定合理的通讯协议,来确保接收端收到的数据是完整的,可靠的。
当然如上的协议并不能完全的避免数据传输所产生的问题,因为还有可能产生数据丢包等其他的问题,但这个协议适合数据流小的消息。
当Socket通道的消息经过了协议的过滤器之后,还将经过LoggingFilter的过滤,这里实际上就是将收到的消息记录到Logger中。之后,将送给IoHandler处理,上面的代码通过匿名类继承了IoHandlerAdapter,为了方便测试,这里只重载了messageReceived方法。当MINA收到消息之后,立即发送回应的数据到客户端。
接下来,我们看看WrapperTagProtocolCodecFactory创建的两个对象:
public class WrapperTagProtocolCodecFactory extends
DemuxingProtocolCodecFactory {
public WrapperTagProtocolCodecFactory(){
super.register(WrapperTagDecoder.class);
super.register(WrapperTagEncoder.class);
}
public WrapperTagProtocolCodecFactory(String charset){
super.register(new WrapperTagDecoder(charset));
super.register(new WrapperTagEncoder(charset));
}
}
此工厂创建了WrapperTagDecoder(解码类)和WrapperTagEncoder(编码类)两个对象。
WrapperTagDecoder
public class WrapperTagDecoder extends MessageDecoderAdapter {
private String charset = "gb2312";
private Logger logger = Logger.getLogger(WrapperTagDecoder.class);
public WrapperTagDecoder(String charset){
this.charset = charset;
}
public WrapperTagDecoder(){
}
public MessageDecoderResult decodable(IoSession session, ByteBuffer in) {
return MessageDecoderResult.OK;
}
public MessageDecoderResult decode(IoSession session, ByteBuffer in,
ProtocolDecoderOutput out) throws Exception {
InputStream stream = in.asInputStream();
int head = stream.read();
if(head == -1){
throw new IOException("read -1 byte");
}
logger.info("Read head 0x00.");
int b = -1;
ByteArrayOutputStream os = new ByteArrayOutputStream();
while((b = stream.read()) != 0xff){
os.write(b);
}
logger.info("Read rear 0xff.");
String message = new String(os.toByteArray(), charset);
out.write(message);
logger.debug("Read message:" + message + ", charset:" + charset);
return MessageDecoderResult.OK;
}
}
此类继承了MessageDecoderAdapter类,待会我们来看这个类的问题。
WrapperTagEncoder
public class WrapperTagEncoder implements MessageEncoder {
private static final Set<Class<?>> TYPES;
private String charset = "gb2312";
static {
Set<Class<?>> types = new HashSet<Class<?>>();
types.add(String.class);
TYPES = Collections.unmodifiableSet(types);
}
public WrapperTagEncoder(String charset){
this.charset = charset;
}
public WrapperTagEncoder(){
}
public Set<Class<?>> getMessageTypes() {
return TYPES;
}
public void encode(IoSession session, Object message,
ProtocolEncoderOutput out) throws Exception {
ByteBuffer buf = ByteBuffer.allocate(256);
// Enable auto-expand for easier encoding
buf.setAutoExpand(true);
buf.put((byte)0x00);
buf.put(message.toString().getBytes(charset));
buf.put((byte)0xff);
buf.flip();
out.write(buf);
}
从实现上来看,WrapperTagEncoder比WrapperTagDecoder要来简单一些。WrapperTagEncoder只需要负责写就行了,而不需要控制多段数据的问题。那么上述的WrapperTagDecoder会出现什么问题呢?
MINA的Socket是基于Java的NIO而实现的非阻塞式的通讯,从WrapperTagDecoder代码我们看到ByteBuffer中提供了一个方法asInputStream,也就是将ByteBuffer转换为输入流,这个倒没什么问题,问题是在于读取的时候,使用了阻塞式的调用方法,stream.read()。如果接收的数据大于了ByteBuffer预分配的容量时,会报出:
org.apache.camel.CamelException: org.apache.mina.filter.codec.ProtocolDecoderException: java.lang.OutOfMemoryError: Java heap space (Hexdump: .....一堆ByteBuffer已读取到的数据,16进制显示)
据我分析,ByteBuffer会预先分配一定容量的堆空间,MINA的ByteBuffer应该是直接初始化了JVM底层中的堆空间,ByteBuffer默认的堆空间应该是1024byte,从ByteBuffer的asInputStream的InputStream的实现代码中可以看到,read方法实际上调用的是get()方法,如下:
@Override
public int read() {
if (ByteBuffer.this.hasRemaining()) {
return ByteBuffer.this.get() & 0xff;
} else {
return -1;
}
}
@Override
public int read(byte[] b, int off, int len) {
int remaining = ByteBuffer.this.remaining();
if (remaining > 0) {
int readBytes = Math.min(remaining, len);
ByteBuffer.this.get(b, off, readBytes);
return readBytes;
} else {
return -1;
}
}
实际上MINA的底层实现中,他会先将已读取到缓存中的ByteBuffer先扔出来给Filter,如果此时ByteBuffer已满了,那么就需要MINA会扩展ByteBuffer的容量,在扩展的过程中,就不应该再去调用任何有关get或者read的方法,由于实现中,不断的在使用read循环读取ByteBuffer中的数据,这样效率降低了,而且容易导致阻塞,本身Filter的生命周期的时间是有限的,这样会扰乱MINA本身的处理机制。如果继续想使用read方法获取数据,应该使用read(byte[] b),以块的形式读取,如果返回了-1,那么直接就返回出去,并将已读到消息,缓存起来,直到读取到0xff时,才清除掉缓存,并送到ProtocolDecoderOutput对象中,接着他就会交给IoHandler进行处理。下面是改良后的WrapperTagDecoder。
public class WrapperTagDecoder extends MessageDecoderAdapter {
private String charset = "gb2312";
private Logger logger = Logger.getLogger(WrapperTagDecoder.class);
private boolean readRear = false;
private boolean readHead = false;
private int receivedBufferSize = 1024;
private ByteArrayOutputStream os = new ByteArrayOutputStream();
public WrapperTagDecoder(String charset){
this.charset = charset;
}
public int getReceivedBufferSize() {
return receivedBufferSize;
}
public void setReceivedBufferSize(int receivedBufferSize) {
this.receivedBufferSize = receivedBufferSize;
}
public WrapperTagDecoder(){
}
public MessageDecoderResult decodable(IoSession session, ByteBuffer in) {
return MessageDecoderResult.OK;
}
public MessageDecoderResult decode(IoSession session, ByteBuffer in,
ProtocolDecoderOutput out) throws Exception {
InputStream stream = in.asInputStream();
if(!readHead){
int head = stream.read();
if(head == -1){
throw new IOException("read -1 byte");
}
readHead = true;
logger.info("Read head 0x00.");
}
byte[] buffer = new byte[receivedBufferSize];
int length = stream.read(buffer);
if(length == -1)
return MessageDecoderResult.OK;
if(buffer[length - 1] == -1){
readRear = true;
logger.info("Read rear 0xff.");
os.write(buffer, 0, length - 1);
}else{
os.write(buffer, 0, length);
}
if(readRear){
String message = new String(os.toByteArray(), charset);
out.write(message);
logger.debug("Read message:" + message + ", charset:" + charset);
readRear = false;
readHead = false;
os.reset();
}
return MessageDecoderResult.OK;
}
}
从使用上来看,看上去似乎还在使用阻塞式的方式去取,但是当你看到InputStream的实现时,你会发现,MINA将他封装了一层,内部还是调用这get的方法。InputStream其实是给方便习惯使用了阻塞式编程的程序员而设计的,我也是从这边慢慢的习惯过来,如果要深入了解非阻塞的编程尤其是MINA的组件,可以多看看网上的一些例子,最直接的方式就是去查看MINA其中的一个解析类,org.apache.mina.filter.codec.textline.TextLineCodecFactory。这里面将ByteBuffer运用的炉火纯青,尤其是对position和limit方法运用的是那么的到位,以及内部缓存,看的我是一直叫好。
不过这里还是提醒大家一句,如果是高性能的并发应用,MINA的ByteBuffer从性能上来看并不好到哪里去,还不如Java中的ByteBuffer对象,尤其是对postion的调用,性能上比Java本身的慢了不少。但是MINA 2.x已抛弃了ByteBuffer的设计,使用了更为有效的缓存方式,我建议大家还是使用2.x的版本,由于公司的项目使用了Apache Camel的组件,目前camel-mina的版本封装的是1.7的版本,期待Apache大佬们能够更新到2.x的版本。也希望遇到此问题的朋友们能够得到一点帮助。
分享到:
相关推荐
1. 优化的内存管理:Mina ByteBuffer可能使用更高效的内存分配策略,例如直接内存分配,避免了Java对象在堆上的创建和垃圾收集开销。 2. 基于池化的管理:Mina通过缓冲区池来复用ByteBuffer实例,减少频繁的创建和...
深入理解Apache_Mina_(1)----_Mina的几个类 深入理解Apache_Mina_(2)----_与IoFilter相关的几个类 深入理解Apache_Mina_(3)----_与IoHandler相关的几个类 深入理解Apache_Mina_(4)----_IoFilter和IoHandler的区别和...
在这个"Apache MINA2学习笔记DEMO"中,我们很可能会看到如何使用MINA来创建一个自定义协议的示例。自定义协议通常是为了满足特定应用的需求,例如高效的数据传输、安全性或者特定的编码格式。MINA允许开发者定义自己...
4. **Buffer**:MINA 提供了 ByteBuffer 和 IoBuffer 两种缓冲区,用于在网络通信中高效地读写数据。 **MINA 2.0.1 版本特点:** 1. **性能优化**:在 2.0.1 版本中,MINA 进行了多方面的性能提升,包括更快的数据...
Apache Mina之所以能够成为一个广泛使用的框架,很大程度上是因为它解决了开发者在处理大量网络连接时所面临的问题,并提供了一种易于理解、易于扩展的编程模型。通过理解这一区别,开发者可以更加明白Mina框架在...
Mina ByteBuffer的使用可以优化性能,减少内存复制,提高网络通信效率。 3. **配置Mina的线程模型**: Mina提供了多种线程模型以适应不同的应用场景。这些模型包括:简单模型(SimpleExecutor),每个连接分配一个...
在MINA中,ByteBuffer的使用是与Java NIO紧密相关的。Java NIO的ByteBuffer是一个高效的数据存储容器,可以以字节为单位读写数据。MINA提供了自己的ByteBuffer实现,它扩展了Java NIO的功能,以更好地适应网络通信的...
1. **Session**:这是MINA中的核心概念,代表一个网络连接。它封装了与特定连接相关的所有状态和操作。 2. **Filter Chain**:MINA采用过滤器链模式,允许开发者通过添加、删除或定制过滤器来处理输入和输出数据。...
3. **缓冲区管理**:MINA使用Buffer对象来存储和操作数据,它支持多种数据类型,如ByteBuffer和MinaBuffer,提供了高效的数据读写和管理机制。 4. **过滤器链**:MINA的Filter(过滤器)机制允许用户自定义多个处理...
Apache MINA(Multipurpose Infrastructure for Network Applications)是一个Java框架,用于构建高性能、高可用性的网络应用程序,特别是针对网络协议服务器。MINA 提供了一个统一的API,无论底层是TCP/IP还是UDP/...
6. **内存管理**:MINA使用DirectBuffer和ByteBuffer进行内存管理,减少了Java对象创建和垃圾收集的开销,提升了性能。 7. **API设计**:MINA的API设计简洁且易于使用,提供了丰富的接口和类,帮助开发者快速开发...
ByteBuffer byteBuffer = ByteBuffer.wrap(imageData); session.write(byteBuffer); } private byte[] loadImage() { // 实现加载图片并转化为字节数组的逻辑 } } ``` 在这个实例中,MINA不仅实现了图片的...
在IT行业中,Apache Mina是一个广泛使用的开源框架,主要用于创建高性能、高效率的网络应用程序。Mina提供了一种抽象层,使得开发者可以专注于业务逻辑,而不是底层的网络通信细节。本话题将深入探讨如何利用Mina...
Apache MINA(Multipurpose Infrastructure for Network Applications)是一个Java框架,专为开发高性能、高可用性的网络应用程序而设计。MINA提供了异步I/O处理能力,适用于TCP和UDP协议,广泛应用于服务器端应用,...
Apache MINA(Multipurpose Infrastructure for Network Applications)是一个Java框架,用于构建高性能、高可用性的网络应用程序,如服务器和客户端。MINA提供了异步的、事件驱动的网络通信API,简化了处理TCP/IP和...
Apache Mina是一个开源的网络通信框架,主要用于构建高性能、高可用性的网络应用程序。它提供了高度模块化的设计,允许开发者以简单的方式处理各种网络协议,如TCP/IP、UDP/IP以及SSL/TLS等。在深入解析Mina源码的...
在Java的网络编程中,Apache Mina是一个非常重要的框架,它提供了一个高度可扩展和高性能的网络应用程序开发基础。Mina允许开发者构建基于TCP、UDP等传输协议的服务器端和客户端应用,而无需关注底层的Socket编程...
3. **Message**:MINA中的Message代表了在网络中传输的数据,它可以是任何类型,如ByteBuffer、String、自定义对象等。 4. **Filters**:过滤器接口(如IoFilter)是处理数据流的核心,开发者可以通过实现这个接口...