`

Mina基础(二)

阅读更多

        Mina基础(一)中的简单实例,虽然实现了根据\r\n换行符编解码的功能,但是却存在以下问题:

        a.编解码器中编码类型Charset硬编码,不便调整;

        b.只能根据Windows的换行符\r\n解码,没有考虑其他操作系统的换行符,不灵活;

        c.解码器中定义了成员变量IoBuffer, 但Decoder实例是单例的,因此Decoder实例中的成员变量可以被多线程共享访问,可能会因为变量的可见性而造成数据异常。

        第3个bug是致命的,因此,必须首先解决。为什么要定义成员变量IoBuffer呢?因为数据接收并不是一次完成的;比如客户端发送一个请求有400个字节,先发送了200个字节,这时暂停某段时间,然后又发送了剩余200字节;在解码时,Decode的IoBuffer中先解码200个接收到的字节,此时,解码工作并未完成;但因为使用了java NIO,发生IO阻塞时会处理其他请求,此时就需要把先接收到的数据暂存在某个变量中,当剩余数据到达时,解码后追加在原来数据后面。

IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true);

public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
	throws Exception {

        此时,问题出现了!每个IoSession都需要有自己的解码器实例;MINA确保同一时刻只有一个线程在执行decode() 函数,不允许多线程并发地执行解码函数,但它并不能保证每次解码过程都是同一线程在执行(两次解码用的可能是不同的线程)。假设第一块数据被线程1管理,这时还没接收到足够的数据以供解码,当接收到第二块数据时,被另一个线程2管理,此时可能会出现变量的可视化(Visibility)问题。

        因此,每个IoSession都需要独立保存解码器所解码时未完成的数据。办法就是保存在IoSession的属性中,每次解码时,都先从它的属性中拿出上次未完成的任务数据,把新数据追加在它的后面。

package com.bijian.study.mina.codec;

import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

public class MyTextLineCodecDecoder implements ProtocolDecoder {
	
	private Charset charset = Charset.forName("utf-8");
    // 定义常量值,作为每个IoSession中保存解码内容的key值
	private static String CONTEXT = MyTextLineCodecDecoder.class.getName()
			+ ".context";

	public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
			throws Exception {
		Context ctx = getContext(session);
		decodeAuto(ctx, in, out);
	}

	private Context getContext(IoSession session) {
		Context ctx = (Context) session.getAttribute(CONTEXT);
		if (ctx == null) {
			ctx = new Context();
			session.setAttribute(CONTEXT, ctx);
		}
		return ctx;
	}

	private void decodeAuto(Context ctx, IoBuffer in, ProtocolDecoderOutput out)
			throws CharacterCodingException {
		boolean mark = false;
		while (in.hasRemaining()) {
			byte b = in.get();
			switch (b) {
			case '\r':
				break;
			case '\n':
				mark = true;
				break; // 跳出switch
			default:
				ctx.getBuf().put(b);
			}

			if (mark) {
				IoBuffer t_buf = ctx.getBuf();
				t_buf.flip();
				try {
					out.write(t_buf.getString(charset.newDecoder()));
				} finally {
					t_buf.clear();
				}
			}
		}
	}

	public void dispose(IoSession session) throws Exception {
		Context ctx = (Context) session.getAttribute(CONTEXT);
		if (ctx != null) {
			session.removeAttribute(CONTEXT);
		}
	}

	public void finishDecode(IoSession session, ProtocolDecoderOutput out)
			throws Exception {
	}

	private class Context {
		private IoBuffer buf;

		public Context() {
			buf = IoBuffer.allocate(100).setAutoExpand(true);
		}

		public IoBuffer getBuf() {
			return buf;
		}
	}
}
        代码解释:
        1.在解码器中定义一个内部类,内部类中有一个成员变量IoBuffer,用来存储每个IoSesssion解码的内容;
private class Context {
	private IoBuffer buf;

	public Context() {
		buf = IoBuffer.allocate(100).setAutoExpand(true);
	}

	public IoBuffer getBuf() {
		return buf;
	}
}
        2.当IoSession使用解码实例时,第一次使用则新建一个Context对象,保存在IoSession的Attribute中,把解码内容保存在Context对象的成员变量IoBuffer中;如果解码没结束,第二次使用解码实例时,从IoSession的Attribute取出Context对象,将解码内容追加在Context对象的成员变量IoBuffer中。 
        注意:IoSession的Attribute使用用一个同步的HashMap保存对象,因此定义了常量CONTEXT作为保存Context对象的Key值。
// 定义常量值,作为每个IoSession中保存解码内容的key值
private static String CONTEXT = MyTextLineCodecDecoder.class.getName()
		+ ".context";

public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
		throws Exception {
	Context ctx = getContext(session);
	decodeAuto(ctx, in, out);
}

private Context getContext(IoSession session) {
	Context ctx = (Context) session.getAttribute(CONTEXT);
	if (ctx == null) {
		ctx = new Context();
		session.setAttribute(CONTEXT, ctx);
	}
	return ctx;
}
        3.解码时,解码内容保存在Context对象的成员变量IoBuffer中,因此,一旦解码成功,要把成员变量IoBuffer重置;现在是请求/响应的单模式,不存在一个请求过来发送了多条记录的情况,所有重置前其实IoBuffer缓存内容已经为空。
private void decodeAuto(Context ctx, IoBuffer in, ProtocolDecoderOutput out)
		throws CharacterCodingException {
	boolean mark = false;
	while (in.hasRemaining()) {
		byte b = in.get();
		switch (b) {
		case '\r':
			break;
		case '\n':
			mark = true;
			break; // 跳出switch
		default:
			ctx.getBuf().put(b);
		}

		if (mark) {
			IoBuffer t_buf = ctx.getBuf();
			t_buf.flip();
			try {
				out.write(t_buf.getString(charset.newDecoder()));
			} finally {
				t_buf.clear();
			}
		}
	}
}
        4. 解码成功,则从IoSession的Attribute删除Context对象;
public void dispose(IoSession session) throws Exception {
	Context ctx = (Context) session.getAttribute(CONTEXT);
	if (ctx != null) {
		session.removeAttribute(CONTEXT);
	}
}
        查看Mina对TextLineCodec的实现源码会发现,根据换行符解码的消息最大长度是受限制的,默认最大长度是1024,相当于缓冲区最大能存放1K的数据。因此使用时,建议调整参数为2K。
        如果我们希望根据我们自己定义的文本换行符及编码格式编解码,则需要把它们作为参数传递给编解码器;完整代码如下:
package com.bijian.study.mina.codec;

import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

public class MyTextLineCodecDecoderII implements ProtocolDecoder {
	
	private Charset charset; // 编码格式

	private String delimiter; // 文本分隔符

	private IoBuffer delimBuf; // 文本分割符匹配的变量

	// 定义常量值,作为每个IoSession中保存解码任务的key值
	private static String CONTEXT = MyTextLineCodecDecoderII.class.getName()
			+ ".context";

	// 构造函数,必须指定Charset和文本分隔符
	public MyTextLineCodecDecoderII(Charset charset, String delimiter) {
		this.charset = charset;
		this.delimiter = delimiter;
	}

	public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
			throws Exception {
		Context ctx = getContext(session);
		if (delimiter == null || "".equals(delimiter)) { // 如果文本换行符未指定,使用默认值
			delimiter = "\r\n";
		}
		if (charset == null) {
			charset = Charset.forName("utf-8");
		}
		decodeNormal(ctx, in, out);
	}

	// 从IoSession中获取Context对象
	private Context getContext(IoSession session) {
		Context ctx;
		ctx = (Context) session.getAttribute(CONTEXT);
		if (ctx == null) {
			ctx = new Context();
			session.setAttribute(CONTEXT, ctx);
		}
		return ctx;
	}

	// 解码
	private void decodeNormal(Context ctx, IoBuffer in,
			ProtocolDecoderOutput out) throws CharacterCodingException {
		// 取出未完成任务中已经匹配的文本换行符的个数
		int matchCount = ctx.getMatchCount();
		
		// 设置匹配文本换行符的IoBuffer变量
		if (delimBuf == null) {
            IoBuffer tmp = IoBuffer.allocate(2).setAutoExpand(true);
            tmp.putString(delimiter, charset.newEncoder());
            tmp.flip();
            delimBuf = tmp;
        }
		
		int oldPos = in.position(); // 解码的IoBuffer中数据的原始信息
        int oldLimit = in.limit();
        while (in.hasRemaining()) { // 变量解码的IoBuffer
            byte b = in.get();
            if (delimBuf.get(matchCount) == b) { // 匹配第matchCount位换行符成功
                matchCount++;               
                if (matchCount == delimBuf.limit()) { // 当前匹配到字节个数与文本换行符字节个数相同,匹配结束
                    int pos = in.position();   // 获得当前匹配到的position(position前所有数据有效)
                    in.limit(pos);
                    in.position(oldPos);   // position回到原始位置

                    ctx.append(in);   // 追加到Context对象未完成数据后面

                    in.limit(oldLimit); // in中匹配结束后剩余数据
                    in.position(pos);

                    IoBuffer buf = ctx.getBuf();
                    buf.flip();
                    buf.limit(buf.limit() - matchCount);// 去掉匹配数据中的文本换行符
                    try {
                        out.write(buf.getString(ctx.getDecoder())); // 输出解码内容
                    } finally {
                        buf.clear(); // 释放缓存空间
                    }

                    oldPos = pos;
                    matchCount = 0;
                }
            } else {
            	// 如果matchCount==0,则继续匹配
            	// 如果matchCount>0,说明没有匹配到文本换行符的中的前一个匹配成功字节的下一个字节,
            	// 跳转到匹配失败字符处,并置matchCount=0,继续匹配
                in.position(in.position()-matchCount);
                matchCount = 0;  // 匹配成功后,matchCount置空
            }
        }
        
        // 把in中未解码内容放回buf中
        in.position(oldPos);
        ctx.append(in);

        ctx.setMatchCount(matchCount);
	}

	public void dispose(IoSession session) throws Exception {

	}

	public void finishDecode(IoSession session, ProtocolDecoderOutput out)
			throws Exception {
	}

	// 内部类,保存IoSession解码时未完成的任务
	private class Context {
		private CharsetDecoder decoder;
		private IoBuffer buf; // 保存真实解码内容
		private int matchCount = 0; // 匹配到的文本换行符个数

		private Context() {
			decoder = charset.newDecoder();
			buf = IoBuffer.allocate(80).setAutoExpand(true);
		}

		// 重置
		public void reset() {
			matchCount = 0;
			decoder.reset();
		}

		// 追加数据
		public void append(IoBuffer in) {
			getBuf().put(in);
		}

		// ======get/set方法=====================
		public CharsetDecoder getDecoder() {
			return decoder;
		}

		public IoBuffer getBuf() {
			return buf;
		}

		public int getMatchCount() {
			return matchCount;
		}

		public void setMatchCount(int matchCount) {
			this.matchCount = matchCount;
		}
	} // end class Context;
}
        编码器:
package com.bijian.study.mina.codec;

import java.nio.charset.Charset;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

public class MyTextLineCodecEncoderII implements ProtocolEncoder {
	
	private Charset charset; // 编码格式

	private String delimiter; // 文本分隔符

	public MyTextLineCodecEncoderII(Charset charset, String delimiter) {
		this.charset = charset;
		this.delimiter = delimiter;
	}

	public void encode(IoSession session, Object message,
			ProtocolEncoderOutput out) throws Exception {
		if (delimiter == null || "".equals(delimiter)) { // 如果文本换行符未指定,使用默认值
			delimiter = "\r\n";
		}
		if (charset == null) {
			charset = Charset.forName("utf-8");
		}

		String value = message.toString();
		IoBuffer buf = IoBuffer.allocate(value.length()).setAutoExpand(true);
		buf.putString(value, charset.newEncoder()); // 真实数据
		buf.putString(delimiter, charset.newEncoder()); // 文本换行符
		buf.flip();
		out.write(buf);
	}

	public void dispose(IoSession session) throws Exception {
	}
}
        编解码器工厂:
package com.bijian.study.mina.codec;

import java.nio.charset.Charset;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

public class MyTextLineCodecFactoryII implements ProtocolCodecFactory {

	private Charset charset; // 编码格式

	private String delimiter; // 文本分隔符

	public MyTextLineCodecFactoryII(Charset charset, String delimiter) {
		this.charset = charset;
		this.delimiter = delimiter;
	}

	public ProtocolDecoder getDecoder(IoSession session) throws Exception {
		return new MyTextLineCodecDecoderII(charset, delimiter);
	}

	public ProtocolEncoder getEncoder(IoSession session) throws Exception {
		return new MyTextLineCodecEncoderII(charset, delimiter);
	}
}
        服务端绑定过滤器:
package com.bijian.study.mina.server;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSessionConfig;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LogLevel;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

import com.bijian.study.mina.codec.MyTextLineCodecFactoryII;
import com.bijian.study.mina.handler.Demo01ServerHandler;

public class TestServer01 {
	
	private static Logger logger = Logger.getLogger(TestServer01.class);

	private static int PORT = 3005;

	public static void main(String[] args) {
		
		IoAcceptor acceptor = null;
		try {
			// 创建一个非阻塞的server端的Socket
			acceptor = new NioSocketAcceptor();

			// 设置过滤器
			acceptor.getFilterChain().addLast(
					"codec",
					new ProtocolCodecFilter(new MyTextLineCodecFactoryII(Charset.forName("utf-8"), "\r\n")));

			// 设置日志过滤器
			LoggingFilter lf = new LoggingFilter();
			lf.setMessageReceivedLogLevel(LogLevel.DEBUG);
			acceptor.getFilterChain().addLast("logger", lf);
			// 获得IoSessionConfig对象
			IoSessionConfig cfg = acceptor.getSessionConfig();
			// 读写通道10秒内无操作进入空闲状态
			cfg.setIdleTime(IdleStatus.BOTH_IDLE, 100);

			// 绑定逻辑处理器
			acceptor.setHandler(new Demo01ServerHandler());
			// 绑定端口
			acceptor.bind(new InetSocketAddress(PORT));
			logger.info("服务端启动成功...     端口号为:" + PORT);
		} catch (Exception e) {
			logger.error("服务端启动异常....", e);
			e.printStackTrace();
		}
	}
}
        客户端绑定过滤器:
package com.bijian.study.mina.client;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.log4j.Logger;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

import com.bijian.study.mina.codec.MyTextLineCodecFactoryII;
import com.bijian.study.mina.handler.DemoClientHandler;

public class TestClient01 {
	
	private static Logger logger = Logger.getLogger(TestClient01.class);

	private static String HOST = "127.0.0.1";

	private static int PORT = 3005;

	public static void main(String[] args) {
		// 创建一个非阻塞的客户端程序
		IoConnector connector = new NioSocketConnector();
		// 设置链接超时时间
		connector.setConnectTimeout(30000);
		// 添加过滤器
		connector.getFilterChain().addLast(
			"codec",
			new ProtocolCodecFilter(new MyTextLineCodecFactoryII(Charset.forName("utf-8"), "\r\n")));
		
		// 添加业务逻辑处理器类
		connector.setHandler(new DemoClientHandler());
		IoSession session = null;
		try {
			ConnectFuture future = connector.connect(new InetSocketAddress(
					HOST, PORT));// 创建连接
			future.awaitUninterruptibly();// 等待连接创建完成
			session = future.getSession();// 获得session
			session.write("hello mina");// 发送消息
		} catch (Exception e) {
			logger.error("客户端链接异常...", e);
		}

		session.getCloseFuture().awaitUninterruptibly();// 等待连接断开
		connector.dispose();
	}
}

        先运行服务端,再运行客户端,服务端输出如下:

2016-01-20 22:41:38,167 INFO  TestServer01  - 服务端启动成功...     端口号为:3005
2016-01-20 22:41:46,946 INFO  LoggingFilter  - CREATED
2016-01-20 22:41:46,948 INFO  Demo01ServerHandler  - 服务端与客户端创建连接...
2016-01-20 22:41:46,948 INFO  LoggingFilter  - OPENED
2016-01-20 22:41:46,948 INFO  Demo01ServerHandler  - 服务端与客户端连接打开...
2016-01-20 22:41:46,965 DEBUG ProtocolCodecFilter  - Processing a MESSAGE_RECEIVED for session 1
2016-01-20 22:41:46,971 DEBUG LoggingFilter  - RECEIVED: hello mina
2016-01-20 22:41:46,972 INFO  Demo01ServerHandler  - 服务端接收到的数据为:hello mina
2016-01-20 22:41:46,980 INFO  LoggingFilter  - SENT: Wed Jan 20 22:41:46 CST 2016
2016-01-20 22:41:46,980 INFO  Demo01ServerHandler  - 服务端发送信息成功...
2016-01-20 22:43:26,986 INFO  LoggingFilter  - IDLE
2016-01-20 22:43:26,987 INFO  Demo01ServerHandler  - 服务端进入空闲状态...

        客户端输出如下:

2016-01-20 22:41:46,983 DEBUG ProtocolCodecFilter  - Processing a MESSAGE_RECEIVED for session 1
2016-01-20 22:41:46,989 INFO  DemoClientHandler  - 客户端接收到的信息为:Wed Jan 20 22:41:46 CST 2016

        完整代码见附件MinaDemo03.rar。

 

学习资料:http://wenku.baidu.com/link?url=VyKQnsn4b0BDJ8cQlLUu9cvpGz-Iou_499U4lJE9I0s5nPPY5kF5BDd8qo1yRMOiqsM8wxDPEL_S0koiFp8v5y36G9OGJydC2C12juo0bTW

分享到:
评论

相关推荐

    mina框架资源包

    本资源包包含了实现Mina框架基础功能所需的组件,对于搭建即时通讯服务器至关重要。 1. **Mina框架核心**:`apache-mina-2.0.19-bin.zip` 这个文件是Mina框架的二进制发行版,包含了Mina运行所需的jar文件和其他可...

    基于 MINA 的 TLS/SSL NIO Socket 实现(二)

    本文将主要关注在Java中使用MINA来实现安全套接层(SSL)和传输层安全(TLS)协议,这两个协议是网络安全通信的基础,确保数据在网络中的加密传输。 首先,理解TLS/SSL的核心概念至关重要。它们都是为网络通信提供...

    MINA开发手册和JAR包

    - `minalib`:指的是MINA的库文件,即JAR包,是开发MINA应用的基础。 总结来说,这个压缩包提供了全面的MINA开发资源,包括必要的库文件和详细的文档,对于想要学习和使用MINA进行网络应用开发的人员来说非常有价值...

    websocket+java服务器(mina)

    1. **创建ServerBootstrap**:这是Mina服务器的基础,用于配置服务器的线程模型、处理链等。 2. **设置ChildHandler**:配置一个IoHandler,处理客户端的连接建立、消息接收和断开连接等事件。 3. **实现WebSocket...

    mina基础技术

    ### Mina基础技术知识点 #### 一、MINA框架简介 **1.1 MINA是什么?** Apache MINA是一个强大的网络应用框架,旨在帮助开发者轻松构建高性能和高扩展性的网络应用。它通过Java NIO(非阻塞I/O)提供了一个抽象的...

    Mina 框架研究与实现

    Mina框架的核心设计理念在于提供一个强大的基础架构,让开发者能够轻松创建高性能和高可用性的网络应用程序。这一框架已被广泛应用于多个领域,如Red5项目中的RTMP协议实现、SubEtha.SMTP项目中的SMTP协议、Apache ...

    apache-mina-2.0.7 含官方教程

    "MINA官方教程(中文版).docx"很可能包含了如何使用MINA框架进行网络编程的基础知识,包括但不限于: 1. **MINA的基本概念**:如Session、Filter、ProtocolDecoder和ProtocolEncoder等,这些都是MINA框架中的核心...

    mina HTTP协议实例

    这只是一个基础实例,实际应用中可能需要处理更复杂的场景,比如HTTPS安全连接、WebSocket双向通信等。 总而言之,Apache MINA为开发人员提供了一个强大的框架,简化了网络编程的复杂性。通过学习和实践MINA HTTP...

    Mina官网例子之时间服务器

    Mina基础概念** - **I/O模型**:Mina基于非阻塞I/O(NIO)模型,这种模型在处理大量并发连接时效率非常高,因为它允许在一个线程中同时处理多个连接,而不是为每个连接创建一个新的线程。 - **Filter Chain**:...

    MIna中转服务

    二、Mina中转服务工作原理 1. 阶段一:连接建立 客户端通过Mina客户端库(如Minaclient)与中转服务器建立TCP连接。这个过程涉及到Socket的创建、三次握手等网络通信的基础步骤。 2. 阶段二:数据传输 建立连接...

    Mina案例+使用文档.pdf

    #### 二、Mina功能与支持 Mina不仅支持基于Java NIO的TCP/UDP应用程序开发,还在最新的预览版中提供了对串口通信的支持。这一特性极大地扩展了Mina的应用场景,使其不仅仅局限于传统的网络应用程序,还适用于嵌入式...

    Mina官方教程_中文版.rar

    《Mina官方教程_中文版》是一份专为学习Mina框架的中文教育资源,它涵盖了Mina框架的基础知识、核心概念以及实际应用技巧。Mina(MinA Socket API)是Apache软件基金会的一个开源项目,主要设计用于构建高性能、高可...

    mina传文件案例,客户端加服务端

    二、Socket客户端基础 Socket是网络通信的基础,它是两台机器间进行数据交换的通道。在Java中,Socket类代表了客户端,ServerSocket类代表服务端。在本案例中,客户端将通过Socket连接到Mina服务端,发送文件请求,...

    mina服务器实例

    一、Mina基础概念 1. **事件驱动模型**:Mina采用I/O多路复用技术,如NIO(非阻塞I/O)和EPOLL,通过事件驱动模型处理网络连接,有效提升了并发处理能力。 2. **过滤器链**:Mina的过滤器链是其核心设计之一,它...

    Mina学习笔记

    **二、Mina基础** **2.1 IoService接口** IoService是Mina的核心接口,它代表了网络服务,可以是服务器(Acceptor)也可以是客户端(Connector)。IoService提供了一系列方法来管理和控制网络连接。 **2.1.1 类...

    高性能网络架构Mina框架 下载

    #### 二、Mina框架的特点 ##### 2.1 高性能与可扩展性 Mina采用了多线程异步处理机制,能够有效利用系统资源,支持大量的并发连接。同时,Mina还提供了丰富的插件机制,方便开发者根据业务需求进行功能扩展。 ###...

Global site tag (gtag.js) - Google Analytics