`
农村外出务工男JAVA
  • 浏览: 105554 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

zookeeper事件监听实战

阅读更多

一:事件监听

    原生zookeeper的事件监听采用Watcher实现,不过Watcher监听是一次性的,如果需要继续监听该事件,必须重新注册。Curator中采用cache来封装了对事件的监听,包括监听节点,监听子节点等,下面分别做介绍
1.1 NodeCache
    NodeCache主要用来监听节点本身的变化,当节点的状态发生变更后,回调NodeCachaListener

  public interface NodeCacheListener{
    /**
     * Called when a change has occurred
     */
     public void     nodeChanged() throws Exception;
   }

    看一个例子:

 public class NodeCacheExample {

    private static final String PATH = "/nodeCache";

    private static CountDownLatch latch = new CountDownLatch(1);
   
    static NodeCache nodeCache;

    static CuratorFramework client;

    static {
       
        client = CuratorFrameworkFactory.newClient(
                "host:2181", 5000, 5000, new ExponentialBackoffRetry(
                        1000, 3));
        client.start();
    }

    public static void initCache() throws Exception {
        client.create().forPath(PATH);
        client.setData().forPath(PATH, "节点的初始值".getBytes());
        nodeCache = new NodeCache(client, PATH);
        EnsurePath ensurePath = client.newNamespaceAwareEnsurePath(PATH);
        ensurePath.ensure(client.getZookeeperClient());
        //设置成true,那么nodeCache在第一次启动的时候就会到zookeeper上去获取节点的数据内容,并保存在cache中
        nodeCache.start(true);
        startCache(nodeCache);
    }

    private static void startCache(final NodeCache cache) throws Exception {
       
        ChildData data = cache.getCurrentData();
       
        System.out.println("第一次启动获取到的内容:"  + new String(data.getData()));
       
        cache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("NodeCache changed,data is "
                        + new String(cache.getCurrentData().getData()));
                latch.countDown();
            }
        });
        Thread.sleep(2000);
        if(client.checkExists().forPath(PATH) != null){
            System.out.println("node is exist,准备给节点设置新的内容");
            client.setData().forPath(PATH, "节点新内容".getBytes());
        }
    }

    public static void main(String[] args) throws Exception {
        initCache();
        latch.await();
    }
} 

   运行结果如下:

 第一次启动获取到的内容:节点的初始值
 node is exist,准备给节点设置新的内容
 NodeCache changed,data is 节点新内容

  1.2 PathChildrenCache
    PathChildrenCache主要用来监听子节点,它有几个构造函数,参数最多的是下面这个

  public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)
    {
        this(client, path, cacheData, dataIsCompressed, new CloseableExecutorService(executorService));
    }

    参数解释:
    client:一个Curator客户端实例
    path:监听路径
    cacheData:boolean类型,如果是true,那么curator客户端在请求服务端的时候,会将监听节点的内容保存起来。而zookeeper原生的watcher监听是不会返回节点的内容的,只会返回节点状态,路径等
    dataIsCompressed:表示是否对数据进行压缩
    executorService:说明可以使用线程池来处理事件通知,当子节点数据发生变化时,会回调

  public interface PathChildrenCacheListener
{
    /**
     * Called when a change has occurred
     *
     * @param client the client
     * @param event describes the change
     * @throws Exception errors
     */
    public void     childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
}

    并且不会对二级节点进行监听,来看一个例子

import java.util.concurrent.CountDownLatch;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 *
 * Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态,
 *  会包含最新的子节点, 子节点的数据和状态
 *
 * @author tanjie
 *
 */
public class PathCacheExample {

    private static final String PATH = "/cache";

    static PathChildrenCache cache;

    static CuratorFramework client;
   
    static CountDownLatch latch = new CountDownLatch(1);
   
    static CountDownLatch coutCountDownLatch = new CountDownLatch(5);

    static {
        client = CuratorFrameworkFactory.newClient(
                "host:2181", 5000, 5000, new ExponentialBackoffRetry(
                        1000, 3));
        client.start();
    }


    private static void startCache()
            throws Exception {
        cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        // 给当前节点创建子节点
        for (int i = 1; i <= 5; i++) {
            String newPath = PATH + "/child_" + i;
            String childNodeName = "child_" + i;
            if (client.checkExists().forPath(PATH) != null) {
                client.create().creatingParentsIfNeeded().forPath(newPath, childNodeName.getBytes());
            }
            coutCountDownLatch.countDown();
        }
        coutCountDownLatch.await();
        addlistener(cache);
        for (final ChildData childData : cache.getCurrentData()) {
            System.out.println("pathChildCache:" + "路径:" + childData.getPath()
                    + ",内容:" + new String(childData.getData()));
        }
        System.out.println("对父节点进行设置新值不会有通知事件发生........start");
        client.setData().forPath(PATH,"哈哈".getBytes());
        System.out.println("对父节点设置值 end..........");
        //改变子节点的数据
        System.out.println("准备删除子节点........start");
        client.delete().forPath( PATH + "/child_1");
        System.out.println("删除子节点........end,会有节点事件通知返回");
        Thread.sleep(2000);
        //改变二级节点的内容
        for(int j=1;j<=2;j++){
            String newPath = PATH + "/child_2/" + j;
            String childNodeName = "child_2" + j;
            if (client.checkExists().forPath(PATH) != null) {
                client.create().forPath(newPath, childNodeName.getBytes());
            }
        }
        addlistener(cache);
        System.out.println("准备删除二级节点......start");
        client.delete().forPath( PATH + "/child_2/" + 1);
        System.out.println("删除二级节点......end..不会有事件监听返回");
        latch.countDown();
       
    }

    private static void addlistener(final PathChildrenCache cache) {
        final PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorClient,
                    PathChildrenCacheEvent event) throws Exception {
                System.out.println("childNode path:"
                        + event.getData().getPath() + ",childNode data : "
                        + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(pathChildrenCacheListener);
    }

    public static void main(String[] args) throws Exception {
        startCache();
        latch.await();
    }
}

   运行结果如下:

pathChildCache:路径:/cache/child_1,内容:child_1
pathChildCache:路径:/cache/child_2,内容:child_2
pathChildCache:路径:/cache/child_3,内容:child_3
pathChildCache:路径:/cache/child_4,内容:child_4
pathChildCache:路径:/cache/child_5,内容:child_5
对父节点进行设置新值不会有通知事件发生........start
对父节点设置值 end..........
准备删除子节点........start
childNode path:/cache/child_1,childNode data : child_1
删除子节点........end,会有节点事件通知返回
准备删除二级节点......start
删除二级节点......end..不会有事件监听返回

1.3 TreeCache
    TreeCache即能监听节点也能监听节点的子节点变更

 public class TreeCacheExample {
   
    private static final String PATH = "/treeNodeCache";

    static TreeCache  treeCache;

    static CuratorFramework client;

    static {
        client = CuratorFrameworkFactory.newClient(
                "host:2181", 5000, 5000, new ExponentialBackoffRetry(
                        1000, 3));
        client.start();
    }
   
    public static void  initCache() throws Exception {
        treeCache = new TreeCache(client, PATH);
        client.create().forPath(PATH);
        treeCache.start();
        treeCache.getListenable().addListener(new TreeCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event)
                    throws Exception {
                if(null != event.getType()){
                    System.out.println("节点状态发送了改变.内容为:" + new String(event.getData().getData()));
                }
            }
        });
       
        //添加子节点
        for(int i=1;i<=5;i++){
            String newPath = PATH + "/child_" + i;
            client.create().forPath(newPath,("child_"+i).getBytes());
        }
        Thread.sleep(3000);
        client.setData().forPath(PATH+ "/child_2", "我是给第二个子节点赋值".getBytes());
        if(null != client.checkExists().forPath(PATH)){
            client.setData().forPath(PATH,"我是来对父节点赋值的".getBytes());
        }
        Thread.sleep(1000);
        client.setData().forPath(PATH,"我是来对父节点第二次赋值的".getBytes());
    }
   
    public static void main(String[] args) throws Exception {
        initCache();
        Thread.sleep(Integer.MAX_VALUE);
    }

}

   运行结果如下:

节点状态发送了改变.内容为:child_1
节点状态发送了改变.内容为:child_2
节点状态发送了改变.内容为:child_3
节点状态发送了改变.内容为:child_4
节点状态发送了改变.内容为:child_5
节点状态发送了改变.内容为:我是给第二个子节点赋值
节点状态发送了改变.内容为:我是来对父节点赋值的
节点状态发送了改变.内容为:我是来对父节点第二次赋值的

    可以看到,只有监听的节点状态发送了变化,监听事件就会执行回调,并且也能监听器父节点且不用反复注册监听

 

1
1
分享到:
评论

相关推荐

    Zookeeper实战

    Watcher是事件监听器,可以注册在节点上,当节点发生变化时,Watcher将被触发。 在分布式环境中,Zookeeper被广泛用于实现诸如配置管理、命名服务、分布式锁、集群管理等功能。例如,它可以协调Hadoop、Kafka等...

    2. Zookeeper经典应用场景实战(一)

    ZooKeeper 的监听机制是基于事件驱动的,例如当节点发生变化时,Watcher 会触发相应的事件。 在实际开发中,需要根据实际情况选择合适的 Java 客户端 API,例如 ZooKeeper 官方的 Java 客户端 API 或者第三方的 ...

    4.zookeeper运维实战视频教程资料-详细课件笔记总结

    7.zookeeper的watch监听机制实战演示 8.zookeeper中三种角色、Zab协议(崩溃选举和数据同步) 9.zookeeper中锁的种类、读锁、写锁和羊群效应 10.CAP的三个概念和CAP理论 11.zookeeper集群节点为什么是奇数台 12....

    大数据之Zookeeper视频

    3. **Watcher**:Zookeeper提供了一种称为Watcher的机制,它是一种观察者模式的实现,用于监听Znode上的事件变化。当触发了Watcher后,Zookeeper会向客户端发送通知。 4. **Session**:客户端与Zookeeper服务器之间...

    zookeeper实战:ConfigServer代码样例

    《Zookeeper实战:ConfigServer代码样例解析》 在分布式系统中,Zookeeper作为一个高可用的分布式协调服务,被广泛应用于配置管理、命名服务、分布式锁等场景。本篇文章将聚焦于Zookeeper的一个典型应用——Config...

    深入探索Zookeeper:实战应用与高效策略

    **深入探索Zookeeper:实战应用与高效策略** Zookeeper是一个分布式的、开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终将简单易用的...

    ZooKeeper-分布式过程协同技术详解 和从Paxos到Zookeeper

    Watcher是一种事件监听机制,允许客户端注册对特定Znode的变更事件感兴趣,一旦发生改变,ZooKeeper会主动通知客户端。 在分布式一致性方面,ZooKeeper采用了类似Paxos的Zab协议(ZooKeeper Atomic Broadcast)。...

    zookeeper的极客学院的视频资料

    3. **API使用**:视频会演示如何使用Java API与Zookeeper进行交互,如创建、删除、读取ZNode,以及watcher事件监听等,这对于实际应用开发至关重要。 4. **Zookeeper的应用场景**:讲解Zookeeper在分布式环境中的...

    第6章 Zookeeper 2 6.1. Zookeeper入门 2 6.1.1. 概述 2 6.1.2. 特点 3 6.1.3

    6.3. Zookeeper实战(开发重点) 10 6.3.1. 分布式安装部署 10 6.3.2. 客户端命令行操作 11 6.3.3. API应用 18 6.3.4. 箭头服务器节点动态上下线案例(扩展) 25 6.4. Zookeeper内部原理 28 6.4.1. 节点类型 29 ...

    zookeeper实战:SingleWorker代码样例

    在这个"zookeeper实战:SingleWorker代码样例"中,我们将深入探讨如何使用ZooKeeper实现一个简单的SingleWorker模式,以及涉及的相关源码和工具。 首先,SingleWorker模式是ZooKeeper中常见的应用场景之一,常用于...

    zookeeper一站式学习资料

    Watcher是Zookeeper的事件监听机制,允许客户端注册对特定Znode的变更或事件监听,一旦发生变化,Zookeeper会立即通知客户端。 在实际应用中,Zookeeper广泛应用于Hadoop、HBase、Kafka等大数据处理框架,用于实现...

    03-05-05-分布式协调服务zookeeper应用实战1

    **Watcher**是Zookeeper提供的一种分布式事件通知机制,客户端可以注册Watcher监听特定事件,如节点创建、删除或数据变更。一旦事件触发,服务端会发送一次性通知,客户端需重新注册Watcher以持续监听。 对于Java...

    2021尚硅谷技术之Zookeeper笔记

    Watches是ZooKeeper的重要特性,允许客户端订阅ZNode的变更事件。当ZNode的状态改变时,所有注册的Watches都会被触发,发送一个单次的通知到客户端。 **5. 集群搭建** ZooKeeper集群通常由三个或五个节点组成,以...

    zookeeper分布式锁实例源码

    ZooKeeper 提供的分布式锁通常基于其原子的创建、删除节点和监视事件机制来实现。 1. **不可重入锁(Non-Recursive Lock)**: 不可重入锁不允许同一个线程再次获取已持有的锁。在ZooKeeper中,实现不可重入锁通常...

    zookeeper-3.4.5.tar.zip

    1. 会话与Watcher:客户端与Zookeeper建立会话,可以设置Watcher监听特定的数据节点,当节点发生变化时,Zookeeper会触发Watcher事件,通知客户端。 2. 数据模型:Zookeeper的数据模型是一个树形结构,每个节点称为...

    zookeeper客户端原理代码操作应用场景

    "05第五次课程"可能会讨论到Zookeeper在分布式环境中的应用案例,"06第六次课程"可能进一步深化了实际场景的应用,而"michael-vip"可能是一份关于Zookeeper进阶或实战的资料。 总的来说,Zookeeper客户端是实现...

    ZooKeeper分布式过程协同技术详解_new.pdf

    总的来说,《ZooKeeper分布式过程协同技术详解》是一本全面覆盖ZooKeeper理论和实践的著作,它不仅适合初学者了解ZooKeeper的基本原理,也对有一定经验的开发者提供了深入的技术细节和实战指导。通过阅读这本书,...

    Zookeeper.docx

    5. **clientPort**:客户端连接端口,Zookeeper监听客户端连接的端口,默认为2181。 【Zookeeper实战——分布式安装部署】 Zookeeper集群通常由一个leader和多个follower组成。Leader负责数据的更新和同步,...

    《Paxos到Zookeeper——分布式一致性原理与实践》高清完整版

    它的设计目标是简化分布式系统的开发,提供了一套简单易用的API,使得开发者可以轻松实现分布式锁、队列、事件监听等功能。 在《Paxos到Zookeeper》这本书中,作者会详细解析Paxos算法的原理,解释其背后的逻辑和...

    Zookeeper讲义.rar

    四、Zookeeper实战 在实际开发中,Zookeeper常用于Hadoop、Kafka、HBase等大数据组件的协调。例如,在Kafka中,Zookeeper负责管理主题分区分配、消费者组的协调以及Broker的选举。 五、总结 Zookeeper凭借其强大...

Global site tag (gtag.js) - Google Analytics