上一节分析了ZooKeeper的部分代码,下面我们看看客户端网络连接器的部分代码
/** 这个类管理客户端的socket I/O。ClientCnxn维护一个可用服务器列表可以根据需要透明地切换服务器 * */ public class ClientCnxn { private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class); private static final String ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username"; /** 客户端在会话重连接时自动复位监视器,这个操作允许客户端通过设置环境变量zookeeper.disableAutoWatchReset=true来关闭这个行为 */ private static boolean disableAutoWatchReset; static { disableAutoWatchReset = Boolean.getBoolean("zookeeper.disableAutoWatchReset"); if (LOG.isDebugEnabled()) { LOG.debug("zookeeper.disableAutoWatchReset is " + disableAutoWatchReset); } } static class AuthData { AuthData(String scheme, byte data[]) { this.scheme = scheme; this.data = data; } String scheme; byte data[]; } private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>(); /** *哪些已经发送出去的目前正在等待响应的包 */ private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); /** * 那些需要发送的包 */ private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>(); // 超时时间 private int connectTimeout; /** *客户端与服务器协商的超时时间,以毫秒为单位。这是真正的超时时间,而不是客户端的超时请求 */ private volatile int negotiatedSessionTimeout; // 读取超时时间 private int readTimeout; // 会话超时时间 private final int sessionTimeout; // ZooKeeper private final ZooKeeper zooKeeper; //客户端监视器管理器 private final ClientWatchManager watcher; //会话ID private long sessionId; //会话密钥 private byte sessionPasswd[] = new byte[16]; // 是否只读 private boolean readOnly; final String chrootPath; // 发送线程 final SendThread sendThread; // 事件回调线程 final EventThread eventThread; /** * Set to true when close is called. Latches the connection such that we * don't attempt to re-connect to the server if in the middle of closing the * connection (client sends session disconnect to server as part of close * operation) */ private volatile boolean closing = false; /** 一组客户端可以连接的Zk主机 */ private final HostProvider hostProvider; /** * 第一次和读写服务器建立连接时设置为true,之后不再改变。 这个值用来处理客户端没有sessionId连接只读模式服务器的场景. 客户端从只读服务器收到一个假的sessionId,这个sessionId对于其他服务器是无效的。所以 当客户端寻找一个读写服务器时,它在连接握手时发送0代替假的sessionId,建立一个新的,有效的会话 如果这个属性是false(这就意味着之前没有找到过读写服务器)则表示非0的sessionId是假的否则就是有效的 */ volatile boolean seenRwServerBefore = false; public ZooKeeperSaslClient zooKeeperSaslClient; public long getSessionId() { return sessionId; } public byte[] getSessionPasswd() { return sessionPasswd; } public int getSessionTimeout() { return negotiatedSessionTimeout; } @Override public String toString() { StringBuilder sb = new StringBuilder(); SocketAddress local = sendThread.getClientCnxnSocket().getLocalSocketAddress(); SocketAddress remote = sendThread.getClientCnxnSocket().getRemoteSocketAddress(); sb .append("sessionid:0x").append(Long.toHexString(getSessionId())) .append(" local:").append(local) .append(" remoteserver:").append(remote) .append(" lastZxid:").append(lastZxid) .append(" xid:").append(xid) .append(" sent:").append(sendThread.getClientCnxnSocket().getSentCount()) .append(" recv:").append(sendThread.getClientCnxnSocket().getRecvCount()) .append(" queuedpkts:").append(outgoingQueue.size()) .append(" pendingresp:").append(pendingQueue.size()) .append(" queuedevents:").append(eventThread.waitingEvents.size()); return sb.toString(); } /** * 创建一个连接对象。真正的网路连接直到需要的时候才建立。start()方法在执行构造方法后一定要调用 * 这个构造方法在ZooKeeper的初始化时调用,用于初始化一个客户端网路管理器 */ public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException { this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0, new byte[16], canBeReadOnly); } public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) { //客户端实例 this.zooKeeper = zooKeeper; // 客户端Watcher管理器 this.watcher = watcher; //sessionId this.sessionId = sessionId; //会话密钥 this.sessionPasswd = sessionPasswd; //会话超时时间 this.sessionTimeout = sessionTimeout; //服务器地址列表管理器 this.hostProvider = hostProvider; //根路径 this.chrootPath = chrootPath; // 连接超时时间是会话超时时间和服务器数量的比值 connectTimeout = sessionTimeout / hostProvider.size(); // 读超时时间是会话超时时间的2/3 readTimeout = sessionTimeout * 2 / 3; readOnly = canBeReadOnly; //创建发送和事件处理线程 sendThread = new SendThread(clientCnxnSocket); eventThread = new EventThread(); } public static boolean getDisableAutoResetWatch() { return disableAutoWatchReset; } public static void setDisableAutoResetWatch(boolean b) { disableAutoWatchReset = b; } //启动发送和事件处理线程 public void start() { sendThread.start(); eventThread.start(); }
// 事件处理线程 class EventThread extends Thread { //等待处理的事件 private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>(); /** *这个是真正的排队会话的状态,知道事件处理线程真正处理事件并将其返回给监视器。 **/ private volatile KeeperState sessionState = KeeperState.Disconnected; private volatile boolean wasKilled = false; private volatile boolean isRunning = false; EventThread() { // 构造一个线程名 super(makeThreadName("-EventThread")); setUncaughtExceptionHandler(uncaughtExceptionHandler); // 设置为守护线程 setDaemon(true); } // public void queueEvent(WatchedEvent event) { // 如果WatchedEvent的类型是None状态是sessionStat的值则不处理 if (event.getType() == EventType.None && sessionState == event.getState()) { return; } // 获取事件的状态 sessionState = event.getState(); // 构建一个基于事件的监视器 WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // 排队pair,稍后处理 waitingEvents.add(pair); } // 排队Packet public void queuePacket(Packet packet) { if (wasKilled) { synchronized (waitingEvents) { if (isRunning) waitingEvents.add(packet); else processEvent(packet); } } else { waitingEvents.add(packet); } } public void queueEventOfDeath() { waitingEvents.add(eventOfDeath); } @Override public void run() { try { isRunning = true; while (true) { //从等待处理的事件队列中获取事件 Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down"); } // 真正处理事件的入口,主要是回调处理 private void processEvent(Object event) { try { // 如果事件是WatcherSetEventPair if (event instanceof WatcherSetEventPair) { //每个监视器都会处理这个事件 WatcherSetEventPair pair = (WatcherSetEventPair) event; for (Watcher watcher : pair.watchers) { try { watcher.process(pair.event); } catch (Throwable t) { LOG.error("Error while calling watcher ", t); } } } else { Packet p = (Packet) event; int rc = 0; // 获取客户端路径 String clientPath = p.clientPath; if (p.replyHeader.getErr() != 0) { rc = p.replyHeader.getErr(); } if (p.cb == null) { LOG.warn("Somehow a null cb got to EventThread!"); } else if (p.response instanceof ExistsResponse || p.response instanceof SetDataResponse || p.response instanceof SetACLResponse) { // 获取回调对象 StatCallback cb = (StatCallback) p.cb; // 如果处理成功回调方法会传入响应状态,否则响应状态为null if (rc == 0) { if (p.response instanceof ExistsResponse) { cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response) .getStat()); } else if (p.response instanceof SetDataResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response) .getStat()); } else if (p.response instanceof SetACLResponse) { cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response) .getStat()); } } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetDataResponse) { DataCallback cb = (DataCallback) p.cb; GetDataResponse rsp = (GetDataResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getData(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetACLResponse) { ACLCallback cb = (ACLCallback) p.cb; GetACLResponse rsp = (GetACLResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getAcl(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof GetChildrenResponse) { ChildrenCallback cb = (ChildrenCallback) p.cb; GetChildrenResponse rsp = (GetChildrenResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren()); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.response instanceof GetChildren2Response) { Children2Callback cb = (Children2Callback) p.cb; GetChildren2Response rsp = (GetChildren2Response) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, rsp .getChildren(), rsp.getStat()); } else { cb.processResult(rc, clientPath, p.ctx, null, null); } } else if (p.response instanceof CreateResponse) { StringCallback cb = (StringCallback) p.cb; CreateResponse rsp = (CreateResponse) p.response; if (rc == 0) { cb.processResult(rc, clientPath, p.ctx, (chrootPath == null ? rsp.getPath() : rsp.getPath() .substring(chrootPath.length()))); } else { cb.processResult(rc, clientPath, p.ctx, null); } } else if (p.cb instanceof VoidCallback) { VoidCallback cb = (VoidCallback) p.cb; cb.processResult(rc, clientPath, p.ctx); } } } catch (Throwable t) { LOG.error("Caught unexpected throwable", t); } } }
相关推荐
Zookeeper源码分析.epub
《Zookeeper源码分析》 Zookeeper是一款分布式协调服务,广泛应用于分布式系统中,提供诸如命名服务、配置管理、集群同步、领导者选举等核心功能。本文将深入剖析Zookeeper的工作原理,以及其内部实现的FastLeader...
第2章 ZooKeeper之序列化组件源码解析【透视现象,直击本质】 第4章 持久化【高手过招必备】 第6章 服务器启动 【由浅入深,先学好单机版,才能掌握集群版】 第7章 会话管理 【无处不在的会话其实没那么难】 第8章 ...
ZooKeeper源码阅读,庖丁解牛的带你进入zk的世界。ZooKeeper的类初始化 ZooKeeper在初始化的时候, 会调用类初始化方法, 初始化日志环境(使用SLF4J), 并且记录相关环境变量. 环境变量被存放在Environment的类中, 使用...
**源码结构分析** ZooKeeper的源码主要分为以下几个部分: 1. **协议层**:包含ZAB协议的实现,处理消息的发送和接收,保证数据的最终一致性。 2. **服务器端**:包括ServerCnxnFactory(连接工厂)、ZooKeeper...
### ZooKeeper源码阅读知识点详解 #### 一、ZooKeeper客户端初始化与使用 ##### 客户端初始化 ZooKeeper客户端的初始化是通过构造函数完成的,具体为: ```java ZooKeeper zookeeper = new ZooKeeper(host, time...
为了理解和分析Zookeeper如何通过Paxos算法保证数据的一致性,我们可以通过具体的实例来进一步解析。例如,假设有五个议员需要就税率问题进行决议。议员A1提出税率提案,其他议员需要响应。如果A1能够获得大多数议员...
**Zookeeper源码剖析:深入理解Leader选举机制** 在分布式协调服务Zookeeper中,Leader选举是其核心功能之一,确保了服务的高可用性和一致性。本文将深入Zookeeper的源码,探讨Leader选举的实现机制。 **为什么要...
源码分析对于理解其内部工作原理至关重要,尤其是对于开发人员和运维人员来说,能深入源码有助于解决实际问题。 该项目名为“ZooKeeper源码Eclipse工程项目”,意味着它是专门为Eclipse IDE准备的,可以直接导入...
**ZooKeeper源码分析** ZooKeeper的源码结构清晰,主要分为以下几个部分: 1. **Server端**:包含ZooKeeper服务器的所有组件,如ZooKeeperServer、QuorumPeer、ZKDatabase等。ZooKeeperServer负责处理客户端请求,...
总的来说,通过分析这个开源项目,我们可以深入理解ZooKeeper的工作原理,学习前端开发、RESTful API设计、分布式系统监控等方面的知识,同时也能参与到开源社区,提升自己的技术水平和协作能力。
在开发过程中,为了深入理解ZooKeeper的工作原理,我们通常会对其进行源码分析。 当我们将ZooKeeper的源码导入Eclipse这样的集成开发环境时,会遇到一些依赖问题。Eclipse本身并不具备完整的构建工具链,所以我们...
ZooKeeper源码分析 优秀时间学习了一下ZooKeeper:分布式过程协调这本书的内容,对ZooKeeper实现的细节很好奇,所以顺便把ZooKeeper源码看了一遍。看完之后想写点内容做个笔记,确实发现不好开始。由于ZooKeeper一个...
《Dubbo与Zookeeper深度解析:Java源码实践》 在现代分布式系统中,服务治理是不可或缺的一部分。Dubbo和Zookeeper作为两个重要的组件,...通过分析源码,你可以掌握服务治理的核心思想,提升解决分布式问题的能力。
总结,本文详细介绍了如何在Zookeeper 3.4.14版本中实现IP黑白名单功能,从需求分析到源码改造,再到功能测试,覆盖了整个开发流程,旨在帮助读者理解和实践Zookeeper的安全管理。通过这样的定制化改造,我们可以更...
在本课程中,我们将深入探讨Zookeeper的ZAB协议实现,并通过源码分析来理解其启动流程、快照与事务日志的存储结构。Zookeeper是一款分布式协调服务,广泛应用于分布式系统中,如Hadoop、HBase等。本节主要关注...
在本文中,我们将深入探讨Zookeeper中的Watcher原理,这是一个关键特性,它允许客户端对Zookeeper中的数据变化进行实时监控。Watcher机制是Zookeeper提供的一种事件通知模型,它使得客户端能够及时响应Zookeeper中...
源码分析: 1. **项目结构**:ZooKeeper的源码主要分为`src/java/main`和`src/java/test`两个部分,前者是生产代码,后者是测试代码。`src/java/main`下又包含了`org.apache.zookeeper`这个主要的包,里面包含了...
学习和分析ZooKeeper源码有助于开发者更好地理解分布式协调服务的实现,从而在实际项目中更有效地使用和优化ZooKeeper。例如,通过研究源码,你可以了解如何自定义Watcher,或者如何在高并发环境下优化ZooKeeper的...