这是使用apache.org下面开源的curator框架写的一个Leader选举的工具类。对于 curator 的介绍,如下:
本程序用到的pom.xml
<org.slf4j.version>1.6.6</org.slf4j.version> <commons-logging.version>1.1.1</commons-logging.version> <log4j.version>1.2.14</log4j.version> <guava.version>17.0</guava.version> <curator-version>2.6.0</curator-version> <!-- curator start --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>${curator-version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>${curator-version}</version> </dependency> <!-- curator end --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>${guava.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>${org.slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${org.slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${org.slf4j.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency>
下面是程序:
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.retry.ExponentialBackoffRetry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.CharMatcher; import java.util.UUID; /** * <pre> * ZKLeaderUtils.java * @author kanpiaoxue<br> * @version 1.0 * Create Time 2014年8月7日 下午1:04:18<br> * Description : Zookeeper Leader选举工具类 * </pre> */ public class ZKLeaderUtils { private static final Logger LOGGER = LoggerFactory .getLogger(ZKLeaderUtils.class); private ZKLeaderUtils() { } /** * <pre> * @param connectionString 链接Zookeeper Cluster的字符串,如:127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181,127.0.0.4:2181,127.0.0.5:2181 * @param leaderPath Zookeeper 上面创建挂在Leader选举使用的路径 * @param sessionTimeoutMs 它是 session 失效的时间,单位:毫秒。这个参数直接影响备选Leader之间切换间隔时间。 * 如果该时间设置过长,则当前Leader挂掉之后,备选的Leader不能立刻启动,需要等待sessionTimeoutMs这么长的时间才能确定之前的Leader已经死亡,备选Leader才能进行选举。 * 如果该时间设置过短,当前Leader与ZK集群通信发生闪断,这个闪断的时间大于sessionTimeoutMs的时间,Leader准备在闪断之后进行尝试连接ZK集群。在这个过程中,其他备选的Leader感知到当前Leader * 的session已经失效,则进行选举并产生新的Leader。这个时候,就会出现问题:已有的Leader并没有真正的死亡,只不过是与ZK的集群之间发生网络的闪断而已,它的业务依然正常运行中;而现有的备选Leader不知道 * 原有的Leader并没有真的死亡,从而选举出Leader并接手业务逻辑。会出现多Leader共存的现象。这是致命的问题。所以要设置恰当的 sessionTimeoutMs 时间,来规避这个问题。 * @param connectionTimeoutMs 链接超时的时间,单位:毫秒 * @param baseSleepTimeMs 链接断开之后进行重新连接的间隔时间,单位:毫秒 * @param maxRetries 链接断开之后进行重新连接的最多次数 * @throws Exception * 功能:等待成为Leader * 算法: * 为每个备选的Leader的ZK节点分配一个唯一ID。该ID由 UUID.randomUUID() 产生。 * 当本节点不是Leader的时候,本方法会一直阻塞直到本节点成为Leader。 * 如果ZK中没有 leaderPath 这个路径,本方法不会自动创建这个路径。 * </pre> */ public static void waitForLeader(String connectionString, String leaderPath, int sessionTimeoutMs, int connectionTimeoutMs, int baseSleepTimeMs, int maxRetries) throws Exception { String id = CharMatcher.anyOf("-").removeFrom( UUID.randomUUID().toString()); CuratorFramework zkCient = CuratorFrameworkFactory.newClient( connectionString, sessionTimeoutMs, connectionTimeoutMs, new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries)); LOGGER.info(String.format("I'm [%s]. start to work", id)); zkCient.start(); zkCient.getZookeeperClient().blockUntilConnectedOrTimedOut(); @SuppressWarnings("resource") LeaderLatch leaderLatch = new LeaderLatch(zkCient, leaderPath, id); leaderLatch.start(); LOGGER.info("leaderLatch has startted and is waitting for being leader."); LOGGER.info(String.format("I'm [%s]. I think that the leader is %s", id, leaderLatch.getLeader())); leaderLatch.await(); LOGGER.info(String.format("I'm [%s]. I'm the leader now! %s", id, leaderLatch.getLeader())); } public static void main(String[] args) throws Exception { String connectionString = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181"; String leaderPath = "/leader"; int sessionTimeoutMs = 10 * 1000; int connectionTimeoutMs = 10 * 1000; int baseSleepTimeMs = 1; int maxRetries = 20000; waitForLeader(connectionString, leaderPath, sessionTimeoutMs, connectionTimeoutMs, baseSleepTimeMs, maxRetries); System.out.println("I'm the leader. I can do something here.") } }
相关推荐
**Zookeeper源码剖析:深入理解Leader选举机制** 在分布式协调服务Zookeeper中,Leader选举是其核心功能之一,确保了服务的高可用性和一致性。本文将深入Zookeeper的源码,探讨Leader选举的实现机制。 **为什么要...
在Zookeeper中,可以通过Leader选举算法来选择集群中的Leader节点,而FlashLeaderElection算法是其中的一种实现。 Zookeeper的Watch机制能够使客户端在指定的节点发生变化时得到通知。ACL控制则提供了一种安全机制...
它支持多种同步协议,如Leader选举、锁服务等,帮助分布式系统中的节点保持数据一致性,保证服务的正确性和稳定性。例如,通过Zookeeper实现的分布式锁,可以防止多个节点同时执行同一操作,避免数据冲突。 此外,...
Zookeeper基于 zab 协议实现强一致性,并采用类文件系统的数据模型,使得操作简单直观。 二、Zookeeper 3.4.10 版本特性 1. 性能优化:3.4.10 版本对性能进行了大量优化,包括客户端连接、数据同步、请求处理等...
1. 集群选举:Kafka 使用ZooKeeper 进行Controller选举,Controller负责处理分区 Leader 的选举和Brokers的增删。 2. 主题和分区元数据管理:ZooKeeper 存储了所有主题(Topic)和分区(Partition)的元数据信息,...
3. **集群管理**:Zookeeper可以作为集群管理工具,实现节点加入和离开的动态管理,以及Leader选举等功能。 4. **分布式锁**:通过Zookeeper可以实现可重入的分布式锁,确保分布式环境下的资源独占访问。 5. **队列...
8. **Leader选举与分布式锁**: - ZooKeeper常用于实现分布式环境下的领导选举,利用ZNode的创建和删除特性。 - 分布式锁可以通过创建临时节点并监控其他节点的变化来实现。 在实际开发中,`lib`文件夹通常包含...
最后,`guava-17.0.jar`是Google Guava库,提供了大量的Java实用工具类,如集合、并发、缓存、I/O等,Curator使用Guava来增强其功能,如使用Guava的缓存来缓存Zookeeper节点的数据。 总的来说,这些JAR文件构建了一...
- **选举Leader**: 在Kafka的Broker中,通过ZooKeeper进行Leader选举。 - **消费者协调**: 管理消费者组的订阅和分配分区。 3. **集群搭建步骤** - **安装Java**: Kafka和ZooKeeper都需要Java环境,确保JDK已...
1. **集群管理**: Zookeeper存储了Kafka集群的元数据,包括broker列表、partition分配和leader选举信息。 2. **Topic管理**: 新建、删除和修改topic的元数据操作都会通过Zookeeper完成。 3. **Partition Leader选举*...
在分布式环境中,ZooKeeper被用来实现命名服务、配置管理、集群同步、 leader选举等任务。例如,在Hadoop中,它用于管理HDFS的命名空间和协调MapReduce作业的调度;在Kafka中,它用于管理主题分区和选举broker领导者...
2. **表达能力**:Zookeeper 可用于实现多种分布式协调机制,例如分布式锁、分布式队列以及领导者选举等。 3. **高可用性**:通过部署在多台服务器上的集群模式,确保即使某些服务器宕机也能正常运行。 4. **松耦合*...
在Zookeeper中,Paxos算法被用来选举 Leader 和维护集群的状态一致性。Paxos算法通常包括提议者(Proposer)、接受者(Acceptor)和学习者(Learner)三个角色。在Zookeeper中,这些角色分别对应于客户端、服务器...
- **选举算法**: 当Leader失效时,`Election`类负责新的Leader选举。基于Fast Leader Election算法,它能够快速确定新的领导者。 - **Watcher的实现**: `ZooKeeper`客户端库中的`WatcherManager`管理所有注册的...
3. `server.x`: 对于多节点集群,你需要在`zoo.cfg`中定义所有服务器的地址和端口,如`server.1=localhost:2888:3888`,其中1是服务器ID,2888是follower和leader之间通信的端口,3888是选举时使用的端口。...
主要涉及的核心类有`ZooKeeper`客户端接口、`ZKDatabase`数据存储、`ZooKeeperServer`服务器实现和`QuorumPeer`集群通信等。 **工具** 在日常使用和维护Zookeeper时,除了官方提供的zkCli工具外,还有一些第三方...
ZooKeeper的核心架构,如ZNode、Watcher、ACL和Leader选举机制等,是其能够提供高效一致性服务的关键。书中不仅详细阐述了这些特性的设计和工作原理,还提供了实际的源码分析,帮助读者深入理解ZooKeeper是如何通过...
Apache Curator 是一个高度封装的 ZooKeeper Java 客户端库,它简化了与 ZooKeeper 交互的复杂性,提供了更高级别的抽象和实用工具。ZooKeeper 是一个分布式的,开放源码的协调服务,用于分布式应用程序,提供命名...
- Zookeeper 集群通常由多个服务器节点组成,通过选举机制确定领导者(Leader)和跟随者(Follower),保证高可用性和一致性。 - 客户端通过 Zookeeper API 连接集群,创建、读取、更新或删除 Znode,以及设置和...
Zookeeper的运作机制类似于一个类Unix文件系统,同时具备通知机制。在Zookeeper中,数据存储在称为Znode的节点中,这些节点类似于文件系统中的文件和目录。用户可以对Znode设置监听,当Znode的状态发生变化时,...