`
Donald_Draper
  • 浏览: 980110 次
社区版块
存档分类
最新评论

NIO-TCP通信实例(单线程,多线程Server)

    博客分类:
  • NIO
阅读更多
Java Socket通信实例:http://donald-draper.iteye.com/blog/2356695
Java NIO ByteBuffer详解:http://donald-draper.iteye.com/blog/2357084
Java Nio系列教程;http://www.iteye.com/magazines/132-Java-NIO
NIO-TCP简单实例:http://donald-draper.iteye.com/admin/blogs/2369044
在这篇文章之前用BIO实现过TCP的通信,即Java Socket通信实例这篇文章,那边文章
主要利用BIO的ServerSocket和Socket实现加法和乘法的实现,今天我们来用NIO的
ServerSocketChannel和SocketChannel来实现加法和乘法;协议基本一致,做了一点修改
如下:




下面我们来具体的实现:
协议常量类:
package nio.socketchannel;

/**
 * 协议常量
 * @author donald
 * 2017年4月13日
 * 下午10:49:27
 */
public class ProtocolConstants {
	/**
	 * 加法协议编码
	 */
	public static final String SUM_PROTOCOL_300000 = "300000";
	/**
	 * 乘法协议编码
	 */
	public static final String MULTI_PROTOCOL_300100 = "300100";
	/**
	 * 计算结果
	 */
	public static final String ACK_PROTOCOL_300200 = "300200";
	/**
	 * 服务器解析协议失败
	 */
	public static final String ACK_PROTOCOL_300300 = "300300";
	/**
	 * 协议编码长度
	 */
	public static final int PROTOCOL_CODE_LENGTH = 6;
	/**
	 * 协议操作数长度
	 */
	public static final int OPERATE_NUM_LENGTH = 4;
	/**
	 * 字符集
	 */
	public static final String CHARSET_UTF8 = "UTF-8";
}

服务端:
package nio.socketchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import socket.ProtocolConstants;


public class NIOServerCalculate {
	private static final String HOST = "192.168.32.126";
	private static final int PORT = 10000;
	//manager the channel
	private Selector selector;
	/**
	 * stat Server
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException{
		NIOServerCalculate server = new NIOServerCalculate();
		server.initServer(HOST,PORT);
		server.listen();
	}
	/**
	 * get the ServerSocket and finish some initial work
	 * @param port
	 * @throws IOException
	 */
	public void initServer(String host, int port) throws IOException{
		//get the ServerSocket
		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		// set no blocking mode
		serverChannel.configureBlocking(false);
		//bind the port
		serverChannel.socket().bind(new InetSocketAddress(host, port));
		//get the channel manager
		this.selector = Selector.open();
		//Register the channel to manager and bind the event
		serverChannel.register(selector,SelectionKey.OP_ACCEPT);
		}
	/**
	 * use asking mode to listen the event of selector
	 * @throws IOException 
	 */
	@SuppressWarnings({ "rawtypes" })
	public void listen() throws IOException{
		System.out.println("=========The Server is start!===========");
		while(true){
			selector.select();
			Iterator ite =  this.selector.selectedKeys().iterator();
			while(ite.hasNext()){
				SelectionKey key = (SelectionKey)ite.next();
				ite.remove();
				if(key.isAcceptable()){
					ServerSocketChannel server = (ServerSocketChannel)key.channel();
					SocketChannel channel = server.accept();
					channel.configureBlocking(false);
					System.out.println("=========channel is Connected:"+channel.isConnected());
					System.out.println("=========channel is Open:"+channel.isOpen());
					System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending());
//					channel.register(this.selector, SelectionKey.OP_READ);
					channel.register(this.selector, SelectionKey.OP_READ,"decodeProtol");
				}
				else if (key.isReadable()) read(key);
			}
			
		}
	}
	/**
	 * deal with the data come from the client
	 * @param key
	 * @throws IOException 
	 */
	public void read(SelectionKey key) throws IOException{
		SocketChannel channel = (SocketChannel) key.channel();
		String  attachedInfo = (String) key.attachment();
		System.out.println("========socketChannel attachedInfo:"+attachedInfo);
		ByteBuffer[] proctols = null;//协议
		ByteBuffer proctolCodeBuffer = null;//协议编码
		proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH);
		ByteBuffer dataBuffer = null;//协议内容:操作数
		dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH);
		proctols = new ByteBuffer[]{proctolCodeBuffer,dataBuffer};
		System.out.println("========read caculate proctol from Client=======");
//		channel.read(proctols);
		while(proctolCodeBuffer.position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && dataBuffer.position() != 2*ProtocolConstants.OPERATE_NUM_LENGTH){
			channel.read(proctols);//待读取完成协议才解析
		}
//		channel.shutdownInput();
		proctolCodeBuffer.flip();
		dataBuffer.flip();
		byte[] proctolCodeBytes = proctolCodeBuffer.array();
		String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim();
		int firstNum = 0;
		int secondNum = 0;
		int result = 0;
		if(proctolCode.equals(ProtocolConstants.SUM_PROTOCOL_300000)){
			System.out.println("========the protocol is sum algorithm=======");
			firstNum = dataBuffer.getInt();
			secondNum = dataBuffer.getInt();
			System.out.println("operate num is:"+firstNum+","+secondNum);
			result = firstNum*secondNum;
			proctolCodeBuffer.clear();
			proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8));
			dataBuffer.clear();
			//针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面
//			dataBuffer.compact()
			dataBuffer.putInt(result);
			proctolCodeBuffer.flip();
			dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中
			channel.write(proctols);
		}
		else if(proctolCode.equals(ProtocolConstants.MULTI_PROTOCOL_300100)){
			System.out.println("========the protocol is multiply algorithm=======");
			firstNum = dataBuffer.getInt();
			secondNum = dataBuffer.getInt();
			System.out.println("operate num is:"+firstNum+","+secondNum);
			result = firstNum*secondNum;
			proctolCodeBuffer.clear();
			proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8));
			proctolCodeBuffer.flip();
			dataBuffer.clear();
			//针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面
//			dataBuffer.compact()
			dataBuffer.putInt(result);
			dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中
			channel.write(proctols);
		}
		else{
			System.out.println("========server decode procotol fail......");
			proctolCodeBuffer.clear();
			proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300300.getBytes(ProtocolConstants.CHARSET_UTF8));
			proctolCodeBuffer.flip();
			dataBuffer.clear();
			dataBuffer.putInt(0);
			dataBuffer.flip();
			channel.write(proctols);
		}
		/*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/
//		channel.shutdownOutput();
		/*关闭通道*/
//		channel.close();
		/*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/
	}
	
}


加法客户端:
package nio.socketchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import socket.ProtocolConstants;

/**
 * 加法计算
 * @author donald
 * 2017年4月10日
 * 下午9:32:57
 */
public class NIOClientSum {
	private static final String HOST = "192.168.32.126";
	private static final int PORT = 10000;
	//manager the channel
	private Selector selector;
	/**
	 * stat Client
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException{
		NIOClientSum client = new NIOClientSum();
		client.initClient(HOST,PORT);
		client.listen();
	}
	/**
	 * get the Socket and finish some initial work
	 * @param ip Server ip
	 * @param port connect Server port
	 * @throws IOException
	 */
	public void initClient(String ip,int port) throws IOException{
		//get the Socket
		SocketChannel channel = SocketChannel.open();
		// set no blocking mode
		channel.configureBlocking(false);
		//connect the Server
		channel.connect(new InetSocketAddress(ip,port));
		//get the channel manager
		this.selector = Selector.open();
		//Register the channel to manager and bind the event
		channel.register(selector,SelectionKey.OP_CONNECT);
		}
	/**
	 * use asking mode to listen the event of selector
	 * @throws IOException 
	 */
	@SuppressWarnings("rawtypes")
	public void listen() throws IOException{
		System.out.println("===========The Sum Client is start!===========");
		while(true){
			selector.select();
			Iterator ite =  this.selector.selectedKeys().iterator();
			while(ite.hasNext()){
				SelectionKey key = (SelectionKey)ite.next();
				ite.remove();
				if(key.isConnectable()){
					SocketChannel channel = (SocketChannel)key.channel();
                    //during connecting, finish the connect
                    if(channel.isConnectionPending()){
                    	channel.finishConnect();
                    }
					channel.configureBlocking(false);
					System.out.println("=========channel is Connected:"+channel.isConnected());
					System.out.println("=========channel is Open:"+channel.isOpen());
					System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending());
					ByteBuffer[] proctols = null;//协议
					proctols = new ByteBuffer[2];
					ByteBuffer proctolCodeBuffer = null;//协议编码
					proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH);
//					proctolCodeBuffer = ByteBuffer.wrap(new String("300000").getBytes("UTF-8"));
					System.out.println("ProtocolCode String length:"+ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8).length);
					proctolCodeBuffer.put(ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8));
					System.out.println("ProtocolCode length:"+proctolCodeBuffer.position());
					proctols[0] = proctolCodeBuffer;
					proctolCodeBuffer.flip();
					ByteBuffer dataBuffer = null;//协议内容:操作数
					dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH);
					dataBuffer.putInt(15);
					dataBuffer.putInt(6);
					System.out.println("data length:"+dataBuffer.position());
					proctols[1] = dataBuffer;
					dataBuffer.flip();
					channel.write(proctols);//将缓冲区的内容发送到通道,
//					channel.shutdownOutput();
					System.out.println("=======write proctols to channel");
//					channel.register(this.selector, SelectionKey.OP_READ);
					channel.register(this.selector, SelectionKey.OP_READ,"calculateResult");
				}
				else if (key.isReadable()) read(key);
			}
			
		}
	}
	/**
	 * deal with the data come from the server
	 * @param key
	 * @throws IOException 
	 */
	public void read(SelectionKey key) throws IOException{
		SocketChannel channel = (SocketChannel) key.channel();
		String  attachedInfo = (String) key.attachment();
		System.out.println("========socketChannel attachedInfo:"+attachedInfo);
		ByteBuffer[] proctols = null;
		proctols = new ByteBuffer[]{ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH),ByteBuffer.allocate(ProtocolConstants.OPERATE_NUM_LENGTH)};
		System.out.println("========read caculate result from Server=======");
//		channel.read(proctols);
		while(proctols[0].position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && proctols[1].position() != ProtocolConstants.OPERATE_NUM_LENGTH){
			channel.read(proctols);//待读取完成协议才解析
		}
		proctols[0].flip();
		proctols[1].flip();
		byte[] proctolCodeBytes = proctols[0].array();
		String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim();
		if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300200)){
			int result = proctols[1].getInt();
			System.out.println("========the calculated result from server:"+result);
		}else if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300300)){
			System.out.println("========server decode procotol fail......");
		}
		else {
			System.out.println("========unknow error ...");
		}
		/*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/
//		channel.shutdownOutput();
		/*关闭通道*/
//		channel.close();
		/*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/
	}
	
}



乘法客户端:
package nio.socketchannel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import socket.ProtocolConstants;

/**
 * 加法计算
 * @author donald
 * 2017年4月10日
 * 下午9:32:57
 */
public class NIOClientSum {
	private static final String HOST = "192.168.32.126";
	private static final int PORT = 10000;
	//manager the channel
	private Selector selector;
	/**
	 * stat Client
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException{
		NIOClientSum client = new NIOClientSum();
		client.initClient(HOST,PORT);
		client.listen();
	}
	/**
	 * get the Socket and finish some initial work
	 * @param ip Server ip
	 * @param port connect Server port
	 * @throws IOException
	 */
	public void initClient(String ip,int port) throws IOException{
		//get the Socket
		SocketChannel channel = SocketChannel.open();
		// set no blocking mode
		channel.configureBlocking(false);
		//connect the Server
		channel.connect(new InetSocketAddress(ip,port));
		//get the channel manager
		this.selector = Selector.open();
		//Register the channel to manager and bind the event
		channel.register(selector,SelectionKey.OP_CONNECT);
		}
	/**
	 * use asking mode to listen the event of selector
	 * @throws IOException 
	 */
	@SuppressWarnings("rawtypes")
	public void listen() throws IOException{
		System.out.println("===========The Sum Client is start!===========");
		while(true){
			selector.select();
			Iterator ite =  this.selector.selectedKeys().iterator();
			while(ite.hasNext()){
				SelectionKey key = (SelectionKey)ite.next();
				ite.remove();
				if(key.isConnectable()){
					SocketChannel channel = (SocketChannel)key.channel();
                    //during connecting, finish the connect
                    if(channel.isConnectionPending()){
                    	channel.finishConnect();
                    }
					channel.configureBlocking(false);
					System.out.println("=========channel is Connected:"+channel.isConnected());
					System.out.println("=========channel is Open:"+channel.isOpen());
					System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending());
					ByteBuffer[] proctols = null;//协议
					proctols = new ByteBuffer[2];
					ByteBuffer proctolCodeBuffer = null;//协议编码
					proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH);
//					proctolCodeBuffer = ByteBuffer.wrap(new String("300000").getBytes("UTF-8"));
					System.out.println("ProtocolCode String length:"+ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8).length);
					proctolCodeBuffer.put(ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8));
					System.out.println("ProtocolCode length:"+proctolCodeBuffer.position());
					proctols[0] = proctolCodeBuffer;
					proctolCodeBuffer.flip();
					ByteBuffer dataBuffer = null;//协议内容:操作数
					dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH);
					dataBuffer.putInt(15);
					dataBuffer.putInt(6);
					System.out.println("data length:"+dataBuffer.position());
					proctols[1] = dataBuffer;
					dataBuffer.flip();
					channel.write(proctols);//将缓冲区的内容发送到通道,
//					channel.shutdownOutput();
					System.out.println("=======write proctols to channel");
//					channel.register(this.selector, SelectionKey.OP_READ);
					channel.register(this.selector, SelectionKey.OP_READ,"calculateResult");
				}
				else if (key.isReadable()) read(key);
			}
			
		}
	}
	/**
	 * deal with the data come from the server
	 * @param key
	 * @throws IOException 
	 */
	public void read(SelectionKey key) throws IOException{
		SocketChannel channel = (SocketChannel) key.channel();
		String  attachedInfo = (String) key.attachment();
		System.out.println("========socketChannel attachedInfo:"+attachedInfo);
		ByteBuffer[] proctols = null;
		proctols = new ByteBuffer[]{ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH),ByteBuffer.allocate(ProtocolConstants.OPERATE_NUM_LENGTH)};
		System.out.println("========read caculate result from Server=======");
//		channel.read(proctols);
		while(proctols[0].position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && proctols[1].position() != ProtocolConstants.OPERATE_NUM_LENGTH){
			channel.read(proctols);//待读取完成协议才解析
		}
		proctols[0].flip();
		proctols[1].flip();
		byte[] proctolCodeBytes = proctols[0].array();
		String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim();
		if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300200)){
			int result = proctols[1].getInt();
			System.out.println("========the calculated result from server:"+result);
		}else if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300300)){
			System.out.println("========server decode procotol fail......");
		}
		else {
			System.out.println("========unknow error ...");
		}
		/*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/
//		channel.shutdownOutput();
		/*关闭通道*/
//		channel.close();
		/*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/
	}
	
}

先启动服务端,再启动加法和乘法客户端,控制台数输出为:
服务端:
=========The Server is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
========socketChannel attachedInfo:decodeProtol
========read caculate proctol from Client=======
========the protocol is sum algorithm=======
operate num is:15,6
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
========socketChannel attachedInfo:decodeProtol
========read caculate proctol from Client=======
========the protocol is multiply algorithm=======
operate num is:17,8

加法客户端:
===========The Sum Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode String length:6
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:90

乘法客户端:
===========The Multiply Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:136
在上面的测试中,channel.shutdownOutput()关闭Connection,即关闭到通道的连接,
和channel.close()关闭通道时,SocketChannel通道会有一个OP_READ事件,至于为什么,
暂时不知道,以后我们会在后面的文章中,在研究一下。
另外在操作缓冲区Buffer时,要注意从通道读数据到缓冲区,及写缓冲区,或从缓冲区写数据到通道,即读取缓冲区,缓冲区读写模式转换是要调用flip函数,进行切换模式,
limit定位到position位置,然后position回到0;意思为缓冲区可读可写的数据量。
put操作为写缓存区,get操作为读缓存区,当重用缓冲区,记得clear缓冲区,clear并不为
清空缓冲区,至少将position至少为0,mark为-1,limit为capacity,这个概念,在ByteBuffer详解文章中已经讲过了,不记得可以再看看。

上面的Server端,以单线程处理Client端的计算请求,下面我们把它改写成多线程的形式,
Server端只处理连接请求,计算的处理单独交给一个线程来处理:
多线程Server如下:
package nio.handler;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import socket.ProtocolConstants;

/**
 * Server
 * @author donald
 * 2017年4月13日
 * 下午11:14:28
 */
public class NIOServerCalculateX {
	private static final String HOST = "192.168.32.126";
	private static final int PORT = 10000;
	private static ExecutorService exec= null;
	static {
		exec = Executors.newFixedThreadPool(2);
	}
	
	//manager the channel
	private Selector selector;
	/**
	 * stat Server
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException{
		NIOServerCalculateX server = new NIOServerCalculateX();
		server.initServer(HOST,PORT);
		server.listen();
	}
	/**
	 * get the ServerSocket and finish some initial work
	 * @param port
	 * @throws IOException
	 */
	public void initServer(String host, int port) throws IOException{
		//get the ServerSocket
		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		// set no blocking mode
		serverChannel.configureBlocking(false);
		//bind the port
		serverChannel.socket().bind(new InetSocketAddress(host, port));
		//get the channel manager
		this.selector = Selector.open();
		//Register the channel to manager and bind the event
		serverChannel.register(selector,SelectionKey.OP_ACCEPT);
		}
	/**
	 * use asking mode to listen the event of selector
	 * @throws IOException 
	 */
	@SuppressWarnings({ "rawtypes" })
	public void listen() throws IOException{
		System.out.println("=========The Server is start!===========");
		while(true){
			selector.select();
			Iterator ite =  this.selector.selectedKeys().iterator();
			while(ite.hasNext()){
				SelectionKey key = (SelectionKey)ite.next();
				ite.remove();
				if(key.isAcceptable()){
					ServerSocketChannel server = (ServerSocketChannel)key.channel();
					SocketChannel channel = server.accept();
					channel.configureBlocking(false);
					System.out.println("=========channel is Connected:"+channel.isConnected());
					System.out.println("=========channel is Open:"+channel.isOpen());
					System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending());
//					channel.register(this.selector, SelectionKey.OP_READ);
					HanlderNioSocketChannel hanlderNioSocketChannel= new HanlderNioSocketChannel();
					channel.register(hanlderNioSocketChannel.getSelector(), SelectionKey.OP_READ,"decodeProtol");
					exec.submit(hanlderNioSocketChannel);
				}
			}
			
		}
	}	
}


计算处理线程:
package nio.handler;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

import socket.ProtocolConstants;
/**
 * 处理SocketChannel读事件
 * @author donald
 * 2017年4月11日
 * 下午10:32:55
 */
public class HanlderNioSocketChannel implements Runnable{
	private Selector selector;
	private String threadName;
	public HanlderNioSocketChannel() {
		super();
		try {
			this.selector = Selector.open();
		} catch (IOException e) {
			e.printStackTrace();
		}
		threadName = Thread.currentThread().getName();
	}
	public Selector getSelector() {
		return selector;
	}
	public void setSelector(Selector selector) {
		this.selector = selector;
	}

	@Override
	public void run() {
		try {
			listen();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	/**
	 * use asking mode to listen the event of selector
	 * @throws IOException 
	 */
	@SuppressWarnings({ "rawtypes" })
	public void listen() throws IOException{
		System.out.println(threadName+"=========The Server Calculate is start!===========");
		while(true){
			selector.select();
			Iterator ite =  this.selector.selectedKeys().iterator();
			while(ite.hasNext()){
				SelectionKey key = (SelectionKey)ite.next();
				ite.remove();
		        if (key.isReadable()) {
					read(key);
				}
			}
			
		}
	}	
	private void read(SelectionKey key){
		try {
			SocketChannel channel = (SocketChannel) key.channel();
			String  attachedInfo = (String) key.attachment();
			System.out.println(threadName+"========socketChannel attachedInfo:"+attachedInfo);
			ByteBuffer[] proctols = null;//协议
			ByteBuffer proctolCodeBuffer = null;//协议编码
			proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH);
			ByteBuffer dataBuffer = null;//协议内容:操作数
			dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH);
			proctols = new ByteBuffer[]{proctolCodeBuffer,dataBuffer};
			System.out.println(threadName+"========read caculate proctol from Client=======");
	//		channel.read(proctols);
			while(proctolCodeBuffer.position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && dataBuffer.position() != 2*ProtocolConstants.OPERATE_NUM_LENGTH){
				channel.read(proctols);//待读取完成协议才解析
			}
	//		channel.shutdownInput();
			proctolCodeBuffer.flip();
			dataBuffer.flip();
			byte[] proctolCodeBytes = proctolCodeBuffer.array();
			String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim();
			int firstNum = 0;
			int secondNum = 0;
			int result = 0;
			if(proctolCode.equals(ProtocolConstants.SUM_PROTOCOL_300000)){
				System.out.println(threadName+"========the protocol is sum algorithm=======");
				firstNum = dataBuffer.getInt();
				secondNum = dataBuffer.getInt();
				System.out.println("operate num is:"+firstNum+","+secondNum);
				result = firstNum*secondNum;
				proctolCodeBuffer.clear();
				proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8));
				dataBuffer.clear();
				//针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面
	//			dataBuffer.compact()
				dataBuffer.putInt(result);
				proctolCodeBuffer.flip();
				dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中
				channel.write(proctols);
				
			}
			else if(proctolCode.equals(ProtocolConstants.MULTI_PROTOCOL_300100)){
				System.out.println(threadName+"========the protocol is multiply algorithm=======");
				firstNum = dataBuffer.getInt();
				secondNum = dataBuffer.getInt();
				System.out.println("operate num is:"+firstNum+","+secondNum);
				result = firstNum*secondNum;
				proctolCodeBuffer.clear();
				proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8));
				proctolCodeBuffer.flip();
				dataBuffer.clear();
				//针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面
	//			dataBuffer.compact()
				dataBuffer.putInt(result);
				dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中
				channel.write(proctols);
			}
			else{
				System.out.println(threadName+"========server decode procotol fail......");
				proctolCodeBuffer.clear();
				proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300300.getBytes(ProtocolConstants.CHARSET_UTF8));
				proctolCodeBuffer.flip();
				dataBuffer.clear();
				dataBuffer.putInt(0);
				dataBuffer.flip();
				channel.write(proctols);
			}
			/*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/
//			channel.shutdownOutput();
			/*关闭通道*/
	        //channel.close();
			/*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

先启动服务端,再启动加法和乘法客户端,控制台数输出为:
服务端:
=========The Server is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
main=========The Server Calculate is start!===========
main========socketChannel attachedInfo:decodeProtol
main========read caculate proctol from Client=======
main========the protocol is sum algorithm=======
operate num is:15,6
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
main=========The Server Calculate is start!===========
main========socketChannel attachedInfo:decodeProtol
main========read caculate proctol from Client=======
main========the protocol is multiply algorithm=======
operate num is:17,8


加法客户端:
===========The Sum Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode String length:6
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:90

乘法客户端:
===========The Multiply Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:136

总结:
在操作缓冲区Buffer时,要注意从通道读数据到缓冲区,及写缓冲区,或从缓冲区写数据到通道,即读取缓冲区,缓冲区读写模式转换是要调用flip函数,进行切换模式,
limit定位到position位置,然后position回到0;意思为缓冲区可读可写的数据量。put操作为写缓存区,get操作为读缓存区,当重用缓冲区,记得clear缓冲区,clear并不为清空缓冲区,至少将position至少为0,mark为-1,limit为capacity,再次写数据是将覆盖以前的数据。

  • 大小: 51.5 KB
分享到:
评论

相关推荐

    JAVA-NIO程序设计完整实例

    - **高并发服务器**: NIO的选择器机制使得一个线程可以处理多个客户端连接,适合构建高性能的服务器端应用。 - **文件批量处理**: 使用FileChannel的transferTo()和transferFrom()方法,可以高效地进行文件的复制和...

    tcp.zip_java Tcp _java nio_java nio TCP_tcp_tcp java

    TCP(传输控制协议)是一种面向连接、可靠的、基于字节流的传输层通信协议,而NIO(非阻塞I/O)则是Java提供的一种高效处理I/O操作的方式。在这个“tcp.zip”压缩包中,我们可能找到了关于使用Java实现TCP服务器和...

    Java Socket编程实例(四)- NIO TCP实践

    本实例主要关注NIO在TCP中的应用,它允许更高效的资源管理和处理多个连接,特别适合高并发场景。我们将探讨以下几个关键知识点: 1. **NIO(Non-blocking Input/Output)**: 与传统的BIO不同,NIO是非阻塞的,这...

    三个分别由单线程 多线程 线程池实现的简单网关

    标题中的“三个分别由单线程、多线程、线程池实现的简单网关”涉及到的是并发处理的三种常见模型。在IT行业中,尤其是在服务器端编程和高性能系统设计中,如何有效地处理并发请求是至关重要的。让我们逐一探讨这三个...

    java网络编程TCP 多线程连接例子

    在这个例子中,我们将深入理解如何在Java中实现TCP网络编程,以及如何利用多线程来处理并发连接。 首先,TCP网络编程涉及到客户端和服务器端的交互。在Java中,我们可以使用`java.net.Socket`类来创建客户端连接,`...

    基于JAVA IO, NIO, Netty, 多线程并发实战源码.zip

    Java IO、NIO和Netty是Java平台中用于处理输入/输出操作的重要组件,而多线程并发则是提升程序性能和响应能力的关键技术。在这个压缩包"基于JAVA IO, NIO, Netty, 多线程并发实战源码.zip"中,我们可以期待找到一些...

    深入理解Apache_Mina_(5)----_配置Mina的线程模型[归类].pdf

    在Mina的非阻塞I/O(NIO)模式下,存在三种主要的工作线程,它们在NIO Socket中起到不同的作用: 1. **Acceptor Thread**:这个线程负责接收来自客户端的连接请求,并将这些连接转发给I/O Processor线程处理。...

    java NIO实例

    传统的Java I/O模型(BIO)在处理大量并发连接时效率较低,因为它基于阻塞模式,一个线程只能处理一个连接,而NIO则允许单个线程同时处理多个连接,大大提高了性能。 `NIOServer.java`和`NIOClient.java`这两个文件...

    java nio 尚硅谷 12讲 new

    在尚硅谷的12讲课程中,这些知识点将通过实例演示和详细解释,让学习者掌握Java NIO的精髓,并能够实际应用到项目开发中,提升系统的性能和并发处理能力。通过系统学习,开发者将更好地理解Java NIO的优势,并能在...

    基于java的TCP通信程序

    本篇文章将深入探讨“基于Java的TCP通信程序”以及如何实现点对点的多通道通信。 TCP是一种面向连接的、可靠的传输协议,它确保了数据的顺序传输和错误检查。在Java中,我们主要使用`java.net`包中的`Socket`和`...

    NIO学习-Java源代码分享(含netty)

    3. **Selector**:Selector允许单线程检查多个Channel的事件(如连接打开、数据到达、连接关闭等)。这样,一个线程就可以管理多个Channel,极大地提高了并发处理能力。使用Selector,开发者可以实现高效率的服务器...

    Client-Server-App.zip_between_client

    为了优化服务器性能,服务器端可能采用了多线程或异步I/O模型(如Java NIO)来处理多个客户端请求,以提高并发处理能力。 综上所述,这个压缩包提供的代码示例涉及了客户端-服务器应用开发的基础,包括TCP/IP通信...

    java nio教程pdf

    - 使用Selector来处理多个通道的事件,实现单线程管理多个连接。 - 对于文件操作,使用FileChannel进行高效的数据传输和文件操作。 以上知识点详细介绍了Java NIO的核心组件及其工作原理和应用场景,有助于理解和...

    Apache-Mina-Server-2.0中文参考手册V1.0.docx

    Apache Mina Server 2.0 是一款强大的网络通信框架,主要设计用于构建高性能、高度可扩展的网络应用程序。它的核心特点在于提供事件驱动和异步IO操作,这得益于其默认基于Java NIO(非阻塞I/O)的底层实现。Mina有两...

    一个NIO服务端,客户端的例子

    选择器允许单个线程处理多个通道的事件,提高了系统资源的利用率。 这个例子可能包含一个简单的Netty服务器和客户端实现,它们展示了如何利用NIO进行通信。在服务器端,首先会创建一个ServerBootstrap,然后设置...

    基于Java的实例开发源码-Java Socket通信实现.zip

    因此,服务器端的Socket通信常与多线程结合使用,每个连接创建一个新的线程来处理,这样服务器可以并发地服务多个客户端。 8. **TCP特性**:TCP是一种面向连接的、可靠的传输协议,它保证了数据的顺序和完整性。这...

    基于Java的实例源码-NIO网络框架 xSocket.zip

    xSocket是一个基于Java NIO实现的网络通信框架,它提供了高性能、稳定可靠的网络连接管理。 1. **Java NIO基础**: - **通道(Channels)**:通道是数据传输的路径,可以连接到不同类型的I/O设备,如文件、套接字...

    java实现Socket通信之单线程服务

    本文将深入探讨如何使用Java实现单线程服务端的Socket通信。 首先,我们来理解Socket的概念。Socket在编程中被视为网络通信的端点,它包含了IP地址和端口号,用于标识网络上的特定服务。在TCP/IP模型中,Socket是...

    基于Java回顾之网络通信的应用分析

    NIO引入了选择器(Selector)和通道(Channel)的概念,允许单线程同时处理多个连接,提高了服务器的并发性能。例如,使用Selector可以监控多个SocketChannel,当某个通道准备好进行读写时,Selector会通知应用程序...

Global site tag (gtag.js) - Google Analytics