`
zhangwei_david
  • 浏览: 477822 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ZooKeeper源码分析(一)

 
阅读更多

   为了更好地了解ZooKeeper客户端的工作原理,首先需要从客户端的会话创建过程学起。

  初始化阶段:

 

  •   初始化ZooKeeper对象

        通过调用ZooKeeper的构造方法实例化一个ZooKeeper对象,在初始化过程中会创建一个客户端Watcher管理器ClientWatcherManager

 

  • 设置会话默认Watcher

      如果构造方法中传入了一个Watcher对象,那么客户端将这个Watcher对象作为默认Watcher保存到ClientWatcherManager中

 

  • 构造ZK服务器地址列表管理器HostProvider

        对于构造器中传入的服务器端地址,客户端将其保存在服务器地址列表管理器HostProvider中    

 

  • 创建并初始化客户端网络连接

          ZooKeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互。另外,客户端在创建 ClientCnxn的同时还会初始化客户端两个核心队列outGoingQueue和pendingQueue,分别作为客户端请求组发送队列和服务端 响应等待队列

 

  • 初始化SendThread和EventThread

        客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务器端之间所有网络IO,后者用户进行客户端的事件处理。

 

   下面就从代码上去分析这个过程

 

 

 

 

 

/**
  这是ZooKeeper客户端库的主类。使用ZK服务,应用必须收银实例化ZooKeeper类。
  所有迭代都是通过调用ZooKeeper类的方法完成。
  这个类的方法是除了特殊说明以外都是线程安全的。
   一旦和服务器建立连接,客户端就分配到一个会话ID(session ID).客户端将周期性发送心跳信号到服务器来保持会话有效。
   只要客户端的会话ID有效,应用都可以通过客户端调用ZooKeeper的接口
   如果因为一些原因导致客户端长时间向服务器发送心跳信号失败(例如,超过会话的超时时间),服务器将会话过期,同时会话ID也失效。客户端对象将不再可用。为了调用ZooKeeper 接口,应用必须创建一个新的客户端。
   如果ZooKeeper服务器的客户端的当前连接失败或其他原因没有响应,客户端在会话ID过期前会自动连接其他服务器。如果成功连接其他服务器,应用可以继续使用客户端。
   ZooKeeper的接口方法是同步或异步的。同步方法一直挂起到服务器响应。异步方法只是将发送的请求排队并立即返回。它使用一个回调对象,无论请求执行成功与否都会执行回调方法返回一个结果码指示结果状态。
  一些ZooKeeper API 成功被调用可以在ZK服务器的数据节点上注监视器。其他ZooKeeper API 成功调用可以触发监哪些监视器。一旦一个监视器被触发,时间将被传递到第一步注册监视器的客户端。每个监视器只能够被触发一次
  因此,一旦一个时间被传递到客户端的监视器,它将被注销。
   客户端需要一个实现Watcher接口的对象去处理传递到客户端的事件。
  当客户端丢失当前连接后重新连接服务器,所有被认为是触发的监视器,但是没有送达的事件将丢失。
   为了模式这个场景,客户端将产生一个特殊的事件,去告诉事件处理器连接被删除。这个特殊的事件的类型是EventNone 状态是KeeperStateDiscounnected
***/
public class ZooKeeper {

    public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";

    protected final ClientCnxn cnxn;
    private static final Logger LOG;
    static {
        //Keep these two lines together to keep the initialization order explicit
        LOG = LoggerFactory.getLogger(ZooKeeper.class);
        Environment.logEnv("Client environment:", LOG);
    }

    public ZooKeeperSaslClient getSaslClient() {
        return cnxn.zooKeeperSaslClient;
    }
	
	// 初始化客户端Watcher管理器,在初始化ZooKeeper时就对其进行初始化
    private final ZKWatchManager watchManager = new ZKWatchManager();

	// 获取数据监视器列表
    List<String> getDataWatches() {
        synchronized(watchManager.dataWatches) {
            List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
            return rc;
        }
    }
	// 获取Exist监视器列表
    List<String> getExistWatches() {
        synchronized(watchManager.existWatches) {
            List<String> rc =  new ArrayList<String>(watchManager.existWatches.keySet());
            return rc;
        }
    }
	// 获取子节点监视器列表
    List<String> getChildWatches() {
        synchronized(watchManager.childWatches) {
            List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet());
            return rc;
        }
    }

    /**
	  管理监视器 & 处理由ClientCnxn对象生成的事件。这个实现作为ZooKeeper的一个嵌套类,这样可以避免公开方法被作为ZooKeeper 客户端API的一部分被公开。
     */
    private static class ZKWatchManager implements ClientWatchManager {
	    // 数据节点监视器
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
		// exist 监视器
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
		// 子节点监视器
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();
        // 默认监视器
        private volatile Watcher defaultWatcher;
	
		// 添加监视器
        final private void addTo(Set<Watcher> from, Set<Watcher> to) {
            if (from != null) {
                to.addAll(from);
            }
        }

        /* 
		 * 返回事件需要通知的监视器列表,管理器不能通知监视器,如果监视器已经被触发,管理器将更新它内部的结构。
		   这样做的目的是被调用者目前也可能在以后的某个时间是负责通知的事件的监视。
         * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, 
         *                                                        Event.EventType, java.lang.String)
         */
        @Override
        public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
			// 需要通知的监视器
            Set<Watcher> result = new HashSet<Watcher>();
		   // 时间类型
            switch (type) {
			// 事件类型为None通知所有监视器
            case None:
				
                result.add(defaultWatcher);
				
                boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                        state != Watcher.Event.KeeperState.SyncConnected;
				
                synchronized(dataWatches) {
					// 添加所有数据监视器
                    for(Set<Watcher> ws: dataWatches.values()) {
                        result.addAll(ws);
                    }
					// 如果需要清空则清空数据监视器
                    if (clear) {
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) {
                    for(Set<Watcher> ws: existWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) {
                    for(Set<Watcher> ws: childWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        childWatches.clear();
                    }
                }
	
                return result;
            case NodeDataChanged:
			// 数据节点被创建
            case NodeCreated:
				// 通知数据节点监视器和exist监视器,同时将其删除
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
				//子节点列表变化事件,通知子节点变化监视器,同时删除
            case NodeChildrenChanged:
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
				// 节点删除事件
            case NodeDeleted:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                // XXX This shouldn't be needed, but just in case
                synchronized (existWatches) {
                    Set<Watcher> list = existWatches.remove(clientPath);
                    if (list != null) {
                        addTo(existWatches.remove(clientPath), result);
                        LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                    }
                }
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default:
                String msg = "Unhandled watch event type " + type
                    + " with state " + state + " on path " + clientPath;
                LOG.error(msg);
                throw new RuntimeException(msg);
            }

            return result;
        }
    }

    /**
     * 为一个特殊的路径注册监视器.
     */
    abstract class WatchRegistration {
	    // 监视器
        private Watcher watcher;
		// 客户端路径
        private String clientPath;
		// 构造方法
        public WatchRegistration(Watcher watcher, String clientPath)
        {
            this.watcher = watcher;
            this.clientPath = clientPath;
        }
        // 获取路径监视器的映射关系
        abstract protected Map<String, Set<Watcher>> getWatches(int rc);

        /**
		 *  在一个路径上注册监视器
         */
        public void register(int rc) {
            if (shouldAddWatch(rc)) {
                Map<String, Set<Watcher>> watches = getWatches(rc);
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);
                }
            }
        }
        /**
		 *  基于结果码判断是否需要添加监视器 
         */
        protected boolean shouldAddWatch(int rc) {
            return rc == 0;
        }
    }

    /** Handle the special case of exists watches - they add a watcher
     * even in the case where NONODE result code is returned.
     */
    class ExistsWatchRegistration extends WatchRegistration {
        public ExistsWatchRegistration(Watcher watcher, String clientPath) {
            super(watcher, clientPath);
        }

        @Override
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            return rc == 0 ?  watchManager.dataWatches : watchManager.existWatches;
        }

        @Override
        protected boolean shouldAddWatch(int rc) {
            return rc == 0 || rc == KeeperException.Code.NONODE.intValue();
        }
    }

    class DataWatchRegistration extends WatchRegistration {
        public DataWatchRegistration(Watcher watcher, String clientPath) {
            super(watcher, clientPath);
        }

        @Override
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            return watchManager.dataWatches;
        }
    }

    class ChildWatchRegistration extends WatchRegistration {
        public ChildWatchRegistration(Watcher watcher, String clientPath) {
            super(watcher, clientPath);
        }

        @Override
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            return watchManager.childWatches;
        }
    }

    public enum States {
        CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
        CLOSED, AUTH_FAILED, NOT_CONNECTED;
		
	    // 判断连接是否存活,只要不是CLOSED也不是AUTH_FAILED都是存活的
        public boolean isAlive() {
            return this != CLOSED && this != AUTH_FAILED;
        }

        /**
         * 判断是否和服务器见有连接,如果状态是CONNECTED或者是CONNECTEDREADONLY都表示有连接
         * */
        public boolean isConnected() {
            return this == CONNECTED || this == CONNECTEDREADONLY;
        }
    }

   
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
        throws IOException
    {
        this(connectString, sessionTimeout, watcher, false);
    }

   
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
		// 2 设置默认监视器
        watchManager.defaultWatcher = watcher;
		//创建一个连接字符串的解析器
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
		// 3 使用解析器解析出的服务器地址构建一个HostProvider
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
		// 4 构建一个客户端连接对象		
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
		// 启动连接
        cnxn.start();
    }

   
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            long sessionId, byte[] sessionPasswd)
        throws IOException
    {   
	 // 默认 canBeReadOnly=false
        this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false);
    }

  
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout
                + " watcher=" + watcher
                + " sessionId=" + Long.toHexString(sessionId)
                + " sessionPasswd="
                + (sessionPasswd == null ? "<null>" : "<hidden>"));

        watchManager.defaultWatcher = watcher;

        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
        cnxn.seenRwServerBefore = true; // since user has provided sessionId
        cnxn.start();
    }

 

 

 

  • 大小: 28.2 KB
  • 大小: 19.7 KB
5
1
分享到:
评论

相关推荐

    Zookeeper源码分析.epub

    Zookeeper源码分析.epub

    Zookeeper源码分析

    《Zookeeper源码分析》 Zookeeper是一款分布式协调服务,广泛应用于分布式系统中,提供诸如命名服务、配置管理、集群同步、领导者选举等核心功能。本文将深入剖析Zookeeper的工作原理,以及其内部实现的FastLeader...

    zookeeper源码分析

    第2章 ZooKeeper之序列化组件源码解析【透视现象,直击本质】 第4章 持久化【高手过招必备】 第6章 服务器启动 【由浅入深,先学好单机版,才能掌握集群版】 第7章 会话管理 【无处不在的会话其实没那么难】 第8章 ...

    zookeeper源码阅读

    ZooKeeper源码阅读,庖丁解牛的带你进入zk的世界。ZooKeeper的类初始化 ZooKeeper在初始化的时候, 会调用类初始化方法, 初始化日志环境(使用SLF4J), 并且记录相关环境变量. 环境变量被存放在Environment的类中, 使用...

    zookeeper 3.6.3 源码下载

    **源码结构分析** ZooKeeper的源码主要分为以下几个部分: 1. **协议层**:包含ZAB协议的实现,处理消息的发送和接收,保证数据的最终一致性。 2. **服务器端**:包括ServerCnxnFactory(连接工厂)、ZooKeeper...

    ZooKeeper源码阅读

    ### ZooKeeper源码阅读知识点详解 #### 一、ZooKeeper客户端初始化与使用 ##### 客户端初始化 ZooKeeper客户端的初始化是通过构造函数完成的,具体为: ```java ZooKeeper zookeeper = new ZooKeeper(host, time...

    08_尚硅谷技术之Zookeeper(源码解析)V3.3.pdf

    为了理解和分析Zookeeper如何通过Paxos算法保证数据的一致性,我们可以通过具体的实例来进一步解析。例如,假设有五个议员需要就税率问题进行决议。议员A1提出税率提案,其他议员需要响应。如果A1能够获得大多数议员...

    ZooKeeper源码Eclipse工程项目

    源码分析对于理解其内部工作原理至关重要,尤其是对于开发人员和运维人员来说,能深入源码有助于解决实际问题。 该项目名为“ZooKeeper源码Eclipse工程项目”,意味着它是专门为Eclipse IDE准备的,可以直接导入...

    zookeeper ui界面源码(github)

    ZooKeeper UI界面源码是基于GitHub开源的项目,它为Apache ZooKeeper提供了一个图形化的用户界面,方便管理员监控和管理ZooKeeper集群。Apache ZooKeeper是一个分布式协调服务,广泛应用于分布式系统中,如分布式...

    zookeeper源码

    **ZooKeeper源码分析** ZooKeeper的源码结构清晰,主要分为以下几个部分: 1. **Server端**:包含ZooKeeper服务器的所有组件,如ZooKeeperServer、QuorumPeer、ZKDatabase等。ZooKeeperServer负责处理客户端请求,...

    Zookeeper源码剖析:深入理解Leader选举机制

    **Zookeeper源码剖析:深入理解Leader选举机制** 在分布式协调服务Zookeeper中,Leader选举是其核心功能之一,确保了服务的高可用性和一致性。本文将深入Zookeeper的源码,探讨Leader选举的实现机制。 **为什么要...

    zookeeper 源码导入eclipse时下载的cache包等文件

    在开发过程中,为了深入理解ZooKeeper的工作原理,我们通常会对其进行源码分析。 当我们将ZooKeeper的源码导入Eclipse这样的集成开发环境时,会遇到一些依赖问题。Eclipse本身并不具备完整的构建工具链,所以我们...

    dubbo+zookeeper案例,dubbo和Zookeeper详解,Java源码.zip

    《Dubbo与Zookeeper深度解析:Java源码实践》 在现代分布式系统中,服务治理是不可或缺的一部分。Dubbo和Zookeeper作为两个重要的组件,...通过分析源码,你可以掌握服务治理的核心思想,提升解决分布式问题的能力。

    ZooKeeper-:ZooKeeper源码剖析

    ZooKeeper源码分析 优秀时间学习了一下ZooKeeper:分布式过程协调这本书的内容,对ZooKeeper实现的细节很好奇,所以顺便把ZooKeeper源码看了一遍。看完之后想写点内容做个笔记,确实发现不好开始。由于ZooKeeper一个...

    zookeeper限制ip版

    Apache ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。Zookeeper 在微服务、大数据等领域的应用广泛,尤其在...

    03-05-08-zookeeper源码之watcher原理分析1

    在本文中,我们将深入探讨Zookeeper中的Watcher原理,这是一个关键特性,它允许客户端对Zookeeper中的数据变化进行实时监控。Watcher机制是Zookeeper提供的一种事件通知模型,它使得客户端能够及时响应Zookeeper中...

    第四课:zookeeper ZAB协议实现源码分析1

    在本课程中,我们将深入探讨Zookeeper的ZAB协议实现,并通过源码分析来理解其启动流程、快照与事务日志的存储结构。Zookeeper是一款分布式协调服务,广泛应用于分布式系统中,如Hadoop、HBase等。本节主要关注...

    zookeeper 例子 源码 jar

    学习和分析ZooKeeper源码有助于开发者更好地理解分布式协调服务的实现,从而在实际项目中更有效地使用和优化ZooKeeper。例如,通过研究源码,你可以了解如何自定义Watcher,或者如何在高并发环境下优化ZooKeeper的...

    zookeeper-3.4.9的源码

    源码分析: 1. **项目结构**:ZooKeeper的源码主要分为`src/java/main`和`src/java/test`两个部分,前者是生产代码,后者是测试代码。`src/java/main`下又包含了`org.apache.zookeeper`这个主要的包,里面包含了...

Global site tag (gtag.js) - Google Analytics