`

zookeeper学习(七)

 
阅读更多

       最近看到一个利用ZK的Watch机制实现Barrier的例子,因为Watch是一个很典型的类似观察者模式的机制,程序中很巧妙的使用一个Integer做为互斥量(mutex)。触发watch的process的时候,notifyAll。开始看的时候有一点晕,之后想了想恍然大悟,既然所有的类都继承自Object类,那么当然所有的类都会继承Object的wait,notify和notifyAll方法了。(基本类型如int,float等是不继承自Object的;但是数组是继承自Object的)。       

       实现Barrier的例子中用执行完enter或者leave之后,调用Integer型变量mutex的wait方法阻塞本线程,等待被唤醒。在watch的process中调用notifyAll,即一旦发布者有所变化会触发process方法。

         先把程序贴出来:

    public class Barrier implements Watcher {  
      
        private static final String addr = "10.20.156.49:2181";  
        private ZooKeeper           zk   = null;  
        private Integer             mutex;  
        private int                 size = 0;  
        private String              root;  
      
        public Barrier(String root, int size){  
            this.root = root;  
            this.size = size;  
      
            try {  
                zk = new ZooKeeper(addr, 10 * 1000, this);  
                mutex = new Integer(-1);  
                Stat s = zk.exists(root, false);  
                if (s == null) {  
                    zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);  
                }  
      
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
      
        }  
      
        public synchronized void process(WatchedEvent event) {  
            synchronized (mutex) {  
                mutex.notify();  
            }  
        }  
      
        public boolean enter(String name) throws Exception {  
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);  
            while (true) {  
                synchronized (mutex) {  
                    List<String> list = zk.getChildren(root, true);  
                    if (list.size() < size) {  
                        mutex.wait();  
                    } else {  
                        return true;  
                    }  
                }  
            }  
        }  
      
        public boolean leave(String name) throws KeeperException, InterruptedException {  
            zk.delete(root + "/" + name, 0);  
            while (true) {  
                synchronized (mutex) {  
                    List<String> list = zk.getChildren(root, true);  
                    if (list.size() > 0) {  
                        mutex.wait();  
                    } else {  
                        return true;  
                    }  
                }  
            }  
        }  
      
    }   

测试代码;

    public class BarrierTest {  
      
        public static void main(String args[]) throws Exception {  
            for (int i = 0; i < 3; i++) {  
                Process p = new Process("Thread-" + i, new Barrier("/test/barrier", 3));  
                p.start();  
            }  
        }  
    }  
      
    class Process extends Thread {  
      
        private String  name;  
        private Barrier barrier;  
      
        public Process(String name, Barrier barrier){  
            this.name = name;  
            this.barrier = barrier;  
        }  
      
        @Override  
        public void run() {  
            try {  
                barrier.enter(name);  
                System.out.println(name + " enter");  
                Thread.sleep(1000 + new Random().nextInt(2000));  
                barrier.leave(name);  
                System.out.println(name + " leave");  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
    }  

 

*****************************以上是实例程序******************************

接下来,我自己试了一下Integer的wait方法,却发生了运行时错误:

我的程序是这样的:

       public static void main(String args[]){
		Integer mutex=new Integer(1);
		try {
				mutex.wait(5000);
			System.out.println("OK");
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
//运行结果:
Exception in thread "main" java.lang.IllegalMonitorStateException
	at java.lang.Object.wait(Native Method)

 于是我查了一下原因;

   违法的监控状态异常。当某个线程试图等待一个自己并不拥有的对象(O)的监控器或者通知其他线程等待该对象(O)的监控器时,抛出该异常。    原因是:在对某个对象上调用wait()方法进行线程等待(让其他竞争执行该代码的线程上锁)时,没有对该对象执行同步操作。

所以,给mutex.wait同步就行了:

public static void main(String args[]){
		Integer mutex=new Integer(1);
		try {
			synchronized (mutex) {
				mutex.wait(5000);
			}
			System.out.println("OK");
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
//运行结果(5秒钟之后输出);
OK

 

分享到:
评论

相关推荐

    zookeeper 系列整理总结

    ZooKeeper的命令行工具(系列之七)是与ZooKeeper交互的重要手段,允许用户查看、创建、更新和删除Znodes,以及进行其他管理操作。 在系列之八和九中,介绍了ZooKeeper的基本操作和API使用,开发者可以使用这些API...

    zookeeper 入门学习(PPT)

    #### 七、Zookeeper命令行工具示例 使用Zookeeper命令行工具可以方便地进行一些基本的操作,如查看状态、创建节点等。启动命令行工具的方法如下: ```sh bin/zkCli.sh -server 127.0.0.1:2181 ``` 以上就是关于...

    zookeeper, 分布式系统服务ZooKeeper的学习历程.zip

    七、ZooKeeper与其他分布式框架的集成 ZooKeeper常被用于Hadoop、HBase、Kafka等大数据处理框架中,作为它们的协调中心,确保服务的稳定性和数据一致性。 总结: ZooKeeper作为分布式系统的核心组件,其强大的协调...

    zookeeper指南

    在开始学习之前,读者需要具备良好的Java基础,因为Zookeeper服务器运行于Java虚拟机(JVM)之上,同时还需要对分布式处理和Linux环境有一定的了解。 #### 五、版权与免责声明 本教程版权归TutorialsPoint(I)Pvt.Ltd...

    zookeeper-3.4.8源码包

    《深入剖析Zookeeper 3.4.8源码》 Zookeeper作为一个分布式协调服务,是Apache Hadoop项目的重要组成部分,广泛应用于...通过学习源码,开发者可以更好地利用Zookeeper解决实际问题,并为未来的系统设计提供灵感。

    Zookeeper入门到精通 教学视频及文档

    《Zookeeper入门到精通》教学视频及文档涵盖了分布式协调...通过学习《Zookeeper入门到精通》的教学视频和文档,你将全面掌握Zookeeper的基础知识,并能灵活运用到实际的分布式系统开发和运维中,提升你的IT技能水平。

    hadoop_zookeeper-3.4.10.rar linux用

    《Hadoop ZooKeeper 3.4.10在Linux环境中的应用详解》 Apache ZooKeeper是Hadoop生态系统中一个至关重要的组件,它是一款...通过深入学习和实践,我们可以充分利用ZooKeeper的能力,提升分布式应用的稳定性和可靠性。

    第二课:zookeeper客户端使用与集群特性.docx

    ZooKeeper 提供了 Java 与 C 两种语言的客户端,我们将学习 Java 客户端。要使用 ZooKeeper 客户端,需要引入最新的 Maven 依赖项: &lt;groupId&gt;org.apache.zookeeper &lt;artifactId&gt;zookeeper &lt;version&gt;3.5.5 二...

    ZooKeeper-分布式过程协同技术详解 PDF 高清完整版

    第二部分“ZooKeeper核心机制”是书中的核心,包括第三章至第七章。这一部分详细讲解了Zookeeper的数据模型(如ZNode、路径、数据版本等)、会话与 watches、原子操作、领导者选举以及数据同步等关键机制。读者将...

    ZooKeeper:Zookeeper学习与总结

    **七、ZooKeeper与其他分布式工具的配合** ZooKeeper常与其他分布式系统结合使用,如Hadoop、Kafka、HBase等,它们依赖ZooKeeper进行元数据管理、集群协调、故障恢复等任务。 **八、ZooKeeper的监控和调优** ...

    zookeeper.docx教程

    ### Zookeeper概述与核心知识点详解 #### 一、Zookeeper简介 **Zookeeper**是一款开源的分布式协调服务框架,主要用于解决分布式系统中的数据...通过对上述知识点的学习,相信你已经对Zookeeper有了更加深入的理解。

    zookeeper入门到精通架构高级课程

    ### ZooKeeper基础知识与核心概念 #### 一、ZooKeeper简介 ZooKeeper是一个高度可用且分布式的协调服务,主要用于解决分布式环境...希望读者能够通过学习掌握ZooKeeper的核心技术,并在未来的工作中发挥其巨大的价值。

    Zookeeper安装手册

    接下来,你可以深入学习Zookeeper的API和命令,以及如何在分布式应用中有效地使用它。 在开发过程中,Zookeeper可以帮助解决很多分布式系统中的问题,如分布式锁、队列管理、状态同步等。理解Zookeeper的工作原理和...

    java源码剖析-Zookeeper-SourceAnalysis:Zookeeper源码剖析

    《深入解析Zookeeper源码:Java视角下的分布式协调机制》 在分布式系统中,Zookeeper以其高可用性、强一致性...学习Zookeeper源码,不仅可以提升我们的技术深度,也有助于我们在实际开发中更好地应用和定制Zookeeper。

    Hadoop学习必看路线图2.pdf

    - **Zookeeper客户端工具**:学习如何使用Zookeeper自带的客户端工具进行集群管理。 - **Java API操作**:编写Java代码来创建、修改和监控Zookeeper中的节点。 - **分布式协作服务实践**:通过实际项目加深对...

    strom的学习记录,包括安装过程、整体介绍、监控页面等

    ### Apache Storm 学习记录与安装指南 #### 一、Apache Storm 概述 Apache Storm 是一款免费且开源的分布式实时计算系统。它能够保证每个消息都能够被处理,并且能够非常方便地做到水平扩展。Storm 的设计使得它...

    八斗学习材料.docx,hadoop的 学习体系总结

    #### 七、Kafka环境安装 Apache Kafka是一种高吞吐量的分布式发布订阅消息系统。它最初由LinkedIn公司开发,后来成为Apache项目的一部分。 - **Kafka架构**:理解Kafka的基本架构,包括生产者、消费者、Broker等...

    基于SpringBoot的学习社区,实现登录注册、发帖评论、点赞私聊、热帖排行等功能+源代码+文档说明

    配置mysql、七牛云等信息。 打开zookeeper、kafka、elasticsearch、redis。 F:\JavaTools\redis-2.8.9&gt;redis-server.exe redis.windows.conf F:\JavaTools\kafka_2.12-2.3.0&gt;bin\windows\zookeeper-server-...

Global site tag (gtag.js) - Google Analytics