`

基于库zkclient 的leader选举代码实现(最粗糙级C)

阅读更多

 

ZooKeeper进行领导者选举是比较容易的。

 

伪代码表示:

zkclient:

<1>判定是否存在/zxeample/leader路径

<2>如果不存在,那么试图创建一个会话znode(Ephemeral Path)(path = /zxeample/leader,data=client id)

 

<2.1>创建成功,标识自己是leader

<2.2>创建不成功(包括异常)转向<1>

<3>如果存在path=/zxeample/leader,标识自己是slave,(可能需要与leader进行通信)

<4>如果自己是slave,那么监控该znode的data change事件。(用于当leader挂了,事件通知模型,就会产生事件触发通知,从而进行新的选举领导者)

 

基于java开源org.I0Itec.zkclient库实现,更简单。kafka也是基于这个实现leader选举的,不过是scala写的。

 

测试方法:

(1)启动ZooKeeper server

(2)启动zkCli

 (3)启动程序,

构建10个线程,每个线程都是一个ZkClient,

(4)然后在zkCli中,使用命令rmr /zxexample/leader

 

总结:尚有2个不如人意之处.创建znode有冲突,因为存在多个client同时创建,单只有一个成功,其余失败(逻辑正确),但是会打印很多异常。第二,线程是用sleep,因此,其实是一直在循环,即轮询,而没有消息驱动的方式。

 

 

package zkexam;

import java.security.SecureRandom;
import java.util.concurrent.Callable;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

/**
 * choose a server as a Leader(Master),while other servers are slaves.
 * 
 * @author Free
 *
 */
public class ServerElect {
	SecureRandom rand = new SecureRandom();

	public ServerElect() {

	}

	public static class Leader {
		ZkClient leader;

		// byte[] data;
		public ZkClient getClient() {
			return leader;
		}

		public void setClient(ZkClient leaderClient) {
			this.leader = leaderClient;
		}
	}

	Leader selectLeader(ZkClient... client) {
		if (client == null || client.length < 0) {
			throw new IllegalArgumentException(
					"no zookeeper client need to be selected as leader.");

		}
		Leader leader = new Leader();
		do {
			int i = rand.nextInt() % (client.length);
			try {
				client[i].createEphemeral("/zxexample/leader", "I am leader "
						+ i);
				leader.setClient(client[i]);
				for (int j = 0; j < client.length && j != i; j++) {

				}
				break;
			} catch (Exception e) {
				e.printStackTrace();
			}
		} while (true);
		return leader;
	}

	public class MyWatcher<T> implements Watcher {
		Callable<T> callback;

		MyWatcher(Callable<T> c) {
			callback = c;
		}

		@Override
		public void process(WatchedEvent event) {
			org.apache.zookeeper.Watcher.Event.EventType eventType = event
					.getType();
			switch (eventType) {
			case NodeDeleted:
				try {
					callback.call();
				} catch (Exception e) {
					e.printStackTrace();
				}
				break;
			default:
				break;
			}
		}

	}

	public static class LeaderChangeListener implements IZkDataListener {
		ZkClient client;

		public LeaderChangeListener(ZkClient client_) {
			client = client_;
		}

		/**
		 * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
		 * 
		 * @throws Exception
		 *             On any error.
		 */
		public void handleDataChange(String dataPath, Object data) {

			System.out.println("a new leader is elected.");
		}

		@Override
		public void handleDataDeleted(String dataPath) throws Exception {
			System.out.println(dataPath + ":data is deleted.");
		}
	}

	public static class zkClientThread extends Thread {
		final static String path = "/zxexample/leader";
		ZkClient client;
		long maxMsToWaitUntilConnected;
		volatile boolean isFirstTime = true;
		volatile boolean isLeader;
		String data;

		// Watcher watcher;
		public zkClientThread(ZkClient client_, String name) {
			super(name);
			client = client_;

		}

		public void start() {
			super.start();
		}

		public void tryLeader() {
			try {
				data = getName();
				if (!client.exists(path)) {
					try {
						client.createEphemeral(path, data);
					} catch (ZkNoNodeException e) {
						String parentDir = path.substring(0,
								path.lastIndexOf('/'));
						if (parentDir.length() != 0) {
							client.createPersistent(parentDir, true);
						}
						client.createEphemeral(path, data);
					}
					isLeader = true;
					System.out.println("I am leader :" + getName());
				}
			} catch (Exception e) {
				e.printStackTrace();
				isFirstTime = true;
				isLeader = false;
			}
		}

		public void run() {
			while (true) {
				if (client.exists(path)) {
					if (isFirstTime) {
						Object obj = client.readData(path);
						if (obj == null || !obj.toString().equals(getName())) {
							tryLeader();
						} else {
							// client.subscribeDataChanges(path,
							// new LeaderChangeListener(client));
							// wait leader ,and communication to leader;
							client.watchForData(path);
						}
						isFirstTime = false;
					}
				} else {
					tryLeader();
				}
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					break;
				}
			}
		}
	}

	public static void main(String args[]) {
		int curClientCount = 10;
		ZkClient[] client = new ZkClient[curClientCount];
		zkClientThread[] zkThreads = new zkClientThread[curClientCount];
		for (int i = 0; i < curClientCount; i++) {

			client[i] = new ZkClient("127.0.0.1:2181", 218100);
			zkThreads[i] = new zkClientThread(client[i], "zk-" + i);
		}
		for (int i = 0; i < zkThreads.length; i++) {
			zkThreads[i].start();
		}
	}
}

 

 

 

I am leader :zk-6
I am leader :zk-5
I am leader :zk-6
org.I0Itec.zkclient.exception.ZkNodeExistsException: org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zxexample/leader
	at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:55)
	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
	at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
	at org.I0Itec.zkclient.ZkClient.createEphemeral(ZkClient.java:328)
	at zkexam.ServerElect$zkClientThread.tryLeader(ServerElect.java:141)
	at zkexam.ServerElect$zkClientThread.run(ServerElect.java:169)
Caused by: org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zxexample/leader
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
	at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
	at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
	at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
	... 4 more
I am leader :zk-3

 

 

分享到:
评论

相关推荐

    面试官:说一说Zookeeper中Leader选举机制.doc

    Zookeeper 中 Leader 选举机制 在分布式系统中,Leader 选举机制是非常重要的一环,Zookeeper 作为一个广泛应用于分布式系统的协调服务,自然也有自己的 Leader 选举机制。下面,我们将通过一个有趣的面试对话来...

    fast paxos算法与zookeeper leader选举源代码分析.doc

    fast paxos算法与zookeeper leader选举源代码分析.doc

    Zookeeper源码剖析:深入理解Leader选举机制

    本文将深入Zookeeper的源码,探讨Leader选举的实现机制。 **为什么要阅读Zookeeper源码?** 1. **提升技术功底**:通过阅读源码,可以学习到优秀的设计思想,了解解决复杂问题的策略,以及常见的设计模式,从而...

    在分布式环境中Leader选举互斥锁和读写锁该如何实现

    ### 分布式环境中Leader选举、互斥锁与读写锁的实现 #### 一、Leader选举 在分布式系统中,Leader选举是一种常见的机制,用于在多个节点之间选择一个领导者(Leader)。领导者通常负责协调和管理其他节点的工作。...

    zookeeperMaster选举以及数据同步代码

    在分布式系统中,Zookeeper是一个至关重要的组件,它主要用于实现分布式服务管理,提供诸如配置管理、命名服务、集群同步、 leader选举等核心功能。在这个"zookeeperMaster选举以及数据同步代码"项目中,我们将深入...

    完整jar包资源,COULD NOT FIND zkclient,包缺失使用

    ZooKeeper是由Apache基金会开发的一个分布式协调服务,它为分布式应用提供统一的命名服务、配置管理、集群同步、 leader选举等服务。ZooKeeper基于简单的ZNode数据模型,每个ZNode存储有限的数据,并可以有子ZNode,...

    Raft - 基于共识的分布式数据库协同算法及其在 Neo4j 集群中的实现.pdf

    Raft是一种基于共识的分布式数据库协同算法,用于解决分布式系统中的Leader选举、日志复制、故障恢复等问题。Raft算法在Neo4j集群中的实现,使得Neo4j能够提供高可用性和高性能的分布式数据库服务。 Raft算法的核心...

    leader-selector-demo.rar

    本文将基于"leader-selector-demo.rar"提供的示例代码,深入探讨如何利用Zookeeper进行 Leader 选举,并解释相关知识点。 首先,Zookeeper 是一个分布式的、开放源码的协调服务,用于分布式应用的数据存储、命名...

    基于Wavelet_leader_省略_映射算法的回转支承自适应特征提取

    "基于Wavelet_leader_省略_映射算法的回转支承自适应特征提取" 本文介绍了一种基于Wavelet_leader方法和优化的等距映射算法(HGWO-ISOMAP)的回转支承自适应特征提取方法。该方法旨在解决回转支承振动信号微弱,...

    sobhuza:Stable Leader选举算法的实现

    【标题】:“sobhuza:Stable Leader选举算法的实现” 在分布式系统中,领导者选举是一个关键的组件,用于确保一致性并协调集群中的节点操作。sobhuza项目是基于Erlang语言实现的一个稳定领导者选举算法。Erlang是一...

    工资管理系统源代码----C语言

    工资管理系统源代码是用于自动化处理员工薪资计算、记录和管理的程序。在这个系统中,使用了C语言...通过这个实例,学习者可以深入了解如何使用C语言实现简单的数据库操作,以及如何设计和实现一个基本的业务逻辑流程。

    cpp-Raft核心算法的一个轻量级C实现可作为一个复制库

    NuRaft 库是这个轻量级 C++ 实现的核心,它包含了以下关键组件: 1. **服务器状态机(Server State Machine)**:每个节点都有一个状态机,用于处理客户端的请求并存储最终一致的状态。 2. **日志(Log)**:存储了...

    03-05-07-zookeeper原理之Leader选举源码分析1

    为了实现这一目标,Zookeeper 采用了基于 Paxos 算法的 ZAB(Zookeeper Atomic Broadcast)协议。 Paxos 算法是由 Leslie Lamport 提出的一种分布式一致性算法,它能够在不可靠的网络环境中确保多个节点就某个提案...

    redis-leader-by-lock:使用Redis Lock轻松实现集群领导者选举

    Redis-leader-by-lock 使用Redis Lock轻松实现集群领导者选举仅使用Spring-Boot和Redis动机几乎所有使用Spring Boot进行领导力选举的例子都转到Hazelcast(现在已弃用)和Zookeeper(在简单情况下过强)中的Spring ...

    zkClient:实现zookeeper客户端,实现客户端连接,重连,基本数据操作,监听等功能

    zkclient 项目项目介绍:zkclient 是对zookeeper java客户端进行的封装,主要实现了连接、断线重连,watch事件改为listen监听事件,分布式锁等注意: 使用时需要自行编译安装到maven或打成jar使用使用方式:...

    redis-leader:用redis选举leader

    Redis 领导者Redis 支持的领导人选举要求Redis 2.6.12安装 npm install redis-leader例子 var Leader = require ( 'redis-leader' ) ;应用程序接口新领导者(redis,选项) 创建一个新的领导者redis是标识锁的字符串...

    选举leader1

    在Zookeeper中,选举机制是其核心组件之一,确保集群中始终存在一个领导者(Leader),负责处理所有的客户端请求和维护数据一致性。这里我们将深入探讨Zookeeper的FastLeader选举算法。 FastLeader选举算法旨在快速...

    用MATLAB编写的leader_follower算法实现聚类的函数.希望能对大家有帮助....zip

    `leader_follower_matrix.m` 文件很可能是实现 Leader-Follower 算法的核心代码。在MATLAB中,通常会使用矩阵操作来高效地执行这种计算密集型任务。以下是对这个算法的详细解析: 1. **算法概述**: Leader-...

    C# 关于zookeeper主从选举的源码

    本项目是基于C#实现的Zookeeper主从选举的源码,对于理解Zookeeper的工作原理以及如何在C#环境中实现选举逻辑具有很高的参考价值。下面将详细探讨相关知识点。 1. **Zookeeper简介**: - ZooKeeper是一个分布式的...

Global site tag (gtag.js) - Google Analytics