前一篇文章分析了server端主动超时session的情况,接下来看一下client和server网络暂时中断的情况。
1.和server主动关闭连接一样,client抛出EndOfStreamException异常,此时客户端状态还是CONNECTED
2.SendThread处理异常,清理连接,将当前所有请求置为失败,错误码是CONNECTIONLOSS
3.发送Disconnected状态通知
4.选下一个server重连
5.连上之后发送ConnectRequest,sessionid和password是当前session的数据
6.server端处理,分leader和follower,由于此时client端重试比较快,session还没超时,所以leader和follower端session校验成功。如果这个时候session正好超时了,则校验失败,client会抛出sessionExpired异常并退出
7.server端返回成功的ConnectResponse
8.client收到相应,发送SyncConnected状态通知给watcher
9.client发送SetWatches包,重建watch
//可以通过配置禁止重建watch if (!disableAutoWatchReset) { //当前的所有watch List<String> dataWatches = zooKeeper.getDataWatches(); List<String> existWatches = zooKeeper.getExistWatches(); List<String> childWatches = zooKeeper.getChildWatches(); if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()) { //发送重建请求 SetWatches sw = new SetWatches(lastZxid, prependChroot(dataWatches), prependChroot(existWatches), prependChroot(childWatches)); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.setWatches); h.setXid(-8); Packet packet = new Packet(h, new ReplyHeader(), sw, null, null); outgoingQueue.addFirst(packet); } }
10.server端收到setWatches请求,如果是follower,直接进入FinalRequestProcessor处理,无需proposal
case OpCode.setWatches: { lastOp = "SETW"; SetWatches setWatches = new SetWatches(); // XXX We really should NOT need this!!!! request.request.rewind(); ByteBufferInputStream.byteBuffer2Record(request.request, setWatches); long relativeZxid = setWatches.getRelativeZxid(); //添加watch zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), setWatches.getExistWatches(), setWatches.getChildWatches(), cnxn); break; }
//添加watch的时候判断watch是否需要触发 public void setWatches(long relativeZxid, List<String> dataWatches, List<String> existWatches, List<String> childWatches, Watcher watcher) { for (String path : dataWatches) { DataNode node = getNode(path); WatchedEvent e = null; if (node == null) { e = new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path); } else if (node.stat.getCzxid() > relativeZxid) { e = new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, path); } else if (node.stat.getMzxid() > relativeZxid) { e = new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path); } if (e != null) { watcher.process(e); } else { this.dataWatches.addWatch(path, watcher); } } for (String path : existWatches) { DataNode node = getNode(path); WatchedEvent e = null; if (node == null) { // This is the case when the watch was registered } else if (node.stat.getMzxid() > relativeZxid) { e = new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path); } else { e = new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, path); } if (e != null) { watcher.process(e); } else { this.dataWatches.addWatch(path, watcher); } } for (String path : childWatches) { DataNode node = getNode(path); WatchedEvent e = null; if (node == null) { e = new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path); } else if (node.stat.getPzxid() > relativeZxid) { e = new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, path); } if (e != null) { watcher.process(e); } else { this.childWatches.addWatch(path, watcher); } } }
11.如果是leader,则多了一层PrepRequestProcessor的处理,检查session是否还在
再来看看客户端主动超时Session和心跳的情况,SendThread主线程
public void run() { clientCnxnSocket.introduce(this,sessionId); clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); //selector的select超时时间,每次循环都会重新计算 int to; long lastPingRwServer = System.currentTimeMillis(); while (state.isAlive()) { try { ...... //session建立之后,to为读超时减去读空闲时间 if (state.isConnected()) { ...... to = readTimeout - clientCnxnSocket.getIdleRecv(); } else { to = connectTimeout - clientCnxnSocket.getIdleRecv(); } //如果client长时间没收到server的packet,会导致读空闲时间很长,超过读超时,直接抛出异常 if (to <= 0) { throw new SessionTimeoutException( "Client session timed out, have not heard from server in " + clientCnxnSocket.getIdleRecv() + "ms" + " for sessionid 0x" + Long.toHexString(sessionId)); } //session建立之后,发送心跳 if (state.isConnected()) { //如果写频繁,则写空闲时间很少,不用发送心跳 int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend(); //写少,发心跳 if (timeToNextPing <= 0) { sendPing(); //上次发送时间 clientCnxnSocket.updateLastSend(); } //写繁忙,不用发送心跳 else { if (timeToNextPing < to) { to = timeToNextPing; } } } ..... //每次doTransport都会更新now,lastHeard和lastSend则取决于是否有读写请求 clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); } catch (Throwable e) { .... clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); } } } ..... }
心跳包,xid为-2
private void sendPing() { lastPingSentNs = System.nanoTime(); RequestHeader h = new RequestHeader(-2, OpCode.ping); queuePacket(h, null, null, null, null, null, null, null, null); }
server端处理ping包,如果是follower直接进入FinalRequestProcessor处理
case OpCode.ping: { zks.serverStats().updateLatency(request.createTime); lastOp = "PING"; cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp, request.createTime, System.currentTimeMillis()); //心跳包的响应xid也是-2 cnxn.sendResponse(new ReplyHeader(-2, zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response"); return; }
如果是leader,则多了一层PrepRequestProcessor的处理,检查session是否还在
client收到心跳包响应,啥事不做
if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; }
以上可以看出
1.心跳包只有写空闲时才会发送
2.每次transport的时候都会更新当前时间now
3.lastHeard和lastSend取决于是否有读写请求
4.客户端session超时和连接关闭CONNECTIONLOSS处理是一样的,都会导致重试
相关推荐
zktools是针对ZooKeeper开发的一款客户端连接工具,它使得与ZooKeeper交互变得更加便捷和高效。这款工具通常包含了一些实用的功能,如查看ZNode(ZooKeeper中的数据节点)信息,执行数据操作,监控会话状态,以及...
- **会话(Session)**: 客户端与Zookeeper服务器之间的连接,会话期间客户端可以接收服务器的通知和保持状态。 - **watcher事件**: 客户端注册在特定Znode上的监听器,当Znode有变化时,Zookeeper会触发事件通知...
本文将详细探讨“Zookeeper客户端连接工具”,特别是其中的ZooInspector,以及它在源码分析、分布式系统理解和云原生环境中的应用。 一、Zookeeper客户端工具概述 Zookeeper提供了多种客户端工具,包括命令行...
《Zookeeper连接工具ZkTools详解》 Zookeeper作为一个分布式协调服务,在云原生环境中扮演着至关重要的角色。它提供了一种可靠的方式来管理和维护配置信息、命名服务、集群同步、分布式锁等。为了方便开发者与...
记录一次自己碰到的问题, 亲测有效 zookeeper客户端会话频繁超时解决方案 broker无法连接客户端触发自动关闭解决方案 zookeeper客户端会话频繁超时解决方案 broker无法连接客户端触发自动关闭解决方案
如果会话超时或网络断开,客户端需要重新建立连接并恢复已设置的 Watch。 6. **命令行客户端 (zkCli.sh)** ZooKeeper 提供了一个简单的命令行工具 `zkCli.sh`,通过它可以直接与 ZooKeeper 交互,执行基本的操作如...
标题中的“Zookeeper的客户端使用”指的是Zookeeper提供的多种客户端工具,包括命令行客户端(zkCli)和图形化客户端。这些客户端允许用户与Zookeeper服务器交互,创建、删除、更新和查询Zookeeper中的节点数据。...
3. "zookeeper连接":指客户端通过特定协议(如TCP/IP)与Zookeeper服务器建立连接,进行数据读写和状态查询等操作。 **压缩包子文件的文件名称列表:** "prettyZoo-win.msi" 是一个Windows安装程序文件,用于在...
如果网络中断,会话可能会超时,但Zookeeper提供了会话恢复机制。 3. ** watches**:Zookeeper中的watches是一种一次性触发的通知机制。当客户端对某个节点设置watch后,一旦该节点的数据发生变化或被删除,客户端...
Zookeeper服务和客户端工具是Apache ZooKeeper的重要组成部分,它是一个分布式的、开放源码的分布式应用程序协调服务。这个压缩包包含Windows版本的Zookeeper服务以及客户端工具,方便在Windows环境下进行配置、管理...
- **`sessionTimeout`**:指定了客户端与服务器之间的通信超时时间,即客户端在没有向服务器发送心跳包的情况下多久会被认为是断开连接的。这个参数可以通过`minSessionTimeout`和`maxSessionTimeout`两个参数来进行...
ZNodes分为临时节点和永久节点,临时节点在创建它的客户端断开连接后自动删除,而永久节点则一直存在,除非被显式删除。 **3. 功能特性** - **命名服务**:Zookeeper可以为分布式服务提供全局唯一的ID,使得分布式...
ZooInspector是Zookeeper生态系统中的一个实用工具,它作为一个轻量级的客户端,专为管理和监控Zookeeper集群而设计。这款工具支持在Windows和Linux操作系统上运行,由Java语言开发,使得开发者和系统管理员能够便捷...
某大神基于springcloud开发的zookeeper浏览器,使用java即可启动。 下载jar包直接运行 1. java -jar zookeeper-explorer-1.0.2-RELEASE-exec.jar --server.port=8099 --server.context-path=/zookeeper-explorer --...
- **安装和配置**:下载并安装相应的Zookeeper可视化工具,根据工具文档配置连接到Zookeeper集群的参数,如服务器地址、端口等。 - **连接Zookeeper**:启动工具,输入配置好的连接信息,连接到Zookeeper服务器。 - ...
5. **异常处理**: 客户端需要处理会话超时、网络中断等异常情况,以便在重新连接后能恢复锁状态。 **Zookeeper2017客户端库** 文件名为`zookeeper2017`可能指的是2017年版本的Zookeeper客户端库。在实际应用中,...
本文将详细介绍 ZooKeeper 客户端的使用和集群特性,包括客户端简介、客户端连接参数说明、客户端 CRUD、客户端监听、集群架构说明、集群配置及参数说明、选举投票机制、主从复制机制等知识点。 一、客户端 API ...
源码中可能包含`Connect`方法,该方法负责初始化连接,包括设置会话超时时间、处理心跳机制和重连策略。 2. **会话管理**: 客户端与服务器之间的会话是状态感知的,会话超时可能导致重新连接。`ClientTests`可能...