- 浏览: 980110 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
Java Socket通信实例:http://donald-draper.iteye.com/blog/2356695
Java NIO ByteBuffer详解:http://donald-draper.iteye.com/blog/2357084
Java Nio系列教程;http://www.iteye.com/magazines/132-Java-NIO
NIO-TCP简单实例:http://donald-draper.iteye.com/admin/blogs/2369044
在这篇文章之前用BIO实现过TCP的通信,即Java Socket通信实例这篇文章,那边文章
主要利用BIO的ServerSocket和Socket实现加法和乘法的实现,今天我们来用NIO的
ServerSocketChannel和SocketChannel来实现加法和乘法;协议基本一致,做了一点修改
如下:
下面我们来具体的实现:
协议常量类:
服务端:
加法客户端:
乘法客户端:
先启动服务端,再启动加法和乘法客户端,控制台数输出为:
服务端:
=========The Server is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
========socketChannel attachedInfo:decodeProtol
========read caculate proctol from Client=======
========the protocol is sum algorithm=======
operate num is:15,6
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
========socketChannel attachedInfo:decodeProtol
========read caculate proctol from Client=======
========the protocol is multiply algorithm=======
operate num is:17,8
加法客户端:
===========The Sum Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode String length:6
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:90
乘法客户端:
===========The Multiply Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:136
在上面的测试中,channel.shutdownOutput()关闭Connection,即关闭到通道的连接,
和channel.close()关闭通道时,SocketChannel通道会有一个OP_READ事件,至于为什么,
暂时不知道,以后我们会在后面的文章中,在研究一下。
另外在操作缓冲区Buffer时,要注意从通道读数据到缓冲区,及写缓冲区,或从缓冲区写数据到通道,即读取缓冲区,缓冲区读写模式转换是要调用flip函数,进行切换模式,
limit定位到position位置,然后position回到0;意思为缓冲区可读可写的数据量。
put操作为写缓存区,get操作为读缓存区,当重用缓冲区,记得clear缓冲区,clear并不为
清空缓冲区,至少将position至少为0,mark为-1,limit为capacity,这个概念,在ByteBuffer详解文章中已经讲过了,不记得可以再看看。
上面的Server端,以单线程处理Client端的计算请求,下面我们把它改写成多线程的形式,
Server端只处理连接请求,计算的处理单独交给一个线程来处理:
多线程Server如下:
计算处理线程:
先启动服务端,再启动加法和乘法客户端,控制台数输出为:
服务端:
=========The Server is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
main=========The Server Calculate is start!===========
main========socketChannel attachedInfo:decodeProtol
main========read caculate proctol from Client=======
main========the protocol is sum algorithm=======
operate num is:15,6
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
main=========The Server Calculate is start!===========
main========socketChannel attachedInfo:decodeProtol
main========read caculate proctol from Client=======
main========the protocol is multiply algorithm=======
operate num is:17,8
加法客户端:
===========The Sum Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode String length:6
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:90
乘法客户端:
===========The Multiply Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:136
总结:
在操作缓冲区Buffer时,要注意从通道读数据到缓冲区,及写缓冲区,或从缓冲区写数据到通道,即读取缓冲区,缓冲区读写模式转换是要调用flip函数,进行切换模式,
limit定位到position位置,然后position回到0;意思为缓冲区可读可写的数据量。put操作为写缓存区,get操作为读缓存区,当重用缓冲区,记得clear缓冲区,clear并不为清空缓冲区,至少将position至少为0,mark为-1,limit为capacity,再次写数据是将覆盖以前的数据。
Java NIO ByteBuffer详解:http://donald-draper.iteye.com/blog/2357084
Java Nio系列教程;http://www.iteye.com/magazines/132-Java-NIO
NIO-TCP简单实例:http://donald-draper.iteye.com/admin/blogs/2369044
在这篇文章之前用BIO实现过TCP的通信,即Java Socket通信实例这篇文章,那边文章
主要利用BIO的ServerSocket和Socket实现加法和乘法的实现,今天我们来用NIO的
ServerSocketChannel和SocketChannel来实现加法和乘法;协议基本一致,做了一点修改
如下:
下面我们来具体的实现:
协议常量类:
package nio.socketchannel; /** * 协议常量 * @author donald * 2017年4月13日 * 下午10:49:27 */ public class ProtocolConstants { /** * 加法协议编码 */ public static final String SUM_PROTOCOL_300000 = "300000"; /** * 乘法协议编码 */ public static final String MULTI_PROTOCOL_300100 = "300100"; /** * 计算结果 */ public static final String ACK_PROTOCOL_300200 = "300200"; /** * 服务器解析协议失败 */ public static final String ACK_PROTOCOL_300300 = "300300"; /** * 协议编码长度 */ public static final int PROTOCOL_CODE_LENGTH = 6; /** * 协议操作数长度 */ public static final int OPERATE_NUM_LENGTH = 4; /** * 字符集 */ public static final String CHARSET_UTF8 = "UTF-8"; }
服务端:
package nio.socketchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import socket.ProtocolConstants; public class NIOServerCalculate { private static final String HOST = "192.168.32.126"; private static final int PORT = 10000; //manager the channel private Selector selector; /** * stat Server * @param args * @throws IOException */ public static void main(String[] args) throws IOException{ NIOServerCalculate server = new NIOServerCalculate(); server.initServer(HOST,PORT); server.listen(); } /** * get the ServerSocket and finish some initial work * @param port * @throws IOException */ public void initServer(String host, int port) throws IOException{ //get the ServerSocket ServerSocketChannel serverChannel = ServerSocketChannel.open(); // set no blocking mode serverChannel.configureBlocking(false); //bind the port serverChannel.socket().bind(new InetSocketAddress(host, port)); //get the channel manager this.selector = Selector.open(); //Register the channel to manager and bind the event serverChannel.register(selector,SelectionKey.OP_ACCEPT); } /** * use asking mode to listen the event of selector * @throws IOException */ @SuppressWarnings({ "rawtypes" }) public void listen() throws IOException{ System.out.println("=========The Server is start!==========="); while(true){ selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if(key.isAcceptable()){ ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); System.out.println("=========channel is Connected:"+channel.isConnected()); System.out.println("=========channel is Open:"+channel.isOpen()); System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending()); // channel.register(this.selector, SelectionKey.OP_READ); channel.register(this.selector, SelectionKey.OP_READ,"decodeProtol"); } else if (key.isReadable()) read(key); } } } /** * deal with the data come from the client * @param key * @throws IOException */ public void read(SelectionKey key) throws IOException{ SocketChannel channel = (SocketChannel) key.channel(); String attachedInfo = (String) key.attachment(); System.out.println("========socketChannel attachedInfo:"+attachedInfo); ByteBuffer[] proctols = null;//协议 ByteBuffer proctolCodeBuffer = null;//协议编码 proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH); ByteBuffer dataBuffer = null;//协议内容:操作数 dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH); proctols = new ByteBuffer[]{proctolCodeBuffer,dataBuffer}; System.out.println("========read caculate proctol from Client======="); // channel.read(proctols); while(proctolCodeBuffer.position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && dataBuffer.position() != 2*ProtocolConstants.OPERATE_NUM_LENGTH){ channel.read(proctols);//待读取完成协议才解析 } // channel.shutdownInput(); proctolCodeBuffer.flip(); dataBuffer.flip(); byte[] proctolCodeBytes = proctolCodeBuffer.array(); String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim(); int firstNum = 0; int secondNum = 0; int result = 0; if(proctolCode.equals(ProtocolConstants.SUM_PROTOCOL_300000)){ System.out.println("========the protocol is sum algorithm======="); firstNum = dataBuffer.getInt(); secondNum = dataBuffer.getInt(); System.out.println("operate num is:"+firstNum+","+secondNum); result = firstNum*secondNum; proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8)); dataBuffer.clear(); //针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面 // dataBuffer.compact() dataBuffer.putInt(result); proctolCodeBuffer.flip(); dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中 channel.write(proctols); } else if(proctolCode.equals(ProtocolConstants.MULTI_PROTOCOL_300100)){ System.out.println("========the protocol is multiply algorithm======="); firstNum = dataBuffer.getInt(); secondNum = dataBuffer.getInt(); System.out.println("operate num is:"+firstNum+","+secondNum); result = firstNum*secondNum; proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8)); proctolCodeBuffer.flip(); dataBuffer.clear(); //针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面 // dataBuffer.compact() dataBuffer.putInt(result); dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中 channel.write(proctols); } else{ System.out.println("========server decode procotol fail......"); proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300300.getBytes(ProtocolConstants.CHARSET_UTF8)); proctolCodeBuffer.flip(); dataBuffer.clear(); dataBuffer.putInt(0); dataBuffer.flip(); channel.write(proctols); } /*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/ // channel.shutdownOutput(); /*关闭通道*/ // channel.close(); /*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/ } }
加法客户端:
package nio.socketchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import socket.ProtocolConstants; /** * 加法计算 * @author donald * 2017年4月10日 * 下午9:32:57 */ public class NIOClientSum { private static final String HOST = "192.168.32.126"; private static final int PORT = 10000; //manager the channel private Selector selector; /** * stat Client * @param args * @throws IOException */ public static void main(String[] args) throws IOException{ NIOClientSum client = new NIOClientSum(); client.initClient(HOST,PORT); client.listen(); } /** * get the Socket and finish some initial work * @param ip Server ip * @param port connect Server port * @throws IOException */ public void initClient(String ip,int port) throws IOException{ //get the Socket SocketChannel channel = SocketChannel.open(); // set no blocking mode channel.configureBlocking(false); //connect the Server channel.connect(new InetSocketAddress(ip,port)); //get the channel manager this.selector = Selector.open(); //Register the channel to manager and bind the event channel.register(selector,SelectionKey.OP_CONNECT); } /** * use asking mode to listen the event of selector * @throws IOException */ @SuppressWarnings("rawtypes") public void listen() throws IOException{ System.out.println("===========The Sum Client is start!==========="); while(true){ selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if(key.isConnectable()){ SocketChannel channel = (SocketChannel)key.channel(); //during connecting, finish the connect if(channel.isConnectionPending()){ channel.finishConnect(); } channel.configureBlocking(false); System.out.println("=========channel is Connected:"+channel.isConnected()); System.out.println("=========channel is Open:"+channel.isOpen()); System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending()); ByteBuffer[] proctols = null;//协议 proctols = new ByteBuffer[2]; ByteBuffer proctolCodeBuffer = null;//协议编码 proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH); // proctolCodeBuffer = ByteBuffer.wrap(new String("300000").getBytes("UTF-8")); System.out.println("ProtocolCode String length:"+ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8).length); proctolCodeBuffer.put(ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8)); System.out.println("ProtocolCode length:"+proctolCodeBuffer.position()); proctols[0] = proctolCodeBuffer; proctolCodeBuffer.flip(); ByteBuffer dataBuffer = null;//协议内容:操作数 dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH); dataBuffer.putInt(15); dataBuffer.putInt(6); System.out.println("data length:"+dataBuffer.position()); proctols[1] = dataBuffer; dataBuffer.flip(); channel.write(proctols);//将缓冲区的内容发送到通道, // channel.shutdownOutput(); System.out.println("=======write proctols to channel"); // channel.register(this.selector, SelectionKey.OP_READ); channel.register(this.selector, SelectionKey.OP_READ,"calculateResult"); } else if (key.isReadable()) read(key); } } } /** * deal with the data come from the server * @param key * @throws IOException */ public void read(SelectionKey key) throws IOException{ SocketChannel channel = (SocketChannel) key.channel(); String attachedInfo = (String) key.attachment(); System.out.println("========socketChannel attachedInfo:"+attachedInfo); ByteBuffer[] proctols = null; proctols = new ByteBuffer[]{ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH),ByteBuffer.allocate(ProtocolConstants.OPERATE_NUM_LENGTH)}; System.out.println("========read caculate result from Server======="); // channel.read(proctols); while(proctols[0].position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && proctols[1].position() != ProtocolConstants.OPERATE_NUM_LENGTH){ channel.read(proctols);//待读取完成协议才解析 } proctols[0].flip(); proctols[1].flip(); byte[] proctolCodeBytes = proctols[0].array(); String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim(); if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300200)){ int result = proctols[1].getInt(); System.out.println("========the calculated result from server:"+result); }else if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300300)){ System.out.println("========server decode procotol fail......"); } else { System.out.println("========unknow error ..."); } /*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/ // channel.shutdownOutput(); /*关闭通道*/ // channel.close(); /*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/ } }
乘法客户端:
package nio.socketchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import socket.ProtocolConstants; /** * 加法计算 * @author donald * 2017年4月10日 * 下午9:32:57 */ public class NIOClientSum { private static final String HOST = "192.168.32.126"; private static final int PORT = 10000; //manager the channel private Selector selector; /** * stat Client * @param args * @throws IOException */ public static void main(String[] args) throws IOException{ NIOClientSum client = new NIOClientSum(); client.initClient(HOST,PORT); client.listen(); } /** * get the Socket and finish some initial work * @param ip Server ip * @param port connect Server port * @throws IOException */ public void initClient(String ip,int port) throws IOException{ //get the Socket SocketChannel channel = SocketChannel.open(); // set no blocking mode channel.configureBlocking(false); //connect the Server channel.connect(new InetSocketAddress(ip,port)); //get the channel manager this.selector = Selector.open(); //Register the channel to manager and bind the event channel.register(selector,SelectionKey.OP_CONNECT); } /** * use asking mode to listen the event of selector * @throws IOException */ @SuppressWarnings("rawtypes") public void listen() throws IOException{ System.out.println("===========The Sum Client is start!==========="); while(true){ selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if(key.isConnectable()){ SocketChannel channel = (SocketChannel)key.channel(); //during connecting, finish the connect if(channel.isConnectionPending()){ channel.finishConnect(); } channel.configureBlocking(false); System.out.println("=========channel is Connected:"+channel.isConnected()); System.out.println("=========channel is Open:"+channel.isOpen()); System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending()); ByteBuffer[] proctols = null;//协议 proctols = new ByteBuffer[2]; ByteBuffer proctolCodeBuffer = null;//协议编码 proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH); // proctolCodeBuffer = ByteBuffer.wrap(new String("300000").getBytes("UTF-8")); System.out.println("ProtocolCode String length:"+ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8).length); proctolCodeBuffer.put(ProtocolConstants.SUM_PROTOCOL_300000.getBytes(ProtocolConstants.CHARSET_UTF8)); System.out.println("ProtocolCode length:"+proctolCodeBuffer.position()); proctols[0] = proctolCodeBuffer; proctolCodeBuffer.flip(); ByteBuffer dataBuffer = null;//协议内容:操作数 dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH); dataBuffer.putInt(15); dataBuffer.putInt(6); System.out.println("data length:"+dataBuffer.position()); proctols[1] = dataBuffer; dataBuffer.flip(); channel.write(proctols);//将缓冲区的内容发送到通道, // channel.shutdownOutput(); System.out.println("=======write proctols to channel"); // channel.register(this.selector, SelectionKey.OP_READ); channel.register(this.selector, SelectionKey.OP_READ,"calculateResult"); } else if (key.isReadable()) read(key); } } } /** * deal with the data come from the server * @param key * @throws IOException */ public void read(SelectionKey key) throws IOException{ SocketChannel channel = (SocketChannel) key.channel(); String attachedInfo = (String) key.attachment(); System.out.println("========socketChannel attachedInfo:"+attachedInfo); ByteBuffer[] proctols = null; proctols = new ByteBuffer[]{ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH),ByteBuffer.allocate(ProtocolConstants.OPERATE_NUM_LENGTH)}; System.out.println("========read caculate result from Server======="); // channel.read(proctols); while(proctols[0].position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && proctols[1].position() != ProtocolConstants.OPERATE_NUM_LENGTH){ channel.read(proctols);//待读取完成协议才解析 } proctols[0].flip(); proctols[1].flip(); byte[] proctolCodeBytes = proctols[0].array(); String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim(); if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300200)){ int result = proctols[1].getInt(); System.out.println("========the calculated result from server:"+result); }else if(proctolCode.equals(ProtocolConstants.ACK_PROTOCOL_300300)){ System.out.println("========server decode procotol fail......"); } else { System.out.println("========unknow error ..."); } /*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/ // channel.shutdownOutput(); /*关闭通道*/ // channel.close(); /*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/ } }
先启动服务端,再启动加法和乘法客户端,控制台数输出为:
服务端:
=========The Server is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
========socketChannel attachedInfo:decodeProtol
========read caculate proctol from Client=======
========the protocol is sum algorithm=======
operate num is:15,6
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
========socketChannel attachedInfo:decodeProtol
========read caculate proctol from Client=======
========the protocol is multiply algorithm=======
operate num is:17,8
加法客户端:
===========The Sum Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode String length:6
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:90
乘法客户端:
===========The Multiply Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:136
在上面的测试中,channel.shutdownOutput()关闭Connection,即关闭到通道的连接,
和channel.close()关闭通道时,SocketChannel通道会有一个OP_READ事件,至于为什么,
暂时不知道,以后我们会在后面的文章中,在研究一下。
另外在操作缓冲区Buffer时,要注意从通道读数据到缓冲区,及写缓冲区,或从缓冲区写数据到通道,即读取缓冲区,缓冲区读写模式转换是要调用flip函数,进行切换模式,
limit定位到position位置,然后position回到0;意思为缓冲区可读可写的数据量。
put操作为写缓存区,get操作为读缓存区,当重用缓冲区,记得clear缓冲区,clear并不为
清空缓冲区,至少将position至少为0,mark为-1,limit为capacity,这个概念,在ByteBuffer详解文章中已经讲过了,不记得可以再看看。
上面的Server端,以单线程处理Client端的计算请求,下面我们把它改写成多线程的形式,
Server端只处理连接请求,计算的处理单独交给一个线程来处理:
多线程Server如下:
package nio.handler; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import socket.ProtocolConstants; /** * Server * @author donald * 2017年4月13日 * 下午11:14:28 */ public class NIOServerCalculateX { private static final String HOST = "192.168.32.126"; private static final int PORT = 10000; private static ExecutorService exec= null; static { exec = Executors.newFixedThreadPool(2); } //manager the channel private Selector selector; /** * stat Server * @param args * @throws IOException */ public static void main(String[] args) throws IOException{ NIOServerCalculateX server = new NIOServerCalculateX(); server.initServer(HOST,PORT); server.listen(); } /** * get the ServerSocket and finish some initial work * @param port * @throws IOException */ public void initServer(String host, int port) throws IOException{ //get the ServerSocket ServerSocketChannel serverChannel = ServerSocketChannel.open(); // set no blocking mode serverChannel.configureBlocking(false); //bind the port serverChannel.socket().bind(new InetSocketAddress(host, port)); //get the channel manager this.selector = Selector.open(); //Register the channel to manager and bind the event serverChannel.register(selector,SelectionKey.OP_ACCEPT); } /** * use asking mode to listen the event of selector * @throws IOException */ @SuppressWarnings({ "rawtypes" }) public void listen() throws IOException{ System.out.println("=========The Server is start!==========="); while(true){ selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if(key.isAcceptable()){ ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); System.out.println("=========channel is Connected:"+channel.isConnected()); System.out.println("=========channel is Open:"+channel.isOpen()); System.out.println("=========channel is ConnectionPending:"+channel.isConnectionPending()); // channel.register(this.selector, SelectionKey.OP_READ); HanlderNioSocketChannel hanlderNioSocketChannel= new HanlderNioSocketChannel(); channel.register(hanlderNioSocketChannel.getSelector(), SelectionKey.OP_READ,"decodeProtol"); exec.submit(hanlderNioSocketChannel); } } } } }
计算处理线程:
package nio.handler; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import socket.ProtocolConstants; /** * 处理SocketChannel读事件 * @author donald * 2017年4月11日 * 下午10:32:55 */ public class HanlderNioSocketChannel implements Runnable{ private Selector selector; private String threadName; public HanlderNioSocketChannel() { super(); try { this.selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); } threadName = Thread.currentThread().getName(); } public Selector getSelector() { return selector; } public void setSelector(Selector selector) { this.selector = selector; } @Override public void run() { try { listen(); } catch (IOException e) { e.printStackTrace(); } } /** * use asking mode to listen the event of selector * @throws IOException */ @SuppressWarnings({ "rawtypes" }) public void listen() throws IOException{ System.out.println(threadName+"=========The Server Calculate is start!==========="); while(true){ selector.select(); Iterator ite = this.selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = (SelectionKey)ite.next(); ite.remove(); if (key.isReadable()) { read(key); } } } } private void read(SelectionKey key){ try { SocketChannel channel = (SocketChannel) key.channel(); String attachedInfo = (String) key.attachment(); System.out.println(threadName+"========socketChannel attachedInfo:"+attachedInfo); ByteBuffer[] proctols = null;//协议 ByteBuffer proctolCodeBuffer = null;//协议编码 proctolCodeBuffer = ByteBuffer.allocate(ProtocolConstants.PROTOCOL_CODE_LENGTH); ByteBuffer dataBuffer = null;//协议内容:操作数 dataBuffer = ByteBuffer.allocate(2*ProtocolConstants.OPERATE_NUM_LENGTH); proctols = new ByteBuffer[]{proctolCodeBuffer,dataBuffer}; System.out.println(threadName+"========read caculate proctol from Client======="); // channel.read(proctols); while(proctolCodeBuffer.position() != ProtocolConstants.PROTOCOL_CODE_LENGTH && dataBuffer.position() != 2*ProtocolConstants.OPERATE_NUM_LENGTH){ channel.read(proctols);//待读取完成协议才解析 } // channel.shutdownInput(); proctolCodeBuffer.flip(); dataBuffer.flip(); byte[] proctolCodeBytes = proctolCodeBuffer.array(); String proctolCode = new String(proctolCodeBytes,ProtocolConstants.CHARSET_UTF8).trim(); int firstNum = 0; int secondNum = 0; int result = 0; if(proctolCode.equals(ProtocolConstants.SUM_PROTOCOL_300000)){ System.out.println(threadName+"========the protocol is sum algorithm======="); firstNum = dataBuffer.getInt(); secondNum = dataBuffer.getInt(); System.out.println("operate num is:"+firstNum+","+secondNum); result = firstNum*secondNum; proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8)); dataBuffer.clear(); //针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面 // dataBuffer.compact() dataBuffer.putInt(result); proctolCodeBuffer.flip(); dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中 channel.write(proctols); } else if(proctolCode.equals(ProtocolConstants.MULTI_PROTOCOL_300100)){ System.out.println(threadName+"========the protocol is multiply algorithm======="); firstNum = dataBuffer.getInt(); secondNum = dataBuffer.getInt(); System.out.println("operate num is:"+firstNum+","+secondNum); result = firstNum*secondNum; proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300200.getBytes(ProtocolConstants.CHARSET_UTF8)); proctolCodeBuffer.flip(); dataBuffer.clear(); //针对数据太大,缓冲区一次装不完的情况,将缓冲区中,未写完的数据,移到缓冲区的前面 // dataBuffer.compact() dataBuffer.putInt(result); dataBuffer.flip();//切换写模式到读模式,从缓冲区读取数据,写到通道中 channel.write(proctols); } else{ System.out.println(threadName+"========server decode procotol fail......"); proctolCodeBuffer.clear(); proctolCodeBuffer.put(ProtocolConstants.ACK_PROTOCOL_300300.getBytes(ProtocolConstants.CHARSET_UTF8)); proctolCodeBuffer.flip(); dataBuffer.clear(); dataBuffer.putInt(0); dataBuffer.flip(); channel.write(proctols); } /*关闭Connection,即关闭到通道的连接,再次write将抛出异常*/ // channel.shutdownOutput(); /*关闭通道*/ //channel.close(); /*注意上面两个方法,测试时,不要开启;测试开启的话,Server端,会有一个OP_READ事件*/ } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
先启动服务端,再启动加法和乘法客户端,控制台数输出为:
服务端:
=========The Server is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
main=========The Server Calculate is start!===========
main========socketChannel attachedInfo:decodeProtol
main========read caculate proctol from Client=======
main========the protocol is sum algorithm=======
operate num is:15,6
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
main=========The Server Calculate is start!===========
main========socketChannel attachedInfo:decodeProtol
main========read caculate proctol from Client=======
main========the protocol is multiply algorithm=======
operate num is:17,8
加法客户端:
===========The Sum Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode String length:6
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:90
乘法客户端:
===========The Multiply Client is start!===========
=========channel is Connected:true
=========channel is Open:true
=========channel is ConnectionPending:false
ProtocolCode length:6
data length:8
=======write proctols to channel
========socketChannel attachedInfo:calculateResult
========read caculate result from Server=======
========the calculated result from server:136
总结:
在操作缓冲区Buffer时,要注意从通道读数据到缓冲区,及写缓冲区,或从缓冲区写数据到通道,即读取缓冲区,缓冲区读写模式转换是要调用flip函数,进行切换模式,
limit定位到position位置,然后position回到0;意思为缓冲区可读可写的数据量。put操作为写缓存区,get操作为读缓存区,当重用缓冲区,记得clear缓冲区,clear并不为清空缓冲区,至少将position至少为0,mark为-1,limit为capacity,再次写数据是将覆盖以前的数据。
发表评论
-
文件通道解析二(文件锁,关闭通道)
2017-05-16 23:17 1065文件通道解析一(读写操作,通道数据传输等):http://do ... -
文件通道解析一(读写操作,通道数据传输等)
2017-05-16 10:04 1164Reference定义(PhantomRefere ... -
文件通道创建方式综述
2017-05-15 17:39 1066Reference定义(PhantomReference,Cl ... -
文件读写方式简单综述后续(文件,流构造)
2017-05-14 23:04 1480Java Socket通信实例:http://donald-d ... -
文件读写方式简单综述
2017-05-14 11:13 1135Java Socket通信实例:http://donald-d ... -
FileChanne定义
2017-05-12 23:28 936文件读写方式简单综述:http://donald-draper ... -
SeekableByteChannel接口定义
2017-05-11 08:43 1235ByteChannel,分散聚集通道接口的定义(SocketC ... -
FileChannel示例
2017-05-11 08:37 992前面我们看过socket通道,datagram通道,以管道Pi ... -
PipeImpl解析
2017-05-11 08:41 932ServerSocketChannel定义:http://do ... -
Pipe定义
2017-05-10 09:07 904Channel接口定义:http://donald-drape ... -
NIO-Pipe示例
2017-05-10 08:47 905PipeImpl解析:http://donald-draper ... -
DatagramChannelImpl 解析四(地址绑定,关闭通道等)
2017-05-10 08:27 776DatagramChannelImpl 解析一(初始化):ht ... -
DatagramChannelImpl 解析三(多播)
2017-05-10 08:20 1893DatagramChannelImpl 解析一(初始化):ht ... -
NIO-UDP实例
2017-05-09 12:32 1585DatagramChannelImpl 解析一(初始化):ht ... -
DatagramChannelImpl 解析二(报文发送与接收)
2017-05-09 09:03 1405DatagramChannelImpl 解析一(初始化):ht ... -
DatagramChannelImpl 解析一(初始化)
2017-05-08 21:52 1407Channel接口定义:http://donald-drape ... -
MembershipKeyImpl 简介
2017-05-08 09:11 923MembershipKey定义:http://donald-d ... -
DatagramChannel定义
2017-05-07 23:13 1228Channel接口定义:http://donald-drape ... -
MulticastChanne接口定义
2017-05-07 13:45 1136NetworkChannel接口定义:ht ... -
MembershipKey定义
2017-05-06 16:20 916package java.nio.channels; i ...
相关推荐
- **高并发服务器**: NIO的选择器机制使得一个线程可以处理多个客户端连接,适合构建高性能的服务器端应用。 - **文件批量处理**: 使用FileChannel的transferTo()和transferFrom()方法,可以高效地进行文件的复制和...
TCP(传输控制协议)是一种面向连接、可靠的、基于字节流的传输层通信协议,而NIO(非阻塞I/O)则是Java提供的一种高效处理I/O操作的方式。在这个“tcp.zip”压缩包中,我们可能找到了关于使用Java实现TCP服务器和...
本实例主要关注NIO在TCP中的应用,它允许更高效的资源管理和处理多个连接,特别适合高并发场景。我们将探讨以下几个关键知识点: 1. **NIO(Non-blocking Input/Output)**: 与传统的BIO不同,NIO是非阻塞的,这...
标题中的“三个分别由单线程、多线程、线程池实现的简单网关”涉及到的是并发处理的三种常见模型。在IT行业中,尤其是在服务器端编程和高性能系统设计中,如何有效地处理并发请求是至关重要的。让我们逐一探讨这三个...
在这个例子中,我们将深入理解如何在Java中实现TCP网络编程,以及如何利用多线程来处理并发连接。 首先,TCP网络编程涉及到客户端和服务器端的交互。在Java中,我们可以使用`java.net.Socket`类来创建客户端连接,`...
Java IO、NIO和Netty是Java平台中用于处理输入/输出操作的重要组件,而多线程并发则是提升程序性能和响应能力的关键技术。在这个压缩包"基于JAVA IO, NIO, Netty, 多线程并发实战源码.zip"中,我们可以期待找到一些...
在Mina的非阻塞I/O(NIO)模式下,存在三种主要的工作线程,它们在NIO Socket中起到不同的作用: 1. **Acceptor Thread**:这个线程负责接收来自客户端的连接请求,并将这些连接转发给I/O Processor线程处理。...
传统的Java I/O模型(BIO)在处理大量并发连接时效率较低,因为它基于阻塞模式,一个线程只能处理一个连接,而NIO则允许单个线程同时处理多个连接,大大提高了性能。 `NIOServer.java`和`NIOClient.java`这两个文件...
在尚硅谷的12讲课程中,这些知识点将通过实例演示和详细解释,让学习者掌握Java NIO的精髓,并能够实际应用到项目开发中,提升系统的性能和并发处理能力。通过系统学习,开发者将更好地理解Java NIO的优势,并能在...
本篇文章将深入探讨“基于Java的TCP通信程序”以及如何实现点对点的多通道通信。 TCP是一种面向连接的、可靠的传输协议,它确保了数据的顺序传输和错误检查。在Java中,我们主要使用`java.net`包中的`Socket`和`...
3. **Selector**:Selector允许单线程检查多个Channel的事件(如连接打开、数据到达、连接关闭等)。这样,一个线程就可以管理多个Channel,极大地提高了并发处理能力。使用Selector,开发者可以实现高效率的服务器...
为了优化服务器性能,服务器端可能采用了多线程或异步I/O模型(如Java NIO)来处理多个客户端请求,以提高并发处理能力。 综上所述,这个压缩包提供的代码示例涉及了客户端-服务器应用开发的基础,包括TCP/IP通信...
- 使用Selector来处理多个通道的事件,实现单线程管理多个连接。 - 对于文件操作,使用FileChannel进行高效的数据传输和文件操作。 以上知识点详细介绍了Java NIO的核心组件及其工作原理和应用场景,有助于理解和...
Apache Mina Server 2.0 是一款强大的网络通信框架,主要设计用于构建高性能、高度可扩展的网络应用程序。它的核心特点在于提供事件驱动和异步IO操作,这得益于其默认基于Java NIO(非阻塞I/O)的底层实现。Mina有两...
选择器允许单个线程处理多个通道的事件,提高了系统资源的利用率。 这个例子可能包含一个简单的Netty服务器和客户端实现,它们展示了如何利用NIO进行通信。在服务器端,首先会创建一个ServerBootstrap,然后设置...
因此,服务器端的Socket通信常与多线程结合使用,每个连接创建一个新的线程来处理,这样服务器可以并发地服务多个客户端。 8. **TCP特性**:TCP是一种面向连接的、可靠的传输协议,它保证了数据的顺序和完整性。这...
xSocket是一个基于Java NIO实现的网络通信框架,它提供了高性能、稳定可靠的网络连接管理。 1. **Java NIO基础**: - **通道(Channels)**:通道是数据传输的路径,可以连接到不同类型的I/O设备,如文件、套接字...
本文将深入探讨如何使用Java实现单线程服务端的Socket通信。 首先,我们来理解Socket的概念。Socket在编程中被视为网络通信的端点,它包含了IP地址和端口号,用于标识网络上的特定服务。在TCP/IP模型中,Socket是...
NIO引入了选择器(Selector)和通道(Channel)的概念,允许单线程同时处理多个连接,提高了服务器的并发性能。例如,使用Selector可以监控多个SocketChannel,当某个通道准备好进行读写时,Selector会通知应用程序...