这篇看看ZooKeeper如何管理Session。 Session相关的接口如下:
Session: 表示session的实体类,维护sessionId和timeout两个主要状态
SessionTracker: Session生命周期管理相关的操作
SessionExpier: Session过期的操作
先看看Session接口和它的实现类SessionImpl,维护了5个属性:sessionId, timeout表示超时时间,tickTime表示客户端和服务器的心跳时间,isClosing表示是否关闭,owner表示对应的客户端
- public static interface Session {
- long getSessionId();
- int getTimeout();
- boolean isClosing();
- }
- public static class SessionImpl implements Session {
- SessionImpl(long sessionId, int timeout, long expireTime) {
- this.sessionId = sessionId;
- this.timeout = timeout;
- this.tickTime = expireTime;
- isClosing = false;
- }
- final long sessionId;
- final int timeout;
- long tickTime;
- boolean isClosing;
- Object owner;
- public long getSessionId() { return sessionId; }
- public int getTimeout() { return timeout; }
- public boolean isClosing() { return isClosing; }
- }
SessionTracker的实现类是SessionTrackerImpl,它是一个单独运行的线程,根据tick周期来批量检查处理当前的session。SessionTrackerImpl直接继承了Thread类,它的定义和构造函数如下,几个主要的属性:
expirer是SessionExpirer的实现类
expirationInterval表示过期的周期,可以看到它的值是tickTime,即如果服务器端在tickTime里面没有收到客户端的心跳,就认为该session过期了
sessionsWithTimeout是一个ConcurrentHashMap,维护了一组sessionId和它对应的timeout过期时间
nextExpirationTime表示下次过期时间,线程会在nextExpirationTime时间来批量过期session
nextSessionId是根据sid计算出的下一个新建的sessionId
sessionById这个HashMap保存了sessionId和Session对象的映射
sessionSets这个HashMap保存了一个过期时间和一组保存在SessionSet中的Session的映射,用来批量清理过期的Session
- public interface SessionTracker {
- public static interface Session {
- long getSessionId();
- int getTimeout();
- boolean isClosing();
- }
- public static interface SessionExpirer {
- void expire(Session session);
- long getServerId();
- }
- long createSession(int sessionTimeout);
- void addSession(long id, int to);
- boolean touchSession(long sessionId, int sessionTimeout);
- void setSessionClosing(long sessionId);
- void shutdown();
- void removeSession(long sessionId);
- void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, SessionMovedException;
- void setOwner(long id, Object owner) throws SessionExpiredException;
- void dumpSessions(PrintWriter pwriter);
- }
- public class SessionTrackerImpl extends Thread implements SessionTracker {
- private static final Logger LOG = LoggerFactory.getLogger(SessionTrackerImpl.class);
- HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
- HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();
- ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
- long nextSessionId = 0;
- long nextExpirationTime;
- int expirationInterval;
- public SessionTrackerImpl(SessionExpirer expirer,
- ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
- long sid)
- {
- super("SessionTracker");
- this.expirer = expirer;
- this.expirationInterval = tickTime;
- this.sessionsWithTimeout = sessionsWithTimeout;
- nextExpirationTime = roundToInterval(System.currentTimeMillis());
- this.nextSessionId = initializeNextSession(sid);
- for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
- addSession(e.getKey(), e.getValue());
- }
- }
看一下SessionTrackerImpl这个线程的run方法实现,实现了批量处理过期Session的逻辑
1. 如果下一次过期时间nextExpirationTime大于当前时间,那么当前线程等待nextExpirationTime - currentTime时间
2. 如果到了过期时间,就从sessionSets里面把当前过期时间对应的一组SessionSet取出
3. 批量关闭和过期这组session
4. 把当前过期时间nextExpirationTime 加上 expirationInterval作为下一个过期时间nextExpiration,继续循环
其中expirer.expire(s)这个操作,这里的expirer的实现类是ZooKeeperServer,它的expire方法会给给客户端发送session关闭的请求
- // SessionTrackerImpl
- synchronized public void run() {
- try {
- while (running) {
- currentTime = System.currentTimeMillis();
- if (nextExpirationTime > currentTime) {
- this.wait(nextExpirationTime - currentTime);
- continue;
- }
- SessionSet set;
- set = sessionSets.remove(nextExpirationTime);
- if (set != null) {
- for (SessionImpl s : set.sessions) {
- setSessionClosing(s.sessionId);
- expirer.expire(s);
- }
- }
- nextExpirationTime += expirationInterval;
- }
- } catch (InterruptedException e) {
- LOG.error("Unexpected interruption", e);
- }
- LOG.info("SessionTrackerImpl exited loop!");
- }
- // ZookeeperServer
- public void expire(Session session) {
- long sessionId = session.getSessionId();
- LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
- + ", timeout of " + session.getTimeout() + "ms exceeded");
- close(sessionId);
- }
- private void close(long sessionId) {
- submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
- }
再看一下创建Session的过程
1. createSession方法只需要一个sessionTimeout参数来指定Session的过期时间,会把当前全局的nextSessionId作为sessionId传给addSession方法
2. addSession方法先把sessionId和过期时间的映射添加到sessionsWithTimeout这个Map里面来,如果在sessionById这个Map里面没有找到对应sessionId的session对象,就创建一个Session对象,然后放到sessionById Map里面。最后调用touchSession方法来设置session的过期时间等信息
3. touchSession方法首先判断session状态,如果关闭就返回。计算当前session的过期时间,如果是第一次touch这个session,它的tickTime会被设置成它的过期时间expireTime,然后把它加到对应的sessuibSets里面。如果不是第一次touch,那么它的tickTime会是它当前的过期时间,如果还没过期,就返回。如果过期了,就重新计算一个过期时间,并设置给tickTime,然后从对应的sessionSets里面先移出,再加入到新的sessionSets里面。 touchSession方法主要是为了更新session的过期时间。
- synchronized public long createSession(int sessionTimeout) {
- addSession(nextSessionId, sessionTimeout);
- return nextSessionId++;
- }
- synchronized public void addSession(long id, int sessionTimeout) {
- sessionsWithTimeout.put(id, sessionTimeout);
- if (sessionsById.get(id) == null) {
- SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
- sessionsById.put(id, s);
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
- "SessionTrackerImpl --- Adding session 0x"
- + Long.toHexString(id) + " " + sessionTimeout);
- }
- } else {
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
- "SessionTrackerImpl --- Existing session 0x"
- + Long.toHexString(id) + " " + sessionTimeout);
- }
- }
- touchSession(id, sessionTimeout);
- }
- synchronized public boolean touchSession(long sessionId, int timeout) {
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.CLIENT_PING_TRACE_MASK,
- "SessionTrackerImpl --- Touch session: 0x"
- + Long.toHexString(sessionId) + " with timeout " + timeout);
- }
- SessionImpl s = sessionsById.get(sessionId);
- // Return false, if the session doesn't exists or marked as closing
- if (s == null || s.isClosing()) {
- return false;
- }
- long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
- if (s.tickTime >= expireTime) {
- // Nothing needs to be done
- return true;
- }
- SessionSet set = sessionSets.get(s.tickTime);
- if (set != null) {
- set.sessions.remove(s);
- }
- s.tickTime = expireTime;
- set = sessionSets.get(s.tickTime);
- if (set == null) {
- set = new SessionSet();
- sessionSets.put(expireTime, set);
- }
- set.sessions.add(s);
- return true;
- }
SessionTracker这个接口主要被ZooKeeperServer这个类来使用,ZooKeeperServer表示ZooKeeper的服务器类,负责维护ZooKeeper服务器状态。
在ZooKeeperServer的startup方法中,如果sessionTracker对象为空,就先创建一个SessionTracker对象,然后调用startSessionTracker方法启动SessionTrackerImpl这个线程
- public void startup() {
- if (sessionTracker == null) {
- createSessionTracker();
- }
- startSessionTracker();
- setupRequestProcessors();
- registerJMX();
- synchronized (this) {
- running = true;
- notifyAll();
- }
- }
- protected void createSessionTracker() {
- sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
- tickTime, 1);
- }
- protected void startSessionTracker() {
- ((SessionTrackerImpl)sessionTracker).start();
- }
在ZooKeeperServer的shutdown方法中,调用sessionTracker的shutdown方法来关闭sessionTrackerImpl线程
- public void shutdown() {
- LOG.info("shutting down");
- // new RuntimeException("Calling shutdown").printStackTrace();
- this.running = false;
- // Since sessionTracker and syncThreads poll we just have to
- // set running to false and they will detect it during the poll
- // interval.
- if (sessionTracker != null) {
- sessionTracker.shutdown();
- }
- if (firstProcessor != null) {
- firstProcessor.shutdown();
- }
- if (zkDb != null) {
- zkDb.clear();
- }
- unregisterJMX();
- }
- // SessionTrackerImpl
- public void shutdown() {
- LOG.info("Shutting down");
- running = false;
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
- "Shutdown SessionTrackerImpl!");
- }
- }
ZooKeeperServer的createSession方法给连接ServerCnxn创建一个对应的session,然后给客户端发送一个创建了session的请求
- long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
- long sessionId = sessionTracker.createSession(timeout);
- Random r = new Random(sessionId ^ superSecret);
- r.nextBytes(passwd);
- ByteBuffer to = ByteBuffer.allocate(4);
- to.putInt(timeout);
- cnxn.setSessionId(sessionId);
- submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
- return sessionId;
- }
ZooKeeperServer的reopenSession会给断开了连接后又重新连接的session更新状态,使session继续可用
1. 如果session的密码不对,调用finishSessionInit方法来关闭session,如果密码正确,调用revalidateSession方法
2. revalidateSession方法会调用sessionTracker的touchSession,如果session已经过期,rc = false,如果session未过期,更新session的过期时间信息。最后也调用finishSessionInit方法
3. finishSessionInit方法会给客户端发送响应对象ConnectResponse,如果验证不通过,会关闭连接 cnxn.sendBuffer(ServerCnxnFactory.closeConn)。验证通过,调用cnxn.enableRecv(); 方法来设置连接状态,使服务器端连接注册SelectionKey.OP_READ事件,准备接收客户端请求
- public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
- int sessionTimeout) throws IOException {
- if (!checkPasswd(sessionId, passwd)) {
- finishSessionInit(cnxn, false);
- } else {
- revalidateSession(cnxn, sessionId, sessionTimeout);
- }
- }
- protected void revalidateSession(ServerCnxn cnxn, long sessionId,
- int sessionTimeout) throws IOException {
- boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
- "Session 0x" + Long.toHexString(sessionId) +
- " is valid: " + rc);
- }
- finishSessionInit(cnxn, rc);
- }
- public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
- // register with JMX
- try {
- if (valid) {
- serverCnxnFactory.registerConnection(cnxn);
- }
- } catch (Exception e) {
- LOG.warn("Failed to register with JMX", e);
- }
- try {
- ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
- : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
- // longer valid
- valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
- bos.writeInt(-1, "len");
- rsp.serialize(bos, "connect");
- if (!cnxn.isOldClient) {
- bos.writeBool(
- this instanceof ReadOnlyZooKeeperServer, "readOnly");
- }
- baos.close();
- ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
- bb.putInt(bb.remaining() - 4).rewind();
- cnxn.sendBuffer(bb);
- if (!valid) {
- LOG.info("Invalid session 0x"
- + Long.toHexString(cnxn.getSessionId())
- + " for client "
- + cnxn.getRemoteSocketAddress()
- + ", probably expired");
- cnxn.sendBuffer(ServerCnxnFactory.closeConn);
- } else {
- LOG.info("Established session 0x"
- + Long.toHexString(cnxn.getSessionId())
- + " with negotiated timeout " + cnxn.getSessionTimeout()
- + " for client "
- + cnxn.getRemoteSocketAddress());
- cnxn.enableRecv();
- }
- } catch (Exception e) {
- LOG.warn("Exception while establishing session, closing", e);
- cnxn.close();
- }
- }
- // NIOServerCnxn
- public void enableRecv() {
- synchronized (this.factory) {
- sk.selector().wakeup();
- if (sk.isValid()) {
- int interest = sk.interestOps();
- if ((interest & SelectionKey.OP_READ) == 0) {
- sk.interestOps(interest | SelectionKey.OP_READ);
- }
- }
- }
- }
相关推荐
ZooKeeper的源代码提供了深入了解其实现细节的机会,对于学习分布式系统原理和Java NIO编程有很大帮助。源码中包含了服务器端的处理逻辑、客户端的API实现、ZAB协议的实现等。 ### 六、应用场景 ZooKeeper广泛应用...
在实际项目中,开发者可以通过解压`zookeeper-3.4.5`压缩包,了解ZooKeeper的源代码,学习其实现原理,以便更好地运用到自己的分布式系统设计中。 总之,ZooKeeper 3.4.5 在服务治理和分布式部署中的作用不可忽视,...
10. **集群管理**:Zookeeper可以用来监控和管理分布式系统的节点状态,比如监控服务的上线、下线,或者健康检查。 在实际应用中,Zookeeper常被用于Hadoop、HBase、Kafka等分布式系统中,作为它们的协调器,确保...
Apache ZooKeeper 是一个高度可靠的分布式协调系统,广泛应用于云原生环境中的服务发现...总之,Apache Zookeeper 在分布式系统中扮演着重要角色,理解和掌握其原理与使用方法,对提升系统架构和运维能力具有显著作用。
1. **源代码**:包含了ZooKeeper的全部源代码,允许开发者深入理解其工作原理,进行自定义修改或扩展。 2. **文档**:可能包括用户手册、API参考、开发者指南等,帮助用户和开发者快速熟悉ZooKeeper的使用和开发。 3...
在分布式系统中,ZooKeeper 被广泛用于实现命名服务、配置管理、集群同步、领导者选举等多种功能。 在 ZooKeeper 3.4.14 版本中,这个稳定性的声明意味着它经过了大量的测试和社区验证,适合在生产环境中使用。相比...
在分布式系统中,Zookeeper常被用来解决分布式锁、配置管理、服务发现等问题。 Redis则是一款开源的高性能键值对数据库,它支持多种数据结构如字符串、哈希、列表、集合、有序集合等,而且具备高速读写能力,通常...
这是一款Zookeeper的3.3.6版本的压缩包,通常包含了Zookeeper的源代码、编译脚本、配置文件、文档以及相关的依赖库。解压后,我们可以看到包括bin(二进制可执行文件)、conf(配置文件)、src(源代码)等目录,...
Zookeeper 的设计目标是简化分布式环境下的数据一致性问题,使得在分布式系统中实现高可用性和扩展性变得更加容易。 **Zookeeper 的主要特点** 1. **原子性**:Zookeeper 的所有操作都是原子性的,一次操作要么...
ZooKeeper-3.4.5-cdh5.11.0的tar包是针对这个特定CDH版本的打包文件,通常包含源代码、编译好的二进制文件、配置文件以及相关的文档。 **ZooKeeper核心概念与功能:** 1. **节点(Znode)**:ZooKeeper的数据存储...
ZooKeeper是一个分布式协调服务,它为分布式应用提供一致性服务,如命名服务、配置管理、组服务、分布式锁和分布式队列等。...同时,了解并熟练掌握ZooKeeper的基本概念和操作对于管理和协调分布式系统至关重要。
而“zookeeper-trunk”可能是一个源码仓库,包含Zookeeper的源代码,适合深入理解其内部机制和进行二次开发。 通过深入学习这些资料,你可以掌握如何配置和管理Zookeeper集群,如何使用Zookeeper API进行数据操作,...
结合以上信息,我们可以推测`score.session`可能是一个Python库,它专注于分布式系统中的会话管理,可能集成或与Apache ZooKeeper协同工作,提供在云环境中安全、可靠的会话存储和服务发现。开发者可以利用这个库在...
Zookeeper-3.4.10.zip 文件是 Apache ZooKeeper 的一个重要版本,包含了该版本的源代码、构建文件以及相关的校验文件。 1. **Zookeeper 的核心概念** - **节点(Znode)**:Zookeeper 数据存储的基本单位,类似于...
5. **分布式**:这暗示该扩展可能设计用于处理分布式系统中的会话一致性问题,比如在多个服务器之间同步用户会话或管理Cookie,以确保用户在不同服务器间的体验连续性。 根据压缩包子文件的文件名称“flask-session...
这个版本的压缩包名为`ZooKeeper-3.4.5.tar.gz`,包含了完整的源代码和必要的文档,供开发者研究和部署。 **ZooKeeper的基本概念** 1. **节点(ZNode)**:ZooKeeper的数据存储结构类似于文件系统,由一系列的节点...
ZooKeeper 被广泛应用于大数据、云计算和其他分布式系统中,用于管理命名服务、配置管理、集群同步、领导者选举等任务。 标题 "apache-zookeeper-3.5.8-bin.zip" 指的是 Apache ZooKeeper 的一个特定版本——3.5.8...
Zookeeper在大数据生态系统中扮演着重要角色,它被广泛应用于Hadoop、HBase、Kafka等众多分布式系统中,作为数据共享、服务发现、配置管理等核心功能的基石。 这个"zookeeper-3.5.3-beta.tar.gz"压缩包包含了...
在`pyramid_pluggable_session-0.0.0a2.tar.gz`压缩包中,包含了这个扩展的源代码和其他相关文件。解压后,开发者可以通过Python的setuptools工具进行安装,并在Pyramid项目中导入和配置,以启用会话管理功能。安装...
6. **src** 目录:虽然不在压缩包中,但在源码树里,此目录包含了 ZooKeeper 的源代码,可供开发者查看和调试。 7. **zkpython** 和 **zkperl** 目录:提供了 Python 和 Perl 的客户端接口,使得这些语言可以方便地...