重点注意:有说明的地方。
package nio.chat;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Server extends Thread {
private Selector selector;
private ServerSocketChannel serverChannel;
private ByteBuffer wBuffer=ByteBuffer.allocate(1024); //共享 buffer 在多 链接时候会产生错误
private ByteBuffer rBuffer=ByteBuffer.allocate(1024);
public Server(ServerSocketChannel sc) throws Exception {
serverChannel=sc;
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void run(){
while (true){
try{
if (selector.select(1000) == 0){
System.out.print(".");
continue;
}
Iterator<SelectionKey> ki = selector.selectedKeys().iterator();
while (ki.hasNext()){
SelectionKey key = ki.next();
if (key.isAcceptable()){
handleAccept(key);
}else
if (key.isReadable()){
handleRead(key);
}else
if (key.isWritable()){
handleWrite(key);
}
ki.remove();
}
}catch(Exception e){
e.printStackTrace();
}
}
}
private void handleWrite(SelectionKey key){
SocketChannel cc = (SocketChannel) key.channel();
ByteBuffer buf= (ByteBuffer)(key.attachment());
//byte []b={5,'H','E','L','L','O'};
//buf=ByteBuffer.wrap(b);
try{
buf.flip();
cc.write(buf);
if(buf.hasRemaining()==false){
// key.interestOps(SelectionKey.OP_READ);
byte []b=buf.array();
System.out.println("---->response finished!,msg="+new String(b,1,b[0],"utf-8"));
buf.clear();
SelectionKey s=cc.register(selector, SelectionKey.OP_READ, rBuffer);
System.out.println("---toRead,selectionKey="+s);
}
}catch(Exception e){
try{
cc.close();
}catch(Exception e1){};
e.printStackTrace();
System.out.println("---closed!---");
}
}
private void handleRead(SelectionKey key){
SocketChannel cc = (SocketChannel) key.channel();
ByteBuffer buf= (ByteBuffer)(key.attachment());
try{
int read=cc.read(buf);
byte []bs=buf.array();
int pos=buf.position();
if(pos>0 && pos==(int)bs[0]+1){
String msg=new String(bs,1,bs[0],"utf-8");
System.out.println("---->Recv:"+msg);
buf.clear();
String msg1="收到"+(pos-1)+"bytes,content("+msg+")";
bs=msg1.getBytes("utf-8");
byte []bl=new byte[bs.length+1];
bl[0]=(byte)bs.length;
System.arraycopy(bs, 0, bl, 1, bs.length);
wBuffer.put(bl);
//key.interestOps(SelectionKey.OP_WRITE);
SelectionKey s=cc.register(selector, SelectionKey.OP_WRITE, wBuffer);
System.out.println("---toWrite, selectionKey="+s);
}
if(read==-1){
System.out.println("--遇到结束--");
}
}catch(Exception e){
try{
cc.close();
}catch(Exception e1){};
e.printStackTrace();
System.out.println("---closed!---");
}
}
private void handleAccept(SelectionKey key) {
try{
SocketChannel cc = ((ServerSocketChannel) key.channel()).accept();
cc.configureBlocking(false);
SelectionKey s=cc.register(selector, SelectionKey.OP_READ, rBuffer);
System.out.println("---toRead, selectionKey="+s);
//key.interestOps(SelectionKey.OP_READ);
System.out.println("--Connect from "+cc.getRemoteAddress());
}catch(Exception e){
e.printStackTrace();
}
}
public static void main(String []args) throws Exception {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(Client.port));
serverChannel.configureBlocking(false);
Server sc=new Server(serverChannel);
sc.start();
}
}
package nio.chat;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Client extends Thread{
private Selector selector;
private SocketChannel channel;
private BufferedReader br = null;
private ByteBuffer readBuffer=ByteBuffer.allocate(1024);
public Client(SocketChannel c) throws Exception {
this.channel=c;
selector=Selector.open();
channel.register(selector, SelectionKey.OP_WRITE|SelectionKey.OP_READ,null);
}
public void run() {
SelectionKey key;
boolean ced=false;
int loop=0;
while(true){
try{
if(channel.finishConnect() && ced==false){
ced=true;
}
if(ced==false){
continue;
}
int is=selector.select(); //先获取就绪数,如不调用本方法, 这 selectedKeys() 得到不就绪的 SelectionKey
if(is==0){
continue;
}
Iterator<SelectionKey> ks = selector.selectedKeys().iterator();
while (ks.hasNext()){
key = ks.next();
if (key.isWritable() ){
send();
}
if(key.isReadable()){
read();
}
ks.remove(); //从 selector 删除, 如不删除,会导致下次 selectedKeys() 无法加入就绪的 SelectionKey
}
loop++;
}catch(Exception e){
e.printStackTrace();
break;
}
}
System.out.println("---end of chat-----");
try{
channel.close();
}catch(Exception e){
e.printStackTrace();
}
}
private void read() throws Exception {
ByteBuffer bu=ByteBuffer.allocate(20);
int read=channel.read(bu);
for(int i=0;i<read;i++){
readBuffer.put(bu.get(i));
int len=readBuffer.get(0);
if(len==readBuffer.position()-1){
byte []bs=readBuffer.array();
System.out.println("\r\n--收到:"+new String(bs,1,len,"utf-8"));
readBuffer.clear();
}
}
}
static int nLine=0;
static long lastTime=0;
private void send(){
nLine++;
if(System.currentTimeMillis()-lastTime<10*1000){
return;
}
lastTime=System.currentTimeMillis();
try{
String line=String.valueOf(nLine);
byte []bs=line.getBytes("utf-8");
int len=bs.length;
byte []b={(byte)len};
ByteBuffer b1=ByteBuffer.wrap(b);
ByteBuffer b2=ByteBuffer.wrap(bs);
while(b1.hasRemaining()){
channel.write(b1);
}
while(b2.hasRemaining()){
channel.write(b2);
}
System.out.println("\t 发送完毕!,msg="+line);
}catch(Exception e){
e.printStackTrace();
System.exit(0);
}
}
public static String host="127.0.0.1";
public static int port=8001;
public static void main(String []args) throws Exception {
SocketChannel clntChan = SocketChannel.open();
clntChan.configureBlocking(false);
Client dealer=new Client(clntChan);
clntChan.connect(new InetSocketAddress(host, port));
dealer.start();
}
}