`
iwinit
  • 浏览: 454792 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

[ZooKeeper]Client Session失效

阅读更多

前面一篇文章提到zookeeper server端主动发现session超时并清理session信息,关闭连接,接下来看看client端如何试图恢复session的。关于client端代码分析见前文http://iwinit.iteye.com/blog/1754611 。
由于session被清理,此时server端已经没有session信息了。而由于连接被关闭,client会抛出异常

if (sockKey.isReadable()) {
		//返回-1
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }

 SendThread处理异常

 else {
                        // this is ugly, you have a better way speak up
                        if (e instanceof SessionExpiredException) {
                            LOG.info(e.getMessage() + ", closing socket connection");
                        } else if (e instanceof SessionTimeoutException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } 
			//连接被关闭了			
			else if (e instanceof EndOfStreamException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof RWServerFoundException) {
                            LOG.info(e.getMessage());
                        } else {
                            LOG.warn(
                                    "Session 0x"
                                            + Long.toHexString(getSessionId())
                                            + " for server "
                                            + clientCnxnSocket.getRemoteSocketAddress()
                                            + ", unexpected error"
                                            + RETRY_CONN_MSG, e);
                        }
			//清理连接,失败当前请求
                        cleanup();
			//此时state还是CONNECTED,发送DISCONNECTED状态通知
                        if (state.isAlive()) {
                            eventThread.queueEvent(new WatchedEvent(
                                    Event.EventType.None,
                                    Event.KeeperState.Disconnected,
                                    null));
                        }
                        clientCnxnSocket.updateNow();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

 接下来client重新寻找下一个server进行session恢复,此时client的sessionId和password仍然是上一次创建的session信息。

//此处返回true,因为连接已经被置为null了
			if (!clientCnxnSocket.isConnected()) {
			//重连时,先sleep一下
                        if(!isFirstConnect){
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }
                        // don't re-establish connection if we are closing
                        if (closing || !state.isAlive()) {
                            break;
                        }
			//重新开始连接
                        startConnect();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

 取下一个server

addr = hostProvider.next(1000);

 之后就和新建session时类似,区别是发送ConnectRequest时sessionid和password都是老的

//新建session成功时,seenRwServerBefore已经被置为true
long sessId = (seenRwServerBefore) ? sessionId : 0;
            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                    sessionTimeout, sessId, sessionPasswd);

 接下来server端处理

//此时sessionId不为0
 long sessionId = connReq.getSessionId();
        if (sessionId != 0) {
            long clientSessionId = connReq.getSessionId();
            LOG.info("Client attempting to renew session 0x"
                    + Long.toHexString(clientSessionId)
                    + " at " + cnxn.getRemoteSocketAddress());
		//先关闭老的连接,如果有的话,删除watch
            serverCnxnFactory.closeSession(sessionId);
            cnxn.setSessionId(sessionId);
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        } else {
            LOG.info("Client attempting to establish new session at "
                    + cnxn.getRemoteSocketAddress());
            createSession(cnxn, passwd, sessionTimeout);
        }

 重启session

    public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
            int sessionTimeout) throws IOException {
	//检查密码,如果不一样,则结束session,返回client一个为0的sessionid。如果sessionid为0,则为false
        if (!checkPasswd(sessionId, passwd)) {
            finishSessionInit(cnxn, false);
        } 
	//密码正确,再校验下session是否还有效,这里不同的server处理不一样
	else {
            revalidateSession(cnxn, sessionId, sessionTimeout);
        }
    }

 重试的server如果是leader

    @Override
    protected void revalidateSession(ServerCnxn cnxn, long sessionId,
        int sessionTimeout) throws IOException {
	//父类中通过sessionTrack检查
        super.revalidateSession(cnxn, sessionId, sessionTimeout);
        try {
            // setowner as the leader itself, unless updated
            // via the follower handlers
            setOwner(sessionId, ServerCnxn.me);
        } catch (SessionExpiredException e) {
            // this is ok, it just means that the session revalidation failed.
        }
    }
}

 父类revalidateSession方法

    protected void revalidateSession(ServerCnxn cnxn, long sessionId,
            int sessionTimeout) throws IOException {
        boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
                                     "Session 0x" + Long.toHexString(sessionId) +
                    " is valid: " + rc);
        }
        finishSessionInit(cnxn, rc);
    }

 leader的SessionTracker为SessionTrackerImpl,touchSession方法如下

synchronized public boolean touchSession(long sessionId, int timeout) {
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,
                                     ZooTrace.CLIENT_PING_TRACE_MASK,
                                     "SessionTrackerImpl --- Touch session: 0x"
                    + Long.toHexString(sessionId) + " with timeout " + timeout);
        }
	//因为session超时,session已经被删掉了,此处返回null,所以检查结果是false
        SessionImpl s = sessionsById.get(sessionId);
        // Return false, if the session doesn't exists or marked as closing
        if (s == null || s.isClosing()) {
            return false;
        }
.....

 对于leader来说session检查的结果是false。

如果是follower,其校验方法

protected void revalidateSession(ServerCnxn cnxn, long sessionId,
            int sessionTimeout) throws IOException {
	//需要询问leader,session是否还有效
        getLearner().validateSession(cnxn, sessionId, sessionTimeout);
    }

 follower询问session是否有效

     */
    void validateSession(ServerCnxn cnxn, long clientId, int timeout)
            throws IOException {
        LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        dos.writeLong(clientId);
        dos.writeInt(timeout);
        dos.close();
	//REVALIDATE包用来检查session是否还有效
        QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
                .toByteArray(), null);
        pendingRevalidations.put(clientId, cnxn);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,
                                     ZooTrace.SESSION_TRACE_MASK,
                                     "To validate session 0x"
                                     + Long.toHexString(clientId));
        }
        writePacket(qp, true);
    } 

 leader端处理REVALIDATE包

case Leader.REVALIDATE:
                    bis = new ByteArrayInputStream(qp.getData());
                    dis = new DataInputStream(bis);
                    long id = dis.readLong();
                    int to = dis.readInt();
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    DataOutputStream dos = new DataOutputStream(bos);
                    dos.writeLong(id);
			//这里由于session已经被删掉,返回false
                    boolean valid = leader.zk.touch(id, to);
                    if (valid) {
                        try {
                            //set the session owner
                            // as the follower that
                            // owns the session
                            leader.zk.setOwner(id, this);
                        } catch (SessionExpiredException e) {
                            LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
                        }
                    }
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logTraceMessage(LOG,
                                                 ZooTrace.SESSION_TRACE_MASK,
                                                 "Session 0x" + Long.toHexString(id)
                                                 + " is valid: "+ valid);
                    }
			//结果是false
                    dos.writeBoolean(valid);
                    qp.setData(bos.toByteArray());
                    queuedPackets.add(qp);
                    break;

 follower处理返回结果

case Leader.REVALIDATE:
            revalidate(qp);
            break;
protected void revalidate(QuorumPacket qp) throws IOException {
        ByteArrayInputStream bis = new ByteArrayInputStream(qp
                .getData());
        DataInputStream dis = new DataInputStream(bis);
        long sessionId = dis.readLong();
        boolean valid = dis.readBoolean();
        ServerCnxn cnxn = pendingRevalidations
        .remove(sessionId);
        if (cnxn == null) {
            LOG.warn("Missing session 0x"
                    + Long.toHexString(sessionId)
                    + " for validation");
        } else {
            zk.finishSessionInit(cnxn, valid);
        }
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,
                    ZooTrace.SESSION_TRACE_MASK,
                    "Session 0x" + Long.toHexString(sessionId)
                    + " is valid: " + valid);
        }
    }

 可以看到无论是leader还是follower最后都会调用zk.finishSessionInit(cnxn, valid)处理,而由于session已经失效,所以valid为false

public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
        ......

        try {
		//由于valid是false,所以返回给client的sessionid为0,password为空
            ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
                    : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                            // longer valid
                            valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
            bos.writeInt(-1, "len");
            rsp.serialize(bos, "connect");
            if (!cnxn.isOldClient) {
                bos.writeBool(
                        this instanceof ReadOnlyZooKeeperServer, "readOnly");
            }
            baos.close();
            ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
            bb.putInt(bb.remaining() - 4).rewind();
            cnxn.sendBuffer(bb);    

            ......
    }

 client端处理ConnectResponse

void readConnectResult() throws IOException {
        ......
	//被server重置为0了
        this.sessionId = conRsp.getSessionId();
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
                conRsp.getPasswd(), isRO);
    }

 

void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
		//sessionid为0,抛出SessionExpiredException异常
            if (negotiatedSessionTimeout <= 0) {
		//state设为CLOSED,这个行为将关闭SendThread
                state = States.CLOSED;
		//Expired状态通知
                eventThread.queueEvent(new WatchedEvent(
                        Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.Expired, null));
		//这个行为将关闭EventThread
                eventThread.queueEventOfDeath();
		//抛出异常
                throw new SessionExpiredException(
                        "Unable to reconnect to ZooKeeper service, session 0x"
                                + Long.toHexString(sessionId) + " has expired");
            }
           ......
        }

 SendThread处理SessionExpiredException,关闭SendThread

 while (state.isAlive()) {
			......
			// this is ugly, you have a better way speak up
                        if (e instanceof SessionExpiredException) {
                            LOG.info(e.getMessage() + ", closing socket connection");
                        } 
			......
			//关闭连接,失败所有请求
                        cleanup();
			//此时state已经被置为CLOSED,SendThread将退出
                        if (state.isAlive()) {
                            eventThread.queueEvent(new WatchedEvent(
                                    Event.EventType.None,
                                    Event.KeeperState.Disconnected,
                                    null));
                        }
                        clientCnxnSocket.updateNow();
                        clientCnxnSocket.updateLastSendAndHeard();
		}
		//清理
		cleanup();
		//关闭selector
            	clientCnxnSocket.close();
		    if (state.isAlive()) {
		        eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
		                Event.KeeperState.Disconnected, null));
		    }
		    ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
		                             "SendThread exitedloop.");

 清理动作

private void cleanup() {
		//关闭socket
            clientCnxnSocket.cleanup();
		//发送完等待响应的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED
            synchronized (pendingQueue) {
                for (Packet p : pendingQueue) {
                    conLossPacket(p);
                }
                pendingQueue.clear();
            }
		//等待发送的请求失败,此时由于state是CLOSED,所以异常信息是SESSIONEXPIRED
            synchronized (outgoingQueue) {
                for (Packet p : outgoingQueue) {
                    conLossPacket(p);
                }
                outgoingQueue.clear();
            }
        }

 

private void conLossPacket(Packet p) {
        if (p.replyHeader == null) {
            return;
        }
        switch (state) {
        case AUTH_FAILED:
            p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
            break;
	//关闭的时候,是SESSIONEXPIRED异常码
        case CLOSED:
            p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
            break;
	//其他是CONNECTIONLOSS异常码
        default:
            p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
        }
        finishPacket(p);
    }

 EventThread关闭

public void run() {
           try {
              isRunning = true;
              while (true) {
                 Object event = waitingEvents.take();
		//kill signal
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                    processEvent(event);
                 }
                 if (wasKilled)
			//等所有通知都发完再退出
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           } catch (InterruptedException e) {
              LOG.error("Event thread exiting due to interruption", e);
           }

            LOG.info("EventThread shut down");
        }

 之后client将不可用,所有请求都将发送的时候都将收到SESSIONEXPIRED异常码,因为queuePacket的时候一个判断

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration)
    {
        Packet packet = null;

        // Note that we do not generate the Xid for the packet yet. It is
        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
        // where the packet is actually sent.
        synchronized (outgoingQueue) {
            packet = new Packet(h, r, request, response, watchRegistration);
            packet.cb = cb;
            packet.ctx = ctx;
            packet.clientPath = clientPath;
            packet.serverPath = serverPath;
		//此时状态为CLOSED
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                outgoingQueue.add(packet);
            }
        }
        sendThread.getClientCnxnSocket().wakeupCnxn();
        return packet;
    }

 从以上分析可知,SESSIONEXPIRED异常码是比较严重的事件,之后这个zookeeper实例不可用了,如果需要恢复,则需要重新创建zookeeper实例。而CONNECTIONLOSS异常码是比较常见的,比如网络暂时中断的时候,这个状态码下zookeeper会自动重连恢复,因为server端还保留着session信息。

3
0
分享到:
评论
2 楼 iwinit 2013-04-09  
panggezi 写道

不错不错,学习学习
1 楼 panggezi 2013-04-09  

相关推荐

    zookeeper-server-client 简单例子

    ZooKeeper zookeeper = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, watchedEvent -&gt; { if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) { connectedSemaphore.countDown(); } ...

    zookeeper客户端

    - **会话(Session)**: Zookeeper客户端与服务器之间的连接称为会话。当客户端与服务器的连接断开时,如果会话超时未重新连接,会话将会失效,所有基于该会话创建的临时节点也会被删除。 - **Watcher**: Watcher是...

    Zookeeper.docx

    5. **clientPort**:客户端连接端口,Zookeeper监听客户端连接的端口,默认为2181。 【Zookeeper实战——分布式安装部署】 Zookeeper集群通常由一个leader和多个follower组成。Leader负责数据的更新和同步,...

    zookeeper 3.4.6版本

    1. 会话(Session):在Zookeeper中,客户端与服务器之间的连接称为会话。会话具有一定的超时时间,如果在超时时间内客户端没有发送心跳,服务器将认为会话失效。 2. 节点(ZNode):Zookeeper的数据存储结构是树形...

    zookeeper-api基础.docx

    #### 三、Zookeeper Client API 详解 在实际开发中,大多数情况下我们使用 Java 语言来编写 Zookeeper 客户端程序。此外,Zookeeper 也支持其他语言的绑定,比如 Python 和 C 语言等。接下来我们将详细介绍 Java ...

    ZooKeeper面试题(2020最新版)-重点.pdf

    Zookeeper 提供了 Java 官方客户端,此外还有 Curator、ZkClient、Zookeeper-Client-Common 等第三方客户端库。 25. chubby 是什么,和 zookeeper 比你怎么看?Chubby 是 Google 开发的一个分布式锁服务,与 ...

    搭建分布式Zookeeper

    8. **优化配置**:根据实际的业务需求和硬件资源,可能需要对Zookeeper的配置进行一些优化,比如调整session超时时间、增加日志级别、调整内存分配等。 以上就是搭建分布式Zookeeper的基本过程。在实际应用中,...

    02_尚硅谷大数据之Zookeeper安装1

    它是Zookeeper通信的基础时间单位,同时也是session超时计算的参考值。 2. **initLimit**: - 设定了集群中follower与leader进行初始连接时的最大心跳数。此参数限制了follower在选举新leader时的连接时间。 3. *...

    zookeepr

    const client = ZooKeeper.createClient('localhost:2181'); client.connect(); client.on('connected', () =&gt; { console.log('Connected to the Zookeeper cluster.'); // 创建节点 client.create('/my-node',...

    zoo-keeper

    虽然ZooKeeper官方客户端是用Java编写的,但在JavaScript环境中,可以通过Node.js的第三方库如`node-zookeeper-client`来实现与ZooKeeper的交互。该库提供API来创建、读取、更新和删除Znodes,以及设置和触发Watcher...

    dubbo-demo:dubbo-demo

    2. **会话(Session)**:客户端与 Zookeeper 服务器建立的连接,会话具有超时时间,如果超过这个时间没有通信,则会话失效。 3. **Watch 事件**:客户端可以设置监听 Zookeeper 上节点的变更,一旦发生变化,...

Global site tag (gtag.js) - Google Analytics