前面分析知道session超时由leader负责,假设某个session长时间没心跳超时,SessionTrackImpl入口
if (set != null) { for (SessionImpl s : set.sessions) { setSessionClosing(s.sessionId); expirer.expire(s); } }
session失效是一个proposal过程
private void close(long sessionId) { submitRequest(null, sessionId, OpCode.closeSession, 0, null, null); }
leader段执行处理链,PrepRequestProcessor执行,closeSession的request没有txn内容
case OpCode.closeSession: // We don't want to do this check since the session expiration thread // queues up this operation without being the session owner. // this request is the last of the session so it should be ok //zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); //session下的所有临时节点,需要删除,添加到outstandingChanges队列 HashSet<String> es = zks.getZKDatabase() .getEphemerals(request.sessionId); synchronized (zks.outstandingChanges) { for (ChangeRecord c : zks.outstandingChanges) { if (c.stat == null) { // Doing a delete es.remove(c.path); } else if (c.stat.getEphemeralOwner() == request.sessionId) { es.add(c.path); } } for (String path2Delete : es) { addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path2Delete, null, 0, null)); } //设置状态为closing zks.sessionTracker.setSessionClosing(request.sessionId); } LOG.info("Processed session termination for sessionid: 0x" + Long.toHexString(request.sessionId)); break;
之后leader通过ProposalRequestProcessor发起投票,并写入log,follower处理投票,写入log并ack。然后leader commit请求,leader和follower都进入FinalRequestProcessor处理。先是datatree处理
case OpCode.closeSession: killSession(header.getClientId(), header.getZxid()); break;
void killSession(long session, long zxid) { // the list is already removed from the ephemerals // so we do not have to worry about synchronizing on // the list. This is only called from FinalRequestProcessor // so there is no need for synchronization. The list is not // changed here. Only create and delete change the list which // are again called from FinalRequestProcessor in session下所有临时节点删除. //sequence HashSet<String> list = ephemerals.remove(session); if (list != null) { for (String path : list) { try { deleteNode(path, zxid); if (LOG.isDebugEnabled()) { LOG .debug("Deleting ephemeral node " + path + " for session 0x" + Long.toHexString(session)); } } catch (NoNodeException e) { LOG.warn("Ignoring NoNodeException for path " + path + " while removing ephemeral for dead session 0x" + Long.toHexString(session)); } } } }
然后FinalRequestProcessor删除session
else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); }
然后FinalRequestProcessor关闭连接
if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) { ServerCnxnFactory scxn = zks.getServerCnxnFactory(); // this might be possible since // we might just be playing diffs from the leader if (scxn != null && request.cnxn == null) { // calling this if we have the cnxn results in the client's // close session response being lost - we've already closed // the session/socket here before we can send the closeSession // in the switch block below scxn.closeSession(request.sessionId); return; } }
closeSession就是把session从内存数据结构中删除。
遍历连接集合,关闭sessionid对应的那个连接
private void closeSessionWithoutWakeup(long sessionId) { HashSet<NIOServerCnxn> cnxns; synchronized (this.cnxns) { cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone(); } for (NIOServerCnxn cnxn : cnxns) { if (cnxn.getSessionId() == sessionId) { try { cnxn.close(); } catch (Exception e) { LOG.warn("exception during session close", e); } break; } } }
以上就完成了server端session超时的处理
1.leader发起closeSession的Proposal
2.该session对应的server的FinalRequestProcessor中关闭连接
3.server端发起的session超时处理不需要返回client信息,如果是client主动关闭session,则需要返回closeConn的信息,然后再关闭连接
相关推荐
本示例将探讨如何利用Zookeeper实现分布式session。 1. **Zookeeper的基本概念** - Zookeeper是一个分布式服务框架,主要用于解决分布式应用中的数据一致性问题。 - 它提供了一种树形的数据结构,节点称为Znode,...
首先,ZooKeeper的核心概念包括节点(Znode)、会话(Session)和Watcher。Znode是数据存储的基本单元,它们类似于文件系统的文件和目录,有生命周期管理,并支持版本控制和ACL(访问控制列表)。会话是客户端与...
3. 性能优化:合理设置ZooKeeper的配置参数,如session超时时间、心跳间隔等,以适应不同应用场景的需求。 总的来说,《ZooKeeper:分布式过程协同技术详解》这本书详细阐述了如何利用ZooKeeper来解决分布式系统中...
4. **异常处理**:Zookeeper提供了Session超时机制,当客户端与服务器失去连接时,其创建的临时节点会被自动删除。因此,如果客户端因网络问题断开连接,锁会自动释放,防止死锁。 在提供的压缩包中,可能包含了...
**ZooKeeper分布式系统协调详解** ZooKeeper是一款开源的分布式协调服务,它是由雅虎创建并贡献给Apache Software Foundation的项目。在分布式系统中,ZooKeeper扮演着至关重要的角色,它提供了一种可靠的方式来...
4. **zkLockTest**:这个文件很可能是实现Zookeeper分布式锁的测试代码,可能包含客户端的连接、锁的获取和释放、异常处理等逻辑。通过对这个测试代码的分析和运行,我们可以深入理解Zookeeper分布式锁的工作机制。 ...
- **Leader节点的角色**:在ZooKeeper集群中,Session超时的检测主要由Leader节点负责执行。 - **定时任务**:Leader节点内部有一个定时任务,以`tickTime`的频率定期检查所有Session的状态,一旦发现某个Session...
《Zookeeper分布式概念详解》 Zookeeper,一个由Apache Hadoop项目孵化的开源分布式协调服务,是分布式应用程序的重要基础设施。它提供了一种简单易用的接口,用于管理分布式环境中的命名服务、配置管理、集群同步...
- **会话(Session)**:客户端与 ZooKeeper 服务器之间建立的连接称为会话。当客户端与服务器的连接断开或会话超时,客户端创建的所有临时节点都会被删除。 - **观察者(Watcher)**:客户端可以在特定的 ZNode 上...
2. **会话(Session)**:客户端与ZooKeeper服务器建立的连接被称为会话。会话具有超时时间,如果在设定的时间内客户端与服务器间无任何通信,会话将自动结束。会话期间,ZooKeeper保证在会话存活状态下所有的操作都...
Zookeeper的核心概念主要包括节点(Znode)、会话(Session)、观察者模式(Watcher)等。Znode是Zookeeper数据存储的基本单元,具有版本号、ACL权限控制和临时/持久两种类型。会话是客户端与Zookeeper服务器之间的...
Zookeeper的核心概念包括节点(ZNode)、会话(Session)和观察者模式(Watcher)。ZNode是Zookeeper中的基本数据单元,类似于文件系统中的文件或目录,可以存储数据并拥有权限控制。每个ZNode都有一个唯一的路径...
3. **会话(Session)**:客户端与Zookeeper服务器建立的连接称为会话。会话期间,服务器会监控客户端的状态,并在会话超时或客户端断开连接时通知客户端。 4. **Watcher**:Zookeeper的核心特性之一,允许客户端...
7. **性能优化**:Zookeeper的性能优化包括调整数据缓存策略、合理设置session超时时间、优化网络通信等。 8. **故障恢复**:Zookeeper的高可用性体现在自动故障检测和转移上,当Leader故障时,Follower会通过选举...
3. **调整参数**:根据实际负载情况,调整Zookeeper的session超时时间、心跳间隔等参数。 **总结** Zookeeper是分布式环境中的重要组件,其核心价值在于提供了一种可靠的分布式协调机制。理解和掌握Zookeeper的...
如果服务器与客户端的连接中断,但会话仍然有效,只要在会话超时前重新建立连接,Zookeeper可以恢复到中断前的状态。 3. **Watcher机制**: Watcher是Zookeeper的一个强大特性,允许客户端注册监听特定ZNode的变化,...
- **ZooKeeper配置**:合理设置ZooKeeper的session超时、数据同步间隔等参数对性能和稳定性至关重要。 - **容错与恢复**:设计良好的故障恢复策略,确保集群在节点失效时仍能正常工作。 - **安全性**:合理设置...
在分布式系统中,管理和协调服务的状态是至关重要的,这就是 Apache ZooKeeper 的作用所在。ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,它为分布式计算提供了一个高可用、高性能的解决方案。本...
6. **会话(Session)**:客户端与ZooKeeper建立会话,会话期间,ZooKeeper保证对客户端的操作顺序。如果服务器与客户端的连接断开,ZooKeeper会尝试重连,或者在会话超时前选举新的领导者。 7. **配置管理**:...
1. Session超时后需手动重连。 2. Watcher注册一次性,触发后需重新注册。 3. 不支持递归创建节点。 4. 异常处理复杂,异常类型多,开发者难以处理。 5. 只提供byte[]数据交互,未针对对象级别提供序列化。 6. 创建...