开始读zookeeper代码,首先启动zookeeper,看到
java -Dzookeeper.log.dir=. -Dzookeeper.root.logger=INFO,CONSOLE -cp /home/uniseraph/dev/zookeeper-3.3.3/bin/../build/classes:/home/uniseraph/dev/zookeeper-3.3.3/bin/../build/lib/log4j-1.2.15.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../build/lib/jline-0.9.94.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../zookeeper-3.3.3.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../lib/log4j-1.2.15.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../lib/jline-0.9.94.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../src/java/lib/ivy-2.1.0.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../src/java/lib/ant-eclipse-1.0-jvm1.2.jar:/home/uniseraph/dev/zookeeper-3.3.3/bin/../conf: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /home/uniseraph/dev/zookeeper-3.3.3/bin/../conf/zoo.cfg
确认入口函数在QuorumPeerMain。
1. 去掉一些注释,找到关键初始化点
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
main.initializeAndRun(args);
}
2. 解析输入参数,选择cluster/standalone
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
if (args.length == 1 && config.servers.size() > 0) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
3 runFromConfig读取配置文件,进行系统初始化
3.1 注册一个MBean
ManagedUtil.registerLog4jMBeans();
3.2 初始化一个连接工厂,这里将是关键,为nio socket server进行一些初始化动作
NIOServerCnxn.Factory cnxnFactory =
new NIOServerCnxn.Factory(config.getClientPortAddress(),
config.getMaxClientCnxns());
public Factory(InetSocketAddress addr, int maxcc) throws IOException {
super("NIOServerCxn.Factory:" + addr);
setDaemon(true);
maxClientCnxns = maxcc;
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
ss.register(selector, SelectionKey.OP_ACCEPT);
}
3.3 初始化一个QuorumPeer,将配置文件中参数赋值给它,并启动之。QuorumPeer负责处理quorum protol,选出leader。
quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
quorumPeer.setQuorumPeers(config.getServers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.start();
public synchronized void start() {
try {
zkDb.loadDataBase();
} catch(IOException ie) {
LOG.fatal("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
cnxnFactory.start();
startLeaderElection();
super.start();
}
3.3.1 quorumPeer.start启动时候先加载硬盘上的zk数据;
try {
zkDb.loadDataBase();
} catch(IOException ie) {
LOG.fatal("Unable to load database on disk", ie);
throw new RuntimeException("Unable to run quorum server ", ie);
}
3.3.2 启动factory的nio 监听线程,开始循环
cnxnFactory.start();
因为Factory继承自Thread,所以新起一个线程执行。
public void run() {
while (!ss.socket().isClosed()) {
try {
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select "
+ k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
clear();
LOG.info("NIOServerCnxn factory exited run method");
}
3.3.2.1 首先在一个无限循环中,nio selector进行监听,每1秒或者有数据来就唤醒一次。
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);
3.3.2.2 如果是有链接请求来了,则accept之,并创建链接上下文,且在selector中注册SelectionKey.OP_READ事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
LOG.warn("Too many connections from " + ia
+ " - max is " + maxClientCnxns );
sc.close();
} else {
LOG.info("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
sk.attach(cnxn);
addCnxn(cnxn);
}
}
3.3.2.3 如果是READ/WRITE事件,处理之
else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
}
NIOServerCnxn处理另外分析;
3.3 启动select leader算法
startLeaderElection();
分享到:
相关推荐
本示例代码提供了对ZooKeeper核心特性的实践,涵盖了以下几个关键知识点: 1. **ZooKeeper对象**:在Zookeeper中,基本的对象包括`ZooKeeper`客户端实例和`ZNode`( ZooKeeper 节点)。`ZooKeeper`客户端用于与...
在本资料中,"zookeeper 操作代码 部分可用"可能是指包含了一些使用ZooKeeper API进行操作的示例代码,但这些代码可能并非全部都能正常运行。 首先,我们需要理解ZooKeeper的主要功能:它提供了一种有序、可靠的...
dubbo+zookeeper例子代码和部署说明,demo文件下载,包含zookeeper安装文件,dubbo的监控war已经dubbo的源码
同时,阅读官方文档(https://zookeeper.apache.org/doc/current/index.html)以及相关的博客和教程,如提供的博文链接(https://gaojingsong.iteye.com/blog/2320437),这些资源将帮助你更好地理解Zookeeper的工作...
### ZooKeeper源码阅读知识点详解 #### 一、ZooKeeper客户端初始化与使用 ##### 客户端初始化 ZooKeeper客户端的初始化是通过构造函数完成的,具体为: ```java ZooKeeper zookeeper = new ZooKeeper(host, time...
在这个"zookeeperMaster选举以及数据同步代码"项目中,我们将深入探讨Zookeeper如何进行主节点选举以及如何与MySQL数据库进行数据同步。 首先,我们要理解Zookeeper的Master选举机制。在分布式环境中,通常需要一个...
Zookeeper权限控制是Apache ZooKeeper系统中的一个重要特性,它允许管理员和用户对Zookeeper的数据节点进行细粒度的访问控制,以确保数据的安全性。在Java开发中,理解和使用Zookeeper的权限控制对于构建分布式系统...
它提供了一种模型-视图-控制器(MVC)架构模式,使开发者能够将业务逻辑、数据和用户界面分离,提高代码的可维护性和复用性。SpringMVC提供了注解驱动的编程模型,简化了开发流程,并支持RESTful API设计,便于前后...
zookeeper 经典应用设计 锁、同步和队列分析
在本文中,我们将深入探讨Zookeeper客户端的工作原理,如何通过代码进行操作,并探讨其在实际应用中的场景。 首先,让我们理解Zookeeper客户端的基本原理。Zookeeper客户端通过TCP连接与服务器建立会话。这个会话...
通过这些代码,你可以学习如何实际操作Zookeeper,理解其基本API用法,以及如何将这些基础功能应用到实际的分布式系统中。同时,这也能帮助你深入理解分布式系统的协调机制和设计思路,对于构建高可用、高性能的...
Zookeeper集群安装是一个关键步骤,尤其对于分布式系统和大数据应用来说,它作为协调服务起着至关重要的作用。Apache ZooKeeper是一个开源的分布式服务框架,它主要用于管理大型分布式系统的配置信息、命名服务、...
apache-zookeeper-3.5.10-bin 环境搭配 ...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在$zookeeper_home\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。
《Zookeeper实战:ConfigServer代码样例解析》 在分布式系统中,Zookeeper作为一个高可用的分布式协调服务,被广泛应用于配置管理、命名服务、分布式锁等场景。本篇文章将聚焦于Zookeeper的一个典型应用——Config...
Zookeeper 3.5.7 源代码
标题中的“ActiveMQ与Zookeeper集群测试代码”指的是一个实验或示例项目,旨在演示如何结合这两个组件来构建高可用的消息传递环境。Zookeeper在这里的角色可能是用来管理ActiveMQ集群的状态,实现节点间的选举和故障...
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调...ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下...同时,源码阅读也有助于开发者调试问题,定制功能,甚至为Zookeeper贡献代码。
在压缩包`zookeeperCase`中,可能包含了实现这些功能的Java代码示例,包括创建Zookeeper客户端连接、创建和删除ZNode、监听节点变化、实现分布式锁逻辑以及服务注册和发现的相关类。通过学习和分析这些代码,可以...
在IT行业中,ZooKeeper是一个广泛使用的分布式协调服务,它由...通过阅读提供的博客文章和分析`Demo1.java`和`Demo2.java`的代码,我们可以深入学习ZooKeeper的API用法和实际应用场景,提升在分布式环境下的编程能力。