public class HelloServer { private Selector selector; private ByteBuffer byteBuffer = ByteBuffer.allocate(1024); private String name; public HelloServer() throws IOException { selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(8888)); serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } public Selector getSelector() { return selector; } public void setSelector(Selector selector) { this.selector = selector; } // 开始监听 public void listen() { try { for (;;) { // 选择已经准备好的通道,返回通道数,这个值是上次调用select()后到现在这个阶段 //已经准备好的通道 int i = selector.select(); Iterator iter = selector.selectedKeys().iterator(); if (iter.hasNext()) { SelectionKey selectionKey = (SelectionKey) iter.next(); // 处理完一个就需要删除一个SelectionKey 不然的话,一直堆积,cpu100% iter.remove(); process(selectionKey); } } } catch (IOException e) { e.printStackTrace(); } } public void process(SelectionKey selectionKey) throws IOException { // 准备好接受新的套接字连接 这个只有ServerSocketChannel绑定的SelectionKey才有效 if (selectionKey.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) selectionKey .channel(); SocketChannel channel = server.accept(); // 设置非阻塞模式 channel.configureBlocking(false); // 得到client的socket后可以使其在OP_READ状态,下个时段的selector.select()中就是selectionKey.isReadable()的了,可以执行下面这个分支了 channel.register(selector, SelectionKey.OP_READ); System.out.println("accept:"+selectionKey.interestOps()+" "+selectionKey.readyOps()); } else if (selectionKey.isReadable()) { // 可读,下面就是client读数据 SocketChannel channel = (SocketChannel) selectionKey.channel(); // int count = 0; int count = channel.read(byteBuffer); if (count > 0) { byte[] bbb = new byte[1024]; byteBuffer.flip(); bbb = byteBuffer.array(); name = new String(bbb); System.out.println(name + "aaa"); byteBuffer.clear(); } // channel.close();这个可以关闭,可以不关闭,一般需要关闭(count==0),不然client的socket堆积的太多 // channel.register(selector, SelectionKey.OP_WRITE); // 可以在注册感兴趣的时间,这个是往客户端写,下个select() // 时段就可以往client里面写了 System.out.println("isReadable:"+selectionKey.interestOps()+" "+selectionKey.readyOps()); } else if (selectionKey.isWritable()) { System.out.println("isWritable:"+selectionKey.interestOps()); SocketChannel channel = (SocketChannel) selectionKey.channel(); byteBuffer.reset(); byteBuffer.put(name == null ? "acb".getBytes() : name.getBytes()); } } public static void main(String[] args) { int port = 8888; try { HelloServer server = new HelloServer(); System.out.println("listening on " + port); server.listen(); } catch (IOException e) { e.printStackTrace(); } } }
public class HelloClient { static InetSocketAddress ip = new InetSocketAddress("localhost", 8888); static ByteBuffer byteBuffer = ByteBuffer.allocate(1024); static class Message implements Runnable { protected String name; String msg = ""; public Message(String index) { this.name = index; } public void run() { try { long start = System.currentTimeMillis(); // 打开Socket通道 SocketChannel client = SocketChannel.open(); // 设置为非阻塞模式 client.configureBlocking(false); // 打开选择器 Selector selector = Selector.open(); // 注册连接服务端socket动作 client.register(selector, SelectionKey.OP_CONNECT); // 连接 client.connect(ip); // 分配内存 ByteBuffer buffer = ByteBuffer.allocate(1); int total = 0; _FOR: for (;;) { selector.select(); Iterator iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = (SelectionKey) iter.next(); iter.remove(); if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key .channel(); if (channel.isConnectionPending()) channel.finishConnect(); byte [] aaa = name.getBytes(); channel.write(buffer.wrap(aaa)); //写完可以监听读,server的响应就可以在下个select()时段检测到了 channel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key .channel(); int count = channel.read(buffer); if (count > 0) { total += count; buffer.flip(); while (buffer.remaining() > 0) { byte b = buffer.get(); msg += (char) b; } buffer.clear(); } else { client.close(); break _FOR; } } } } double last = (System.currentTimeMillis() - start) * 1.0 / 1000; System.out.println(msg + "used time :" + last + "s."); msg = ""; } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { String names[] = new String[10]; for (int index = 0; index < 10; index++) { names[index] = "jeff[" + index + "]"; new Thread(new Message(names[index])).start(); } } }
1 selector的selectedKeys()方法返回的是上次select()方法调用后到现在为止的放入到就绪集里面的SelectionKey
1. 将已取消键集中的每个键从所有键集中移除(如果该键是键集的成员),并注销其通道。此步骤使已取消键集成为空集。 2. 在开始进行选择操作时,应查询基础操作系统来更新每个剩余通道的准备就绪信息, 以执行由其键的相关集合所标识的任意操作。对于已为至少一个这样的操作准备就绪的通道, 执行以下两种操作之一: 如果该通道的键尚未在已选择键集中,则将其添加到该集合中,并修改其准备就绪操作集, 以准确地标识那些通道现在已报告为之准备就绪的操作。丢弃准备就绪操作集中以前记录的所有准备就绪信息。 如果该通道的键已经在已选择键集中,则修改其准备就绪操作集, 以准确地标识所有通道已报告为之准备就绪的新操作。保留准备就绪操作集以前记录的所有准备就绪信息; 换句话说,基础系统所返回的准备就绪操作集是和该键的当前准备就绪操作集按位分开 (bitwise-disjoined) 的。 3.如果在此步骤开始时键集中的所有键都有空的相关集合,则不会更新已选择键集和任意键的准备就绪操作集。 如果在步骤 (2) 的执行过程中要将任意键添加到已取消键集中,则处理过程如步骤 (1)。
public void run() { try { listen(); } catch (Exception x) { log.error("Unable to run replication listener.", x); } }
protected void listen() throws Exception { if (doListen()) { log.warn("ServerSocketChannel already started"); return; } setListen(true); while (doListen() && selector != null) { // this may block for a long time, upon return the // selected set contains keys of the ready channels try { events(); socketTimeouts(); int n = selector.select(getTcpSelectorTimeout()); if (n == 0) { continue; // nothing to do } // get an iterator over the set of selected keys Iterator it = selector.selectedKeys().iterator(); // look at each key in the selected set while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); // Is a new connection coming in? if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); channel.socket().setReceiveBufferSize(getRxBufSize()); channel.socket().setSendBufferSize(getTxBufSize()); channel.socket().setTcpNoDelay(getTcpNoDelay()); channel.socket().setKeepAlive(getSoKeepAlive()); channel.socket().setOOBInline(getOoBInline()); channel.socket().setReuseAddress(getSoReuseAddress()); channel.socket().setSoLinger(getSoLingerOn(),getSoLingerTime()); channel.socket().setTrafficClass(getSoTrafficClass()); channel.socket().setSoTimeout(getTimeout()); Object attach = new ObjectReader(channel); registerChannel(selector, channel, SelectionKey.OP_READ, attach); } // is there data to read on this channel? if (key.isReadable()) { readDataFromSocket(key); } else { key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); } // remove key from selected set, it's been handled it.remove(); } } catch (java.nio.channels.ClosedSelectorException cse) { // ignore is normal at shutdown or stop listen socket } catch (java.nio.channels.CancelledKeyException nx) { log.warn("Replication client disconnected, error when polling key. Ignoring client."); } catch (Throwable x) { try { log.error("Unable to process request in NioReceiver", x); }catch ( Throwable tx ) { //in case an out of memory error, will affect the logging framework as well tx.printStackTrace(); } } } serverChannel.close(); if (selector != null) selector.close(); }
每一个event都是Runnable,比较怪的是在events()方法中,执行的是runnable的run()的方法,也不是线程的执行方 法.
protected void readDataFromSocket(SelectionKey key) throws Exception { NioReplicationTask task = (NioReplicationTask) getTaskPool().getRxTask(); if (task == null) { // No threads/tasks available, do nothing, the selection // loop will keep calling this method until a // thread becomes available, the thread pool itself has a waiting mechanism // so we will not wait here. if (log.isDebugEnabled()) log.debug("No TcpReplicationThread available"); } else { // invoking this wakes up the worker thread then returns //add task to thread pool task.serviceChannel(key); getExecutor().execute(task); } }
// loop forever waiting for work to do public synchronized void run() { if ( buffer == null ) { if ( (getOptions() & OPTION_DIRECT_BUFFER) == OPTION_DIRECT_BUFFER) { buffer = ByteBuffer.allocateDirect(getRxBufSize()); } else { buffer = ByteBuffer.allocate(getRxBufSize()); } } else { buffer.clear(); } if (key == null) { return; // just in case } if ( log.isTraceEnabled() ) log.trace("Servicing key:"+key); try { ObjectReader reader = (ObjectReader)key.attachment(); //如果SelectionKey上面没有reader的话,这个SelectionKey可以取消此键的通道到其选择器的注册 //以及关闭SelectionKey的channel和socket if ( reader == null ) { if ( log.isTraceEnabled() ) log.trace("No object reader, cancelling:"+key); cancelKey(key); } else { if ( log.isTraceEnabled() ) log.trace("Draining channel:"+key); drainChannel(key, reader); } } catch (Exception e) { //this is common, since the sockets on the other //end expire after a certain time. if ( e instanceof CancelledKeyException ) { //do nothing } else if ( e instanceof IOException ) { //dont spew out stack traces for IO exceptions unless debug is enabled. if (log.isDebugEnabled()) log.debug ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"].", e); else log.warn ("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed["+e.getMessage()+"]."); } else if ( log.isErrorEnabled() ) { //this is a real error, log it. log.error("Exception caught in TcpReplicationThread.drainChannel.",e); } cancelKey(key); } finally { } key = null; // done, ready for more, return to pool getTaskPool().returnWorker (this); }
protected void drainChannel (final SelectionKey key, ObjectReader reader) throws Exception { //从SelectionKey中得到数据 reader.setLastAccess(System.currentTimeMillis()); reader.access(); SocketChannel channel = (SocketChannel) key.channel(); int count; buffer.clear(); // make buffer empty // loop while data available, channel is non-blocking while ((count = channel.read (buffer)) > 0) { buffer.flip(); // make buffer readable if ( buffer.hasArray() ) reader.append(buffer.array(),0,count,false); else reader.append(buffer,count,false); buffer.clear(); // make buffer empty //do we have at least one package? if ( reader.hasPackage() ) break; } int pkgcnt = reader.count(); if (count < 0 && pkgcnt == 0 ) { //end of stream, and no more packages to process remoteEof(key); return; } ChannelMessage[] msgs = pkgcnt == 0? ChannelData.EMPTY_DATA_ARRAY : reader.execute(); registerForRead(key,reader);//register to read new data, before we send it off to avoid dead locks for ( int i=0; i<msgs.length; i++ ) { /** * Use send ack here if you want to ack the request to the remote * server before completing the request * This is considered an asynchronized request */ if (ChannelData.sendAckAsync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); try { if ( Logs.MESSAGES.isTraceEnabled() ) { try { Logs.MESSAGES.trace("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new java.sql.Timestamp(System.currentTimeMillis())); }catch ( Throwable t ) {} } //process the message ReceiverBase.messageDataReceived() getCallback().messageDataReceived(msgs[i]); /** * Use send ack here if you want the request to complete on this * server before sending the ack to the remote server * This is considered a synchronized request */ if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.ACK_COMMAND); }catch ( RemoteProcessException e ) { if ( log.isDebugEnabled() ) log.error("Processing of cluster message failed.",e); if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); }catch ( Exception e ) { log.error("Processing of cluster message failed.",e); if (ChannelData.sendAckSync(msgs[i].getOptions())) sendAck(key,channel,Constants.FAIL_ACK_COMMAND); } if ( getUseBufferPool() ) { BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage()); msgs[i].setMessage(null); } } if (count < 0) { remoteEof(key); return; } }
