- 浏览: 193637 次
- 性别:
- 来自: 南京
文章分类
最新评论
1.前言
curator由Netflix的工程师开发,主要目的为了基于zookeeper的应用变得简单可靠,在2013年成为apache的顶级项目。curator基于zookeeper,但提供了更高级别的
API抽象以及工具集,并对zookeeper提供的常用功能进行了封装和扩充,例如leader选举、分布式锁、服务发现、缓存等功能,从而使开发者在实现这些功能时不用在实现
哪些无聊的程式化代码段,也减少出错的可能性。
2.本文主要讲解如何利用curator实现leader选举,并对curator提供的两种实现形式进行对比
2.1 通过LeaderSelector进行leader选举
建议通过LeaderSelectorListenerAdapter实现,当某个实例成为leader后,会调用对应实例的takeLeadership方法,此方法执行期间,此实例一直占着leader权,
当takeLeadership方法执行结束,实例自动释放leader权,所有实例重新进行leader选举
执行类
执行结果
2.2 通过LeaderLatch实现leader选举,具体实现LeaderLatchListener来实现,
当实例成为leader后,会调用isLeader()方法,之后除非此实例连接不到zookeeper,否侧将一直占着leader权,当失去leader权后会调用notLeader()方法,为了模拟
选举过程,我们追加了一个ScheduledExecutorService来周期性的自己释放leader权
启动类
执行结果
curator由Netflix的工程师开发,主要目的为了基于zookeeper的应用变得简单可靠,在2013年成为apache的顶级项目。curator基于zookeeper,但提供了更高级别的
API抽象以及工具集,并对zookeeper提供的常用功能进行了封装和扩充,例如leader选举、分布式锁、服务发现、缓存等功能,从而使开发者在实现这些功能时不用在实现
哪些无聊的程式化代码段,也减少出错的可能性。
2.本文主要讲解如何利用curator实现leader选举,并对curator提供的两种实现形式进行对比
2.1 通过LeaderSelector进行leader选举
建议通过LeaderSelectorListenerAdapter实现,当某个实例成为leader后,会调用对应实例的takeLeadership方法,此方法执行期间,此实例一直占着leader权,
当takeLeadership方法执行结束,实例自动释放leader权,所有实例重新进行leader选举
package chengf.falcon.curator.leader; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter; import org.apache.curator.framework.recipes.leader.LeaderSelector; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @author: 作者: chengaofeng * @date: 创建时间:2018-06-07 15:26:54 * @Description: TODO * @version V1.0 */ public class SelectorClient extends LeaderSelectorListenerAdapter implements Closeable { private final String name; private final LeaderSelector leaderSelector; private final AtomicInteger leaderCount = new AtomicInteger(); public SelectorClient(CuratorFramework client, String path, String name) { this.name = name; // 利用一个给定的路径创建一个leader selector // 执行leader选举的所有参与者对应的路径必须一样 // 本例中SelectorClient也是一个LeaderSelectorListener,但这不是必须的。 leaderSelector = new LeaderSelector(client, path, this); // 在大多数情况下,我们会希望一个selector放弃leader后还要重新参与leader选举 leaderSelector.autoRequeue(); } /** * 启动当前实例参与leader选举 * * @throws IOException */ public void start() throws IOException { // leader选举是在后台处理的,所以这个方法会立即返回 leaderSelector.start(); } @Override public void close() throws IOException { leaderSelector.close(); } /** * 当前实例成为leader时,会执行下面的方法,这个方法执行结束后,当前实例就自动释放leader了,所以在想放弃leader前此方法不能结束 */ @Override public void takeLeadership(CuratorFramework client) throws Exception { final int waitSeconds = (int) (5 * Math.random()) + 1; System.out.println(name + " 现在是leader了,持续成为leader " + waitSeconds + " 秒."); System.out.println(name + " 之前已经成为了 " + leaderCount.getAndIncrement() + " 次leader."); try { Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds)); } catch (InterruptedException e) { System.err.println(name + " was interrupted."); Thread.currentThread().interrupt(); } finally { System.out.println(name + " 释放leader权.\n"); } } }
执行类
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package chengf.falcon.curator.leader; import com.google.common.collect.Lists; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.List; public class LeaderSelectorExample { private static final int CLIENT_QTY = 10; private static final String PATH = "/examples/leader"; public static void main(String[] args) throws Exception { // all of the useful sample code is in ExampleClient.java System.out.println("创建 " + CLIENT_QTY + " 个客户端, 公平的参与leader选举,成为leader后,会等待一个随机的时间(几秒中),之后释放leader权,所有实例重新进行leader选举。"); List<CuratorFramework> clients = Lists.newArrayList(); List<SelectorClient> examples = Lists.newArrayList(); TestingServer server = new TestingServer(); try { for (int i = 0; i < CLIENT_QTY; ++i) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); clients.add(client); SelectorClient example = new SelectorClient(client, PATH, "Client #" + i); examples.add(example); client.start(); example.start(); } System.out.println("按 enter/return 来退出\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); } finally { System.out.println("关闭程序..."); for (SelectorClient exampleClient : examples) { CloseableUtils.closeQuietly(exampleClient); } for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } CloseableUtils.closeQuietly(server); } } }
执行结果
创建 10 个客户端, 公平的参与leader选举,成为leader后,会等待一个随机的时间(几秒中),之后释放leader权,所有实例重新进行leader选举。 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder". SLF4J: Defaulting to no-operation MDCAdapter implementation. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details. 按 enter/return 来退出 Client #2 现在是leader了,持续成为leader 3 秒. Client #2 之前已经成为了 0 次leader. Client #2 释放leader权. Client #4 现在是leader了,持续成为leader 3 秒. Client #4 之前已经成为了 0 次leader. Client #4 释放leader权. Client #5 现在是leader了,持续成为leader 5 秒. Client #5 之前已经成为了 0 次leader. Client #5 释放leader权. Client #9 现在是leader了,持续成为leader 3 秒. Client #9 之前已经成为了 0 次leader. Client #9 释放leader权. Client #6 现在是leader了,持续成为leader 3 秒. Client #6 之前已经成为了 0 次leader. Client #6 释放leader权. Client #1 现在是leader了,持续成为leader 4 秒. Client #1 之前已经成为了 0 次leader. Client #1 释放leader权. Client #8 现在是leader了,持续成为leader 1 秒. Client #8 之前已经成为了 0 次leader. Client #8 释放leader权. Client #7 现在是leader了,持续成为leader 3 秒. Client #7 之前已经成为了 0 次leader. Client #7 释放leader权. Client #0 现在是leader了,持续成为leader 2 秒. Client #0 之前已经成为了 0 次leader. Client #0 释放leader权. Client #3 现在是leader了,持续成为leader 5 秒. Client #3 之前已经成为了 0 次leader. Client #3 释放leader权. Client #2 现在是leader了,持续成为leader 4 秒. Client #2 之前已经成为了 1 次leader. Client #2 释放leader权. Client #4 现在是leader了,持续成为leader 3 秒. Client #4 之前已经成为了 1 次leader. Client #4 释放leader权. Client #5 现在是leader了,持续成为leader 3 秒. Client #5 之前已经成为了 1 次leader. Client #5 释放leader权. Client #9 现在是leader了,持续成为leader 3 秒. Client #9 之前已经成为了 1 次leader. Client #9 释放leader权. Client #6 现在是leader了,持续成为leader 1 秒. Client #6 之前已经成为了 1 次leader. Client #6 释放leader权. Client #1 现在是leader了,持续成为leader 4 秒. Client #1 之前已经成为了 1 次leader. Client #1 释放leader权. Client #8 现在是leader了,持续成为leader 5 秒. Client #8 之前已经成为了 1 次leader. 关闭程序... Client #8 was interrupted. Client #8 释放leader权.
2.2 通过LeaderLatch实现leader选举,具体实现LeaderLatchListener来实现,
当实例成为leader后,会调用isLeader()方法,之后除非此实例连接不到zookeeper,否侧将一直占着leader权,当失去leader权后会调用notLeader()方法,为了模拟
选举过程,我们追加了一个ScheduledExecutorService来周期性的自己释放leader权
/** * */ package chengf.falcon.curator.leader; import java.io.IOException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; /** * @author: 作者: chengaofeng * @date: 创建时间:2018-06-07 15:26:54 * @Description: TODO * @version V1.0 */ public class LatchClient implements LeaderLatchListener { private final String name; private LeaderLatch leaderLatch; private CuratorFramework client; public LatchClient(CuratorFramework client, String path, String name) throws Exception { this.name = name; this.client = client; leaderLatch = new LeaderLatch(client, path); leaderLatch.addListener(this); leaderLatch.start(); } /* * (non-Javadoc) * * @see * org.apache.curator.framework.recipes.leader.LeaderLatchListener#isLeader( * ) */ /** * 成为leader后会执行下面的方法,方法执行完后不会释放leader权 */ @Override public void isLeader() { System.out.println(name + " is now the leader. "); } /** * 失去leader后执行下面的方法 */ @Override public void notLeader() { System.out.println(name + " is not the leader. "); } public void stop() { try { leaderLatch.close(); } catch (IOException e) { } } public boolean hasLeadership() { return leaderLatch.hasLeadership(); } public CuratorFramework getClient() { return client; } public String getName() { return name; } }
启动类
/** * */ package chengf.falcon.curator.leader; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import org.apache.curator.utils.CloseableUtils; import com.google.common.collect.Lists; /** * @author: 作者: chengaofeng * @date: 创建时间:2018-06-07 15:25:47 * @Description: TODO * @version V1.0 */ public class LeaderLatchExample { private static final int CLIENT_QTY = 10; private static final String PATH = "/examples/leader"; public static void main(String[] args) throws Exception { // all of the useful sample code is in ExampleClient.java System.out.println("创建 " + CLIENT_QTY + " 个客户端, 公平的参与leader选举,成为leader后,将一直占用此leader权。直到四秒中后主动放弃leader权"); List<CuratorFramework> clients = Lists.newArrayList(); TestingServer server = new TestingServer(); List<LatchClient> examples = Lists.newArrayList(); int i = 0; try { for (; i < CLIENT_QTY; ++i) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); clients.add(client); LatchClient example = new LatchClient(client, PATH, "Client #" + i); examples.add(example); client.start(); } // 不调用该方法则某实例成为leader后将一直持续占有leader权 ScheduledExecutorService se = giveUpLeader(examples); System.out.println("按 enter/return 来退出\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); se.shutdown(); } finally { System.out.println("关闭程序..."); for (LatchClient exampleClient : examples) { exampleClient.stop(); } for (CuratorFramework client : clients) { CloseableUtils.closeQuietly(client); } CloseableUtils.closeQuietly(server); } } /** * 主动放弃leader权,并新创建一个实例参与leader选举 * @param examples */ private static ScheduledExecutorService giveUpLeader(List<LatchClient> examples) { ScheduledExecutorService se = Executors.newScheduledThreadPool(1); se.scheduleAtFixedRate(()->{ examples.stream().filter(t-> t.hasLeadership()).findFirst().ifPresent(t-> { examples.remove(t); LatchClient example; try { example = new LatchClient(t.getClient(), PATH, t.getName()); examples.add(example); } catch (Exception e) { } //当前实例主动放弃leader权 System.out.println(t.getName() + " 现在主动放弃leader权"); t.stop();}); }, 4, 4, TimeUnit.SECONDS); return se; } }
执行结果
创建 10 个客户端, 公平的参与leader选举,成为leader后,将一直占用此leader权。直到四秒中后主动放弃leader权 SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder". SLF4J: Defaulting to no-operation MDCAdapter implementation. SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details. 按 enter/return 来退出 Client #3 is now the leader. Client #3 现在主动放弃leader权 Client #2 is now the leader. Client #2 现在主动放弃leader权 Client #9 is now the leader. Client #9 现在主动放弃leader权 Client #4 is now the leader. Client #4 现在主动放弃leader权 Client #1 is now the leader. Client #1 现在主动放弃leader权 Client #5 is now the leader. Client #5 现在主动放弃leader权 Client #7 is now the leader. Client #7 现在主动放弃leader权 Client #8 is now the leader. Client #8 现在主动放弃leader权 Client #6 is now the leader. Client #6 现在主动放弃leader权 Client #0 is now the leader. 关闭程序...
相关推荐
Curator的Leader选举食谱提供了这样的机制,确保在任何时刻只有一个节点成为领导者。 2. **分布式锁**:在多节点环境中,同步访问共享资源是常见的需求。Curator的分布式锁食谱可以防止多个节点同时修改同一数据,...
2. `curator-recipes-2.8.0.jar`:这个库包含了Curator的各种高级功能或“食谱”,如锁机制(共享锁、独占锁)、队列(等待队列、分配队列)、Leader选举、分布式计数器等。这些食谱使得实现分布式一致性策略变得...
9. ** recipes**:Curator 提供了许多预定义的 recipes,如队列、栈、分布式开关、服务发现等,它们都是基于 ZooKeeper 原生特性的高级抽象。 10. **故障恢复策略**:Curator 提供了多种重试策略(如 `RetryOneTime...
比如:集群管理(Leader选举)、共享锁、队列、Counter等等。可以总结Curator主要解决以下三类问题: 封装ZK Client与Server之间的连接处理; 提供了一套Fluent风格的操作API; 提供ZK各种应用场景的抽象封装...
Curator提供了一系列预定义的“Recipes”,这些是现成的解决方案,用于解决常见的分布式问题,如 Leader选举、分布式锁、队列和缓存等。 5. **缓存机制**: Curator包含数据缓存功能,可以缓存ZooKeeper节点的...
4. **配方(Recipes)**:Curator提供了一系列的“配方”,如Leader选举、分布式计数器、队列、分布式事件等,这些都是基于Zookeeper构建的高级功能,简化了开发工作。 5. **缓存**:Curator有内置的数据缓存机制,...
Curator提供了一系列高级配方,如分布式计数器、队列、 Leader选举、分布式命名服务等,这些配方可以帮助开发者快速构建分布式应用。 6. **重试策略**: 当ZooKeeper操作失败时,Curator可以配置不同的重试策略,...
2. **Recipes**:Curator 包含一系列预定义的“食谱”(Recipes),这些是常见的分布式协调任务的实现,如分布式锁、队列、领导选举等。这些食谱大大降低了开发复杂分布式系统的难度。 - **Distributed Lock**:...
1. **Leader选举**:展示如何使用 Curator 的 Leader选举机制,确保在分布式环境中只有一个节点作为领导者。 2. **分布式锁**:演示如何实现分布式锁,防止多个节点同时执行相同操作。 3. **配方(Recipes)**:...
- ** recipes**:预定义的高级功能集合,如 Leader选举、分布式锁、队列、缓存等。 3. **Curator recipes**: - **Leader Election**:确保集群中只有一个节点作为领导者,其他节点成为追随者。 - **Distributed...
Leader选举是Zookeeper集群中的重要环节,它分为集群初始化启动时的选举和运行期间的Leader重新选举两种情况。在深入理解选举机制之前,我们需要先了解Zookeeper节点的四种状态以及事务ID的概念。 1. 节点状态: -...
- Curator提供了一些预定义的模式,如Leader选举、分布式计数器、队列和锁等,简化了常见的分布式编程问题。 6. **Curator Recipes** - `Leader Election`:实现分布式环境中的领导者选举。 - `Locks`:提供...
FastPaxos通过引入Leader选举机制解决了这一问题,即集群中只允许一个Leader进行提议的提交。在实际应用中,Zookeeper并未直接使用Paxos算法,而是采用了ZAB(Zookeeper Atomic Broadcast)一致性协议来实现数据一致性...
ZooKeeper 是一个开源的分布式协调服务,提供了一致性服务,用于分布式应用程序实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。 ZooKeeper 的...
- **封装常用功能**:Curator提供了一系列的API,用于实现诸如Leader选举、分布式锁、分布式计数器等功能,这大大减轻了开发者的工作量。 - **会话管理**:Curator内部实现了会话的自动重连机制,当ZooKeeper会话...
1. **Master/Leader选举**: 应用可以通过创建临时有序节点,根据节点顺序选举master。 2. **分布式锁**:提供读锁和写锁,保证多线程并发访问时的数据一致性。 3. **分布式队列**: 实现多个节点间的同步操作,如发布...
Zookeeper 集群通过选举机制,当 Leader 节点宕机时,会自动选举新的 Leader,保证服务的连续性。 19. zookeeper 负载均衡和 nginx 负载均衡区别Zookeeper 提供的是分布式协调服务,可以用于实现服务发现、负载均衡...
它提供了一种基于发布/订阅模式的消息系统,可以实现命名服务、配置管理、集群管理、分布式锁等功能。在Java开发中,Zookeeper与Java API的结合使用,使得其在分布式应用中发挥着重要作用。 一、Zookeeper的基本...
- **Leader选举**:利用`LeaderSelector`实现服务实例间的选举。 6. **实际应用** - **Hadoop**:Zookeeper在Hadoop中用于协调NameNode和DataNode,确保集群的稳定性。 - **Kafka**:管理生产者和消费者的元数据...
在Zookeeper中,可以通过Leader选举算法来选择集群中的Leader节点,而FlashLeaderElection算法是其中的一种实现。 Zookeeper的Watch机制能够使客户端在指定的节点发生变化时得到通知。ACL控制则提供了一种安全机制...