`

基于库zkclient 的leader选举代码实现(最粗糙级C)

阅读更多

 

ZooKeeper进行领导者选举是比较容易的。

 

伪代码表示:

zkclient:

<1>判定是否存在/zxeample/leader路径

<2>如果不存在,那么试图创建一个会话znode(Ephemeral Path)(path = /zxeample/leader,data=client id)

 

<2.1>创建成功,标识自己是leader

<2.2>创建不成功(包括异常)转向<1>

<3>如果存在path=/zxeample/leader,标识自己是slave,(可能需要与leader进行通信)

<4>如果自己是slave,那么监控该znode的data change事件。(用于当leader挂了,事件通知模型,就会产生事件触发通知,从而进行新的选举领导者)

 

基于java开源org.I0Itec.zkclient库实现,更简单。kafka也是基于这个实现leader选举的,不过是scala写的。

 

测试方法:

(1)启动ZooKeeper server

(2)启动zkCli

 (3)启动程序,

构建10个线程,每个线程都是一个ZkClient,

(4)然后在zkCli中,使用命令rmr /zxexample/leader

 

总结:尚有2个不如人意之处.创建znode有冲突,因为存在多个client同时创建,单只有一个成功,其余失败(逻辑正确),但是会打印很多异常。第二,线程是用sleep,因此,其实是一直在循环,即轮询,而没有消息驱动的方式。

 

 

package zkexam;

import java.security.SecureRandom;
import java.util.concurrent.Callable;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

/**
 * choose a server as a Leader(Master),while other servers are slaves.
 * 
 * @author Free
 *
 */
public class ServerElect {
	SecureRandom rand = new SecureRandom();

	public ServerElect() {

	}

	public static class Leader {
		ZkClient leader;

		// byte[] data;
		public ZkClient getClient() {
			return leader;
		}

		public void setClient(ZkClient leaderClient) {
			this.leader = leaderClient;
		}
	}

	Leader selectLeader(ZkClient... client) {
		if (client == null || client.length < 0) {
			throw new IllegalArgumentException(
					"no zookeeper client need to be selected as leader.");

		}
		Leader leader = new Leader();
		do {
			int i = rand.nextInt() % (client.length);
			try {
				client[i].createEphemeral("/zxexample/leader", "I am leader "
						+ i);
				leader.setClient(client[i]);
				for (int j = 0; j < client.length && j != i; j++) {

				}
				break;
			} catch (Exception e) {
				e.printStackTrace();
			}
		} while (true);
		return leader;
	}

	public class MyWatcher<T> implements Watcher {
		Callable<T> callback;

		MyWatcher(Callable<T> c) {
			callback = c;
		}

		@Override
		public void process(WatchedEvent event) {
			org.apache.zookeeper.Watcher.Event.EventType eventType = event
					.getType();
			switch (eventType) {
			case NodeDeleted:
				try {
					callback.call();
				} catch (Exception e) {
					e.printStackTrace();
				}
				break;
			default:
				break;
			}
		}

	}

	public static class LeaderChangeListener implements IZkDataListener {
		ZkClient client;

		public LeaderChangeListener(ZkClient client_) {
			client = client_;
		}

		/**
		 * Called when the leader information stored in zookeeper has changed. Record the new leader in memory
		 * 
		 * @throws Exception
		 *             On any error.
		 */
		public void handleDataChange(String dataPath, Object data) {

			System.out.println("a new leader is elected.");
		}

		@Override
		public void handleDataDeleted(String dataPath) throws Exception {
			System.out.println(dataPath + ":data is deleted.");
		}
	}

	public static class zkClientThread extends Thread {
		final static String path = "/zxexample/leader";
		ZkClient client;
		long maxMsToWaitUntilConnected;
		volatile boolean isFirstTime = true;
		volatile boolean isLeader;
		String data;

		// Watcher watcher;
		public zkClientThread(ZkClient client_, String name) {
			super(name);
			client = client_;

		}

		public void start() {
			super.start();
		}

		public void tryLeader() {
			try {
				data = getName();
				if (!client.exists(path)) {
					try {
						client.createEphemeral(path, data);
					} catch (ZkNoNodeException e) {
						String parentDir = path.substring(0,
								path.lastIndexOf('/'));
						if (parentDir.length() != 0) {
							client.createPersistent(parentDir, true);
						}
						client.createEphemeral(path, data);
					}
					isLeader = true;
					System.out.println("I am leader :" + getName());
				}
			} catch (Exception e) {
				e.printStackTrace();
				isFirstTime = true;
				isLeader = false;
			}
		}

		public void run() {
			while (true) {
				if (client.exists(path)) {
					if (isFirstTime) {
						Object obj = client.readData(path);
						if (obj == null || !obj.toString().equals(getName())) {
							tryLeader();
						} else {
							// client.subscribeDataChanges(path,
							// new LeaderChangeListener(client));
							// wait leader ,and communication to leader;
							client.watchForData(path);
						}
						isFirstTime = false;
					}
				} else {
					tryLeader();
				}
				try {
					Thread.sleep(1000);
				} catch (InterruptedException e) {
					break;
				}
			}
		}
	}

	public static void main(String args[]) {
		int curClientCount = 10;
		ZkClient[] client = new ZkClient[curClientCount];
		zkClientThread[] zkThreads = new zkClientThread[curClientCount];
		for (int i = 0; i < curClientCount; i++) {

			client[i] = new ZkClient("127.0.0.1:2181", 218100);
			zkThreads[i] = new zkClientThread(client[i], "zk-" + i);
		}
		for (int i = 0; i < zkThreads.length; i++) {
			zkThreads[i].start();
		}
	}
}

 

 

 

I am leader :zk-6
I am leader :zk-5
I am leader :zk-6
org.I0Itec.zkclient.exception.ZkNodeExistsException: org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zxexample/leader
	at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:55)
	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
	at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304)
	at org.I0Itec.zkclient.ZkClient.createEphemeral(ZkClient.java:328)
	at zkexam.ServerElect$zkClientThread.tryLeader(ServerElect.java:141)
	at zkexam.ServerElect$zkClientThread.run(ServerElect.java:169)
Caused by: org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zxexample/leader
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783)
	at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87)
	at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308)
	at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304)
	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
	... 4 more
I am leader :zk-3

 

 

分享到:
评论

相关推荐

    YOLOv12:以注意力为中心的实时目标检测器.pdf

    YOLOv12:以注意力为中心的实时目标检测器

    GO语言基础语法指令教程

    GO语言基础语法指令教程

    MATLAB代码实现:分布式电源接入对配电网运行影响深度分析与评估,MATLAB代码分析:分布式电源接入对配电网运行影响评估,MATLAB代码:分布式电源接入对配电网影响分析 关键词:分布式电源 配电

    MATLAB代码实现:分布式电源接入对配电网运行影响深度分析与评估,MATLAB代码分析:分布式电源接入对配电网运行影响评估,MATLAB代码:分布式电源接入对配电网影响分析 关键词:分布式电源 配电网 评估 参考文档:《自写文档,联系我看》参考选址定容模型部分; 仿真平台:MATLAB 主要内容:代码主要做的是分布式电源接入场景下对配电网运行影响的分析,其中,可以自己设置分布式电源接入配电网的位置,接入配电网的有功功率以及无功功率的大小,通过牛顿拉夫逊法求解分布式电源接入后的电网潮流,从而评价分布式电源接入前后的电压、线路潮流等参数是否发生变化,评估配电网的运行方式。 代码非常精品,是研究含分布式电源接入的电网潮流计算的必备程序 ,分布式电源; 配电网; 接入影响分析; 潮流计算; 牛顿拉夫逊法; 电压评估; 必备程序。,基于MATLAB的分布式电源对配电网影响评估系统

    三相光伏并网逆变器:Mppt最大功率跟踪与800V中间母线电压的电力转换技术,三相光伏并网逆变器:实现最大功率跟踪与800V中间母线电压的优化处理,三相光伏并网逆变器 输入光伏Mppt 最大功率跟踪

    三相光伏并网逆变器:Mppt最大功率跟踪与800V中间母线电压的电力转换技术,三相光伏并网逆变器:实现最大功率跟踪与800V中间母线电压的优化处理,三相光伏并网逆变器 输入光伏Mppt 最大功率跟踪中间母线电压800V 后级三相光伏并网逆变器 ,三相光伏并网逆变器; 输入光伏Mppt; 最大功率跟踪; 中间母线电压800V; 后级逆变器,三相光伏并网逆变器:MPPT最大功率跟踪800V母线电压

    基于SSM的车位销售平台设计与实现.zip(毕设&课设&实训&大作业&竞赛&项目)

    项目工程资源经过严格测试运行并且功能上ok,可实现复现复刻,拿到资料包后可实现复现出一样的项目,本人系统开发经验充足(全栈全领域),有任何使用问题欢迎随时与我联系,我会抽时间努力为您解惑,提供帮助 【资源内容】:包含源码+工程文件+说明等。答辩评审平均分达到96分,放心下载使用!可实现复现;设计报告也可借鉴此项目;该资源内项目代码都经过测试运行,功能ok 【项目价值】:可用在相关项目设计中,皆可应用在项目、毕业设计、课程设计、期末/期中/大作业、工程实训、大创等学科竞赛比赛、初期项目立项、学习/练手等方面,可借鉴此优质项目实现复刻,设计报告也可借鉴此项目,也可基于此项目来扩展开发出更多功能 【提供帮助】:有任何使用上的问题欢迎随时与我联系,抽时间努力解答解惑,提供帮助 【附带帮助】:若还需要相关开发工具、学习资料等,我会提供帮助,提供资料,鼓励学习进步 下载后请首先打开说明文件(如有);整理时不同项目所包含资源内容不同;项目工程可实现复现复刻,如果基础还行,也可在此程序基础上进行修改,以实现其它功能。供开源学习/技术交流/学习参考,勿用于商业用途。质量优质,放心下载使用

    西门子博途三部十层电梯程序案例解析:基于Wincc RT Professional V14及更高版本的应用探索,西门子博途三部十层电梯程序案例解析:基于Wincc RT Professional画面与

    西门子博途三部十层电梯程序案例解析:基于Wincc RT Professional V14及更高版本的应用探索,西门子博途三部十层电梯程序案例解析:基于Wincc RT Professional画面与V14及以上版本技术参考,西门子1200博途三部十层电梯程序案例,加Wincc RT Professional画面三部十层电梯程序,版本V14及以上。 程序仅限于参考资料使用。 ,西门子;1200博途;三部十层电梯程序案例;Wincc RT Professional;V14以上程序版本。,西门子V14+博途三部十层电梯程序案例:Wincc RT Pro专业画面技术解析

    基于舆情数据的知识图谱推荐可视化系统论文,全原创,免费分享

    基于舆情数据的知识图谱推荐可视化系统论文,全原创,免费分享

    基于Vivado源码的AM包络检调制解调与FIR滤波器设计在FPGA上的实现,基于Zynq-7000和Artix-7系列的AM包络检调制解调源码及Vivado环境下的实现,AM包络检调制解调,Viva

    基于Vivado源码的AM包络检调制解调与FIR滤波器设计在FPGA上的实现,基于Zynq-7000和Artix-7系列的AM包络检调制解调源码及Vivado环境下的实现,AM包络检调制解调,Vivado源码 FPGA的AM调制解调源码,其中FIR滤波器根据MATLAB设计。 【AM_jietiao】文件是基于zynq-7000系列,但没有涉及AD与DA,只是单纯的仿真。 【AM包络检调制解调_Vivado源码】文件基于Artix-7系列,从AD读入信号后,进行AM调制,并解调DA输出。 ,AM包络检调制解调;Vivado源码;FPGA;AM调制解调源码;FIR滤波器;MATLAB设计;Zynq-7000系列;Artix-7系列;AD读入信号;DA输出,AM包络调制解调源码:Zynq-7000与Artix-7 FPGA的不同实现

    rdtyfv、ijij

    yugy

    2025山东大学:DeepSeek应用与部署(部署方案大全+API调用+业务应用)-80页.pptx

    2025山东大学:DeepSeek应用与部署(部署方案大全+API调用+业务应用)-80页.pptx

    chromedriver-mac-x64-135.0.7023.0(Dev).zip

    chromedriver-mac-x64-135.0.7023.0(Dev).zip

    基于单片机protues仿真的433MHz无线模块编解码收发通信测试(仿真图、源代码)

    基于单片机protues仿真的433MHz无线模块编解码收发通信测试(仿真图、源代码) 该设计为单片机protues仿真的433MHz无线模块收发通信测试; 1、433M超再生收发模块; 2、在仿真图中是把发射MCU的P2_7腿直接输入到接收MCU的INT0实现编码解码的; 3、通过433MHz无线模块实现无线通信的编解码功能; 4、按键控制指令; 5、液晶屏显示收发状态和信息;

    车机安卓版好用的应用管理app

    资源说说明; 自带文件管理 adb操作以及应用管理等等的功能。 操作性对比其他应用较好。 参阅博文: https://blog.csdn.net/mg668/article/details/145689511?spm=1001.2014.3001.5352

    软件工程课程设计前端.zip

    项目工程资源经过严格测试运行并且功能上ok,可实现复现复刻,拿到资料包后可实现复现出一样的项目,本人系统开发经验充足(全栈全领域),有任何使用问题欢迎随时与我联系,我会抽时间努力为您解惑,提供帮助 【资源内容】:包含源码+工程文件+说明等。答辩评审平均分达到96分,放心下载使用!可实现复现;设计报告也可借鉴此项目;该资源内项目代码都经过测试运行,功能ok 【项目价值】:可用在相关项目设计中,皆可应用在项目、毕业设计、课程设计、期末/期中/大作业、工程实训、大创等学科竞赛比赛、初期项目立项、学习/练手等方面,可借鉴此优质项目实现复刻,设计报告也可借鉴此项目,也可基于此项目来扩展开发出更多功能 【提供帮助】:有任何使用上的问题欢迎随时与我联系,抽时间努力解答解惑,提供帮助 【附带帮助】:若还需要相关开发工具、学习资料等,我会提供帮助,提供资料,鼓励学习进步 下载后请首先打开说明文件(如有);整理时不同项目所包含资源内容不同;项目工程可实现复现复刻,如果基础还行,也可在此程序基础上进行修改,以实现其它功能。供开源学习/技术交流/学习参考,勿用于商业用途。质量优质,放心下载使用

    智慧图书管理系统(源码+数据库+论文)java开发springboot框架javaweb,可做计算机毕业设计或课程设计

    智慧图书管理系统(源码+数据库+论文)java开发springboot框架javaweb,可做计算机毕业设计或课程设计 【功能需求】 本系统分为读者、管理员2个角色 读者可以进行注册登录、浏览图书以及留言、图书借阅、图书归还、图书续借、个人中心、论坛交流、等功能 管理员可以进行读者管理、图书管理、论坛论坛回复管理、图书借阅管理(下架、库存管理、修改、删除)、轮播图管理 【环境需要】 1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。 2.IDE环境:IDEA,Eclipse,Myeclipse都可以。 3.tomcat环境:Tomcat 7.x,8.x,9.x版本均可 4.数据库:MySql 5.7/8.0等版本均可; 【购买须知】 本源码项目经过严格的调试,项目已确保无误,可直接用于课程实训或毕业设计提交。里面都有配套的运行环境软件,讲解视频,部署视频教程,一应俱全,可以自己按照教程导入运行。附有论文参考,使学习者能够快速掌握系统设计和实现的核心技术。

    三相APFC电路与单相Boost PFC电路仿真模型:电压外环电流内环双闭环控制研究,三相电路仿真模型:探索APFC电路、单相PFC电路及BoostPFC电路的动态特性与双闭环控制策略,APFC电路

    三相APFC电路与单相Boost PFC电路仿真模型:电压外环电流内环双闭环控制研究,三相电路仿真模型:探索APFC电路、单相PFC电路及BoostPFC电路的动态特性与双闭环控制策略,APFC电路,单相PFC电路,单相BoostPFC电路仿真模型。 网侧220V 50Hz,输出电压设置为50Hz。 电压外环电流内环双闭环控制仿真模型 ,APFC电路; 单相PFC电路; 单相BoostPFC电路仿真模型; 网侧电压; 220V 50Hz; 输出电压50Hz; 电压外环电流内环双闭环控制仿真模型。,基于APFC电路的单相Boost PFC仿真模型:网侧电压220V/50Hz下电压电流双闭环控制的研究与应用

    MATLAB环境下ADMM算法在分布式调度中的应用:比较并行与串行算法(Jocobi与Gaussian Seidel)的优化效果与实现细节-基于YALMIP和GUROBI的仿真平台复刻参考文档的研究

    MATLAB环境下ADMM算法在分布式调度中的应用:比较并行与串行算法(Jocobi与Gaussian Seidel)的优化效果与实现细节——基于YALMIP和GUROBI的仿真平台复刻参考文档的研究结果。,MATLAB下ADMM算法在分布式调度中的并行与串行算法应用:基于YALMIP与GUROBI的仿真研究,MATLAB代码:ADMM算法在分布式调度中的应用 关键词:并行算法(Jocobi)和串行算法(Gaussian Seidel, GS) 参考文档:《主动配电网分布式无功优化控制方法》《基于串行和并行ADMM算法的电-气能量流分布式协同优化》 仿真平台:MATLAB YALMIP GUROBI 主要内容:ADMM算法在分布式调度中的应用 复刻参考文档 ,关键词:ADMM算法; 分布式调度; 并行算法(Jocobi); 串行算法(Gaussian Seidel, GS); MATLAB代码; YALMIP; GUROBI; 主动配电网; 无功优化控制方法; 能量流分布式协同优化。,MATLAB实现:ADMM算法在分布式调度中的并行与串行优化应用

    “考虑P2G、碳捕集与碳交易机制的综合能源系统优化调度模型研究”,考虑电转气P2G与碳捕集设备的热电联供综合能源系统优化调度模型研究(含碳交易机制与四种算例场景分析),考虑P2G和碳捕集设备的热电联供

    “考虑P2G、碳捕集与碳交易机制的综合能源系统优化调度模型研究”,考虑电转气P2G与碳捕集设备的热电联供综合能源系统优化调度模型研究(含碳交易机制与四种算例场景分析),考虑P2G和碳捕集设备的热电联供综合能源系统优化调度模型 摘要:代码主要做的是一个考虑电转气P2G和碳捕集设备的热电联供综合能源系统优化调度模型,模型耦合CHP热电联产单元、电转气单元以及碳捕集单元,并重点考虑了碳交易机制,建立了综合能源系统运行优化模型,与目前市面上的代码不同,本代码完全复现了文档中所提出的四种算例场景,没有对比算例,买过去也没有任何意义,四种算例主要包括: 1)t不包括P2G、CCS、以及碳交易 2)t包括P2G,但是不包括CCS以及碳交易 3)t包括P2G和CCS,但是不包括碳交易 4)t包括P2G、CCS以及碳交易 且最终的实现效果与文档进行对比后,虽然数值无法100%一致,但是结果以及数值曲线,几乎完全一样,此版本为目前市面上最好的园区综合能源调度代码,没有之一 ,考虑电转气(P2G); 碳捕集设备; 热电联供综合能源系统; 优化调度模型; 碳交易机制; CHP热电联产单元; 耦合模型; 算

    FS-LDM培训材料(DAY_2)_NCR数据仓库事业部.ppt

    FS-LDM培训材料(DAY_2)_NCR数据仓库事业部.ppt

    专题 平面向量的数量积(学生版)20250222.pdf

    专题 平面向量的数量积(学生版)20250222.pdf

Global site tag (gtag.js) - Google Analytics