`
wbj0110
  • 浏览: 1602725 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Zookeeper开源客户端框架Curator简介与示例

阅读更多

简介

        Curator最初由Netflix的Jordan Zimmerman开发, Curator提供了一套Java类库, 可以更容易的使用ZooKeeper.

        所谓ZooKeeper技巧(ZooKeeper Recipes),也可以称之为解决方案, 或者叫实现方案, 是指ZooKeeper的使用方法, 比如分布式的配置管理, Leader选举等

        Curator作为Apache ZooKeeper天生配套的组件。ZooKeeper的Java开发者自然而然的会选择它在项目中使用。

        官网链接:http://curator.apache.org/

提供的功能组件

  1. Framework 提供了一套高级的API, 简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制

  2. Client是ZooKeeper客户端的一个替代品, 提供了一些底层处理和相关的工具方法

  3. Recipes实现了通用ZooKeeper的recipe, 该组件建立在Framework的基础之上

  4. Utilities各种工具类

  5. Errors异常处理, 连接, 恢复等.

  6. Extensionscurator-recipes包实现了通用的技巧,这些技巧在ZooKeeper文档中有介绍。为了避免是这个包(package)变得巨大, recipes/applications将会放入一个独立的extension包下。并使用命名规则curator-x-name.

        Curator 编译好的类库被发布到Maven Center中。Curator包含几个artifact. 你可以根据你的需要在你的项目中加入相应的依赖。对于大多数开发者来说,引入curator-recipes这一个就足够了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.6.0</version>
</dependency
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-client</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.6.0</version>
</dependency>

 

代码示例

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
 
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
 
import com.google.common.base.Charsets;
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
 
 
/**
 * DateTime: 2015年1月9日 上午9:14:08
 *
 */
public class CuratorTest {
    private static String zkAddress = "hadoop02:2181,hadoop03:2181,hadoop04:2181";
 
 
    public static void main(String[] args) throws Exception {
        CuratorUtil curator = new CuratorUtil(zkAddress);
        curator.createNode("/zkroot/test1""你好abc11");
        curator.createNode("/zkroot/test2""你好abc22");
        curator.updateNode("/zkroot/test2""你好abc22");
        List<String> list = curator.listChildren("/zkroot");
        Map<String, String> map = curator.listChildrenDetail("/zkroot");
        // curator.deleteNode("/zkroot");
        // curator.destory();
        System.out.println("=========================================");
        for (String str : list) {
            System.out.println(str);
        }
 
        System.out.println("=========================================");
        for (Map.Entry<String, String> entry : map.entrySet()) {
            System.out.println(entry.getKey() + "=>" + entry.getValue());
        }
 
        // 增加监听
        curator.addWatch("/zkroot"false);
 
        TimeUnit.SECONDS.sleep(600);
    }
 
}
 
 
class CuratorUtil {
    private CuratorFramework client;
 
 
    public CuratorUtil(String zkAddress) {
        client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(10003));
        client.getCuratorListenable().addListener(new NodeEventListener());
        client.start();
    }
 
 
    /**
     * 创建node
     
     * @param nodeName
     * @param value
     * @return
     */
    public boolean createNode(String nodeName, String value) {
        boolean suc = false;
        try {
            Stat stat = getClient().checkExists().forPath(nodeName);
            if (stat == null) {
                String opResult = null;
                if (Strings.isNullOrEmpty(value)) {
                    opResult = getClient().create().creatingParentsIfNeeded().forPath(nodeName);
                }
                else {
                    opResult =
                            getClient().create().creatingParentsIfNeeded()
                                .forPath(nodeName, value.getBytes(Charsets.UTF_8));
                }
                suc = Objects.equal(nodeName, opResult);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return suc;
    }
 
 
    /**
     * 更新节点
     
     * @param nodeName
     * @param value
     * @return
     */
    public boolean updateNode(String nodeName, String value) {
        boolean suc = false;
        try {
            Stat stat = getClient().checkExists().forPath(nodeName);
            if (stat != null) {
                Stat opResult = getClient().setData().forPath(nodeName, value.getBytes(Charsets.UTF_8));
                suc = opResult != null;
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return suc;
    }
 
 
    /**
     * 删除节点
     
     * @param nodeName
     */
    public void deleteNode(String nodeName) {
        try {
            getClient().delete().deletingChildrenIfNeeded().forPath(nodeName);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
 
 
    /**
     * 找到指定节点下所有子节点的名称与值
     
     * @param node
     * @return
     */
    public Map<String, String> listChildrenDetail(String node) {
        Map<String, String> map = Maps.newHashMap();
        try {
            GetChildrenBuilder childrenBuilder = getClient().getChildren();
            List<String> children = childrenBuilder.forPath(node);
            GetDataBuilder dataBuilder = getClient().getData();
            if (children != null) {
                for (String child : children) {
                    String propPath = ZKPaths.makePath(node, child);
                    map.put(child, new String(dataBuilder.forPath(propPath), Charsets.UTF_8));
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return map;
    }
 
 
    /**
     * 列出子节点的名称
     
     * @param node
     * @return
     */
    public List<String> listChildren(String node) {
        List<String> children = Lists.newArrayList();
        try {
            GetChildrenBuilder childrenBuilder = getClient().getChildren();
            children = childrenBuilder.forPath(node);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        return children;
    }
 
 
    /**
     * 增加监听
     
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf) throws Exception {
        if (isSelf) {
            getClient().getData().watched().forPath(node);
        }
        else {
            getClient().getChildren().watched().forPath(node);
        }
    }
 
 
    /**
     * 增加监听
     
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @param watcher
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf, Watcher watcher) throws Exception {
        if (isSelf) {
            getClient().getData().usingWatcher(watcher).forPath(node);
        }
        else {
            getClient().getChildren().usingWatcher(watcher).forPath(node);
        }
    }
 
 
    /**
     * 增加监听
     
     * @param node
     * @param isSelf
     *            true 为node本身增加监听 false 为node的子节点增加监听
     * @param watcher
     * @throws Exception
     */
    public void addWatch(String node, boolean isSelf, CuratorWatcher watcher) throws Exception {
        if (isSelf) {
            getClient().getData().usingWatcher(watcher).forPath(node);
        }
        else {
            getClient().getChildren().usingWatcher(watcher).forPath(node);
        }
    }
 
 
    /**
     * 销毁资源
     */
    public void destory() {
        if (client != null) {
            client.close();
        }
    }
 
 
    /**
     * 获取client
     
     * @return
     */
    public CuratorFramework getClient() {
        return client;
    }
 
}
 
 
// 监听器
final class NodeEventListener implements CuratorListener {
    @Override
    public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
        System.out.println(event.toString() + ".......................");
        final WatchedEvent watchedEvent = event.getWatchedEvent();
        if (watchedEvent != null) {
            System.out.println(watchedEvent.getState() + "=======================" + watchedEvent.getType());
            if (watchedEvent.getState() == KeeperState.SyncConnected) {
                switch (watchedEvent.getType()) {
                case NodeChildrenChanged:
                    // TODO
                    break;
                case NodeDataChanged:
                    // TODO
                    break;
                default:
                    break;
                }
            }
        }
    }

http://my.oschina.net/cloudcoder/blog/365484

分享到:
评论

相关推荐

    zookeeper客户端curator操作示例

    而Apache Curator是Facebook开源的一个ZooKeeper客户端库,它提供了更高级别的API,简化了ZooKeeper的使用,并增加了诸如连接管理、重试策略、事务操作等功能。 在Java开发中,Curator是使用ZooKeeper最常用且推荐...

    zookeeper示例代码。

    6. **Curator应用**:Curator是Facebook开源的ZooKeeper客户端库,它封装了ZooKeeper的操作,提供了更高级的API,如连接管理、故障恢复、分布式锁、队列等。使用Curator可以使ZooKeeper的使用更加便捷和稳定。 7. *...

    VIP-02 Zookeeper客户端使用与集群特性(1)

    Curator是Netflix公司开源的一个针对ZooKeeper的高级Java客户端框架。它不仅简化了ZooKeeper的使用,还提供了丰富的抽象和工具类,使得开发者能够更加专注于业务逻辑而不用过多关注ZooKeeper的具体实现细节。Curator...

    4、zookeeper的java三种客户端介绍-Curator(crud、事务操作、监听、分布式计数器、分布式锁)

    这篇文章主要介绍了Zookeeper的三种Java客户端:原生Java API、ZkClient以及Apache Curator,并着重讨论了Curator的特性与优势。 一、Zookeeper原生Java API Zookeeper的原生Java API提供了基本的CRUD(创建、读取...

    springboot-zookeeper-curator.rar

    SpringBoot与Zookeeper Curator整合详解 在分布式系统中,协调服务是至关重要的,Apache ZooKeeper和Curator作为其中的佼佼者,被广泛应用于配置管理、服务发现、分布式锁等场景。本教程将深入讲解如何在SpringBoot...

    zookeeper实践操作下载指南 1

    Curator是Facebook开源的一个Zookeeper客户端库,它提供了更高级别的抽象和实用工具,简化了Zookeeper的使用。例如,Curator提供了分布式锁、领导选举、配置管理等功能,使得开发人员能够更方便地构建可靠的分布式...

    基于zookeeper的分布式锁实现demo

    在示例代码中,我们看到使用了Curator框架,它是一个开源的Zookeeper客户端库,封装了Zookeeper的复杂操作,简化了Zookeeper编程。Curator提供了现成的分布式锁实现,如`InterProcessMutex`和`...

    使用curator实现zookeeper锁服务的示例分享

    Curator是Netflix开源的一个用于简化Zookeeper使用的Java客户端库,它提供了很多高级功能,包括分布式锁、事件监听、配方等。在这个示例中,我们将使用Curator的`InterProcessMutex`类来实现分布式互斥锁(Mutex)。...

    springcloud集成zookeeper的方法示例

    Curator 是 Netflix 公司开源的一个 ZooKeeper 客户端,与 ZooKeeper 提供的原生客户端相比,Curator 的抽象层次更高,简化了 ZooKeeper 客户端编程。 最后,我们可以使用 CuratorFramework 来访问和操作 ZooKeeper...

    zookeeper ui界面源码(github)

    这些API可能会封装ZooKeeper的命令行工具,如`zkCli.sh`,或者直接使用Java客户端库(zkclient或curator)与ZooKeeper服务器交互。 3. **数据可视化**:UI界面需要展示ZooKeeper集群的状态,如节点结构、节点数据、...

    zk实现的分布式锁相关代码

    首先,我们通过CuratorFrameworkFactory创建了一个CuratorFramework实例,这是与ZooKeeper交互的主要对象。在创建CuratorFramework实例时,我们使用了RetryNTimes策略,这意味着在遇到连接问题时,客户端会尝试重新...

    workflow:基于ZooKeeper和Curator的分布式工作流管理库,可启用分布式任务工作流

    Curator是LinkedIn贡献的一个ZooKeeper客户端库,它为Java开发者提供了更高级别的抽象,简化了与ZooKeeper的交互。Curator包括一系列的工具、策略和拦截器,用于管理ZooKeeper的数据和会话,从而降低了开发复杂性的...

    注册管理(zk基本运用)

    6. **实战应用**:在实际开发中,我们可以通过Apache Curator、Java原生API或者其他语言的Zookeeper客户端库来操作Zookeeper。例如,使用Curator的`ServiceDiscovery`接口可以简化服务注册和发现的流程。 总结,...

    DubboxTest.zip

    5. `zkclient`或`curator`目录:Zookeeper客户端库,用于与Zookeeper交互。 通过这个项目,开发者可以学习如何配置SpringBoot应用,将Dubbo服务暴露出去,以及如何编写服务消费者来调用这些服务。同时,Zookeeper的...

    java技术面试------项目.md

    - **Dubbo**:阿里巴巴开源的一款高性能、轻量级的微服务框架。 - **Spring Cloud**:基于Spring Boot实现的一套微服务云应用及管理框架。 #### 二、Dubbo开发流程 Dubbo是一个分布式服务框架,它能够帮助开发者...

    SpringBoot(2.1.6) + dubbo(apache 2.7.5) + zookeeper 集成并测试

    在本示例中,我们将探讨如何将SpringBoot 2.1.6与Apache Dubbo 2.7.5以及Zookeeper集成,以便构建一个服务提供者(Provider)和服务消费者(Consumer)。首先,我们需要理解这三个核心组件的角色: 1. **SpringBoot...

    zk:redis分布式锁.zip

    2. **框架方式**:除了直接使用Zookeeper API,还可以借助于像Curator这样的框架,它提供了更高级别的抽象和便利的功能,如Lock接口,简化了分布式锁的实现,降低了开发难度。 接下来是Redis的分布式锁。Redis是一...

    1000道 互联网Java工程师面试题 485页_PDF密码解除.pdf

    - **会话:**客户端与ZooKeeper服务器之间的连接称为会话。 - **心跳检测:**用于检测会话的有效性。 **12. 服务器角色** - **Leader:**负责接收数据更新请求。 - **Follower:**复制Leader的数据。 - **Observer...

Global site tag (gtag.js) - Google Analytics