`

Zookeeper 源码分析-leader选举

 
阅读更多

选举的算法可以参考:http://blog.csdn.net/xhh198781/article/details/10949697

 

假设配置中有两个server

server.1=localhost:2888:3888

server.2=localhost:2889:3888

 

由前文可以,zookeeper在选举leader之前会先调用下面的代码,首先设置currentVote为myid,即一开始会选举自己为leader。如果electionType = 0,myid=1, Responder线程将监听在2888这个UDP端口上,处理其他节点的请求。

  synchronized public void startLeaderElection() {
        currentVote = new Vote(myid, getLastLoggedZxid());
        for (QuorumServer p : getView().values()) {
            if (p.id == myid) {
                myQuorumAddr = p.addr;
                break;
            }
        }
        if (myQuorumAddr == null) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
        }
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        this.electionAlg = createElectionAlgorithm(electionType);
    }

 

 

LeaderElection

如果electionAlg=0,将使用LeaderElection算法。LeaderElection会调用lookForLeader方法,先对每个peer询问他们选举的leader,然后调用countVotes查看那个节点胜出,并将它设置成currentVote。如果超过半数的人选举这个节点,则选举成功。

  public Vote lookForLeader() throws InterruptedException {
            self.setCurrentVote(new Vote(self.getId(),
                    self.getLastLoggedZxid()));
         
            int xid = epochGen.nextInt();
            while (self.isRunning()) {
                requestBuffer.putInt(xid);
                requestPacket.setLength(4);
                HashSet<Long> heardFrom = new HashSet<Long>();
                for (QuorumServer server : self.getVotingView().values()) {
                    LOG.info("Server address: " + server.addr);
                    requestPacket.setSocketAddress(server.addr);
                    s.send(requestPacket);
                        responsePacket.setLength(responseBytes.length);
                        s.receive(responsePacket);
                        long peerId = responseBuffer.getLong();
                        heardFrom.add(peerId);
                        Vote vote = new Vote(responseBuffer.getLong(),
                                responseBuffer.getLong());
                        InetSocketAddress addr =
                                (InetSocketAddress) responsePacket
                                .getSocketAddress();
                        votes.put(addr, vote);
                }

                ElectionResult result = countVotes(votes, heardFrom);
                // ZOOKEEPER-569:
                // If no votes are received for live peers, reset to voting 
                // for ourselves as otherwise we may hang on to a vote 
                // for a dead peer                 
                if (votes.size() == 0) {                    
                    self.setCurrentVote(new Vote(self.getId(),
                            self.getLastLoggedZxid()));
                } else {
                    if (result.winner.id >= 0) {
                        self.setCurrentVote(result.vote);
                        // To do: this doesn't use a quorum verifier
                        if (result.winningCount > (self.getVotingView().size() / 2)) {
                            self.setCurrentVote(result.winner);
                             self.setPeerState((current.id == self.getId())
                                        ? ServerState.LEADING: ServerState.FOLLOWING);
                                if (self.getPeerState() == ServerState.FOLLOWING) {
                                    Thread.sleep(100);
                                }    								
                        }
					}
                }
			    
    }

 

 protected ElectionResult countVotes(HashMap<InetSocketAddress, Vote> votes, HashSet<Long> heardFrom) {
        ElectionResult result = new ElectionResult();
        // Initialize with null vote
        result.vote = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
        result.winner = new Vote(Long.MIN_VALUE, Long.MIN_VALUE);
        Collection<Vote> votesCast = votes.values();
        // First make the views consistent. Sometimes peers will have
        // different zxids for a server depending on timing.
        for (Iterator<Vote> i = votesCast.iterator(); i.hasNext();) {
            Vote v = i.next();
            if (!heardFrom.contains(v.id)) {
                // Discard votes for machines that we didn't hear from
                i.remove();
                continue;
            }
            for (Vote w : votesCast) {
                if (v.id == w.id) {
                    if (v.zxid < w.zxid) {
                        v.zxid = w.zxid;
                    }
                }
            }
        }

        HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>();
        // Now do the tally,选出zxid最大,且id最大的vote,作为下一轮选举的对象
        for (Vote v : votesCast) {
            Integer count = countTable.get(v);
            if (count == null) {
                count = Integer.valueOf(0);
            }
            countTable.put(v, count + 1);
            if (v.id == result.vote.id) {
                result.count++;
            } else if (v.zxid > result.vote.zxid
                    || (v.zxid == result.vote.zxid && v.id > result.vote.id)) {
                result.vote = v;
                result.count = 1;
            }
        }
        result.winningCount = 0;
        LOG.info("Election tally: ");
        //挑选出被选举最多的节点作为winner
        for (Entry<Vote, Integer> entry : countTable.entrySet()) {
            if (entry.getValue() > result.winningCount) {
                result.winningCount = entry.getValue();
                result.winner = entry.getKey();
            }
            LOG.info(entry.getKey().id + "\t-> " + entry.getValue());
        }
        return result;
    }

 

    FastLeaderElection(另参考http://blog.csdn.net/xhh198781/article/details/6619203

是标准的fast paxos的实现,它首先向所有Server提议自己要成为leader,当其它Server收到提议以后,解决 epoch 和 zxid 的冲突,并接受对方的提议,然后向对方发送接受提议完成的消息。

 

FastLeaderElection算法通过异步的通信方式来收集其它节点的选票,同时在分析选票时又根据投票者的当前状态来作不同的处理,以加快Leader的选举进程。

每个Server都一个接收线程池和一个发送线程池在没有发起选举时,这两个线程池处于阻塞状态,直到有消息到来时才解除阻塞并处理消息,同时每个Serve r都有一个选举线程(可以发起选举的线程担任)

1). 主动发起选举端(选举线程)的处理

首先自己的 logicalclock1,然后生成notification消息,并将消息放入发送队列中, 系统中配置有几个Server就生成几条消息,保证每个Server都能收到此消息,如果当前Server 的状态是LOOKING就一直循环检查接收队列是否有消息,如果有消息,根据消息中对方的状态进行相应的处理。

2).主动发送消息端(发送线程池)的处理

将要发送的消息由Notification消息转换成ToSend消息,然后发送对方,并等待对方的回复。

3). 被动接收消息端(接收线程池)的处理

将收到的消息转换成Notification消息放入接收队列中,如果对方Serverepoch小于logicalclock则向其发送一个消息(让其更新epoch);如果对方Server处于Looking状态,自己则处于FollowingLeading状态,则也发送一个消息(当前Leader已产生,让其尽快收敛)

 /**
     * Starts a new round of leader election. Whenever our QuorumPeer
     * changes its state to LOOKING, this method is invoked, and it
     * sends notifications to all other peers.
     */
    public Vote lookForLeader() throws InterruptedException {
       HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
	    //发送notification给每个节点,告述他们自己当前选举的人,默认一开始选举自己   
		sendNotifications();

		/*
		 * Loop in which we exchange notifications until we find a leader
		 */

		while ((self.getPeerState() == ServerState.LOOKING) &&
				(!stop)){
			//当前节点启动了几个接收线程,用于接收其他节点选举的结果,并将选举的结果存到recvqueue中
			Notification n = recvqueue.poll(notTimeout,
					TimeUnit.MILLISECONDS);

			switch (n.state) {
				case LOOKING:
					// If notification > current, replace and send messages out
					if (n.epoch > logicalclock) {
						logicalclock = n.epoch;
						recvset.clear();
						//Check if a pair (server id, zxid) succeeds our current vote.
						if(totalOrderPredicate(n.leader, n.zxid,
								getInitId(), getInitLastLoggedZxid()))
							updateProposal(n.leader, n.zxid);
						else
							updateProposal(getInitId(),
									getInitLastLoggedZxid());
						sendNotifications();
					} else if (n.epoch < logicalclock) {
						if(LOG.isDebugEnabled()){
							LOG.debug("Notification epoch is smaller than logicalclock. n.epoch = " + n.epoch
									+ ", Logical clock" + logicalclock);
						}
						break;
					//如果n.zxid大于proposedZxid,或者n.zxid等于proposedZxid且leader id大于proposedLeader 的id, 那么选举n.leader
					} else if (totalOrderPredicate(n.leader, n.zxid,
							proposedLeader, proposedZxid)) {
						LOG.info("Updating proposal");
						updateProposal(n.leader, n.zxid);
						sendNotifications();
					}
 
					/*
					 * Only proceed if the vote comes from a replica in the
					 * voting view.
					 */
					if(self.getVotingView().containsKey(n.sid)){
						recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));

						//If have received from all nodes, then terminate
						if ((self.getVotingView().size() == recvset.size()) &&
								(self.getQuorumVerifier().getWeight(proposedLeader) != 0)){
							self.setPeerState((proposedLeader == self.getId()) ?
									ServerState.LEADING: learningState());
							leaveInstance();
							return new Vote(proposedLeader, proposedZxid);
						//如果满足结束的条件,将进入结束等待阶段
						} else if (termPredicate(recvset,
								new Vote(proposedLeader, proposedZxid,
										logicalclock))) {

							// Verify if there is any change in the proposed leader
							while((n = recvqueue.poll(finalizeWait,
									TimeUnit.MILLISECONDS)) != null){
								if(totalOrderPredicate(n.leader, n.zxid,
										proposedLeader, proposedZxid)){
									recvqueue.put(n);
									break;
								}
							}

							/*
							 * This predicate is true once we don't read any new
							 * relevant message from the reception queue
							 */
							if (n == null) {
								self.setPeerState((proposedLeader == self.getId()) ?
										ServerState.LEADING: learningState());
								if(LOG.isDebugEnabled()){
									LOG.debug("About to leave FLE instance: Leader= "
										+ proposedLeader + ", Zxid = " +
										proposedZxid + ", My id = " + self.getId()
										+ ", My state = " + self.getPeerState());
								}

								leaveInstance();
								return new Vote(proposedLeader,
										proposedZxid);
							}
						}
					}				
		}
    }

 

分享到:
评论

相关推荐

    apache-zookeeper-3.5.8-bin.zip

    在"apache-zookeeper-3.5.8-bin.zip"这个压缩包中,包含了Apache ZooKeeper 3.5.8版本的源码、编译后的二进制文件以及相关的配置和脚本。 1. **Zookeeper 的核心概念** - **节点(ZNode)**: ZooKeeper 数据模型由...

    zookeeper 3.6.3 源码下载

    1. **主备选举(Leader Election)**:ZooKeeper 集群由多个服务器组成,通过选举选出一个领导者(Leader),其他服务器作为follower。领导者负责处理所有的写操作,而读操作可以从任何服务器读取。 2. **原子广播...

    Zookeeper源码剖析:深入理解Leader选举机制

    **Zookeeper源码剖析:深入理解Leader选举机制** 在分布式协调服务Zookeeper中,Leader选举是其核心功能之一,确保了服务的高可用性和一致性。本文将深入Zookeeper的源码,探讨Leader选举的实现机制。 **为什么要...

    03-05-07-zookeeper原理之Leader选举源码分析1

    Zookeeper 是一个分布式协调服务,源自 Google 的 Chubby,它主要解决了在分布式环境中如何选举主节点(Master Server)的问题,确保一致性。为了实现这一目标,Zookeeper 采用了基于 Paxos 算法的 ZAB(Zookeeper ...

    zookeeper-release-3.4.14.zip

    5. **Leader选举** Zookeeper的集群需要选举出一个Leader来负责处理客户端的写请求和协调集群中的数据同步。在3.4.14版本中,选举算法已经非常成熟,能够快速、稳定地选出新的Leader,确保服务的连续性。 6. **...

    C# 关于zookeeper主从选举的源码

    5. **源码分析**: - 分析源码时,重点关注类的设计,如`ZookeeperNode`、`ZookeeperLeader`和`ZookeeperFollower`,它们分别代表Zookeeper的节点、领导者和跟随者。 - 查看节点间如何交换信息(如使用TCP套接字...

    zookeeper3.6.0-linux版本

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终将简单易用的接口和性能高效、功能稳定的系统提供给用户。...

    zookeeper-3.4.9的源码

    源码分析: 1. **项目结构**:ZooKeeper的源码主要分为`src/java/main`和`src/java/test`两个部分,前者是生产代码,后者是测试代码。`src/java/main`下又包含了`org.apache.zookeeper`这个主要的包,里面包含了...

    apache-zookeeper-3.7.0.rar

    6. ** Leader选举**:在Zookeeper集群中,有一个角色叫Leader,它负责处理所有的写操作。如果Leader宕机,其他节点会进行新的Leader选举。 7. **Quorum机制**:Zookeeper通过多数派Quorum策略来保证系统的高可用性...

    zookeeper-3.4.5

    2. 分析ZAB协议的实现,特别是领导者选举和事务处理流程。 3. 学习客户端API的使用,了解客户端如何与服务器通信。 4. 深入理解Zookeeper的持久化机制,包括内存数据结构和磁盘数据的交互。 5. 探究Zookeeper的监控...

    zookeeper系列1:入门

    集群中的每个节点都保存整个数据树的副本,通过选举机制确定一个领导者(Leader),其余节点为follower。当客户端进行读写操作时,会由Leader处理并广播到所有Follower,确保数据的一致性。 **3. Zookeeper的主要...

    笔记_zookeeper_源码.zip

    - **选举算法**: 当Leader失效时,`Election`类负责新的Leader选举。基于Fast Leader Election算法,它能够快速确定新的领导者。 - **Watcher的实现**: `ZooKeeper`客户端库中的`WatcherManager`管理所有注册的...

    hadoop_zookeeper-3.4.10.rar linux用

    ZooKeeper采用的是主从(Leader-Follower)架构,由一个或多个服务器节点组成集群。每个节点都可以作为客户端与ZooKeeper进行交互,但只有一个节点作为领导者,负责处理所有的写操作,并通过ZAB(ZooKeeper Atomic ...

    【DT-BigData】Zookeeper-3.4.5.gz

    在DT(Data Technology)时代,大数据处理和分析已经成为企业决策的重要支持,而Zookeeper在其中扮演了关键角色。本文将围绕Zookeeper 3.4.5版本,深入探讨其核心概念、功能特性以及在大数据环境下的应用。 一、...

    zookeeper第四节课原理源码分析资料1

    总的来说,Zookeeper的源码分析涵盖了从服务启动、配置解析、日志清理、集群通信到故障恢复等多个层面,深入了解这些原理对于优化和维护Zookeeper集群至关重要。通过本节课的学习,我们可以更好地理解Zookeeper如何...

    zookeeper-3.3.6.tar.gz

    9. **Leader选举**:在集群中,选举出一个领导者(Leader)来处理所有的写操作,其他服务器(Follower)负责读操作。如果 Leader 故障,会触发新的选举过程。 10. **应用示例**:Zookeeper在分布式环境中常用于命名...

    从Paxos到ZooKeeper 清晰扫描版pdf加源码

    书中将详细介绍ZooKeeper的主要特性和架构,包括ZNode(数据节点)、Watcher(观察器)、ACL(访问控制列表)以及Leader选举机制等,并通过源码分析揭示其实现细节。 源码部分将帮助读者从实际代码层面理解Paxos...

    zookeeper-3.4.6

    通过对源码的深入研究,我们可以理解其内部实现机制,如Watch事件通知、ZNode的生命周期管理、选举算法等。 6. **应用场景** Zookeeper广泛应用于大数据领域,如Hadoop、HBase、Kafka等,也常被用作分布式锁、服务...

    zookeeper 服务监控和管理

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终将简单易用的接口和性能高效、功能稳定的系统提供给用户。...

Global site tag (gtag.js) - Google Analytics