`
Donald_Draper
  • 浏览: 981722 次
社区版块
存档分类
最新评论

MINA 编解码器实例

    博客分类:
  • Mina
阅读更多
MINA TCP简单通信实例:http://donald-draper.iteye.com/blog/2375297
网络中传输的数据为二进制byte,如何将Java的对象转换成二进制byte序列,这就需要协议编码器和解码器;上一篇文章中我们用的协议编解器工厂为TextLineCodecFactory,TextLineCodecFactory中有一个
文本行编码器TextLineEncoder(ProtocolEncoderAdapter),文本行解码器TextLineDecoder(ProtocolDecoderAdapter);
今天我们来看一个编解码器的实例:
首先我们定义一个短信发送的协议为:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 13688888888
R: 18866666666
L:19
Hello Sms Server...

第一行为短信状态,第二行为发送者,第三行为接受者,第四行为发送内容长度,最后所有内容为发送内容。
这个协议和HTTP协议有点像,首先是状态行,头部,内容。
定义消息:
package mina.tcp.message;


/**
 * 短信
 * @author donald 
 * 2017年5月19日 
 * 下午10:46:36
 */
public class SmsInfo {
	/** 发送者 */
	private String sender;
	/** 接受者 */
	private String receiver;
	/** 短信内容 */
	private String message;
	public String getSender() {
		return sender;
	}
	public void setSender(String sender) {
		this.sender = sender;
	}
	public String getReceiver() {
		return receiver;
	}
	public void setReceiver(String receiver) {
		this.receiver = receiver;
	}
	public String getMessage() {
		return message;
	}
	public void setMessage(String message) {
		this.message = message;
	}
}

编码器:
package mina.tcp.coder;

import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * 短信编码器
 * @author donald
 * 2017年5月19日
 * 下午10:55:48
 */
public class CumulativeSmsEncoder extends ProtocolEncoderAdapter {
	private static final Logger log = LoggerFactory.getLogger(CumulativeSmsEncoder.class);
	private final Charset charset;
	private final CharsetEncoder charsetEncoder;
	public CumulativeSmsEncoder(Charset charset) {
		this.charset = charset;
		charsetEncoder = charset.newEncoder();
	}
	@Override
	public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		
		IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
		String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0";//状态行
		String sender = sms.getSender();
		String receiver = sms.getReceiver();
		String smsContent = sms.getMessage();
		//组装发送内容,我们以\n来分隔
		buffer.putString(statusLine + '\n', charsetEncoder);//状态行
		buffer.putString("S: " + sender + '\n', charsetEncoder);//短信发送者
		buffer.putString("R: " + receiver + '\n', charsetEncoder);//短信接受者
		buffer.putString("L: " + (smsContent.getBytes(charset).length) + "\n", charsetEncoder);//内容长度
		buffer.putString(smsContent, charsetEncoder);//内容
		//切换读写模式
		buffer.flip();
		out.write(buffer);
		log.info("========短信编码器编码完毕....");
	}
}

编码器继承ProtocolEncoderAdapter;
解码器:
package mina.tcp.coder;

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * 短信解码器
 * @author donald
 * 2017年5月19日
 * 下午11:01:50
 */
public class CumulativeSmsDecoder extends CumulativeProtocolDecoder {
	private static final Logger log = LoggerFactory.getLogger(CumulativeSmsDecoder.class);
	private final CharsetDecoder charsetDecoder;
	public CumulativeSmsDecoder(Charset charset) {
		charsetDecoder = charset.newDecoder();
	}
	@Override
	protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
		IoBuffer buffer = IoBuffer.allocate(1024).setAutoExpand(true);
		int matchCount = 0;//记录短信每一行的字节数
		String statusLine = "", sender = "", receiver = "", length = "", sms = "";
		int line = 0;//短信行计数器
		while (in.hasRemaining()) {
			byte b = in.get();
			buffer.put(b);
			// 10 为\n的ASCII编码,短信一行信息读完
			if (b == 10 && line < 4) {
				matchCount++;//一行读取完毕,字节数加1为\n
				if (line == 0) {//状态行
					buffer.flip();
					statusLine = buffer.getString(matchCount, charsetDecoder);
					//剔除最后一个换行符\n
					statusLine = statusLine.substring(0, statusLine.length() - 1);
					log.debug("========短信状态行:"+statusLine);
					matchCount = 0;//重置短信行字节序列计数器
					buffer.clear();//清除短信行字节序列计数器
				}
				if (line == 1) {//短信发送者
					buffer.flip();
					sender = buffer.getString(matchCount, charsetDecoder);
					sender = sender.substring(0, sender.length() - 1);
					log.debug("========短信发送者:"+sender);
					matchCount = 0;
					buffer.clear();
				}
				if (line == 2) {//短信接受者
					buffer.flip();
					receiver = buffer.getString(matchCount, charsetDecoder);
					receiver = receiver.substring(0, receiver.length() - 1);
					log.debug("========短信接受者:"+receiver);
					matchCount = 0;
					buffer.clear();
				}
				if (line == 3) {//短信内容长度
					buffer.flip();
					length = buffer.getString(matchCount, charsetDecoder);
					length = length.substring(0, length.length() - 1);
					log.debug("========短信内容长度:"+length.split(": ")[1]);
					matchCount = 0;
					buffer.clear();
				}
				line++;//短信一行读取完毕
			} else if (line == 4) {//短信内容
				matchCount++;
				//读取短信内容,读到与短息内容长度length相同的字节数时,解析内容
				if (matchCount == Long.parseLong(length.split(": ")[1])) {
					buffer.flip();
					sms = buffer.getString(matchCount, charsetDecoder);
					log.debug("========短信内容:"+sms);
					buffer.clear();
					matchCount = 0;
					line++;
					break;
				}
			}else{
				matchCount++;//一行没读完,记录读取的字节数
			}
		}
		//组装短信
		SmsInfo smsInfo = new SmsInfo();
		smsInfo.setSender(sender.split(": ")[1]);
		smsInfo.setReceiver(receiver.split(": ")[1]);
		smsInfo.setMessage(sms);
		out.write(smsInfo);
		log.info("========短信解码器解码完毕....");
		/*不再调用解码器解码方法解码数据,如果这次数据未读完,及缓冲区还有数据,则
		保存在会话中,以便与下一次的数据合并处理,如果数据读取完,则清空缓冲区。*/
		return false;
	}
}

解码器继承的为CumulativeProtocolDecoder,即可累计的协议解码器,这个我们先用,后面在具体的讲。
协议编解码器工厂:
package mina.tcp.coder;

import java.nio.charset.Charset;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

/**
 * 短信编码解码工厂
 * @author donald 
 * 2017年5月19日 
 * 下午10:53:37
 */
public class CumulativeSmsCodecFactory implements ProtocolCodecFactory {
	private final CumulativeSmsEncoder encoder;
	private final CumulativeSmsDecoder decoder;

	public CumulativeSmsCodecFactory() {
		this(Charset.defaultCharset());
	}

	public CumulativeSmsCodecFactory(Charset charSet) {
		this.encoder = new CumulativeSmsEncoder(charSet);
		this.decoder = new CumulativeSmsDecoder(charSet);
	}
	@Override
	public ProtocolDecoder getDecoder(IoSession session) throws Exception {
		return decoder;
	}
	@Override
	public ProtocolEncoder getEncoder(IoSession session) throws Exception {
		return encoder;
	}
}

sever:
package mina.tcp.main;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.coder.CumulativeSmsCodecFactory;
import mina.tcp.handler.SmsServerHandler;
/**
 * SmsServer
 * @author donald
 * 2017年5月19日
 * 下午10:16:29
 */
public class SmsServer {
	private static final Logger log = LoggerFactory.getLogger(SmsServer.class);
	private static final  String ip = "192.168.31.153";
	private static final  int port = 9122;
	private static final  int readBufferSize = 2048;
	private static final  int idleTime = 10;
	private static final Charset charset = Charset.forName("UTF-8");
	public static void main(String[] args) throws IOException {
		 IoAcceptor acceptor=new NioSocketAcceptor();
		//配置socket会话
		 SocketSessionConfig socketSessionConfig = (SocketSessionConfig) acceptor.getSessionConfig();
		 socketSessionConfig.setReadBufferSize(readBufferSize);
		 socketSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,idleTime);
		 //配置过滤器
		 DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = acceptor.getFilterChain();
		 LoggingFilter loggingFilter = new LoggingFilter();
		 defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter);
		 CumulativeSmsCodecFactory cmccSipcCodecFactory = new CumulativeSmsCodecFactory(charset);
		 ProtocolCodecFilter protocolCodecFilter = new ProtocolCodecFilter(cmccSipcCodecFactory);
		 defaultIoFilterChainBuilder.addLast("protocolCodecFilter",protocolCodecFilter);
		 //配置NioSocketAcceptor处理器
		 SmsServerHandler smsServerHandler = new SmsServerHandler();
		 acceptor.setHandler(smsServerHandler);
		 InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
		 acceptor.bind(inetSocketAddress);
		 log.info("=========SmsServer is start============");
	}
}

注意:协议编解码器要包装成协议编解码器工厂ProtocolCodecFactory,然后以ProtocolCodecFilter形式添加到过滤链中。
server handler:
package mina.tcp.handler;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * SmsServerHandler
 * @author donald
 * 2017年5月19日
 * 下午10:45:26
 */
public class SmsServerHandler extends IoHandlerAdapter {
	private final static Logger log = LoggerFactory.getLogger(SmsServerHandler.class);

	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		log.info("===message received from "+sms.getSender()+" is:" + sms.getMessage());
		SmsInfo ackSms = new SmsInfo();
		ackSms.setSender(sms.getReceiver());
		ackSms.setReceiver(sms.getSender());
		ackSms.setMessage("收到...");
		session.write(ackSms);
		log.info("===回复短信已发送...");
	}
	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		log.error("===会话异常:"+cause.getMessage());
		cause.printStackTrace();
		session.closeNow();
	}
}

这里我们只是实现需要关注的方法

client:
package mina.tcp.main;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.coder.CumulativeSmsCodecFactory;
import mina.tcp.handler.SmsClientHandler;
/**
 * SmsClient
 * @author donald
 * 2017年5月19日
 * 下午10:27:30
 */
public class SmsClient {
	private static final Logger log = LoggerFactory.getLogger(SmsClient.class);
	private static final  String ip = "192.168.31.153";
	private static final  int port = 9122;
	private static final  int connectTimeoutMillis = 30000;
	private static final Charset charset = Charset.forName("UTF-8");
	public static void main(String[] args) {
		IoConnector connector=new NioSocketConnector();
		 connector.setConnectTimeoutMillis(connectTimeoutMillis);
		//配置过滤器
		 DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = connector.getFilterChain();
		 LoggingFilter loggingFilter = new LoggingFilter();
		 defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter);
		 CumulativeSmsCodecFactory cmccSipcCodecFactory = new CumulativeSmsCodecFactory(charset);
		 ProtocolCodecFilter protocolCodecFilter = new ProtocolCodecFilter(cmccSipcCodecFactory);
		 defaultIoFilterChainBuilder.addLast("protocolCodecFilter",protocolCodecFilter);
		//配置NioSocketConnector处理器
		 SmsClientHandler smsClientHandler = new SmsClientHandler();
		 connector.setHandler(smsClientHandler);
		 InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
		 connector.connect(inetSocketAddress);
		 log.info("=========SmsClient is start============");
	}
}


client handler:


package mina.tcp.handler;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * SmsClientHandler
 * @author donald
 * 2017年5月19日
 * 下午10:30:24
 */
public class SmsClientHandler extends IoHandlerAdapter {
	private final static Logger log = LoggerFactory.getLogger(SmsClientHandler.class);
	@Override
	public void sessionOpened(IoSession session) {
		SmsInfo sms = new SmsInfo();
		sms.setSender("13688888888");
		sms.setReceiver("18866666666");
		sms.setMessage("Hello Sms Server...");
		session.write(sms);
		log.info("===短信已发送...");
	}
	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		log.info("===message received from "+sms.getSender()+" is:" + sms.getMessage());
	}
	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		log.error("===会话异常:"+cause.getMessage());
		cause.printStackTrace();
		session.closeNow();
	}
}

启动server,client控制台输出:
server:
[INFO ] 2017-05-21 12:11:40 mina.tcp.main.SmsServer =========SmsServer is start============
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter CREATED
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter OPENED
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=89 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:11:46 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信状态行:M sip:wap.fetion.com.cn SIP-C/2.0
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信发送者:S: 13688888888
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信接受者:R: 18866666666
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信内容长度:19
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信内容:Hello Sms Server...
[INFO ] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信解码器解码完毕....
[INFO ] 2017-05-21 12:11:46 mina.tcp.handler.SmsServerHandler ===message received from 13688888888 is:Hello Sms Server...
[INFO ] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsEncoder ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:11:46 mina.tcp.handler.SmsServerHandler ===回复短信已发送...
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@68fae256
[INFO ] 2017-05-21 12:11:56 org.apache.mina.filter.logging.LoggingFilter IDLE


client:

[INFO ] 2017-05-21 12:11:46 mina.tcp.main.SmsClient =========SmsClient is start============
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter CREATED
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter OPENED
[INFO ] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsEncoder ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:11:46 mina.tcp.handler.SmsClientHandler ===短信已发送...
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@107dfdb8
[INFO ] 2017-05-21 12:11:46 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=78 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:11:46 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信状态行:M sip:wap.fetion.com.cn SIP-C/2.0
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信发送者:S: 18866666666
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信接受者:R: 13688888888
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信内容长度:9
[DEBUG] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信内容:收到...
[INFO ] 2017-05-21 12:11:46 mina.tcp.coder.CumulativeSmsDecoder ========短信解码器解码完毕....
[INFO ] 2017-05-21 12:11:46 mina.tcp.handler.SmsClientHandler ===message received from 18866666666 is:收到...

在上面的解码器中我们用到的两个状态变量line和matchCount,用于记录解码短信的状态,这两个状态是在解码器中保存,当客户端将一条短信分多次发送,比如先发短信状态行和发送者,再发接收者,短信内容长度和内容,解码器的状态变量line和matchCount将
不能完全起作用,为了应对这种情况我们可以将状态变量line和matchCount保存到会话中,下次再接收数据时,恢复会话中的解码器状态,继续解码。
现在对解码器和进行改造:
package mina.tcp.coder;

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.AttributeKey;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * 短信解码器
 * @author donald
 * 2017年5月20日
 * 下午10:55:33
 */
public class CumulativeSmsDecoder2 extends CumulativeProtocolDecoder {
	private static final Logger log = LoggerFactory.getLogger(CumulativeSmsDecoder2.class);
	private final CharsetDecoder charsetDecoder;
	private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");

	public CumulativeSmsDecoder2(Charset charset) {
		charsetDecoder = charset.newDecoder();
	}

	@Override
	protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
		//从会话获取解码器状态上下文
		Context ctx = getContext(session);
		//从上下文获取已经解码的短信相关信息
		int matchCount = ctx.getMatchCount();
		int line = ctx.getLine();
		IoBuffer buffer = ctx.innerBuffer;
		String statusLine = ctx.getStatusLine(), sender = ctx.getSender(), 
				receiver = ctx.getReceiver(),length = ctx.getLength(), 
				sms = ctx.getSms();
		while (in.hasRemaining()) {
			byte b = in.get();
			buffer.put(b);
			matchCount++;
			if (line < 4 && b == 10) {
				if (line == 0) {//状态行
					buffer.flip();
					statusLine = buffer.getString(matchCount, charsetDecoder);
					statusLine = statusLine.substring(0, statusLine.length() - 1);
					matchCount = 0;
					buffer.clear();
					ctx.setStatusLine(statusLine);
				}
				if (line == 1) {//短信发送者
					buffer.flip();
					sender = buffer.getString(matchCount, charsetDecoder);
					sender = sender.substring(0, sender.length() - 1);
					matchCount = 0;
					buffer.clear();
					ctx.setSender(sender);
				}
				if (line == 2) {//短信接受者
					buffer.flip();
					receiver = buffer.getString(matchCount, charsetDecoder);
					receiver = receiver.substring(0, receiver.length() - 1);
					matchCount = 0;
					buffer.clear();
					ctx.setReceiver(receiver);
				}
				if (line == 3) {//短信内容长度
					buffer.flip();
					length = buffer.getString(matchCount, charsetDecoder);
					length = length.substring(0, length.length() - 1);
					matchCount = 0;
					buffer.clear();
					ctx.setLength(length);
				}
				line++;
			} else if (line == 4) {//短信内容,读到与短息内容长度length相同的字节数时,解析内容
				if (matchCount == Long.parseLong(length.split(": ")[1])) {
					buffer.flip();
					sms = buffer.getString(matchCount, charsetDecoder);
					ctx.setSms(sms);
					ctx.setMatchCount(matchCount);
					ctx.setLine(line);
					break;
				}
			}
			ctx.setMatchCount(matchCount);
			ctx.setLine(line);
		}
		//一条短信解码完毕,组装短信,重置解码器上下文
		if (ctx.getLine() == 4 && Long.parseLong(ctx.getLength().split(": ")[1]) == ctx.getMatchCount()) {
			SmsInfo smsObject = new SmsInfo();
			smsObject.setSender(sender.split(": ")[1]);
			smsObject.setReceiver(receiver.split(": ")[1]);
			smsObject.setMessage(sms);
			out.write(smsObject);
			log.info("========短信解码器解码一条短信完毕....");
			ctx.reset();
			/*检查是否读取数据,没有则非法状态异常;
			已经消费了数据,如果缓冲区有数据,则继续读取缓冲区数据并解码*/
			return true;
		} else {
			/*不再调用解码器解码方法解码数据,如果这次数据未读完,及缓冲区还有数据,则
			保存在会话中,以便与下一次的数据合并处理,如果数据读取完,则清空缓冲区。*/
			return false;
		}
	}
    /**
     * 从会话获取短信解码器上下文
     * @param session
     * @return
     */
	private Context getContext(IoSession session) {
		Context context = (Context) session.getAttribute(CONTEXT);
		if (context == null) {
			context = new Context();
			session.setAttribute(CONTEXT, context);
		}
		return context;
	}
    /**
     * 记录解码器的上下文
     */
	private class Context {
		private final IoBuffer innerBuffer;
		private String statusLine = "";
		private String sender = "";
		private String receiver = "";
		private String length = "";
		private String sms = "";
		private int matchCount = 0;
		private int line = 0;

		public Context() {
			innerBuffer = IoBuffer.allocate(1024).setAutoExpand(true);
		}
		public int getMatchCount() {
			return matchCount;
		}
		public void setMatchCount(int matchCount) {
			this.matchCount = matchCount;
		}

		public int getLine() {
			return line;
		}
		public void setLine(int line) {
			this.line = line;
		}
		public String getStatusLine() {
			return statusLine;
		}
		public void setStatusLine(String statusLine) {
			this.statusLine = statusLine;
		}
		public String getSender() {
			return sender;
		}
		public void setSender(String sender) {
			this.sender = sender;
		}
		public String getReceiver() {
			return receiver;
		}
		public void setReceiver(String receiver) {
			this.receiver = receiver;
		}
		public String getLength() {
			return length;
		}
		public void setLength(String length) {
			this.length = length;
		}
		public String getSms() {
			return sms;
		}
		public void setSms(String sms) {
			this.sms = sms;
		}
        /**
         * 重置解码器上下文状态
         */
		public void reset() {
			this.innerBuffer.clear();
			this.matchCount = 0;
			this.line = 0;
			this.statusLine = "";
			this.sender = "";
			this.receiver = "";
			this.length = "";
			this.sms = "";
		}
	}

}

为了模拟将一条短信分两次发送的情况,我们来改造编码器:
package mina.tcp.coder;

import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * 短信编码器
 * @author donald
 * 2017年5月19日
 * 下午10:55:48
 */
public class CumulativeSmsEncoder2 extends ProtocolEncoderAdapter {
	private static final Logger log = LoggerFactory.getLogger(CumulativeSmsEncoder2.class);
	private final Charset charset;
	private final CharsetEncoder charsetEncoder;
	public CumulativeSmsEncoder2(Charset charset) {
		this.charset = charset;
		charsetEncoder = charset.newEncoder();
	}
	@Override
	public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		
		IoBuffer buffer = IoBuffer.allocate(1024).setAutoExpand(true);
		String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0";//状态行
		String sender = sms.getSender();
		String receiver = sms.getReceiver();
		String smsContent = sms.getMessage();
		//组装发送内容,我们以\n来分隔
		buffer.putString(statusLine + '\n', charsetEncoder);//状态行
		buffer.putString("S: " + sender + '\n', charsetEncoder);//短信发送者
		//在这里先发送短信状态行和短信发送者行,
		//再发送短息其他内容,已测试解码器上下文是否有效
		buffer.flip();
		out.write(buffer);
		IoBuffer bufferx = IoBuffer.allocate(1024).setAutoExpand(true);
		bufferx.putString("R: " + receiver + '\n', charsetEncoder);//短信接受者
		bufferx.putString("L: " + (smsContent.getBytes(charset).length) + "\n", charsetEncoder);//内容长度
		bufferx.putString(smsContent, charsetEncoder);//内容
		//切换读写模式
		bufferx.flip();
		out.write(bufferx);
		log.info("========短信编码器编码完毕....");
	}
}

修改协议编解码器工厂:
package mina.tcp.coder;

import java.nio.charset.Charset;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

/**
 * 短信编码解码工厂
 * @author donald 
 * 2017年5月19日 
 * 下午10:53:37
 */
public class CumulativeSmsCodecFactory2 implements ProtocolCodecFactory {
	private final CumulativeSmsEncoder2 encoder;
	private final CumulativeSmsDecoder2 decoder;

	public CumulativeSmsCodecFactory2() {
		this(Charset.defaultCharset());
	}

	public CumulativeSmsCodecFactory2(Charset charSet) {
		this.encoder = new CumulativeSmsEncoder2(charSet);
		this.decoder = new CumulativeSmsDecoder2(charSet);
	}
	@Override
	public ProtocolDecoder getDecoder(IoSession session) throws Exception {
		return decoder;
	}
	@Override
	public ProtocolEncoder getEncoder(IoSession session) throws Exception {
		return encoder;
	}
}

修改server:

package mina.tcp.main;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.coder.CumulativeSmsCodecFactory2;
import mina.tcp.handler.SmsServerHandler2;
/**
 * SmsServer
 * @author donald
 * 2017年5月19日
 * 下午10:16:29
 */
public class SmsServer2 {
	private static final Logger log = LoggerFactory.getLogger(SmsServer2.class);
	private static final  String ip = "192.168.31.153";
	private static final  int port = 9122;
	private static final  int readBufferSize = 2048;
	private static final  int idleTime = 10;
	private static final Charset charset = Charset.forName("UTF-8");
	public static void main(String[] args) throws IOException {
		 IoAcceptor acceptor=new NioSocketAcceptor();
		//配置socket会话
		 SocketSessionConfig socketSessionConfig = (SocketSessionConfig) acceptor.getSessionConfig();
		 socketSessionConfig.setReadBufferSize(readBufferSize);
		 socketSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,idleTime);
		 //配置过滤器
		 DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = acceptor.getFilterChain();
		 LoggingFilter loggingFilter = new LoggingFilter();
		 defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter);
		 CumulativeSmsCodecFactory2 cmccSipcCodecFactory2 = new CumulativeSmsCodecFactory2(charset);
		 ProtocolCodecFilter protocolCodecFilter = new ProtocolCodecFilter(cmccSipcCodecFactory2);
		 defaultIoFilterChainBuilder.addLast("protocolCodecFilter",protocolCodecFilter);
		 //配置NioSocketAcceptor处理器
		 SmsServerHandler2 smsServerHandler2 = new SmsServerHandler2();
		 acceptor.setHandler(smsServerHandler2);
		 InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
		 acceptor.bind(inetSocketAddress);
		 log.info("=========SmsServer2 is start============");
	}
}

server handler:
package mina.tcp.handler;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * SmsServerHandler
 * @author donald
 * 2017年5月19日
 * 下午10:45:26
 */
public class SmsServerHandler2 extends IoHandlerAdapter {
	private final static Logger log = LoggerFactory.getLogger(SmsServerHandler2.class);

	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		log.info("===message received from "+sms.getSender()+" is:" + sms.getMessage());
		SmsInfo ackSms = new SmsInfo();
		ackSms.setSender(sms.getReceiver());
		ackSms.setReceiver(sms.getSender());
		ackSms.setMessage("收到...");
		session.write(ackSms);
		log.info("===回复短信已发送...");
	}
	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		log.error("===会话异常:"+cause.getMessage());
		cause.printStackTrace();
		session.closeNow();
	}

}


修改client:

package mina.tcp.main;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.coder.CumulativeSmsCodecFactory2;
import mina.tcp.handler.SmsClientHandler2;
/**
 * SmsClient
 * @author donald
 * 2017年5月19日
 * 下午10:27:30
 */
public class SmsClient2 {
	private static final Logger log = LoggerFactory.getLogger(SmsClient2.class);
	private static final  String ip = "192.168.31.153";
	private static final  int port = 9122;
	private static final  int connectTimeoutMillis = 30000;
	private static final Charset charset = Charset.forName("UTF-8");
	public static void main(String[] args) {
		IoConnector connector=new NioSocketConnector();
		 connector.setConnectTimeoutMillis(connectTimeoutMillis);
		//配置过滤器
		 DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = connector.getFilterChain();
		 LoggingFilter loggingFilter = new LoggingFilter();
		 defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter);
		 CumulativeSmsCodecFactory2 cmccSipcCodecFactory2 = new CumulativeSmsCodecFactory2(charset);
		 ProtocolCodecFilter protocolCodecFilter = new ProtocolCodecFilter(cmccSipcCodecFactory2);
		 defaultIoFilterChainBuilder.addLast("protocolCodecFilter",protocolCodecFilter);
		//配置NioSocketConnector处理器
		 SmsClientHandler2 smsClientHandler2 = new SmsClientHandler2();
		 connector.setHandler(smsClientHandler2);
		 InetSocketAddress inetSocketAddress = new InetSocketAddress(ip,port);
		 connector.connect(inetSocketAddress);
		 log.info("=========SmsClient2 is start============");
	}
}

client handler:
package mina.tcp.handler;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import mina.tcp.message.SmsInfo;
/**
 * SmsClientHandler
 * @author donald
 * 2017年5月19日
 * 下午10:30:24
 */
public class SmsClientHandler2 extends IoHandlerAdapter {
	private final static Logger log = LoggerFactory.getLogger(SmsClientHandler2.class);
	@Override
	public void sessionOpened(IoSession session) {
		SmsInfo sms = new SmsInfo();
		sms.setSender("13688888888");
		sms.setReceiver("18866666666");
		sms.setMessage("first...");
		session.write(sms);
		sms.setMessage("second...");
		session.write(sms);
		log.info("===短信已发送...");
	}
	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		SmsInfo sms = (SmsInfo) message;
		log.info("===message received from "+sms.getSender()+" is:" + sms.getMessage());
	}
	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		log.error("===会话异常:"+cause.getMessage());
		cause.printStackTrace();
		session.closeNow();
	}
	
}


启动server,client控制台输出:
server:
[INFO ] 2017-05-21 12:27:09 mina.tcp.main.SmsServer2 =========SmsServer2 is start============
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter CREATED
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter OPENED
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=77 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:27:14 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsDecoder2 ========短信解码器解码一条短信完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsServerHandler2 ===message received from 13688888888 is:first...
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsEncoder2 ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsServerHandler2 ===回复短信已发送...
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@4ab24098
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=78 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:27:14 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsDecoder2 ========短信解码器解码一条短信完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsServerHandler2 ===message received from 13688888888 is:second...
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsEncoder2 ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsServerHandler2 ===回复短信已发送...
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@7caee177

client:
[INFO ] 2017-05-21 12:27:14 mina.tcp.main.SmsClient2 =========SmsClient2 is start============
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter CREATED
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter OPENED
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsEncoder2 ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsEncoder2 ========短信编码器编码完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsClientHandler2 ===短信已发送...
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@41785b00
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter SENT: mina.tcp.message.SmsInfo@41785b00
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=78 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:27:14 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsDecoder2 ========短信解码器解码一条短信完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsClientHandler2 ===message received from 18866666666 is:收到...
[INFO ] 2017-05-21 12:27:14 org.apache.mina.filter.logging.LoggingFilter RECEIVED: HeapBuffer[pos=0 lim=78 cap=2048: 4D 20 73 69 70 3A 77 61 70 2E 66 65 74 69 6F 6E...]
[DEBUG] 2017-05-21 12:27:14 org.apache.mina.filter.codec.ProtocolCodecFilter Processing a MESSAGE_RECEIVED for session 1
[INFO ] 2017-05-21 12:27:14 mina.tcp.coder.CumulativeSmsDecoder2 ========短信解码器解码一条短信完毕....
[INFO ] 2017-05-21 12:27:14 mina.tcp.handler.SmsClientHandler2 ===message received from 18866666666 is:收到...
  • lib.rar (1022.7 KB)
  • 下载次数: 2
0
1
分享到:
评论

相关推荐

    mina自定义编解码器详解

    - 文件`example`可能包含了一个使用mina编写的服务器或客户端程序,其中包含了自定义编解码器的应用实例。 - `MinaCodec`可能是一个包含编码器和解码器的类,我们来详细分析其工作流程: - 在编码器中,通常有一...

    mina编解码器Demo

    《mina编解码器Demo详解》 mina是一个高性能、异步事件驱动的网络通信框架,主要应用于Java平台,常用于构建TCP和UDP服务。在本文中,我们将深入探讨MINA框架中的编解码器(Codec)概念,并通过提供的`mina_server`...

    mina自定义编码解码器

    本文将深入探讨如何在Mina中自定义编码解码器,这对于实现特定的网络通信协议至关重要。 首先,了解Mina的编码解码器架构是必要的。Mina使用了Chain of Responsibility设计模式,通过Filter链来处理进来的数据。...

    Mina开发实例(服务端、客户端)DEMO

    此外,Mina支持多种编码和解码器,如LineDelimiterFilter(按行分割数据)、ProtobufDecoder(解析Google Protobuf格式数据)等。在实际应用中,根据数据格式选择合适的过滤器,可以有效地处理网络传输中的数据。 ...

    apache mina 框架实例

    以下是对Mina框架实例、自定义协议包、编码器和解码器的详细解释: **Apache Mina框架实例:** Apache Mina框架的核心思想是基于事件驱动和非阻塞I/O,这使得它在处理大量并发连接时表现出色。在Mina中,开发者可以...

    mina解码器

    在这个场景中,"mina解码器"是指使用MINA框架来实现自定义的编码和解码逻辑,以处理网络数据的收发。 解码器(Decoder)和编码器(Encoder)是MINA框架中用于处理数据传输的核心组件。它们分别负责将接收到的原始...

    Mina断包,粘包处理(完整实例,有客户端,服务端)

    在Mina中,解决这个问题的关键在于定义正确的协议编码器和解码器。例如,你可以使用FixedLengthFrameDecoder或LineBasedFrameDecoder来确保每个接收到的数据块都能正确地组合成原始消息。 2. **粘包**: 相反,...

    mina入门实例

    在这个实例中,`MyProtocolCodecFactory`是一个自定义的编码解码器,用于将业务数据转换为字节流,以便在网络上传输。你需要根据实际需求实现这个工厂类。 在完成上述代码后,运行服务器端,然后启动客户端,即可...

    Mina长连接短连接实例

    Mina提供了多种内置的编码器和解码器,可以根据实际需求选择或自定义。 3. **FilterChain**:这是一个处理链,允许我们插入自定义的过滤器来处理进/出站事件,实现业务逻辑或日志记录等功能。 4. **IoHandler**:...

    apache mina实例免费下载

    过滤器链则提供了数据处理的模块化方式,每个过滤器可以执行特定的操作,如数据编码、解码、安全加密等。 在实际应用中,Apache MINA常用于构建服务器端应用,如聊天服务器、游戏服务器、文件传输服务等。通过MINA...

    mina2框架+实例教程

    在提供的压缩包中,可能包含了基于Mina2的实例代码,这些代码通常会展示如何创建服务器、连接客户端、定义IoHandler和过滤器、进行数据编码解码等操作。例如,可能会有一个简单的Echo Server示例,它接收客户端发送...

    minaDemo的实例

    "MinaDemo"项目可能还包括自定义的编码器和解码器,例如`ProtocolEncoder`和`ProtocolDecoder`,用于将业务数据转化为适合网络传输的格式,并在接收到网络数据时还原成原始信息。 此外,MINA支持事件驱动模型,通过...

    mina2服务端客户端实例,保证能够正常运行

    MINA提供了过滤器链的概念,允许开发者在数据传输过程中添加自定义处理逻辑,如数据编码解码、安全加密等。 学习和研究这个实例,开发者可以了解到MINA如何处理异步I/O事件,如何定义自定义的协议处理类,以及如何...

    队列,mina通信的结合实例

    本实例将探讨如何将队列与MINA通信框架结合,以实现高效、可靠的网络服务。以下是关于这两个概念及其结合使用的详细说明。 **队列(Queue)** 队列是一种数据结构,遵循先进先出(FIFO,First In First Out)原则...

    mina HTTP协议实例

    4. **编码与解码**:MINA提供了ByteToMessageDecoder和MessageToByteEncoder接口,用于数据的解码和编码。在HTTP协议中,我们需要实现这两个接口来处理HTTP报文的二进制和文本转换。 5. **源码分析**:阅读MINA的...

    mina 实现简单通讯

    在本实例中,可能自定义了一个简单的编解码器,以适应特定的通信协议,可能是基于固定长度或基于特定分隔符的数据包。 此外,“工具类”可能包含了处理网络操作辅助方法,例如数据包的序列化和反序列化,或者错误...

    mina学习基础-入门实例-传输定长报文(三)

    Apache Mina是一个开源...这个过程涉及到了Mina的Server创建、过滤器链构建、编码解码器设计以及事件驱动模型的运用。通过这样的实例学习,可以帮助开发者更好地理解和应用Mina框架,提高网络应用的开发效率和安全性。

    MINA客户端与服务端通信实例

    4. 将解码器、编码器和处理器设置到`IoAcceptor`,并启动监听特定端口。 客户端的实现包括: 1. 创建一个`IoConnector`,它是MINA客户端的核心,负责建立到服务器的连接。 2. 同样,定义解码器和编码器,并配置到...

    Mina Tcp实例

    在TCP实例中,你可能需要自定义编码器和解码器,以适应你的消息格式。 5. **Buffer**:Mina使用缓冲区对象来存储和操作网络数据。通常,你会在编码和解码过程中与Buffer对象打交道。 接下来,我们来看看如何创建一...

    mina test实例一个

    2. **FilterChain**: 过滤器链是 Mina 的一大特色,它允许开发者通过添加、删除或修改过滤器来处理网络通信的不同阶段,如数据编码解码、安全加密等。 3. **ProtocolCodecFactory**: 用于编码和解码网络数据的工厂...

Global site tag (gtag.js) - Google Analytics