前面几篇zookeeper的文章简单分析了执行流程,接下来打算从横向来分析一下zk的一些特性,先从session开始。
这一篇http://iwinit.iteye.com/blog/1754611分析了单机情况下session建立,在集群环境下建立session不太一样,是一个proposal的过程,先假设集群由leader,followerA,followerB组成,我们的client去连followerA。follower和leader初始化之后,初始化的sessionTracker不一样,leader中是SessionTrackerImpl,follower中是LearnerSessionTracker,主要区别和类同点:
1.follower中不会启动超时检查线程,只是简单得记录了session信息,主要数据结构是
//session更新信息 HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>(); //session超时信息 private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
而leader会启动超时线程,而且数据结构也多一些
//session实体 HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>(); //同一个超时时间点的session,给超时线程用 HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>(); //session超时信息 ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
2.addSession时,leader会创建session实体,follower只是简单的记录了一下session信息
3.sessionId初始化算法一样
//1字节server_id+当前时间的后5个字节+2字节0,保证全局唯一 public static long initializeNextSession(long id) { long nextSid = 0; nextSid = (System.currentTimeMillis() << 24) >> 8; nextSid = nextSid | (id <<56); return nextSid; }
连下来client开始创建session
1.客户端发送ConnectionRequest给followerA
2.followerA处理
session超时时间协商
//处理session时间,minSessionTimeout为ticktime2倍,maxSessionTimeout为ticktime20倍 int sessionTimeout = connReq.getTimeOut(); byte passwd[] = connReq.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; } int maxSessionTimeout = getMaxSessionTimeout(); if (sessionTimeout > maxSessionTimeout) { sessionTimeout = maxSessionTimeout; } cnxn.setSessionTimeout(sessionTimeout); // We don't want to receive any packets until we are sure that the // session is setup cnxn.disableRecv(); //客户端发送的sessionid,重试时不为0 long sessionId = connReq.getSessionId(); //客户端重试,则reopen,后文分析 if (sessionId != 0) { long clientSessionId = connReq.getSessionId(); LOG.info("Client attempting to renew session 0x" + Long.toHexString(clientSessionId) + " at " + cnxn.getRemoteSocketAddress()); serverCnxnFactory.closeSession(sessionId); cnxn.setSessionId(sessionId); reopenSession(cnxn, sessionId, passwd, sessionTimeout); } //创建session else { LOG.info("Client attempting to establish new session at " + cnxn.getRemoteSocketAddress()); createSession(cnxn, passwd, sessionTimeout); }
sessionid和密码初始化
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { //sessionid递增 long sessionId = sessionTracker.createSession(timeout); //随机密码 Random r = new Random(sessionId ^ superSecret); r.nextBytes(passwd); //4个字节的超时时间 ByteBuffer to = ByteBuffer.allocate(4); to.putInt(timeout); cnxn.setSessionId(sessionId); //异步提交执行链 submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null); return sessionId; }
提交请求
private void submitRequest(ServerCnxn cnxn, long sessionId, int type, int xid, ByteBuffer bb, List<Id> authInfo) { //初始化时,xid为0,bb为4个字节的session超时时间 Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo); submitRequest(si); }
根据之前http://iwinit.iteye.com/blog/1777109分析的Processor链图,FollowerRequestProcessor执行。和create请求一样,createSession是事务请求需要投票,FollowerA发送投票packet给leader。
3.leader处理,learnerHandler中收到request请求,提交leader的processor链
PrepRequestProcessor处理
//头信息 request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), type); .... case OpCode.createSession: request.request.rewind(); int to = request.request.getInt(); //txn信息,就是一个session超时 request.txn = new CreateSessionTxn(to); request.request.rewind(); //leader中创建session zks.sessionTracker.addSession(request.sessionId, to); //owner属于followerA zks.setOwner(request.sessionId, request.getOwner()); break;
之后ProposalRequestProcessor发起投票,并写入log
4.followerA和followerB处理投票
public void logRequest(TxnHeader hdr, Record txn) { Request request = new Request(null, hdr.getClientId(), hdr.getCxid(), hdr.getType(), null, null); request.hdr = hdr; request.txn = txn; request.zxid = hdr.getZxid(); //添加到pending队列 if ((request.zxid & 0xffffffffL) != 0) { pendingTxns.add(request); } /写入log,并发送ack包给leader syncProcessor.processRequest(request); }
5.leader收到投票,发起commit,然后自己commit
6.leader的FinalRequestProcessor处理,添加session,不需要返回client数据
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) { ProcessTxnResult rc; int opCode = hdr.getType(); long sessionId = hdr.getClientId(); //createSession不需要修改db状态,啥都不做 rc = getZKDatabase().processTxn(hdr, txn); //又添加了一次session if (opCode == OpCode.createSession) { if (txn instanceof CreateSessionTxn) { CreateSessionTxn cst = (CreateSessionTxn) txn; sessionTracker.addSession(sessionId, cst .getTimeOut()); } else { LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString()); } } else if (opCode == OpCode.closeSession) { sessionTracker.removeSession(sessionId); } return rc; }
7.followerA和followerB处理commit,区别在于CommitRequestProcessor中,followerA中的Request会带上connection信息而followerB中的reqesut没有connection信息。所以在FinalRequestProcessor中,followerB创立完session就返回了,而followerA还需要写回client响应
最后看下leader的sessionTracker超时机制,构造SessionTrackerImpl
public SessionTrackerImpl(SessionExpirer expirer, ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime, long sid) { super("SessionTracker"); this.expirer = expirer; this.expirationInterval = tickTime; this.sessionsWithTimeout = sessionsWithTimeout; //下一个检查点,向上取整为expirationInterval倍数,expirationInterval就是配置的ticktime nextExpirationTime = roundToInterval(System.currentTimeMillis()); this.nextSessionId = initializeNextSession(sid); for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) { addSession(e.getKey(), e.getValue()); } }
主线程循环
synchronized public void run() { try { while (running) { //下一个超时点没到,就等待 currentTime = System.currentTimeMillis(); if (nextExpirationTime > currentTime) { this.wait(nextExpirationTime - currentTime); continue; } //同一个时间点超时的session SessionSet set; set = sessionSets.remove(nextExpirationTime); //超时session处理 if (set != null) { for (SessionImpl s : set.sessions) { setSessionClosing(s.sessionId); expirer.expire(s); } } //下一个check point nextExpirationTime += expirationInterval; } } catch (InterruptedException e) { LOG.error("Unexpected interruption", e); } LOG.info("SessionTrackerImpl exited loop!"); }
session更新时
synchronized public boolean touchSession(long sessionId, int timeout) { ...... //session对象 SessionImpl s = sessionsById.get(sessionId); // Return false, if the session doesn't exists or marked as closing if (s == null || s.isClosing()) { return false; } //这个session的下一个超时点,向上取整为ticktime倍数 long expireTime = roundToInterval(System.currentTimeMillis() + timeout); //时间点比老的时间还小,不更新 if (s.tickTime >= expireTime) { // Nothing needs to be done return true; } //先从老的超时set中remove掉,再添加到新的set中,超时线程会定时check SessionSet set = sessionSets.get(s.tickTime); if (set != null) { set.sessions.remove(s); } //下个超时点 s.tickTime = expireTime; //添加到新的超时set中 set = sessionSets.get(s.tickTime); if (set == null) { set = new SessionSet(); sessionSets.put(expireTime, set); } set.sessions.add(s); return true; }
简单小节
1.session创建需要投票处理
2.结果是每台server上的内存中都会建立相同的session记录
3.sessionid通过serverid+时间保证唯一
4.session超时检查由leader负责,以ticktime定时检查
5.session更新时,会修改自己这个session所属的超时set,超时时间是ticktime倍数
相关推荐
本示例将探讨如何利用Zookeeper实现分布式session。 1. **Zookeeper的基本概念** - Zookeeper是一个分布式服务框架,主要用于解决分布式应用中的数据一致性问题。 - 它提供了一种树形的数据结构,节点称为Znode,...
本文将深入探讨如何利用ZooKeeper实现分布式Session,并通过分析提供的"基于ZooKeeper的分布式Session实现.doc"文档,解析其实现原理与步骤。 首先,理解ZooKeeper的基本概念至关重要。ZooKeeper是一个高可用、高...
首先,ZooKeeper的核心概念包括节点(Znode)、会话(Session)和Watcher。Znode是数据存储的基本单元,它们类似于文件系统的文件和目录,有生命周期管理,并支持版本控制和ACL(访问控制列表)。会话是客户端与...
使用ZooKeeper实现分布式Session,可以利用其提供的API来创建和管理Session。当用户登录后,其Session信息会被存储在ZooKeeper的一个特定znode下,并在各个服务节点之间共享。通过设置Watcher,服务节点可以实时监听...
### 基于ZooKeeper的分布式Session实现详解 #### 1. 认识ZooKeeper ZooKeeper,形象地被称为“动物园管理员”,在分布式系统中扮演着至关重要的角色。随着企业级应用系统的不断扩展,传统的单体架构难以应对日益...
3. 性能优化:合理设置ZooKeeper的配置参数,如session超时时间、心跳间隔等,以适应不同应用场景的需求。 总的来说,《ZooKeeper:分布式过程协同技术详解》这本书详细阐述了如何利用ZooKeeper来解决分布式系统中...
4. **异常处理**:Zookeeper提供了Session超时机制,当客户端与服务器失去连接时,其创建的临时节点会被自动删除。因此,如果客户端因网络问题断开连接,锁会自动释放,防止死锁。 在提供的压缩包中,可能包含了...
《ZooKeeper:分布式过程协同技术详解》 ZooKeeper,作为一款开源的分布式协调服务...在实际工作中,开发者可以根据需求选择合适的ZooKeeper操作,如创建、删除ZNode,设置数据监听等,以实现高效且稳定的分布式协同。
3. **会话(Session)**:客户端与ZooKeeper服务器之间的连接被称为会话。会话期间,ZooKeeper保证所有事务操作的顺序性。 4. **Watch机制**:ZooKeeper允许客户端对某个节点设置监听(Watch),当该节点发生变化时...
**ZooKeeper分布式系统协调详解** ZooKeeper是一款开源的分布式协调服务,它是由雅虎创建并贡献给Apache Software Foundation的项目。在分布式系统中,ZooKeeper扮演着至关重要的角色,它提供了一种可靠的方式来...
《从PAXOS到ZOOKEEPER分布式一致性原理与实践》这本书深入探讨了分布式系统中的一个重要议题——一致性。一致性是确保分布式系统中各个节点数据同步的关键,它在高可用、高性能的分布式服务中起着至关重要的作用。...
《从Paxos到Zookeeper分布式一致性原理与实践》是一本深入探讨分布式系统一致性问题的著作,涵盖了从理论基础Paxos算法到实际应用Zookeeper的全面内容。这本书旨在帮助读者理解分布式一致性的重要性,以及如何在实践...
4. **zkLockTest**:这个文件很可能是实现Zookeeper分布式锁的测试代码,可能包含客户端的连接、锁的获取和释放、异常处理等逻辑。通过对这个测试代码的分析和运行,我们可以深入理解Zookeeper分布式锁的工作机制。 ...
在分布式系统中,数据一致性与协调是至关重要的问题,Zookeeper作为一个分布式的、开放源码的协调服务,为分布式应用提供了高效且可靠的集群管理、配置维护、命名服务、分布式同步和组服务等功能。本篇文章将重点...
- **会话(Session)**:客户端与 ZooKeeper 服务器之间建立的连接称为会话。当客户端与服务器的连接断开或会话超时,客户端创建的所有临时节点都会被删除。 - **观察者(Watcher)**:客户端可以在特定的 ZNode 上...
《Zookeeper分布式概念详解》 Zookeeper,一个由Apache Hadoop项目孵化的开源分布式协调服务,是分布式应用程序的重要基础设施。它提供了一种简单易用的接口,用于管理分布式环境中的命名服务、配置管理、集群同步...
秒杀项目是一个典型的高并发、低延迟的电商应用场景,它涉及到多个关键技术,包括服务调用和分布式Session管理。在这个项目中,我们主要关注如何在微服务架构下有效地处理这些问题。 一、服务调用 1. **API ...
- Java语言的分布式系统架构。...- 公共功能:公共功能(基类、数据访问组件、读写分离、分布式session、HTTP客户端、日志服务、队列服务、支付服务组件、redis缓存、Web安全等等)、公共配置、工具类。 适合接口编程。