`
zhangwei_david
  • 浏览: 474797 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ZooKeeper源码分析(二)

zk 
阅读更多

 

  上一节分析了ZooKeeper的部分代码,下面我们看看客户端网络连接器的部分代码

 

 

/**
   这个类管理客户端的socket I/O。ClientCnxn维护一个可用服务器列表可以根据需要透明地切换服务器
 *
 */
public class ClientCnxn {
    private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class);

    private static final String ZK_SASL_CLIENT_USERNAME =
        "zookeeper.sasl.client.username";

    /** 客户端在会话重连接时自动复位监视器,这个操作允许客户端通过设置环境变量zookeeper.disableAutoWatchReset=true来关闭这个行为
	 */
    private static boolean disableAutoWatchReset;
    static {
        disableAutoWatchReset =
            Boolean.getBoolean("zookeeper.disableAutoWatchReset");
        if (LOG.isDebugEnabled()) {
            LOG.debug("zookeeper.disableAutoWatchReset is "
                    + disableAutoWatchReset);
        }
    }

    static class AuthData {
        AuthData(String scheme, byte data[]) {
            this.scheme = scheme;
            this.data = data;
        }

        String scheme;

        byte data[];
    }
	
    private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();

    /**
	 *哪些已经发送出去的目前正在等待响应的包
     */
    private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();

    /**
     * 那些需要发送的包
     */
    private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();

	// 超时时间
    private int connectTimeout;

    /**
	 *客户端与服务器协商的超时时间,以毫秒为单位。这是真正的超时时间,而不是客户端的超时请求
     */
    private volatile int negotiatedSessionTimeout;
	
	// 读取超时时间
    private int readTimeout;

	// 会话超时时间
    private final int sessionTimeout;

	// ZooKeeper
    private final ZooKeeper zooKeeper;

	//客户端监视器管理器
    private final ClientWatchManager watcher;

	//会话ID
    private long sessionId;

	//会话密钥
    private byte sessionPasswd[] = new byte[16];

    // 是否只读
    private boolean readOnly;

    final String chrootPath;
   // 发送线程
    final SendThread sendThread;
	// 事件回调线程
    final EventThread eventThread;

    /**
     * Set to true when close is called. Latches the connection such that we
     * don't attempt to re-connect to the server if in the middle of closing the
     * connection (client sends session disconnect to server as part of close
     * operation)
     */
    private volatile boolean closing = false;
    
    /**
	  一组客户端可以连接的Zk主机
     */
    private final HostProvider hostProvider;

    /**
	 * 第一次和读写服务器建立连接时设置为true,之后不再改变。
	   这个值用来处理客户端没有sessionId连接只读模式服务器的场景.
	   客户端从只读服务器收到一个假的sessionId,这个sessionId对于其他服务器是无效的。所以
	   当客户端寻找一个读写服务器时,它在连接握手时发送0代替假的sessionId,建立一个新的,有效的会话
	   如果这个属性是false(这就意味着之前没有找到过读写服务器)则表示非0的sessionId是假的否则就是有效的 
     */
    volatile boolean seenRwServerBefore = false;


    public ZooKeeperSaslClient zooKeeperSaslClient;

    public long getSessionId() {
        return sessionId;
    }

    public byte[] getSessionPasswd() {
        return sessionPasswd;
    }

    public int getSessionTimeout() {
        return negotiatedSessionTimeout;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();

        SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress();
        SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress();
        sb
            .append("sessionid:0x").append(Long.toHexString(getSessionId()))
            .append(" local:").append(local)
            .append(" remoteserver:").append(remote)
            .append(" lastZxid:").append(lastZxid)
            .append(" xid:").append(xid)
            .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount())
            .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount())
            .append(" queuedpkts:").append(outgoingQueue.size())
            .append(" pendingresp:").append(pendingQueue.size())
            .append(" queuedevents:").append(eventThread.waitingEvents.size());

        return sb.toString();
    }

   

    /**
     * 创建一个连接对象。真正的网路连接直到需要的时候才建立。start()方法在执行构造方法后一定要调用
     * 这个构造方法在ZooKeeper的初始化时调用,用于初始化一个客户端网路管理器
     */
    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
            throws IOException {
        this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
             clientCnxnSocket, 0, new byte[16], canBeReadOnly);
    }

    
    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        //客户端实例
        this.zooKeeper = zooKeeper;
        // 客户端Watcher管理器
        this.watcher = watcher;
        //sessionId
        this.sessionId = sessionId;
        //会话密钥
        this.sessionPasswd = sessionPasswd;
        //会话超时时间
        this.sessionTimeout = sessionTimeout;
        //服务器地址列表管理器
        this.hostProvider = hostProvider;
         //根路径
        this.chrootPath = chrootPath;
       // 连接超时时间是会话超时时间和服务器数量的比值
        connectTimeout = sessionTimeout / hostProvider.size();
       // 读超时时间是会话超时时间的2/3
        readTimeout = sessionTimeout * 2 / 3;
		
        readOnly = canBeReadOnly;
        //创建发送和事件处理线程
        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();

    }

   
    public static boolean getDisableAutoResetWatch() {
        return disableAutoWatchReset;
    }
   
    public static void setDisableAutoResetWatch(boolean b) {
        disableAutoWatchReset = b;
    }
	//启动发送和事件处理线程
    public void start() {
        sendThread.start();
        eventThread.start();
    }

 

 

 

// 事件处理线程
class EventThread extends Thread {
	    //等待处理的事件
        private final LinkedBlockingQueue<Object> waitingEvents =
            new LinkedBlockingQueue<Object>();

		 /**
		  *这个是真正的排队会话的状态,知道事件处理线程真正处理事件并将其返回给监视器。
		  **/
        private volatile KeeperState sessionState = KeeperState.Disconnected;

       private volatile boolean wasKilled = false;
       private volatile boolean isRunning = false;

        EventThread() {
		   // 构造一个线程名
            super(makeThreadName("-EventThread"));
            setUncaughtExceptionHandler(uncaughtExceptionHandler);
			// 设置为守护线程
            setDaemon(true);
        }
        // 
        public void queueEvent(WatchedEvent event) {
			// 如果WatchedEvent的类型是None状态是sessionStat的值则不处理
            if (event.getType() == EventType.None
                    && sessionState == event.getState()) {
                return;
            }
			// 获取事件的状态
            sessionState = event.getState();

            // 构建一个基于事件的监视器
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // 排队pair,稍后处理
            waitingEvents.add(pair);
        }
		
		// 排队Packet
       public void queuePacket(Packet packet) {
          if (wasKilled) {
             synchronized (waitingEvents) {
                if (isRunning) waitingEvents.add(packet);
                else processEvent(packet);
             }
          } else {
             waitingEvents.add(packet);
          }
       }

        public void queueEventOfDeath() {
            waitingEvents.add(eventOfDeath);
        }

        @Override
        public void run() {
           try {
              isRunning = true;
              while (true) {
			     //从等待处理的事件队列中获取事件
                 Object event = waitingEvents.take();
				 
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           } catch (InterruptedException e) {
              LOG.error("Event thread exiting due to interruption", e);
           }

            LOG.info("EventThread shut down");
        }

		// 真正处理事件的入口,主要是回调处理
       private void processEvent(Object event) {
          try {
			// 如果事件是WatcherSetEventPair
              if (event instanceof WatcherSetEventPair) {
                  //每个监视器都会处理这个事件
                  WatcherSetEventPair pair = (WatcherSetEventPair) event;
                  for (Watcher watcher : pair.watchers) {
                      try {
                          watcher.process(pair.event);
                      } catch (Throwable t) {
                          LOG.error("Error while calling watcher ", t);
                      }
                  }
              } else {
                  Packet p = (Packet) event;
                  int rc = 0;
				  // 获取客户端路径
                  String clientPath = p.clientPath;
                  if (p.replyHeader.getErr() != 0) {
                      rc = p.replyHeader.getErr();
                  }
                  if (p.cb == null) {
                      LOG.warn("Somehow a null cb got to EventThread!");
                  } else if (p.response instanceof ExistsResponse
                          || p.response instanceof SetDataResponse
                          || p.response instanceof SetACLResponse) {
						 // 获取回调对象
                      StatCallback cb = (StatCallback) p.cb;
					  // 如果处理成功回调方法会传入响应状态,否则响应状态为null
                      if (rc == 0) {
                          if (p.response instanceof ExistsResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((ExistsResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetDataResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetDataResponse) p.response)
                                              .getStat());
                          } else if (p.response instanceof SetACLResponse) {
                              cb.processResult(rc, clientPath, p.ctx,
                                      ((SetACLResponse) p.response)
                                              .getStat());
                          }
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetDataResponse) {
                      DataCallback cb = (DataCallback) p.cb;
                      GetDataResponse rsp = (GetDataResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getData(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetACLResponse) {
                      ACLCallback cb = (ACLCallback) p.cb;
                      GetACLResponse rsp = (GetACLResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getAcl(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null,
                                  null);
                      }
                  } else if (p.response instanceof GetChildrenResponse) {
                      ChildrenCallback cb = (ChildrenCallback) p.cb;
                      GetChildrenResponse rsp = (GetChildrenResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.response instanceof GetChildren2Response) {
                      Children2Callback cb = (Children2Callback) p.cb;
                      GetChildren2Response rsp = (GetChildren2Response) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx, rsp
                                  .getChildren(), rsp.getStat());
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null, null);
                      }
                  } else if (p.response instanceof CreateResponse) {
                      StringCallback cb = (StringCallback) p.cb;
                      CreateResponse rsp = (CreateResponse) p.response;
                      if (rc == 0) {
                          cb.processResult(rc, clientPath, p.ctx,
                                  (chrootPath == null
                                          ? rsp.getPath()
                                          : rsp.getPath()
                                    .substring(chrootPath.length())));
                      } else {
                          cb.processResult(rc, clientPath, p.ctx, null);
                      }
                  } else if (p.cb instanceof VoidCallback) {
                      VoidCallback cb = (VoidCallback) p.cb;
                      cb.processResult(rc, clientPath, p.ctx);
                  }
              }
          } catch (Throwable t) {
              LOG.error("Caught unexpected throwable", t);
          }
       }
    }

 

5
3
分享到:
评论

相关推荐

    Zookeeper源码分析.epub

    Zookeeper源码分析.epub

    Zookeeper源码分析

    《Zookeeper源码分析》 Zookeeper是一款分布式协调服务,广泛应用于分布式系统中,提供诸如命名服务、配置管理、集群同步、领导者选举等核心功能。本文将深入剖析Zookeeper的工作原理,以及其内部实现的FastLeader...

    zookeeper源码分析

    第2章 ZooKeeper之序列化组件源码解析【透视现象,直击本质】 第4章 持久化【高手过招必备】 第6章 服务器启动 【由浅入深,先学好单机版,才能掌握集群版】 第7章 会话管理 【无处不在的会话其实没那么难】 第8章 ...

    zookeeper源码阅读

    ZooKeeper源码阅读,庖丁解牛的带你进入zk的世界。ZooKeeper的类初始化 ZooKeeper在初始化的时候, 会调用类初始化方法, 初始化日志环境(使用SLF4J), 并且记录相关环境变量. 环境变量被存放在Environment的类中, 使用...

    zookeeper 3.6.3 源码下载

    **源码结构分析** ZooKeeper的源码主要分为以下几个部分: 1. **协议层**:包含ZAB协议的实现,处理消息的发送和接收,保证数据的最终一致性。 2. **服务器端**:包括ServerCnxnFactory(连接工厂)、ZooKeeper...

    ZooKeeper源码阅读

    ### ZooKeeper源码阅读知识点详解 #### 一、ZooKeeper客户端初始化与使用 ##### 客户端初始化 ZooKeeper客户端的初始化是通过构造函数完成的,具体为: ```java ZooKeeper zookeeper = new ZooKeeper(host, time...

    08_尚硅谷技术之Zookeeper(源码解析)V3.3.pdf

    为了理解和分析Zookeeper如何通过Paxos算法保证数据的一致性,我们可以通过具体的实例来进一步解析。例如,假设有五个议员需要就税率问题进行决议。议员A1提出税率提案,其他议员需要响应。如果A1能够获得大多数议员...

    Zookeeper源码剖析:深入理解Leader选举机制

    **Zookeeper源码剖析:深入理解Leader选举机制** 在分布式协调服务Zookeeper中,Leader选举是其核心功能之一,确保了服务的高可用性和一致性。本文将深入Zookeeper的源码,探讨Leader选举的实现机制。 **为什么要...

    ZooKeeper源码Eclipse工程项目

    源码分析对于理解其内部工作原理至关重要,尤其是对于开发人员和运维人员来说,能深入源码有助于解决实际问题。 该项目名为“ZooKeeper源码Eclipse工程项目”,意味着它是专门为Eclipse IDE准备的,可以直接导入...

    zookeeper源码

    **ZooKeeper源码分析** ZooKeeper的源码结构清晰,主要分为以下几个部分: 1. **Server端**:包含ZooKeeper服务器的所有组件,如ZooKeeperServer、QuorumPeer、ZKDatabase等。ZooKeeperServer负责处理客户端请求,...

    zookeeper ui界面源码(github)

    总的来说,通过分析这个开源项目,我们可以深入理解ZooKeeper的工作原理,学习前端开发、RESTful API设计、分布式系统监控等方面的知识,同时也能参与到开源社区,提升自己的技术水平和协作能力。

    zookeeper 源码导入eclipse时下载的cache包等文件

    在开发过程中,为了深入理解ZooKeeper的工作原理,我们通常会对其进行源码分析。 当我们将ZooKeeper的源码导入Eclipse这样的集成开发环境时,会遇到一些依赖问题。Eclipse本身并不具备完整的构建工具链,所以我们...

    ZooKeeper-:ZooKeeper源码剖析

    ZooKeeper源码分析 优秀时间学习了一下ZooKeeper:分布式过程协调这本书的内容,对ZooKeeper实现的细节很好奇,所以顺便把ZooKeeper源码看了一遍。看完之后想写点内容做个笔记,确实发现不好开始。由于ZooKeeper一个...

    dubbo+zookeeper案例,dubbo和Zookeeper详解,Java源码.zip

    《Dubbo与Zookeeper深度解析:Java源码实践》 在现代分布式系统中,服务治理是不可或缺的一部分。Dubbo和Zookeeper作为两个重要的组件,...通过分析源码,你可以掌握服务治理的核心思想,提升解决分布式问题的能力。

    zookeeper限制ip版

    总结,本文详细介绍了如何在Zookeeper 3.4.14版本中实现IP黑白名单功能,从需求分析到源码改造,再到功能测试,覆盖了整个开发流程,旨在帮助读者理解和实践Zookeeper的安全管理。通过这样的定制化改造,我们可以更...

    第四课:zookeeper ZAB协议实现源码分析1

    在本课程中,我们将深入探讨Zookeeper的ZAB协议实现,并通过源码分析来理解其启动流程、快照与事务日志的存储结构。Zookeeper是一款分布式协调服务,广泛应用于分布式系统中,如Hadoop、HBase等。本节主要关注...

    03-05-08-zookeeper源码之watcher原理分析1

    在本文中,我们将深入探讨Zookeeper中的Watcher原理,这是一个关键特性,它允许客户端对Zookeeper中的数据变化进行实时监控。Watcher机制是Zookeeper提供的一种事件通知模型,它使得客户端能够及时响应Zookeeper中...

    zookeeper-3.4.9的源码

    源码分析: 1. **项目结构**:ZooKeeper的源码主要分为`src/java/main`和`src/java/test`两个部分,前者是生产代码,后者是测试代码。`src/java/main`下又包含了`org.apache.zookeeper`这个主要的包,里面包含了...

    zookeeper 例子 源码 jar

    学习和分析ZooKeeper源码有助于开发者更好地理解分布式协调服务的实现,从而在实际项目中更有效地使用和优化ZooKeeper。例如,通过研究源码,你可以了解如何自定义Watcher,或者如何在高并发环境下优化ZooKeeper的...

Global site tag (gtag.js) - Google Analytics