`
jaesonchen
  • 浏览: 310035 次
  • 来自: ...
社区版块
存档分类
最新评论

从ZooKeeper源代码看如何实现分布式系统(三)高性能的网络编程

 
阅读更多

对网络编程来说,最基本的三要素是IO, 协议(编解码),服务器端线程模型。这篇来看看ZooKeeper是如何实现高性能的网络程序。

 

IO模型

ZooKeeper默认提供了两种网络IO的实现,一个是Java原生的NIO,一个是基于Netty的IO。先从ServerCnxn这个抽象类看起,它表示一个从客户端到服务器端的网络连接。ServerCnxn实现了Stat服务器端统计接口,Watcher接口。 Watcher接口里面定义了KeeperState和EventType这两个枚举类型。

ServerCnxn有两个默认实现类,一个是基于JDK原生NIO的NIOServerCnxn,一个是基于Netty的NettyServerCnxn。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. // ServerCnxn的属性  
  2. public abstract class ServerCnxn implements Stats, Watcher {  
  3.     protected abstract ServerStats serverStats();  
  4.       
  5.     protected final Date established = new Date();  
  6.   
  7.     protected final AtomicLong packetsReceived = new AtomicLong();  
  8.     protected final AtomicLong packetsSent = new AtomicLong();  
  9.   
  10.     protected long minLatency;  
  11.     protected long maxLatency;  
  12.     protected String lastOp;  
  13.     protected long lastCxid;  
  14.     protected long lastZxid;  
  15.     protected long lastResponseTime;  
  16.     protected long lastLatency;  
  17.   
  18.     protected long count;  
  19.     protected long totalLatency;  
  20.   
  21. }  


重点看一下NIOServerCnxn,它处理和客户端的连接。它的唯一构造函数需要4个参数:ZooKeeperServer, SocketChannel, SelectionKey, NIOServerCnxnFactory。NIOServerCnxn本质上是对SocketChannel的封装,它提供了对SocketChannel读写的方法。

 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,  
  2.             SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {  
  3.         this.zkServer = zk;  
  4.         this.sock = sock;  
  5.         this.sk = sk;  
  6.         this.factory = factory;  
  7.         if (this.factory.login != null) {  
  8.             this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);  
  9.         }  
  10.         if (zk != null) {   
  11.             outstandingLimit = zk.getGlobalOutstandingLimit();  
  12.         }  
  13.         sock.socket().setTcpNoDelay(true);  
  14.         /* set socket linger to false, so that socket close does not 
  15.          * block */  
  16.         sock.socket().setSoLinger(false, -1);  
  17.         InetAddress addr = ((InetSocketAddress) sock.socket()  
  18.                 .getRemoteSocketAddress()).getAddress();  
  19.         authInfo.add(new Id("ip", addr.getHostAddress()));  
  20.         sk.interestOps(SelectionKey.OP_READ);  
  21.     }  

 

NIOServerCnxn的核心方法是doIO, 它实现了SelectionKey被Selector选出后,SocketChannel如何进行读写

SocketChannel从客户端读数据的过程:

1.    NIOServerCnxc维护了两个读数据的ByteBuffer, 一个是 lenBuffer = ByteBuffer.allocate(4), 4个字节的ByteBuffer,表示是否是4个字符的命令消息,比如ruok, conf这种命令。ByteBuffer incomingBuffer表示用来存放读数据ByteBuffer, 初始状态下incomingBuffer指向lenBuffer

2. SocketChannel先向incomingBuffer写入数据,如果写入的长度小于0就抛异常。如果正常写入,并且incomingBuffer写满了,如果incomingBuffer是指向lenBuffer,表示这次读的是4个字节的长度。

3. readLength方法会判断是否是4字符命令,首先调用checkFourLetterWord来判断是否是4字符命令

4. 在checkFourLetterWord中,如果是4字符命令,就调用对应的线程CommandThread,启动单独的线程去执行对应的命令,具体的如何写会在后面说.

如果不是4字符命令,就会incomingBuffer分配对应长度的ByteBuffer, incomingBuffer = ByteBuffer.allocate(len); 不在指向lenBuffer

5. 如果不是4字符命令,进入到readPayload分支。在readPayload判断incomingBuffer是否满包,如果不是,就尝试读一次SocketChanel。如果这时候满包了,就调用flip方法切换到读模式,如果是第一次读到请求,就进入readConnectRequest,如果不是就进入到readRequest。 最后 incomingBuffer = lenBuffer; 再次指向lenBuffer,读下一个请求。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1.  void doIO(SelectionKey k) throws InterruptedException {  
  2.         try {  
  3.             if (isSocketOpen() == false) {  
  4.                 LOG.warn("trying to do i/o on a null socket for session:0x"  
  5.                          + Long.toHexString(sessionId));  
  6.   
  7.                 return;  
  8.             }  
  9.             if (k.isReadable()) {  
  10.                 int rc = sock.read(incomingBuffer);  
  11.                 if (rc < 0) {  
  12.                     throw new EndOfStreamException(  
  13.                             "Unable to read additional data from client sessionid 0x"  
  14.                             + Long.toHexString(sessionId)  
  15.                             + ", likely client has closed socket");  
  16.                 }  
  17.                 if (incomingBuffer.remaining() == 0) {  
  18.                     boolean isPayload;  
  19.                     if (incomingBuffer == lenBuffer) { // start of next request  
  20.                         incomingBuffer.flip();  
  21.                         isPayload = readLength(k);  
  22.                         incomingBuffer.clear();  
  23.                     } else {  
  24.                         // continuation  
  25.                         isPayload = true;  
  26.                     }  
  27.                     if (isPayload) { // not the case for 4letterword  
  28.                         readPayload();  
  29.                     }  
  30.                     else {  
  31.                         // four letter words take care  
  32.                         // need not do anything else  
  33.                         return;  
  34.                     }  
  35.                 }  
  36.             }  
  37. ............  
  38.     }  
  39.   
  40. private boolean readLength(SelectionKey k) throws IOException {  
  41.         // Read the length, now get the buffer  
  42.         int len = lenBuffer.getInt();  
  43.         if (!initialized && checkFourLetterWord(sk, len)) {  
  44.             return false;  
  45.         }  
  46.         if (len < 0 || len > BinaryInputArchive.maxBuffer) {  
  47.             throw new IOException("Len error " + len);  
  48.         }  
  49.         if (zkServer == null) {  
  50.             throw new IOException("ZooKeeperServer not running");  
  51.         }  
  52.         incomingBuffer = ByteBuffer.allocate(len);  
  53.         return true;  
  54.     }  
  55.   
  56.  private boolean checkFourLetterWord(final SelectionKey k, final int len)  
  57.     throws IOException  
  58.     {  
  59.         // We take advantage of the limited size of the length to look  
  60.         // for cmds. They are all 4-bytes which fits inside of an int  
  61.         String cmd = cmd2String.get(len);  
  62.         if (cmd == null) {  
  63.             return false;  
  64.         }  
  65.         LOG.info("Processing " + cmd + " command from "  
  66.                 + sock.socket().getRemoteSocketAddress());  
  67.         packetReceived();  
  68.   
  69.         /** cancel the selection key to remove the socket handling 
  70.          * from selector. This is to prevent netcat problem wherein 
  71.          * netcat immediately closes the sending side after sending the 
  72.          * commands and still keeps the receiving channel open.  
  73.          * The idea is to remove the selectionkey from the selector 
  74.          * so that the selector does not notice the closed read on the 
  75.          * socket channel and keep the socket alive to write the data to 
  76.          * and makes sure to close the socket after its done writing the data 
  77.          */  
  78.         if (k != null) {  
  79.             try {  
  80.                 k.cancel();  
  81.             } catch(Exception e) {  
  82.                 LOG.error("Error cancelling command selection key ", e);  
  83.             }  
  84.         }  
  85.   
  86.         final PrintWriter pwriter = new PrintWriter(  
  87.                 new BufferedWriter(new SendBufferWriter()));  
  88.         if (len == ruokCmd) {  
  89.             RuokCommand ruok = new RuokCommand(pwriter);  
  90.             ruok.start();  
  91.             return true;  
  92.         } else if (len == getTraceMaskCmd) {  
  93.             TraceMaskCommand tmask = new TraceMaskCommand(pwriter);  
  94.             tmask.start();  
  95.             return true;  
  96.         } else if (len == setTraceMaskCmd) {  
  97.             int rc = sock.read(incomingBuffer);  
  98.             if (rc < 0) {  
  99.                 throw new IOException("Read error");  
  100.             }  
  101.   
  102.             incomingBuffer.flip();  
  103.             long traceMask = incomingBuffer.getLong();  
  104.             ZooTrace.setTextTraceLevel(traceMask);  
  105.             SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask);  
  106.             setMask.start();  
  107.             return true;  
  108.         } else if (len == enviCmd) {  
  109.             EnvCommand env = new EnvCommand(pwriter);  
  110.             env.start();  
  111.             return true;  
  112.         } else if (len == confCmd) {  
  113.             ConfCommand ccmd = new ConfCommand(pwriter);  
  114.             ccmd.start();  
  115.             return true;  
  116.         } else if (len == srstCmd) {  
  117.             StatResetCommand strst = new StatResetCommand(pwriter);  
  118.             strst.start();  
  119.             return true;  
  120.         } else if (len == crstCmd) {  
  121.             CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter);  
  122.             crst.start();  
  123.             return true;  
  124.         } else if (len == dumpCmd) {  
  125.             DumpCommand dump = new DumpCommand(pwriter);  
  126.             dump.start();  
  127.             return true;  
  128.         } else if (len == statCmd || len == srvrCmd) {  
  129.             StatCommand stat = new StatCommand(pwriter, len);  
  130.             stat.start();  
  131.             return true;  
  132.         } else if (len == consCmd) {  
  133.             ConsCommand cons = new ConsCommand(pwriter);  
  134.             cons.start();  
  135.             return true;  
  136.         } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {  
  137.             WatchCommand wcmd = new WatchCommand(pwriter, len);  
  138.             wcmd.start();  
  139.             return true;  
  140.         } else if (len == mntrCmd) {  
  141.             MonitorCommand mntr = new MonitorCommand(pwriter);  
  142.             mntr.start();  
  143.             return true;  
  144.         } else if (len == isroCmd) {  
  145.             IsroCommand isro = new IsroCommand(pwriter);  
  146.             isro.start();  
  147.             return true;  
  148.         }  
  149.         return false;  
  150.     }  
  151.   
  152.  private void readPayload() throws IOException, InterruptedException {  
  153.         if (incomingBuffer.remaining() != 0) { // have we read length bytes?  
  154.             int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok  
  155.             if (rc < 0) {  
  156.                 throw new EndOfStreamException(  
  157.                         "Unable to read additional data from client sessionid 0x"  
  158.                         + Long.toHexString(sessionId)  
  159.                         + ", likely client has closed socket");  
  160.             }  
  161.         }  
  162.   
  163.         if (incomingBuffer.remaining() == 0) { // have we read length bytes?  
  164.             packetReceived();  
  165.             incomingBuffer.flip();  
  166.             if (!initialized) {  
  167.                 readConnectRequest();  
  168.             } else {  
  169.                 readRequest();  
  170.             }  
  171.             lenBuffer.clear();  
  172.             incomingBuffer = lenBuffer;  
  173.         }  
  174.     }  

 

NIOServerCnxn写数据的过程如下:

1. 创建一个LinkedBlockingQueue<ByteBuffer>类型的outgoingBuffers来优化写,可以一次写多个ByteBuffer

2. 如果SelectionKey是因为写消息被Selector选中 的,先判断outgoingBuffers的长度是否大于0,如果大于0,就把outgoingBuffers中的ByteBuffer的数据都复制到factory.directBuffer这个直接内存的缓冲区中,如果directBuffer满了或者outgoingBuffers都已经复制到directBuffer了,就调用它的flip方法把它切换到读模式,然后把它的数据写入到SocketChannel中去。

由此可见,每次写的时候,都是从directBuffer写到SocketChannel中去的,利用直接内存优化了写操作。

写完后清理一下outgoingBuffers,把已经写完的ByteBuffer清理掉

3. 如果outgoingBuffers都写完了,就把SocketChannel切换到读模式中,关闭对写标志位的监听。如果没写完,继续监听写请求。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. void doIO(SelectionKey k) throws InterruptedException {  
  2.         try {  
  3.             if (isSocketOpen() == false) {  
  4.                 LOG.warn("trying to do i/o on a null socket for session:0x"  
  5.                          + Long.toHexString(sessionId));  
  6.   
  7.                 return;  
  8.             }  
  9.             .......  
  10.             if (k.isWritable()) {  
  11.                  
  12.                 if (outgoingBuffers.size() > 0) {  
  13.                       
  14.                     ByteBuffer directBuffer = factory.directBuffer;  
  15.                     directBuffer.clear();  
  16.   
  17.                     for (ByteBuffer b : outgoingBuffers) {  
  18.                         if (directBuffer.remaining() < b.remaining()) {  
  19.                               
  20.                             b = (ByteBuffer) b.slice().limit(  
  21.                                     directBuffer.remaining());  
  22.                         }  
  23.                           
  24.                         int p = b.position();  
  25.                         directBuffer.put(b);  
  26.                         b.position(p);  
  27.                         if (directBuffer.remaining() == 0) {  
  28.                             break;  
  29.                         }  
  30.                     }  
  31.                      
  32.                     directBuffer.flip();  
  33.   
  34.                     int sent = sock.write(directBuffer);  
  35.                     ByteBuffer bb;  
  36.   
  37.                     // Remove the buffers that we have sent  
  38.                     while (outgoingBuffers.size() > 0) {  
  39.                         bb = outgoingBuffers.peek();  
  40.                         if (bb == ServerCnxnFactory.closeConn) {  
  41.                             throw new CloseRequestException("close requested");  
  42.                         }  
  43.                         int left = bb.remaining() - sent;  
  44.                         if (left > 0) {  
  45.                               
  46.                             bb.position(bb.position() + sent);  
  47.                             break;  
  48.                         }  
  49.                         packetSent();  
  50.                           
  51.                         sent -= bb.remaining();  
  52.                         outgoingBuffers.remove();  
  53.                     }  
  54.                     // ZooLog.logTraceMessage(LOG,  
  55.                     // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,  
  56.                     // outgoingBuffers.size() = " + outgoingBuffers.size());  
  57.                 }  
  58.   
  59.                 synchronized(this.factory){  
  60.                     if (outgoingBuffers.size() == 0) {  
  61.                         if (!initialized  
  62.                                 && (sk.interestOps() & SelectionKey.OP_READ) == 0) {  
  63.                             throw new CloseRequestException("responded to info probe");  
  64.                         }  
  65.                         sk.interestOps(sk.interestOps()  
  66.                                 & (~SelectionKey.OP_WRITE));  
  67.                     } else {  
  68.                         sk.interestOps(sk.interestOps()  
  69.                                 | SelectionKey.OP_WRITE);  
  70.                     }  
  71.                 }  
  72.             }  
  73.         } catch (CancelledKeyException e) {  
  74.             LOG.warn("Exception causing close of session 0x"  
  75.                     + Long.toHexString(sessionId)  
  76.                     + " due to " + e);  
  77.             if (LOG.isDebugEnabled()) {  
  78.                 LOG.debug("CancelledKeyException stack trace", e);  
  79.             }  
  80.             close();  
  81.         } catch (CloseRequestException e) {  
  82.             // expecting close to log session closure  
  83.             close();  
  84.         } catch (EndOfStreamException e) {  
  85.             LOG.warn("caught end of stream exception",e); // tell user why  
  86.   
  87.             // expecting close to log session closure  
  88.             close();  
  89.         } catch (IOException e) {  
  90.             LOG.warn("Exception causing close of session 0x"  
  91.                     + Long.toHexString(sessionId)  
  92.                     + " due to " + e);  
  93.             if (LOG.isDebugEnabled()) {  
  94.                 LOG.debug("IOException stack trace", e);  
  95.             }  
  96.             close();  
  97.         }  
  98.     }  


NIOServerCnxn写操作的入口方法有两个,一个是同步IO的sendBufferSync, 一个是NIO的sendBuffer。

 

1.基于同步IO的 sendBufferSync方法直接把SocketChannel设置为阻塞模式,然后直接写到Socket中去。上面提到的相应4字符命令的场景,就是使用了sendBufferSync的方法,直接写。

2. sendBuffer方法使用了NIO,它主要是因为使用了outgoingBuffers队列来优化写操作,可以一次写多个ByteBuffer。写的时候,先加入到outgoingBuffers,然后设置SelectionKey的写标志位,这样在下次Selector执行select方法时,可以进行写的动作

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. void sendBufferSync(ByteBuffer bb) {  
  2.        try {  
  3.            /* configure socket to be blocking 
  4.             * so that we dont have to do write in  
  5.             * a tight while loop 
  6.             */  
  7.            sock.configureBlocking(true);  
  8.            if (bb != ServerCnxnFactory.closeConn) {  
  9.                if (sock.isOpen()) {  
  10.                    sock.write(bb);  
  11.                }  
  12.                packetSent();  
  13.            }   
  14.        } catch (IOException ie) {  
  15.            LOG.error("Error sending data synchronously ", ie);  
  16.        }  
  17.     }  
  18.       
  19.     public void sendBuffer(ByteBuffer bb) {  
  20.         try {  
  21.             if (bb != ServerCnxnFactory.closeConn) {  
  22.                 // We check if write interest here because if it is NOT set,  
  23.                 // nothing is queued, so we can try to send the buffer right  
  24.                 // away without waking up the selector  
  25.                 if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {  
  26.                     try {  
  27.                         sock.write(bb);  
  28.                     } catch (IOException e) {  
  29.                         // we are just doing best effort right now  
  30.                     }  
  31.                 }  
  32.                 // if there is nothing left to send, we are done  
  33.                 if (bb.remaining() == 0) {  
  34.                     packetSent();  
  35.                     return;  
  36.                 }  
  37.             }  
  38.   
  39.             synchronized(this.factory){  
  40.                 sk.selector().wakeup();  
  41.                 if (LOG.isTraceEnabled()) {  
  42.                     LOG.trace("Add a buffer to outgoingBuffers, sk " + sk  
  43.                             + " is valid: " + sk.isValid());  
  44.                 }  
  45.                 outgoingBuffers.add(bb);  
  46.                 if (sk.isValid()) {  
  47.                     sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);  
  48.                 }  
  49.             }  
  50.               
  51.         } catch(Exception e) {  
  52.             LOG.error("Unexpected Exception: ", e);  
  53.         }  
  54.     }  



 

协议(编解码)

ZooKeeper使用Apache jute来序列化和反序列化Java对象,把Java对象序列化成二进制数据在网络中传播。在上一篇从ZooKeeper源代码看如何实现分布式系统(二)数据的高可用存储 中已经介绍了Apache Jute,这里不再赘述,简单看一下ZooKeeperServer是如何处理收到的数据包的,可以看到如何把二进制的请求序列化成Java对象来使用。

1. 先用ByteBufferInputStream来将incomingBuffer封装成流,然后用Jute的接口读到RequestHeader对象,这个对象实现了Jute的Record接口

2. RequestHeader只有两个属性,xid表示事务id,type表示请求的类型

3. 如果是auth类型的请求,从incomingBuffer中读取数据,反序列化到AuthPacket中,然后调用AuthenticationProvider来进行认证

4.如果是sasl的请求,执行相应的代码

5. 对于其他的事务请求,构造一个Request对象,进入到submitRequest方法去执行相应的事务请求。

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. // ZooKeeperServer  
  2. public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {  
  3.         // We have the request, now process and setup for next  
  4.         InputStream bais = new ByteBufferInputStream(incomingBuffer);  
  5.         BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);  
  6.         RequestHeader h = new RequestHeader();  
  7.         h.deserialize(bia, "header");  
  8.         // Through the magic of byte buffers, txn will not be  
  9.         // pointing  
  10.         // to the start of the txn  
  11.         incomingBuffer = incomingBuffer.slice();  
  12.         if (h.getType() == OpCode.auth) {  
  13.             LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());  
  14.             AuthPacket authPacket = new AuthPacket();  
  15.             ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);  
  16.             String scheme = authPacket.getScheme();  
  17.             AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);  
  18.             Code authReturn = KeeperException.Code.AUTHFAILED;  
  19.             if(ap != null) {  
  20.                 try {  
  21.                     authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());  
  22.                 } catch(RuntimeException e) {  
  23.                     LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);  
  24.                     authReturn = KeeperException.Code.AUTHFAILED;                     
  25.                 }  
  26.             }  
  27.             if (authReturn!= KeeperException.Code.OK) {  
  28.                 if (ap == null) {  
  29.                     LOG.warn("No authentication provider for scheme: "  
  30.                             + scheme + " has "  
  31.                             + ProviderRegistry.listProviders());  
  32.                 } else {  
  33.                     LOG.warn("Authentication failed for scheme: " + scheme);  
  34.                 }  
  35.                 // send a response...  
  36.                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0,  
  37.                         KeeperException.Code.AUTHFAILED.intValue());  
  38.                 cnxn.sendResponse(rh, nullnull);  
  39.                 // ... and close connection  
  40.                 cnxn.sendBuffer(ServerCnxnFactory.closeConn);  
  41.                 cnxn.disableRecv();  
  42.             } else {  
  43.                 if (LOG.isDebugEnabled()) {  
  44.                     LOG.debug("Authentication succeeded for scheme: "  
  45.                               + scheme);  
  46.                 }  
  47.                 LOG.info("auth success " + cnxn.getRemoteSocketAddress());  
  48.                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0,  
  49.                         KeeperException.Code.OK.intValue());  
  50.                 cnxn.sendResponse(rh, nullnull);  
  51.             }  
  52.             return;  
  53.         } else {  
  54.             if (h.getType() == OpCode.sasl) {  
  55.                 Record rsp = processSasl(incomingBuffer,cnxn);  
  56.                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());  
  57.                 cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?  
  58.             }  
  59.             else {  
  60.                 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),  
  61.                   h.getType(), incomingBuffer, cnxn.getAuthInfo());  
  62.                 si.setOwner(ServerCnxn.me);  
  63.                 submitRequest(si);  
  64.             }  
  65.         }  
  66.         cnxn.incrOutstandingRequests(h);  
  67.     }  


可以看到ZooKeeper的请求分为了两部分,RequestHeader表示消息头,剩余部分表示消息体。消息头标示了消息的类型。

 

 

线程模型

 

ZooKeeper提供了两种服务器端的线程模型,一种是基于原生NIO的reactor模型,一种是基于Netty的reactor模型。我们看一下基于NIO的reactor模型。

NIOServerCnxnFactory封装了Selector对象来做事件分发。NIOServerCnxnFactory本身实现了Runnable接口来作为一个可运行的线程。它还维护了一个线程,来使它本身作为一个单独的线程运行。

1. configure方法创建了一个守护线程,并且创建了ServerSocketChannel,注册到了Selector上去监听ACCEPT事件

2. 维护了一个HashMap,由客户端IP映射到来自该IP的NIOServerCnxn连接对象。

3. start方法启动线程,开始监听端口来响应客户端请求

4. run方法就是reactor模型的EventLoop,Selector每隔1秒执行一次select方法来处理IO请求,并分发到对应的SocketChannel中去。可以看到在分发请求的时候并没有创建新的线程

所以NIOServerCnxnFactory是一个最简单的单线程的reactor模型,由一个线程来进行IO事件的分发,以及IO的读写

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {  
  2.     ServerSocketChannel ss;  
  3.   
  4.     final Selector selector = Selector.open();  
  5.   
  6.     Thread thread;  
  7.   
  8.     public void configure(InetSocketAddress addr, int maxcc) throws IOException {  
  9.         configureSaslLogin();  
  10.   
  11.         thread = new Thread(this"NIOServerCxn.Factory:" + addr);  
  12.         thread.setDaemon(true);  
  13.         maxClientCnxns = maxcc;  
  14.         this.ss = ServerSocketChannel.open();  
  15.         ss.socket().setReuseAddress(true);  
  16.         LOG.info("binding to port " + addr);  
  17.         ss.socket().bind(addr);  
  18.         ss.configureBlocking(false);  
  19.         ss.register(selector, SelectionKey.OP_ACCEPT);  
  20.     }  
  21.   
  22.  final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =  
  23.         new HashMap<InetAddress, Set<NIOServerCnxn>>( );  
  24.   
  25. public void start() {  
  26.         // ensure thread is started once and only once  
  27.         if (thread.getState() == Thread.State.NEW) {  
  28.             thread.start();  
  29.         }  
  30.     }  
  31.  private void addCnxn(NIOServerCnxn cnxn) {  
  32.         synchronized (cnxns) {  
  33.             cnxns.add(cnxn);  
  34.             synchronized (ipMap){  
  35.                 InetAddress addr = cnxn.sock.socket().getInetAddress();  
  36.                 Set<NIOServerCnxn> s = ipMap.get(addr);  
  37.                 if (s == null) {  
  38.                     // in general we will see 1 connection from each  
  39.                     // host, setting the initial cap to 2 allows us  
  40.                     // to minimize mem usage in the common case  
  41.                     // of 1 entry --  we need to set the initial cap  
  42.                     // to 2 to avoid rehash when the first entry is added  
  43.                     s = new HashSet<NIOServerCnxn>(2);  
  44.                     s.add(cnxn);  
  45.                     ipMap.put(addr,s);  
  46.                 } else {  
  47.                     s.add(cnxn);  
  48.                 }  
  49.             }  
  50.         }  
  51.     }  
  52. public void run() {  
  53.         while (!ss.socket().isClosed()) {  
  54.             try {  
  55.                 selector.select(1000);  
  56.                 Set<SelectionKey> selected;  
  57.                 synchronized (this) {  
  58.                     selected = selector.selectedKeys();  
  59.                 }  
  60.                 ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(  
  61.                         selected);  
  62.                 Collections.shuffle(selectedList);  
  63.                 for (SelectionKey k : selectedList) {  
  64.                     if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {  
  65.                         SocketChannel sc = ((ServerSocketChannel) k  
  66.                                 .channel()).accept();  
  67.                         InetAddress ia = sc.socket().getInetAddress();  
  68.                         int cnxncount = getClientCnxnCount(ia);  
  69.                         if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){  
  70.                             LOG.warn("Too many connections from " + ia  
  71.                                      + " - max is " + maxClientCnxns );  
  72.                             sc.close();  
  73.                         } else {  
  74.                             LOG.info("Accepted socket connection from "  
  75.                                      + sc.socket().getRemoteSocketAddress());  
  76.                             sc.configureBlocking(false);  
  77.                             SelectionKey sk = sc.register(selector,  
  78.                                     SelectionKey.OP_READ);  
  79.                             NIOServerCnxn cnxn = createConnection(sc, sk);  
  80.                             sk.attach(cnxn);  
  81.                             addCnxn(cnxn);  
  82.                         }  
  83.                     } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {  
  84.                         NIOServerCnxn c = (NIOServerCnxn) k.attachment();  
  85.                         c.doIO(k);  
  86.                     } else {  
  87.                         if (LOG.isDebugEnabled()) {  
  88.                             LOG.debug("Unexpected ops in select "  
  89.                                       + k.readyOps());  
  90.                         }  
  91.                     }  
  92.                 }  
  93.                 selected.clear();  
  94.             } catch (RuntimeException e) {  
  95.                 LOG.warn("Ignoring unexpected runtime exception", e);  
  96.             } catch (Exception e) {  
  97.                 LOG.warn("Ignoring exception", e);  
  98.             }  
  99.         }  
  100.         closeAll();  
  101.         LOG.info("NIOServerCnxn factory exited run method");  
  102.     }  


 

 

分享到:
评论

相关推荐

    基于ssm redis solr dubbo zookeeper mysql等大型分布式电商系统.zip

    【压缩包子文件的文件名称列表】: "content_code" 提示我们这个压缩包可能包含了整个项目的源代码,用户可以下载并研究系统是如何利用上述技术实现的。通过查看和分析代码,学习者可以深入理解这些技术在实际项目中...

    分布式系统实验报告之一

    实验报告中提到的关键源代码可能是实现上述分布式系统特性的实例,例如: 1. RPC框架的客户端和服务器端代码,展示了请求和响应的交互过程。 2. 负载均衡器的实现,可能包含调度算法的代码片段。 3. 一致性算法的...

    netty、redis、zookeeper高并发实战-源代码

    这个源代码包很可能包含了使用Java操作Netty、Redis和ZooKeeper的具体实现,包括但不限于Netty服务器的搭建、Redis数据的存取操作、ZooKeeper的客户端接口调用等。通过研究这些源代码,开发者可以学习如何在实际项目...

    ZOOKEEPER3.4.5

    在实际项目中,开发者可以通过解压`zookeeper-3.4.5`压缩包,了解ZooKeeper的源代码,学习其实现原理,以便更好地运用到自己的分布式系统设计中。 总之,ZooKeeper 3.4.5 在服务治理和分布式部署中的作用不可忽视,...

    ylf,分布式商城系统——基于ssm+zookeeper+dubbo

    这些源代码可能涵盖了业务逻辑、数据访问、服务接口定义、配置文件等多个方面,开发者可以通过阅读和学习这些源码,了解和掌握分布式系统的设计理念和实现细节。 总的来说,ylf分布式商城系统结合了Java Web开发的...

    netty,redis,zookeeper高-netty_redis_zookeeper_source_code.zip

    总的来说,这个压缩包提供了一个绝佳的学习资源,对于想深入了解网络编程、数据存储和分布式协调的开发者来说,Netty、Redis和Zookeeper的源代码是宝贵的参考资料。通过研究这些源码,你可以提升自己的技能,更好地...

    基于Spring Boot、Redis、Dubbo、Zookeeper、Vue前后端分离、分布式架构的个人运动健康管理系统

    文件“content_code”可能包含了整个系统的源代码,包括Spring Boot后端服务代码、Dubbo接口定义、Zookeeper配置、Redis连接代码、MySQL数据库脚本、Vue前端项目代码等。这些代码遵循良好的编程规范,经过严格的测试...

    分布式秒杀系统

    分布式秒杀系统是一种在高并发环境下处理大量用户请求的技术,常用于电商平台的限时...这涉及到如限流算法(如漏桶、令牌桶)、分布式事务处理、数据一致性保证等多个高级主题,是理解分布式系统设计和实践的重要案例。

    分布式框架资源专题资料

    分布式框架资源专题资料涵盖了一系列关于分布式系统、Zookeeper、云原生以及Netty网络库的深入讲解。...这些内容对于想要在IT领域深入发展,特别是涉及分布式系统架构和高性能网络编程的开发者来说,都是宝贵的资源。

    云计算中的分布式文件系统.pdf

    后来Google发表了GFS和MapReduce的论文,为Nutch提供了启发,并最终Nutch开发出了NDFS和MapReduce的开源实现,成为了非常优秀的分布式系统基础架构。 开源项目如Hadoop、HBase、ZooKeeper等,都是云计算和分布式...

    基于Python实现的一个简单的分布式高并发RPC框架+源代码+文档说明

    &gt; + RPC在微服务、分布式系统、Web服务器方面应用太广泛了,需要对底层通信过程有基本认识 &gt; + Nignx、Hadoop、K8s、Tensorflow等系统或软件的底层源码大多是关于RPC的 &gt; + 可以更加熟悉地使用已有的RPC框架,甚至...

    dubbo+netty打造高性能的RPC

    同时,通过Netty的高性能网络编程能力,可以优化RPC通信过程,提升服务调用的效率。http和https则用于构建Web接口,允许客户端通过HTTP请求与服务端进行交互,而https确保了这些交互的安全性。 【标签】"dubbo ...

    Java 分布式项(SSM、分布式Dubbo、全文检索Solr、Vue、Zookeeper

    在提供的`content_code`压缩包中,可能包含了上述技术的源代码示例,这为学习和理解这些技术提供了实际的操作素材。对于初学者来说,可以通过阅读和运行这些代码来加深对分布式系统架构的理解;对于开发者来说,这些...

    基于Golang的分布式态势感知系统系统.zip

    Golang以其高效的性能、内置的并发支持和内存安全特性,成为了构建高性能分布式系统的理想选择。其轻量级的goroutine和channel机制使得编写并发程序变得简单,同时,静态类型的编译语言特性保证了代码的稳定性和可...

    apache-zookeeper-3.7.0-bin.zip

    Apache ZooKeeper 是一个分布式协调服务,它为分布式应用程序提供了一个高度可用、高性能的框架,用于管理命名空间、配置信息和同步服务。Zookeeper 的核心概念是基于节点(ZNode)的树形数据结构,允许各个分布式...

    分布式锁简单实现

    分布式锁是一种在分布式系统中实现同步访问资源的关键技术。它允许多个节点在同一时间对共享资源进行互斥访问,确保在高并发环境下数据的一致性和完整性。在这个“分布式锁简单实现”项目中,开发者提供了一个基本的...

    《Java中间件技术及其应用开发》-李华飚-源代码

    书中的源代码可能涉及以上这些中间件技术的具体实现和应用场景,通过阅读和理解这些代码,读者可以深入学习Java中间件技术的工作原理,并提升实际开发能力。对于想从事Java后端开发或者对Java中间件感兴趣的开发者来...

    Node.js-nodejszookeeperthrift实现服务的高可用

    Node.js是一种流行的JavaScript运行环境,常用于构建高性能的网络应用。Zookeeper是Apache的一个分布式协调服务,它提供了诸如配置管理、命名服务、分布式同步等功能,对于实现服务的高可用性至关重要。Thrift则是一...

    课程设计运动会管理系统的源代码.zip

    Dubbo是阿里巴巴开源的高性能服务框架,它提供了服务注册、发现、调用、负载均衡等功能,使得多个微服务能够相互通信。而Zookeeper则作为一个协调工具,用于存储和管理Dubbo服务的元数据,确保服务的高可用性和一致...

Global site tag (gtag.js) - Google Analytics