`
iwinit
  • 浏览: 455101 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
文章分类
社区版块
存档分类
最新评论

[ZooKeeper]分布式Session创建

阅读更多

前面几篇zookeeper的文章简单分析了执行流程,接下来打算从横向来分析一下zk的一些特性,先从session开始。
这一篇http://iwinit.iteye.com/blog/1754611分析了单机情况下session建立,在集群环境下建立session不太一样,是一个proposal的过程,先假设集群由leader,followerA,followerB组成,我们的client去连followerA。follower和leader初始化之后,初始化的sessionTracker不一样,leader中是SessionTrackerImpl,follower中是LearnerSessionTracker,主要区别和类同点:
1.follower中不会启动超时检查线程,只是简单得记录了session信息,主要数据结构是

 

//session更新信息
HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>();
//session超时信息
private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;

 而leader会启动超时线程,而且数据结构也多一些

//session实体
 HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
//同一个超时时间点的session,给超时线程用
HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();
//session超时信息
 ConcurrentHashMap<Long, Integer> sessionsWithTimeout;

 2.addSession时,leader会创建session实体,follower只是简单的记录了一下session信息

 

3.sessionId初始化算法一样

 

//1字节server_id+当前时间的后5个字节+2字节0,保证全局唯一
 public static long initializeNextSession(long id) {
        long nextSid = 0;
        nextSid = (System.currentTimeMillis() << 24) >> 8;
        nextSid =  nextSid | (id <<56);
        return nextSid;
    }

 连下来client开始创建session

 

1.客户端发送ConnectionRequest给followerA
2.followerA处理

                   session超时时间协商

       

//处理session时间,minSessionTimeout为ticktime2倍,maxSessionTimeout为ticktime20倍
	int sessionTimeout = connReq.getTimeOut();
        byte passwd[] = connReq.getPasswd();
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout);
        // We don't want to receive any packets until we are sure that the
        // session is setup
        cnxn.disableRecv();
	//客户端发送的sessionid,重试时不为0
        long sessionId = connReq.getSessionId();
	//客户端重试,则reopen,后文分析
        if (sessionId != 0) {
            long clientSessionId = connReq.getSessionId();
            LOG.info("Client attempting to renew session 0x"
                    + Long.toHexString(clientSessionId)
                    + " at " + cnxn.getRemoteSocketAddress());
            serverCnxnFactory.closeSession(sessionId);
            cnxn.setSessionId(sessionId);
            reopenSession(cnxn, sessionId, passwd, sessionTimeout);
        } 
	//创建session
	else {
            LOG.info("Client attempting to establish new session at "
                    + cnxn.getRemoteSocketAddress());
            createSession(cnxn, passwd, sessionTimeout);
        }

             sessionid和密码初始化

long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
		//sessionid递增
		long sessionId = sessionTracker.createSession(timeout);
		//随机密码
		Random r = new Random(sessionId ^ superSecret);
		r.nextBytes(passwd);
		//4个字节的超时时间
		ByteBuffer to = ByteBuffer.allocate(4);
		to.putInt(timeout);
		cnxn.setSessionId(sessionId);
		//异步提交执行链
		submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
		return sessionId;
    	}

                   提交请求

   private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
            int xid, ByteBuffer bb, List<Id> authInfo) {
	//初始化时,xid为0,bb为4个字节的session超时时间
        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
        submitRequest(si);
    }

              根据之前http://iwinit.iteye.com/blog/1777109分析的Processor链图,FollowerRequestProcessor执行。和create请求一样,createSession是事务请求需要投票,FollowerA发送投票packet给leader。

3.leader处理,learnerHandler中收到request请求,提交leader的processor链

                    PrepRequestProcessor处理

//头信息
	 request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
                                    zks.getTime(), type);
	....
	case OpCode.createSession:
                request.request.rewind();
                int to = request.request.getInt();
		//txn信息,就是一个session超时
                request.txn = new CreateSessionTxn(to);
                request.request.rewind();
		//leader中创建session
                zks.sessionTracker.addSession(request.sessionId, to);
		//owner属于followerA
                zks.setOwner(request.sessionId, request.getOwner());
                break;

             之后ProposalRequestProcessor发起投票,并写入log

4.followerA和followerB处理投票

public void logRequest(TxnHeader hdr, Record txn) {
        Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
                hdr.getType(), null, null);
        request.hdr = hdr;
        request.txn = txn;
        request.zxid = hdr.getZxid();
	//添加到pending队列
        if ((request.zxid & 0xffffffffL) != 0) {
            pendingTxns.add(request);
        }
	/写入log,并发送ack包给leader
        syncProcessor.processRequest(request);
    }

 5.leader收到投票,发起commit,然后自己commit

6.leader的FinalRequestProcessor处理,添加session,不需要返回client数据

public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
        ProcessTxnResult rc;
        int opCode = hdr.getType();
        long sessionId = hdr.getClientId();
	//createSession不需要修改db状态,啥都不做
        rc = getZKDatabase().processTxn(hdr, txn);
	//又添加了一次session
        if (opCode == OpCode.createSession) {
            if (txn instanceof CreateSessionTxn) {
                CreateSessionTxn cst = (CreateSessionTxn) txn;
                sessionTracker.addSession(sessionId, cst
                        .getTimeOut());
            } else {
                LOG.warn("*****>>>>> Got "
                        + txn.getClass() + " "
                        + txn.toString());
            }
        } else if (opCode == OpCode.closeSession) {
            sessionTracker.removeSession(sessionId);
        }
        return rc;
    }

 7.followerA和followerB处理commit,区别在于CommitRequestProcessor中,followerA中的Request会带上connection信息而followerB中的reqesut没有connection信息。所以在FinalRequestProcessor中,followerB创立完session就返回了,而followerA还需要写回client响应

最后看下leader的sessionTracker超时机制,构造SessionTrackerImpl

 public SessionTrackerImpl(SessionExpirer expirer,
            ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
            long sid)
    {
        super("SessionTracker");
        this.expirer = expirer;
        this.expirationInterval = tickTime;
        this.sessionsWithTimeout = sessionsWithTimeout;
	//下一个检查点,向上取整为expirationInterval倍数,expirationInterval就是配置的ticktime
        nextExpirationTime = roundToInterval(System.currentTimeMillis());
        this.nextSessionId = initializeNextSession(sid);
        for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
            addSession(e.getKey(), e.getValue());
        }
    }

 主线程循环

    synchronized public void run() {
        try {
            while (running) {
		//下一个超时点没到,就等待
                currentTime = System.currentTimeMillis();
                if (nextExpirationTime > currentTime) {
                    this.wait(nextExpirationTime - currentTime);
                    continue;
                }
		//同一个时间点超时的session
                SessionSet set;
                set = sessionSets.remove(nextExpirationTime);
		//超时session处理
                if (set != null) {
                    for (SessionImpl s : set.sessions) {
                        setSessionClosing(s.sessionId);
                        expirer.expire(s);
                    }
                }
		//下一个check point
                nextExpirationTime += expirationInterval;
            }
        } catch (InterruptedException e) {
            LOG.error("Unexpected interruption", e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }

 session更新时

    synchronized public boolean touchSession(long sessionId, int timeout) {
        ......
	//session对象
        SessionImpl s = sessionsById.get(sessionId);
        // Return false, if the session doesn't exists or marked as closing
        if (s == null || s.isClosing()) {
            return false;
        }
	//这个session的下一个超时点,向上取整为ticktime倍数
        long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
	//时间点比老的时间还小,不更新
        if (s.tickTime >= expireTime) {
            // Nothing needs to be done
            return true;
        }
	//先从老的超时set中remove掉,再添加到新的set中,超时线程会定时check
        SessionSet set = sessionSets.get(s.tickTime);
        if (set != null) {
            set.sessions.remove(s);
        }
	//下个超时点
        s.tickTime = expireTime;
	//添加到新的超时set中
        set = sessionSets.get(s.tickTime);
        if (set == null) {
            set = new SessionSet();
            sessionSets.put(expireTime, set);
        }
        set.sessions.add(s);
        return true;
    }

 简单小节
1.session创建需要投票处理
2.结果是每台server上的内存中都会建立相同的session记录
3.sessionid通过serverid+时间保证唯一
4.session超时检查由leader负责,以ticktime定时检查
5.session更新时,会修改自己这个session所属的超时set,超时时间是ticktime倍数

分享到:
评论

相关推荐

    zookeeper实现分布式session sample

    本示例将探讨如何利用Zookeeper实现分布式session。 1. **Zookeeper的基本概念** - Zookeeper是一个分布式服务框架,主要用于解决分布式应用中的数据一致性问题。 - 它提供了一种树形的数据结构,节点称为Znode,...

    基于ZooKeeper的分布式Session实现

    本文将深入探讨如何利用ZooKeeper实现分布式Session,并通过分析提供的"基于ZooKeeper的分布式Session实现.doc"文档,解析其实现原理与步骤。 首先,理解ZooKeeper的基本概念至关重要。ZooKeeper是一个高可用、高...

    ZooKeeper-分布式过程协同技术详解 和从Paxos到Zookeeper

    首先,ZooKeeper的核心概念包括节点(Znode)、会话(Session)和Watcher。Znode是数据存储的基本单元,它们类似于文件系统的文件和目录,有生命周期管理,并支持版本控制和ACL(访问控制列表)。会话是客户端与...

    基于ZooKeeper的分布式Session实现_已发布.docx

    使用ZooKeeper实现分布式Session,可以利用其提供的API来创建和管理Session。当用户登录后,其Session信息会被存储在ZooKeeper的一个特定znode下,并在各个服务节点之间共享。通过设置Watcher,服务节点可以实时监听...

    zookeeper分布session式实现

    ### 基于ZooKeeper的分布式Session实现详解 #### 1. 认识ZooKeeper ZooKeeper,形象地被称为“动物园管理员”,在分布式系统中扮演着至关重要的角色。随着企业级应用系统的不断扩展,传统的单体架构难以应对日益...

    ZooKeeper 分布式

    3. 性能优化:合理设置ZooKeeper的配置参数,如session超时时间、心跳间隔等,以适应不同应用场景的需求。 总的来说,《ZooKeeper:分布式过程协同技术详解》这本书详细阐述了如何利用ZooKeeper来解决分布式系统中...

    基于zookeeper的分布式锁简单实现

    4. **异常处理**:Zookeeper提供了Session超时机制,当客户端与服务器失去连接时,其创建的临时节点会被自动删除。因此,如果客户端因网络问题断开连接,锁会自动释放,防止死锁。 在提供的压缩包中,可能包含了...

    ZooKeeper分布式过程协同技术详

    《ZooKeeper:分布式过程协同技术详解》 ZooKeeper,作为一款开源的分布式协调服务...在实际工作中,开发者可以根据需求选择合适的ZooKeeper操作,如创建、删除ZNode,设置数据监听等,以实现高效且稳定的分布式协同。

    zookeeper分布式调度工具

    3. **会话(Session)**:客户端与ZooKeeper服务器之间的连接被称为会话。会话期间,ZooKeeper保证所有事务操作的顺序性。 4. **Watch机制**:ZooKeeper允许客户端对某个节点设置监听(Watch),当该节点发生变化时...

    ZooKeeper分布式系统协调 v3.6.3.gz

    **ZooKeeper分布式系统协调详解** ZooKeeper是一款开源的分布式协调服务,它是由雅虎创建并贡献给Apache Software Foundation的项目。在分布式系统中,ZooKeeper扮演着至关重要的角色,它提供了一种可靠的方式来...

    从PAXOS到ZOOKEEPER分布式一致性原理与实践

    《从PAXOS到ZOOKEEPER分布式一致性原理与实践》这本书深入探讨了分布式系统中的一个重要议题——一致性。一致性是确保分布式系统中各个节点数据同步的关键,它在高可用、高性能的分布式服务中起着至关重要的作用。...

    从Paxos到Zookeeper分布式一致性原理与实践.rar

    《从Paxos到Zookeeper分布式一致性原理与实践》是一本深入探讨分布式系统一致性问题的著作,涵盖了从理论基础Paxos算法到实际应用Zookeeper的全面内容。这本书旨在帮助读者理解分布式一致性的重要性,以及如何在实践...

    Zookeeper实现分布式锁

    4. **zkLockTest**:这个文件很可能是实现Zookeeper分布式锁的测试代码,可能包含客户端的连接、锁的获取和释放、异常处理等逻辑。通过对这个测试代码的分析和运行,我们可以深入理解Zookeeper分布式锁的工作机制。 ...

    ZookeeperNet实现分布式锁

    在分布式系统中,数据一致性与协调是至关重要的问题,Zookeeper作为一个分布式的、开放源码的协调服务,为分布式应用提供了高效且可靠的集群管理、配置维护、命名服务、分布式同步和组服务等功能。本篇文章将重点...

    zookeeper之分布式环境搭建.docx

    - **会话(Session)**:客户端与 ZooKeeper 服务器之间建立的连接称为会话。当客户端与服务器的连接断开或会话超时,客户端创建的所有临时节点都会被删除。 - **观察者(Watcher)**:客户端可以在特定的 ZNode 上...

    对zookeeper的分布式概念进行讲解

    《Zookeeper分布式概念详解》 Zookeeper,一个由Apache Hadoop项目孵化的开源分布式协调服务,是分布式应用程序的重要基础设施。它提供了一种简单易用的接口,用于管理分布式环境中的命名服务、配置管理、集群同步...

    秒杀项目之服务调用&分布式session对应的完整项目

    秒杀项目是一个典型的高并发、低延迟的电商应用场景,它涉及到多个关键技术,包括服务调用和分布式Session管理。在这个项目中,我们主要关注如何在微服务架构下有效地处理这些问题。 一、服务调用 1. **API ...

    springboot+dubbo分布式架构,提供分布式缓存、分布式锁、分布式Session、读写分离

    - Java语言的分布式系统架构。...- 公共功能:公共功能(基类、数据访问组件、读写分离、分布式session、HTTP客户端、日志服务、队列服务、支付服务组件、redis缓存、Web安全等等)、公共配置、工具类。 适合接口编程。

Global site tag (gtag.js) - Google Analytics