1、zookeeper session是什么
客户端与服务端的任何交换操作都与会话相关,包括:临时节点的生命周期,客户端的请求顺序执行以及Watcher通知机制。
2、客户端操作
2.1、Zookeeper客户端一次创建会话的过程。
2.2、构造会话请求
long sessId = (seenRwServerBefore) ? sessionId : 0; ConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd); outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));
2.3、处理Response
ConnectResponse conRsp = new ConnectResponse(); conRsp.deserialize(bbia, "connect"); this.sessionId = conRsp.getSessionId(); sendThread.onConnected(conRsp.getTimeOut(),this.sessionId,conRsp.getPasswd(), isRO);
2.3.1、更新客户端Timeout等信息
void onConnected(int _negotiatedSessionTimeout, long _sessionId, byte[] _sessionPasswd, boolean isRO) throws IOException { negotiatedSessionTimeout = _negotiatedSessionTimeout; if (negotiatedSessionTimeout <= 0) { state = States.CLOSED; eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null)); eventThread.queueEventOfDeath(); throw new SessionExpiredException( "Unable to reconnect to ZooKeeper service, session 0x" + Long.toHexString(sessionId) + " has expired"); } if (!readOnly && isRO) { LOG.error("Read/write client got connected to read-only server"); } readTimeout = negotiatedSessionTimeout * 2 / 3; connectTimeout = negotiatedSessionTimeout / hostProvider.size(); hostProvider.onConnected(); sessionId = _sessionId; sessionPasswd = _sessionPasswd; state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED; seenRwServerBefore |= !isRO; LOG.info("Session establishment complete on server " + clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x" + Long.toHexString(sessionId) + ", negotiated timeout = " + negotiatedSessionTimeout + (isRO ? " (READ-ONLY mode)" : "")); KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected; eventThread.queueEvent(new WatchedEvent( Watcher.Event.EventType.None, eventState, null)); }
2.4、发送Ping请求
客户端当sessionTimeout/3(readTimeout=sessionTimeout * 2 /3 )之内都没有发送通信请求后,会主动发起一个ping请求,以维持会话状态.
if (state.isConnected()) { int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend(); if (timeToNextPing <= 0) { sendPing(); clientCnxnSocket.updateLastSend(); } else { if (timeToNextPing < to) { to = timeToNextPing; } } }
3、服务端操作
3.1、服务端会话处理流程示意图
3.2、协商sessionTimeout
2Ticktime <= sessionTimeout <= 20Ticktime(tickTime默认等于2s)
int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout);
3.3、判断是否需要创建新会话
long sessionId = connReq.getSessionId(); if (sessionId != 0) { long clientSessionId = connReq.getSessionId(); LOG.info("Client attempting to renew session 0x"+ Long.toHexString(clientSessionId) + " at " + cnxn.getRemoteSocketAddress()); serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } else { LOG.info("Client attempting to establish new session at " + cnxn.getRemoteSocketAddress()); createSession(cnxn, passwd, sessionTimeout); }
3.4、创建sessionId
每次客户端创建新会话时,zookeeper都会为其分配一个全局唯一的sessionID。
public static long initializeNextSession(long id) { long nextSid = 0; nextSid = (System.currentTimeMillis() << 24) >> 8; nextSid = nextSid | (id <<56); return nextSid; } synchronized public long createSession(int sessionTimeout) { addSession(nextSessionId, sessionTimeout); return nextSessionId++; }
3.5、注册会话
SessionTracker中维护两个数据结构sessionsWithTimeout和sessionsById,前者根据sessionID保存了所有会话超时时间,后者则根据sessionID保存了会话实体。
HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>(); ConcurrentHashMap<Long, Integer> sessionsWithTimeout; synchronized public void addSession(long id, int sessionTimeout) { sessionsWithTimeout.put(id, sessionTimeout); if (sessionsById.get(id) == null) { SessionImpl s = new SessionImpl(id, sessionTimeout, 0); sessionsById.put(id, s); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Adding session 0x" + Long.toHexString(id) + " " + sessionTimeout); } } else { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Existing session 0x" + Long.toHexString(id) + " " + sessionTimeout); } } touchSession(id, sessionTimeout); }
3.6、激活会话
3.6.1、分桶策略
将类似的会话放在同一区块中进行管理,以便于不同区块的隔离,相同区块的统一处理。
分配的原则:会话的下一次超时的时间点(ExpirationTime, ExpirationInterval=tickTime 单位:毫秒)。
ExpirationTime = CurrentTime + SessionTimeout
ExpirationTime = (ExpirationTime / ExpirationInterval +1) x ExpirationInterval
以ExpirationTime为key存入Map中,实现了分桶。
3.6.2、激活
客户端会向服务端发送ping请求来保持会话的有效,服务端收到心跳会不断的更新对应的客户端会话。
HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>(); private long roundToInterval(long time) { // We give a one interval grace period return (time / expirationInterval + 1) * expirationInterval; } synchronized public boolean touchSession(long sessionId, int timeout) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.CLIENT_PING_TRACE_MASK, "SessionTrackerImpl --- Touch session: 0x" + Long.toHexString(sessionId) + " with timeout " + timeout); } SessionImpl s = sessionsById.get(sessionId); // Return false, if the session doesn't exists or marked as closing if (s == null || s.isClosing()) { return false; } long expireTime = roundToInterval(System.currentTimeMillis() + timeout); if (s.tickTime >= expireTime) { // Nothing needs to be done return true; } SessionSet set = sessionSets.get(s.tickTime); if (set != null) { set.sessions.remove(s); } s.tickTime = expireTime; set = sessionSets.get(s.tickTime); if (set == null) { set = new SessionSet(); sessionSets.put(expireTime, set); } set.sessions.add(s); return true; }
3.7、会话清理
3.7.1、会话超时检测
分桶策略的数据迁移是被动触发,没有做迁移的sessionId会一直保存在之前的桶中,SessionTracker线程会检测到过期(没有迁移的)的会话。
3.7.2、清理过程
1、标记会话为“已关闭”
2、发起“会话关闭”请求
3、收集要清理的临时节点
4、添加“节点删除”事务
5、删除临时节点
6、移除会话
7、关闭NioServerCnxn
synchronized public void run() { try { while (running) { currentTime = System.currentTimeMillis(); if (nextExpirationTime > currentTime) { this.wait(nextExpirationTime - currentTime); continue; } SessionSet set; set = sessionSets.remove(nextExpirationTime); if (set != null) { for (SessionImpl s : set.sessions) { setSessionClosing(s.sessionId); expirer.expire(s); } } nextExpirationTime += expirationInterval; } } catch (InterruptedException e) { LOG.error("Unexpected interruption", e); } LOG.info("SessionTrackerImpl exited loop!"); } synchronized public void setSessionClosing(long sessionId) { if (LOG.isTraceEnabled()) { LOG.info("Session closing: 0x" + Long.toHexString(sessionId)); } SessionImpl s = sessionsById.get(sessionId); if (s == null) { return; } s.isClosing = true; } public void expire(Session session) { long sessionId = session.getSessionId(); LOG.info("Expiring session 0x" + Long.toHexString(sessionId) + ", timeout of " + session.getTimeout() + "ms exceeded"); close(sessionId); } private void close(long sessionId) { submitRequest(null, sessionId, OpCode.closeSession, 0, null, null); }
PrepRequestProcessor.pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) case OpCode.closeSession: // We don't want to do this check since the session expiration thread // queues up this operation without being the session owner. // this request is the last of the session so it should be ok //zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); HashSet<String> es = zks.getZKDatabase() .getEphemerals(request.sessionId); synchronized (zks.outstandingChanges) { for (ChangeRecord c : zks.outstandingChanges) { if (c.stat == null) { // Doing a delete es.remove(c.path); } else if (c.stat.getEphemeralOwner() == request.sessionId) { es.add(c.path); } } for (String path2Delete : es) { addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path2Delete, null, 0, null)); } zks.sessionTracker.setSessionClosing(request.sessionId); } LOG.info("Processed session termination for sessionid: 0x" + Long.toHexString(request.sessionId)); break;
FinalRequestProcessor.processRequest(Request request) if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) { ServerCnxnFactory scxn = zks.getServerCnxnFactory(); // this might be possible since // we might just be playing diffs from the leader if (scxn != null && request.cnxn == null) { // calling this if we have the cnxn results in the client's // close session response being lost - we've already closed // the session/socket here before we can send the closeSession // in the switch block below scxn.closeSession(request.sessionId); return; } }
相关推荐
- **Session的迁移与同步:** 在ZooKeeper中存储的Session数据可以通过同步机制在不同的服务器之间共享,保证了数据的一致性。 - **高可用性保证:** 通过ZooKeeper集群提供的选举机制,可以确保即使部分节点故障,...
- Zookeeper提供了多种watch机制,可以监控Znode的状态变化,这对于实现分布式session至关重要。 2. **分布式session的需求** - 在传统的单体应用中,session信息存储在服务器的内存中,用户在同一个应用内的不同...
本文将深入探讨如何利用ZooKeeper实现分布式Session,并通过分析提供的"基于ZooKeeper的分布式Session实现.doc"文档,解析其实现原理与步骤。 首先,理解ZooKeeper的基本概念至关重要。ZooKeeper是一个高可用、高...
1.0 zookeeper 是什么? 1.1 zookeeper 提供了什么? 1.2 zookeeper 文件系统 1.3 zookeeper 的四种类型的 znode 1.4 zookeeper 通知机制 1.5 zookeeper 有哪些应用场景?...2.8 zk 的 session 机制
### ZooKeeper会话超时及重连机制详解 #### 一、ZooKeeper会话超时机制 在分布式系统中,ZooKeeper作为一款流行的协调服务框架,为开发者提供了高效且可靠的协调服务。其中,会话管理是ZooKeeper的重要组成部分之...
相比之下,ZooKeeper 提供了一种更强大的机制来处理分布式系统的协调问题,包括会话管理。ZooKeeper 的会话模型具有以下特点: 1. **高可用性**:ZooKeeper 集群自身是高度可用的,由多个节点组成,每个节点都可以...
【基于ZooKeeper的分布式Session实现】 ZooKeeper是一个分布式协调服务,源于Apache Hadoop项目,现已成为一个独立的子项目。它旨在提供高可用性、高性能的协调服务,适用于分布式环境中的命名、配置管理、同步和组...
2. **会话(Session)**:客户端与ZooKeeper服务器建立的连接称为会话,会话期间,客户端可以发送请求并接收响应,会话具有超时机制。 3. **Watcher**:ZooKeeper提供了一种事件监听机制,称为Watcher,允许客户端...
首先,ZooKeeper的核心概念包括节点(Znode)、会话(Session)和Watcher。Znode是数据存储的基本单元,它们类似于文件系统的文件和目录,有生命周期管理,并支持版本控制和ACL(访问控制列表)。会话是客户端与...
3. **会话(Session)**: 用户与Zookeeper服务器之间的连接称为会话,会话期间,Zookeeper服务器会保持与客户端的连接状态,如果会话超时或网络中断,会话结束。 **Zookeeper管理工具的功能** 1. **集群监控**: ...
- **会话(Session)**:客户端与 ZooKeeper 服务器之间的连接称为会话。会话期间,服务器会监控客户端的活动,并在会话超时或客户端断开连接时执行相应的操作。 - **Watcher 事件监听**:Zookeeper 支持 Watcher ...
Zookeeper 的核心概念包括节点(Znode)、会话(Session)和观察者(Watcher)。Znode 是 Zookeeper 数据存储的基本单位,类似于文件系统中的节点,分为临时节点和永久节点两种。会话是客户端与 Zookeeper 服务器...
Session机制用于维护客户端与Zookeeper服务器之间的连接。Session的实现涉及客户端与服务器端的交互,以及Session超时的处理。 7.2.1 Session的实现 Session在Zookeeper中是一种抽象,用于表示客户端与服务器之间的...
2. **会话(Session)**:客户端与Zookeeper服务器之间的连接称为会话,会话期间客户端可以发送请求并接收响应。如果会话因网络中断而断开,Zookeeper提供了会话恢复机制。 3. **Watcher**:Watcher是Zookeeper的一...
2. **会话(Session)**:客户端与Zookeeper服务器建立的连接称为会话,会话期间,服务器会持续监控客户端的状态,如果客户端因为网络故障断开连接,Zookeeper会根据预设的超时时间决定是否关闭该会话。 3. **...
4. **Session**:客户端与Zookeeper服务器之间的连接被称为Session,Session有超时机制,如果客户端长时间无响应,Session会超时并断开。 5. **Quorum**:Zookeeper通过Quorum机制实现高可用性,集群中的大多数...
- **会话(Session)**:客户端与Zookeeper服务器之间的连接称为会话,会话期间,服务器保证对客户端的请求进行有序处理。 - **Watcher**:Zookeeper的一个重要特性是Watcher机制,它允许客户端注册监听事件,当特定...
8. **Watch机制**:Watch是Zookeeper的一种事件监听机制,客户端可以为任何节点设置Watch,当该节点发生变化时,Zookeeper会向设置Watch的客户端发送通知,实现数据变更的实时同步。 9. **安全性**:Zookeeper支持...