代碼分爲客戶端、服務端以及調試代碼
客戶端:
package test.nio.tcp;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
class AsyncClient
{
private SocketChannel sc;
private final int MAX_LENGTH = 1024;
private ByteBuffer r_buff = ByteBuffer.allocate(MAX_LENGTH);
private ByteBuffer w_buff = ByteBuffer.allocate(MAX_LENGTH);
private static String host;
private static int port = 6000;
public AsyncClient()
{
try
{
InetSocketAddress addr = new InetSocketAddress(host, port);
// 生成一个socketchannel
sc = SocketChannel.open();
// 连接到server
sc.connect(addr);
while (!sc.finishConnect());
System.out.println("connection has been established!...");
while (true)
{
// // 回射消息
// String echo = null;
// try
// {
// System.err.println("Enter msg you'd like to send: ");
// BufferedReader br = new BufferedReader(
// new InputStreamReader(System.in));
// // 输入回射消息
// echo = br.readLine();
// echo = "send [test] to server.";
//
// // 把回射消息放入w_buff中
// w_buff.clear();
// w_buff.put(echo.getBytes());
// w_buff.flip();
// }
// catch (IOException ioe)
// {
// System.err.println("sth. is wrong with br.readline() ");
// }
//
// // 发送消息
// while (w_buff.hasRemaining())
// sc.write(w_buff);
// w_buff.clear();
// 进入接收状态
Rec();
// 间隔1秒
//Thread.currentThread().sleep(1000);
}
}
catch (IOException ioe)
{
ioe.printStackTrace();
}
// catch (InterruptedException ie)
// {
// ie.printStackTrace();
// }
}
// //////////
// 读取server端发回的数据,并显示
public void Rec() throws IOException
{
int count;
r_buff.clear();
count = sc.read(r_buff);
r_buff.flip();
byte[] temp = new byte[r_buff.limit()];
r_buff.get(temp);
System.out.println("reply is [" + count + "] long, and content is: "
+ new String(temp));
}
public static void main(String args[])
{
// if (args.length < 1)
// {// 输入需有主机名或IP地址
// try
// {
// System.err.println("Enter host name: ");
// BufferedReader br = new BufferedReader(new InputStreamReader(
// System.in));
// host = br.readLine();
// }
// catch (IOException ioe)
// {
// System.err.println("sth. is wrong with br.readline() ");
// }
// }
// else if (args.length == 1)
// {
// host = args[0];
// }
// else if (args.length > 1)
// {
// host = args[0];
// port = Integer.parseInt(args[1]);
// }
host = "127.0.0.1";
port = 6000;
new AsyncClient();
}
}
服務端:
package test.nio.tcp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
class AsyncServer implements Runnable
{
private ByteBuffer r_buff = ByteBuffer.allocate(1024);
private ByteBuffer w_buff = ByteBuffer.allocate(1024);
private static int port = 7890;
public AsyncServer()
{
new Thread(this).start();
}
public void run()
{
try
{
// 生成一个侦听端
ServerSocketChannel ssc = ServerSocketChannel.open();
// 将侦听端设为异步方式
ssc.configureBlocking(false);
// 生成一个信号监视器
Selector s = Selector.open();
// 侦听端绑定到一个端口
ssc.socket().bind(new InetSocketAddress(port));
// 设置侦听端所选的异步信号OP_ACCEPT
ssc.register(s, SelectionKey.OP_ACCEPT);
System.out.println("echo server has been set up ......");
while (true)
{
int n = s.select();
if (n == 0)
{// 没有指定的I/O事件发生
continue;
}
Iterator it = s.selectedKeys().iterator();
while (it.hasNext())
{
SelectionKey key = (SelectionKey) it.next();
if (key.isAcceptable())
{// 侦听端信号触发
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
// 接受一个新的连接
SocketChannel sc = server.accept();
sc.configureBlocking(false);
// 设置该socket的异步信号OP_READ:当socket可读时
// 触发函数DealwithData();
sc.register(s, SelectionKey.OP_READ);
}
if (key.isReadable())
{// 某socket可读信号
DealwithData(key);
}
it.remove();
}
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void DealwithData(SelectionKey key) throws IOException
{
int count = 0;
// 由key获取指定socketchannel的引用
SocketChannel sc = (SocketChannel) key.channel();
r_buff.clear();
// 读取数据到r_buff
while ((count = sc.read(r_buff)) > 0);
// 确保r_buff可读
r_buff.flip();
w_buff.clear();
// 将r_buff内容拷入w_buff
w_buff.put(r_buff);
w_buff.flip();
// 将数据返回给客户端
EchoToClient(sc);
w_buff.clear();
r_buff.clear();
}
public void EchoToClient(SocketChannel sc) throws IOException
{
while (w_buff.hasRemaining())
sc.write(w_buff);
}
public static void main(String args[])
{
if (args.length > 0)
{
port = Integer.parseInt(args[0]);
}
new AsyncServer();
}
}
本地調試:
package test.nio.tcp;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
*
* @author Administrator
* @version
*/
public class NBTest
{
/** Creates new NBTest */
public NBTest()
{
}
public void startServer() throws Exception
{
int channels = 0;
int nKeys = 0;
//使用Selector
Selector selector = Selector.open();
//建立Channel 并绑定到9000端口
ServerSocketChannel ssc = ServerSocketChannel.open();
System.out.println("Bind Host["+InetAddress.getLocalHost().getHostName()+"] Port["+7890+"]");
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),7890);
ssc.socket().bind(address);
//使设定non-blocking的方式。
ssc.configureBlocking(false);
//向Selector注册Channel及我们有兴趣的事件
SelectionKey s = ssc.register(selector, SelectionKey.OP_ACCEPT);
printKeyInfo(s);
while(true) // 不断的轮询
{
debug("NBTest: Starting select");
//Selector通过select方法通知我们我们感兴趣的事件发生了。
nKeys = selector.select();
//如果有我们注册的事情发生了,它的传回值就会大于0
if(nKeys > 0)
{
debug("NBTest: Number of keys after select operation: " +nKeys);
//Selector传回一组SelectionKeys
//我们从这些key中的channel()方法中取得我们刚刚注册的channel。
Set selectedKeys = selector.selectedKeys();
Iterator i = selectedKeys.iterator();
while(i.hasNext())
{
s = (SelectionKey) i.next();
printKeyInfo(s);
debug("NBTest: Nr Keys in selector: " +selector.keys().size());
//一个key被处理完成后,就都被从就绪关键字(ready keys)列表中除去
i.remove();
if(s.isAcceptable())
{
System.out.println("Receive a new socket.");
//从channel()中取得我们刚刚注册的channel。
Socket socket = ((ServerSocketChannel)s.channel()).accept().socket();
SocketChannel sc = socket.getChannel();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ |SelectionKey.OP_WRITE);
System.out.println(++channels);
}
else
{
debug("NBTest: Channel not acceptable");
}
}
}
else
{
debug("NBTest: Select finished without any keys.");
}
}
}
private static void debug(String s)
{
System.out.println(s);
}
private static void printKeyInfo(SelectionKey sk)
{
String s = new String();
s = "Att: " + (sk.attachment() == null ? "no" : "yes");
s += ", Read: " + sk.isReadable();
s += ", Acpt: " + sk.isAcceptable();
s += ", Cnct: " + sk.isConnectable();
s += ", Wrt: " + sk.isWritable();
s += ", Valid: " + sk.isValid();
s += ", Ops: " + sk.interestOps();
debug(s);
}
/**
* @param args the command line arguments
*/
public static void main (String args[])
{
NBTest nbTest = new NBTest();
try
{
nbTest.startServer();
}
catch(Exception e)
{
e.printStackTrace();
}
}
}