前面一篇文章提到zookeeper server端主动发现session超时并清理session信息,关闭连接,接下来看看client端如何试图恢复session的。关于client端代码分析见前文http://iwinit.iteye.com/blog/1754611 。
由于session被清理,此时server端已经没有session信息了。而由于连接被关闭,client会抛出异常
if (sockKey.isReadable()) { //返回-1 int rc = sock.read(incomingBuffer); if (rc < 0) { throw new EndOfStreamException( "Unable to read additional data from server sessionid 0x" + Long.toHexString(sessionId) + ", likely server has closed socket"); }
SendThread处理异常
else { // this is ugly, you have a better way speak up if (e instanceof SessionExpiredException) { LOG.info(e.getMessage() + ", closing socket connection"); } else if (e instanceof SessionTimeoutException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } //连接被关闭了 else if (e instanceof EndOfStreamException) { LOG.info(e.getMessage() + RETRY_CONN_MSG); } else if (e instanceof RWServerFoundException) { LOG.info(e.getMessage()); } else { LOG.warn( "Session 0x" + Long.toHexString(getSessionId()) + " for server " + clientCnxnSocket.getRemoteSocketAddress() + ", unexpected error" + RETRY_CONN_MSG, e); } //清理连接,失败当前请求 cleanup(); //此时state还是CONNECTED,发送DISCONNECTED状态通知 if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent( Event.EventType.None, Event.KeeperState.Disconnected, null)); } clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); }
接下来client重新寻找下一个server进行session恢复,此时client的sessionId和password仍然是上一次创建的session信息。
//此处返回true,因为连接已经被置为null了 if (!clientCnxnSocket.isConnected()) { //重连时,先sleep一下 if(!isFirstConnect){ try { Thread.sleep(r.nextInt(1000)); } catch (InterruptedException e) { LOG.warn("Unexpected exception", e); } } // don't re-establish connection if we are closing if (closing || !state.isAlive()) { break; } //重新开始连接 startConnect(); clientCnxnSocket.updateLastSendAndHeard(); }
取下一个server
addr = hostProvider.next(1000);
之后就和新建session时类似,区别是发送ConnectRequest时sessionid和password都是老的
//新建session成功时,seenRwServerBefore已经被置为true long sessId = (seenRwServerBefore) ? sessionId : 0; ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
接下来server端处理
//此时sessionId不为0 long sessionId = connReq.getSessionId(); if (sessionId != 0) { long clientSessionId = connReq.getSessionId(); LOG.info("Client attempting to renew session 0x" + Long.toHexString(clientSessionId) + " at " + cnxn.getRemoteSocketAddress()); //先关闭老的连接,如果有的话,删除watch serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } else { LOG.info("Client attempting to establish new session at " + cnxn.getRemoteSocketAddress()); createSession(cnxn, passwd, sessionTimeout); }
重启session
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd, int sessionTimeout) throws IOException { //检查密码,如果不一样,则结束session,返回client一个为0的sessionid。如果sessionid为0,则为false if (!checkPasswd(sessionId, passwd)) { finishSessionInit(cnxn, false); } //密码正确,再校验下session是否还有效,这里不同的server处理不一样 else { revalidateSession(cnxn, sessionId, sessionTimeout); } }
重试的server如果是leader
@Override protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { //父类中通过sessionTrack检查 super.revalidateSession(cnxn, sessionId, sessionTimeout); try { // setowner as the leader itself, unless updated // via the follower handlers setOwner(sessionId, ServerCnxn.me); } catch (SessionExpiredException e) { // this is ok, it just means that the session revalidation failed. } } }
父类revalidateSession方法
protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK, "Session 0x" + Long.toHexString(sessionId) + " is valid: " + rc); } finishSessionInit(cnxn, rc); }
leader的SessionTracker为SessionTrackerImpl,touchSession方法如下
synchronized public boolean touchSession(long sessionId, int timeout) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.CLIENT_PING_TRACE_MASK, "SessionTrackerImpl --- Touch session: 0x" + Long.toHexString(sessionId) + " with timeout " + timeout); } //因为session超时,session已经被删掉了,此处返回null,所以检查结果是false SessionImpl s = sessionsById.get(sessionId); // Return false, if the session doesn't exists or marked as closing if (s == null || s.isClosing()) { return false; } .....
对于leader来说session检查的结果是false。
如果是follower,其校验方法
protected void revalidateSession(ServerCnxn cnxn, long sessionId, int sessionTimeout) throws IOException { //需要询问leader,session是否还有效 getLearner().validateSession(cnxn, sessionId, sessionTimeout); }
follower询问session是否有效
*/ void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException { LOG.info("Revalidating client: 0x" + Long.toHexString(clientId)); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(baos); dos.writeLong(clientId); dos.writeInt(timeout); dos.close(); //REVALIDATE包用来检查session是否还有效 QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos .toByteArray(), null); pendingRevalidations.put(clientId, cnxn); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "To validate session 0x" + Long.toHexString(clientId)); } writePacket(qp, true); }
leader端处理REVALIDATE包
case Leader.REVALIDATE: bis = new ByteArrayInputStream(qp.getData()); dis = new DataInputStream(bis); long id = dis.readLong(); int to = dis.readInt(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(bos); dos.writeLong(id); //这里由于session已经被删掉,返回false boolean valid = leader.zk.touch(id, to); if (valid) { try { //set the session owner // as the follower that // owns the session leader.zk.setOwner(id, this); } catch (SessionExpiredException e) { LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e); } } if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "Session 0x" + Long.toHexString(id) + " is valid: "+ valid); } //结果是false dos.writeBoolean(valid); qp.setData(bos.toByteArray()); queuedPackets.add(qp); break;
follower处理返回结果
case Leader.REVALIDATE: revalidate(qp); break; protected void revalidate(QuorumPacket qp) throws IOException { ByteArrayInputStream bis = new ByteArrayInputStream(qp .getData()); DataInputStream dis = new DataInputStream(bis); long sessionId = dis.readLong(); boolean valid = dis.readBoolean(); ServerCnxn cnxn = pendingRevalidations .remove(sessionId); if (cnxn == null) { LOG.warn("Missing session 0x" + Long.toHexString(sessionId) + " for validation"); } else { zk.finishSessionInit(cnxn, valid); } if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "Session 0x" + Long.toHexString(sessionId) + " is valid: " + valid); } }
可以看到无论是leader还是follower最后都会调用zk.finishSessionInit(cnxn, valid)处理,而由于session已经失效,所以valid为false
public void finishSessionInit(ServerCnxn cnxn, boolean valid) { ...... try { //由于valid是false,所以返回给client的sessionid为0,password为空 ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout() : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no // longer valid valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]); ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos); bos.writeInt(-1, "len"); rsp.serialize(bos, "connect"); if (!cnxn.isOldClient) { bos.writeBool( this instanceof ReadOnlyZooKeeperServer, "readOnly"); } baos.close(); ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray()); bb.putInt(bb.remaining() - 4).rewind(); cnxn.sendBuffer(bb); ...... }
client端处理ConnectResponse
void readConnectResult() throws IOException { ...... //被server重置为0了 this.sessionId = conRsp.getSessionId(); sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO); }
void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException { negotiatedSessionTimeout = _negotiatedSessionTimeout; //sessionid为0,抛出SessionExpiredException异常 if (negotiatedSessionTimeout <= 0) { //state设为CLOSED,这个行为将关闭SendThread state = States.CLOSED; //Expired状态通知 eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); //这个行为将关闭EventThread eventThread.queueEventOfDeath(); //抛出异常 throw new SessionExpiredException( "Unable to reconnect to ZooKeeper service, session 0x" + Long.toHexString(sessionId) + " has expired"); } ...... }
SendThread处理SessionExpiredException,关闭SendThread
while (state.isAlive()) { ...... // this is ugly, you have a better way speak up if (e instanceof SessionExpiredException) { LOG.info(e.getMessage() + ", closing socket connection"); } ...... //关闭连接,失败所有请求 cleanup(); //此时state已经被置为CLOSED,SendThread将退出 if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent( Event.EventType.None, Event.KeeperState.Disconnected, null)); } clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard(); } //清理 cleanup(); //关闭selector clientCnxnSocket.close(); if (state.isAlive()) { eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null)); } ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "SendThread exitedloop.");
清理动作
private void cleanup() { //关闭socket clientCnxnSocket.cleanup(); //发送完等待响应的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED synchronized (pendingQueue) { for (Packet p : pendingQueue) { conLossPacket(p); } pendingQueue.clear(); } //等待发送的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED synchronized (outgoingQueue) { for (Packet p : outgoingQueue) { conLossPacket(p); } outgoingQueue.clear(); } }
private void conLossPacket(Packet p) { if (p.replyHeader == null) { return; } switch (state) { case AUTH_FAILED: p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue()); break; //关闭的时候,是SESSIONEXPIRED异常码 case CLOSED: p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue()); break; //其他是CONNECTIONLOSS异常码 default: p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue()); } finishPacket(p); }
EventThread关闭
public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); //kill signal if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } if (wasKilled) //等所有通知都发完再退出 synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down"); }
之后client将不可用,所有请求都将发送的时候都将收到SESSIONEXPIRED异常码,因为queuePacket的时候一个判断
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; // Note that we do not generate the Xid for the packet yet. It is // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(), // where the packet is actually sent. synchronized (outgoingQueue) { packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; //此时状态为CLOSED if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
从以上分析可知,SESSIONEXPIRED异常码是比较严重的事件,之后这个zookeeper实例不可用了,如果需要恢复,则需要重新创建zookeeper实例。而CONNECTIONLOSS异常码是比较常见的,比如网络暂时中断的时候,这个状态码下zookeeper会自动重连恢复,因为server端还保留着session信息。
相关推荐
ZooKeeper zookeeper = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, watchedEvent -> { if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) { connectedSemaphore.countDown(); } ...
- **会话(Session)**: Zookeeper客户端与服务器之间的连接称为会话。当客户端与服务器的连接断开时,如果会话超时未重新连接,会话将会失效,所有基于该会话创建的临时节点也会被删除。 - **Watcher**: Watcher是...
5. **clientPort**:客户端连接端口,Zookeeper监听客户端连接的端口,默认为2181。 【Zookeeper实战——分布式安装部署】 Zookeeper集群通常由一个leader和多个follower组成。Leader负责数据的更新和同步,...
1. 会话(Session):在Zookeeper中,客户端与服务器之间的连接称为会话。会话具有一定的超时时间,如果在超时时间内客户端没有发送心跳,服务器将认为会话失效。 2. 节点(ZNode):Zookeeper的数据存储结构是树形...
#### 三、Zookeeper Client API 详解 在实际开发中,大多数情况下我们使用 Java 语言来编写 Zookeeper 客户端程序。此外,Zookeeper 也支持其他语言的绑定,比如 Python 和 C 语言等。接下来我们将详细介绍 Java ...
Zookeeper 提供了 Java 官方客户端,此外还有 Curator、ZkClient、Zookeeper-Client-Common 等第三方客户端库。 25. chubby 是什么,和 zookeeper 比你怎么看?Chubby 是 Google 开发的一个分布式锁服务,与 ...
8. **优化配置**:根据实际的业务需求和硬件资源,可能需要对Zookeeper的配置进行一些优化,比如调整session超时时间、增加日志级别、调整内存分配等。 以上就是搭建分布式Zookeeper的基本过程。在实际应用中,...
它是Zookeeper通信的基础时间单位,同时也是session超时计算的参考值。 2. **initLimit**: - 设定了集群中follower与leader进行初始连接时的最大心跳数。此参数限制了follower在选举新leader时的连接时间。 3. *...
const client = ZooKeeper.createClient('localhost:2181'); client.connect(); client.on('connected', () => { console.log('Connected to the Zookeeper cluster.'); // 创建节点 client.create('/my-node',...
虽然ZooKeeper官方客户端是用Java编写的,但在JavaScript环境中,可以通过Node.js的第三方库如`node-zookeeper-client`来实现与ZooKeeper的交互。该库提供API来创建、读取、更新和删除Znodes,以及设置和触发Watcher...
2. **会话(Session)**:客户端与 Zookeeper 服务器建立的连接,会话具有超时时间,如果超过这个时间没有通信,则会话失效。 3. **Watch 事件**:客户端可以设置监听 Zookeeper 上节点的变更,一旦发生变化,...