最近看到一个利用ZK的Watch机制实现Barrier的例子,因为Watch是一个很典型的类似观察者模式的机制,程序中很巧妙的使用一个Integer做为互斥量(mutex)。触发watch的process的时候,notifyAll。开始看的时候有一点晕,之后想了想恍然大悟,既然所有的类都继承自Object类,那么当然所有的类都会继承Object的wait,notify和notifyAll方法了。(基本类型如int,float等是不继承自Object的;但是数组是继承自Object的)。
实现Barrier的例子中用执行完enter或者leave之后,调用Integer型变量mutex的wait方法阻塞本线程,等待被唤醒。在watch的process中调用notifyAll,即一旦发布者有所变化会触发process方法。
先把程序贴出来:
public class Barrier implements Watcher { private static final String addr = "10.20.156.49:2181"; private ZooKeeper zk = null; private Integer mutex; private int size = 0; private String root; public Barrier(String root, int size){ this.root = root; this.size = size; try { zk = new ZooKeeper(addr, 10 * 1000, this); mutex = new Integer(-1); Stat s = zk.exists(root, false); if (s == null) { zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { e.printStackTrace(); } } public synchronized void process(WatchedEvent event) { synchronized (mutex) { mutex.notify(); } } public boolean enter(String name) throws Exception { zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() < size) { mutex.wait(); } else { return true; } } } } public boolean leave(String name) throws KeeperException, InterruptedException { zk.delete(root + "/" + name, 0); while (true) { synchronized (mutex) { List<String> list = zk.getChildren(root, true); if (list.size() > 0) { mutex.wait(); } else { return true; } } } } }
测试代码;
public class BarrierTest { public static void main(String args[]) throws Exception { for (int i = 0; i < 3; i++) { Process p = new Process("Thread-" + i, new Barrier("/test/barrier", 3)); p.start(); } } } class Process extends Thread { private String name; private Barrier barrier; public Process(String name, Barrier barrier){ this.name = name; this.barrier = barrier; } @Override public void run() { try { barrier.enter(name); System.out.println(name + " enter"); Thread.sleep(1000 + new Random().nextInt(2000)); barrier.leave(name); System.out.println(name + " leave"); } catch (Exception e) { e.printStackTrace(); } } }
*****************************以上是实例程序******************************
接下来,我自己试了一下Integer的wait方法,却发生了运行时错误:
我的程序是这样的:
public static void main(String args[]){ Integer mutex=new Integer(1); try { mutex.wait(5000); System.out.println("OK"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //运行结果: Exception in thread "main" java.lang.IllegalMonitorStateException at java.lang.Object.wait(Native Method)
于是我查了一下原因;
违法的监控状态异常。当某个线程试图等待一个自己并不拥有的对象(O)的监控器或者通知其他线程等待该对象(O)的监控器时,抛出该异常。 原因是:在对某个对象上调用wait()方法进行线程等待(让其他竞争执行该代码的线程上锁)时,没有对该对象执行同步操作。
所以,给mutex.wait同步就行了:
public static void main(String args[]){ Integer mutex=new Integer(1); try { synchronized (mutex) { mutex.wait(5000); } System.out.println("OK"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } //运行结果(5秒钟之后输出); OK
相关推荐
ZooKeeper的命令行工具(系列之七)是与ZooKeeper交互的重要手段,允许用户查看、创建、更新和删除Znodes,以及进行其他管理操作。 在系列之八和九中,介绍了ZooKeeper的基本操作和API使用,开发者可以使用这些API...
#### 七、Zookeeper命令行工具示例 使用Zookeeper命令行工具可以方便地进行一些基本的操作,如查看状态、创建节点等。启动命令行工具的方法如下: ```sh bin/zkCli.sh -server 127.0.0.1:2181 ``` 以上就是关于...
七、ZooKeeper与其他分布式框架的集成 ZooKeeper常被用于Hadoop、HBase、Kafka等大数据处理框架中,作为它们的协调中心,确保服务的稳定性和数据一致性。 总结: ZooKeeper作为分布式系统的核心组件,其强大的协调...
在开始学习之前,读者需要具备良好的Java基础,因为Zookeeper服务器运行于Java虚拟机(JVM)之上,同时还需要对分布式处理和Linux环境有一定的了解。 #### 五、版权与免责声明 本教程版权归TutorialsPoint(I)Pvt.Ltd...
《深入剖析Zookeeper 3.4.8源码》 Zookeeper作为一个分布式协调服务,是Apache Hadoop项目的重要组成部分,广泛应用于...通过学习源码,开发者可以更好地利用Zookeeper解决实际问题,并为未来的系统设计提供灵感。
《Zookeeper入门到精通》教学视频及文档涵盖了分布式协调...通过学习《Zookeeper入门到精通》的教学视频和文档,你将全面掌握Zookeeper的基础知识,并能灵活运用到实际的分布式系统开发和运维中,提升你的IT技能水平。
《Hadoop ZooKeeper 3.4.10在Linux环境中的应用详解》 Apache ZooKeeper是Hadoop生态系统中一个至关重要的组件,它是一款...通过深入学习和实践,我们可以充分利用ZooKeeper的能力,提升分布式应用的稳定性和可靠性。
ZooKeeper 提供了 Java 与 C 两种语言的客户端,我们将学习 Java 客户端。要使用 ZooKeeper 客户端,需要引入最新的 Maven 依赖项: <groupId>org.apache.zookeeper <artifactId>zookeeper <version>3.5.5 二...
第二部分“ZooKeeper核心机制”是书中的核心,包括第三章至第七章。这一部分详细讲解了Zookeeper的数据模型(如ZNode、路径、数据版本等)、会话与 watches、原子操作、领导者选举以及数据同步等关键机制。读者将...
**七、ZooKeeper与其他分布式工具的配合** ZooKeeper常与其他分布式系统结合使用,如Hadoop、Kafka、HBase等,它们依赖ZooKeeper进行元数据管理、集群协调、故障恢复等任务。 **八、ZooKeeper的监控和调优** ...
### Zookeeper概述与核心知识点详解 #### 一、Zookeeper简介 **Zookeeper**是一款开源的分布式协调服务框架,主要用于解决分布式系统中的数据...通过对上述知识点的学习,相信你已经对Zookeeper有了更加深入的理解。
### ZooKeeper基础知识与核心概念 #### 一、ZooKeeper简介 ZooKeeper是一个高度可用且分布式的协调服务,主要用于解决分布式环境...希望读者能够通过学习掌握ZooKeeper的核心技术,并在未来的工作中发挥其巨大的价值。
接下来,你可以深入学习Zookeeper的API和命令,以及如何在分布式应用中有效地使用它。 在开发过程中,Zookeeper可以帮助解决很多分布式系统中的问题,如分布式锁、队列管理、状态同步等。理解Zookeeper的工作原理和...
《深入解析Zookeeper源码:Java视角下的分布式协调机制》 在分布式系统中,Zookeeper以其高可用性、强一致性...学习Zookeeper源码,不仅可以提升我们的技术深度,也有助于我们在实际开发中更好地应用和定制Zookeeper。
- **Zookeeper客户端工具**:学习如何使用Zookeeper自带的客户端工具进行集群管理。 - **Java API操作**:编写Java代码来创建、修改和监控Zookeeper中的节点。 - **分布式协作服务实践**:通过实际项目加深对...
### Apache Storm 学习记录与安装指南 #### 一、Apache Storm 概述 Apache Storm 是一款免费且开源的分布式实时计算系统。它能够保证每个消息都能够被处理,并且能够非常方便地做到水平扩展。Storm 的设计使得它...
#### 七、Kafka环境安装 Apache Kafka是一种高吞吐量的分布式发布订阅消息系统。它最初由LinkedIn公司开发,后来成为Apache项目的一部分。 - **Kafka架构**:理解Kafka的基本架构,包括生产者、消费者、Broker等...
配置mysql、七牛云等信息。 打开zookeeper、kafka、elasticsearch、redis。 F:\JavaTools\redis-2.8.9>redis-server.exe redis.windows.conf F:\JavaTools\kafka_2.12-2.3.0>bin\windows\zookeeper-server-...