Watcher机制:目的是为ZK客户端操作提供一种类似于异步获得数据的操作.
1)在创建Zookeeper实例时,允许接收一个watcher参数,此参数将会赋值给watchMnanger.defaultWatcher,成为当前客户端的默认Watcher.需要注意此watcher和其他watcher不同,此wather主要是响应"与链接状态转换"有关的事件(比如,"建立链接","链接关闭"等,参见KeeperState).此默认watcher有zk client本地持有且生命周期伴随整个zookeeper实例,而不是"一次触发即消亡",当Client收到EventType,NONE类型的消息时,则会触发这个"默认wather"被执行..(参见:消息类型)
2)ZKWatchManager是客户端watcher管理器,负责跟踪多种watcher,watcher被分为dataWatches,existWatches,childWatches.每种类型的watcher将会被存在各自的Map中(key为path,value为Set<Watcher>,由此可见,在一个path上一种类型操作重复注册同一个watcher对象,事实上只会生效一次,不同的watcher对象是可以的).记住:这些watcher只是一些存根,由ZKWatchManager负责管理,并不会随请求发送给server,而只会发给server此请求类型是否注册了watch(源码:request.setWatch(boolean))
3)对于setData,exist,getChildren操作,都可以接收boolean类型的watcher标识和Watcher对象,boolean类型告知请求使用defaultWatcher对象注册事件.
4)在ZKDatabase中,包括一个DataTree,此dataTree持有对nodes以及相关的watcher的数据.在server端,WatcherManager是管理client注册的watcher,它只管理dataWatches和childWatches,没有对exist类型的watch.其数据结构为HashSet<path,Set<Watcher>>,和ZKWatchManager一致.(对于exist类型的请求,sever端将其watch加入dataWatches中,这个很好理解)
5)请求到达server之后,在FinalRequestProcessor中,将会处理各种请求,如果检测到request.getWatch()为true,即请求要求注册watch,那么将会把ServerCnxn和path关联起来,加入到WatherManager相应的列表中.
6)客户端的请求响应之后,由SendThread.readResponse()处理响应,如果响应code为成功且此请求中注册了watch,那么将会把此wath添加到响应的watch列表中。
7)ServerCnxn(抽象类)实现了Watcher接口,每个client在server端都对应一个ServerCnxn,此类(子类)是client请求/响应的处理器,不过所有的请求最终还是由一个线程负责通信。在ServerCnxn处理请求时出现异常或者client关闭,将会导致ServerCnxn调用close()方法,此方法中有个分支操作就是从DataTree中的两种watches列表中删除其关联的watch。
8)WatcherManager是server端watch管理器,此类包含2个不同的数据结构用来存储watch以方便查询,其中一个是watch2path为HashMap<Watcher, HashSet<String>>;另一个是watchTable为HashMap<String, HashSet<Watcher>>。其实这2个map保存的数据一样,只是查询的场景不同;这2个map将会被同时操作。
9)DataTree持有2个WatchManager对象,分别为dataWatches用于管理注册data操作的watch,childWatches用于管理注册child操作的watch。
10)WatchManager中还有一个很重要的操作,trigerWatch(String path,EvenType type),当server接受到例如createNode/deleteNode/setData等操作时,将会操作ZKDatabase来操作DataTree中的数据,当然dataTree的数据改动,将会触发相应patch(节点)上的watch(有可能一个操作会导致多种watch被触发),trigerWatch就是在这些时机下被调用。此操作中就是从watchManager中将相应path下注册的watch移除,并依次调用watch.process()。此process()做了一件事情,就是向client发送一个nofication消息,此消息中包含一个WatchEvent对象,此对象封装了事件的类型/path等。
11)客户端接受到nofication,并反序获取WatchEvent,然后和server端的watcherManager一样,ZKWatcherManager根据event类型,从相应的一个或多个watches列表中分别移除相应path的watch,并将这些“移除”的watches再次封装成一个WatcherSetEventPair,此对象持有event和watches集合。最后将此pair加入event队列。
12)client的EventThread将会不断轮询,从event队列中获取pair,并遍历pair中关联的watcher,依次调用watcher的process()方法。。当然此watcher的process方法是client用户自己实现的,因为watcher对象是client用户在实例化zookeeper时包括各种操作时交付给zookeeper的。所以用户应该根据自己的需要,在client受到event时做自己的处理。
F1.Watch生命周期
- Zookeeper提供了如下几种可以"注册watch"的操作:exist,getChildren,getData;而对于create,setData,delete是有可能触发"watcher"的操作.
- 客户端并不会把用户创建的watcher对象传递给Server,而是传递给server一个标记(boolean值)告知server此请求所涉及到的patch上是否有watcher..
- 对于client端请求是队列化的,即一个操作阻塞直到server端响应.(异步操作稍后介绍,它不阻塞)
- Server对Client的每个请求的响应体中,都会明确告知此次响应的类型(是正常操作响应还是"事件",操作对应的xid,结果类型,错误信息等等);如果响应体中没有错误信息且其他校验正常的话,我们认为此次请求被正确的执行了.
- 可能考虑到在Client与Server端传递wath对象所带来的程序复杂度,ZK采取了"分制"的方式,在Client端和Server端分别采取了不同的技巧来保存Watch列表;(参见上述)
- Server在接收Client请求时,会检测此次request体中是否持有watcher信息,如果有,则会导致Server端的watcher列表中新增一个此path关联的watch,只有exist/getChildren/getData会导致此操作.记住watcher信息将会被保存在ZKDatabase中(内存中,而非持久,ZKDatabase会持久Session/ACL/Data).
- 那么对于create/setData/delete请求,将会触发watcher列表的检测,比如create操作,创建一个path,在实际的数据存储结束后,将会在watch列表中遍历是否有此path所关联的watches,如果有,则依次触发.
- 触发watch其实很简单,对于server端而言,它持有了每个path所关联的watch列表,而且每个watch实例正是一个ServerCnxn对象(每个Client与Server的连接处理器,就是一个ServerCnxn对象),因为触发一个watcher将是便捷性将是显而易见的,直接将此watcher事件所对应的path/类型直接通过IO的方式发送出去;因此哪个Client注册了事件,将会被响应的ServerCnxn处理;集群中每个Server几乎会在同一时间向Client交付事件消息.可能因为网络的问题,不可确保他们能够在极短的时间差内都获得事件.
- "插队",是因为对于watcher事件,将不再和其他Client操作放在同一队列中,而是直接通过IO发送,因为ServerCnxn处理client响应是同步的(方法是同步方法),即事件信息将会在当前packet发送之后被立即发送.
- 事件一旦被server触发,将会在watcher列表中删除,因此watcher是一次性的(同一个path下的同一类型watcher).我们不能依赖wathcer来全权检测数据的变更,因为网络断开可能会导致事件通知的丢失;当事件被触发之后,server端将删除事件,即使client端再次注册watcher,那么"上一次事件"和"重新注册事件"这段事件内,仍然有可能数据已经变更.(备注:Watcher watch = watchTable.remove(path);watch.process();首先从watchtable中移除watch,然后再将watch信息发送给client端,即使在发送时网络异常,watch也不会再次put到watchTable中,事实上此时watch已经被消费.)
- Client接收到Event响应结果之后,将会把此消息体放在eventQueue中,等待EventThread去remove并触发.
- EventThread将event队列中的事件,逐个移除并处理,每移除一个event,都会导致Client本地维护的watcher列表删除相应的watcher(根据path和event类型决定),移除之后并获取到Client维护的watcher对象(此对象就是先前的操作中注册的watcher),watcher对象明确了回调方法,此时将会执行watcher.process(),那么调用者的业务方法将会在此刻被执行.[对于业务方法被执行,从整个周期中,我们可以认为是异步的].
- 对于节点的create操作,将会触发先前注册的"exist""getChildren"事件被触发;对于节点的delete操作,将会触发先前注册的"exsit""getChildren"事件被触发;对于节点的setData操作,将会触发先前注册的"getData"事件被触发......每个触发的事件都会包含事件的类型(比如:nodeCreate,nodeDelete等),对于用户自定义的watch.process()方法中可以根据事件类型做特定的处理.
- 对于Server端遇到session关闭,连接关闭等异常时,都会触发和此连接(ServerCnxn)关联的watch列表.
- 不过对于Client端却做了"弥补";"zookeeper.disableAutoWatchReset"这个系统参数的意义就是"是否关闭watch自动重置";如果此参数为false(即为开启"自动重置"),那么在Client端遇到连接异常(比如重连操作)时,都会将本地已有的watcher列表全部发送给Server(此操作称为"setWatches"),如果连接成功,那么新的server仍然会持有watcher列表,接下来事件将会被如期触发,就像网络异常根本就没发生一样..那么为什么ZK没有默认开启此参数呢?可能考虑到这是个双刃剑,Client有可能在网络异常时会做其他的操作(因为网络异常,最终也会触发一个本地的Event,Client可以在此Event中做自定义操作);也有可能在网络异常期间,Cluster中的数据已经被改变,极有可能这些事件中的部分事件已经被错过,即使接下来被触发,也将不能正确的反应目前的现状.如果你期望获得正确的结果,要么重新注册watcher,要么检测现有的数据是否已经改变.
Zookeeper客户端不仅提供了同步操作,还有异步操作,对于create/delete/exist/setData等,ZK分别提供了同步和异步方法,我们上述了解到的,都是同步操作,简单做如下列举:
public Stat exists(String path,Watcher watcher):同步方法,检测path是否存在,如果存在则返回节点的全信息,否则返回null.如果此后此path被创建或者删除,则触发watcher.
public void exist(String path,Watcher watcher,StatCallback cb,Object ctx):这个方法就是异步的,它需要指定一个StatCallback实例,以便在请求被处理之后,异步的执行callback操作.
我相信你一定知道如何将调用过程设计为"异步"[提示:异步即为操作队列话 + callback调用].
在Zookeeper中,同步方法样例:
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); //将请求加入队列,此队列将会被SendThread操作,并依此发送请求. Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); //直接阻塞当前请求 synchronized (packet) { while (!packet.finished) { packet.wait();//此处阻塞,直到响应,响应被接受后,会对此packet.notify()调用. } } return r;//返回处理的结果 }
那么对于异步操作,只调用queuePacket(....)将请求添加到队列,然后exist方法就直接返回了.不过在响应被成功接收后,会额外的检测此packet是否有callback,如果有,就立即执行:
private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } //此处就是检测callback if (p.cb == null) { synchronized (p) { p.finished = true; p.notifyAll(); } } else { p.finished = true; eventThread.queuePacket(p);//将异步调用packet添加到事件队列,依此被处理. } }
到目前为止,watcher机制我们已经走到"头"了...
相关推荐
3. **Watcher机制**:Watcher是Zookeeper的一种事件监听机制,当Zookeeper中的数据发生变化时,注册了对应事件的Watcher会接收到通知,实现事件的异步回调。 二、Zookeeper工作原理 1. **主从复制模式**:...
5. 事件通知:Zookeeper提供了watcher机制,可以实时监听数据变化,并触发回调,实现节点间的异步通信。 三、Zookeeper的工作原理 Zookeeper采用Paxos算法的简化版本ZAB(Zookeeper Atomic Broadcast)协议来保证...
Zookeeper中的`Watcher`是异步事件处理机制,可以监听节点的创建、删除、数据变更等事件。当事件触发时,会调用预先注册的回调方法。注意,Watchers是一次性的,节点状态改变后需重新注册。 4. **会话与会话事件**...
7.3.2 Watcher异步调用原理 Zookeeper的Watcher是异步执行的,这样可以减少客户端的等待时间。 7.3.3 Watcher异常处理 在Watcher机制中,如果出现异常,Zookeeper会提供相应的处理策略。 7.4 ACL机制 ACL机制...
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的...在实际项目中,还需要结合具体的业务需求,理解和掌握ZooKeeper的选举算法、会话机制、Watcher机制等核心概念,以充分利用ZooKeeper的能力。
ZooKeeper中的Watcher机制是一种数据变更通知机制,它允许客户端注册数据变更的监听器,当数据发生变更时,客户端会收到异步的通知。Watcher的实现分为客户端注册Watcher和服务端处理Watcher两个部分。服务端接收、...
此外,Zookeeper的Java API还提供了异步操作接口,如`AsyncCreate`、`AsyncDelete`等,允许开发者在不阻塞主线程的情况下处理Zookeeper操作。 在实际应用中,Zookeeper常被用于实现分布式锁、分布式队列、分布式...
本文将深入探讨ZooKeeper的.NET客户端源码实现,以`ClientTests`类为例,解析其工作原理和关键功能。 1. **连接管理**: ZooKeeper客户端首先需要与服务器建立连接。在.NET客户端中,这通常通过创建`ZooKeeper`...
1. **Zookeeper连接与会话**:在使用Java API连接Zookeeper时,需要创建`ZooKeeper`对象,通常提供服务器地址列表、会话超时时间和Watcher对象。会话是Zookeeper中客户端和服务端之间的一个状态保持连接,会话期间...
Zookeeper的Java原生API是开发人员在Java环境中与Zookeeper进行交互的主要工具。Zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行...
本文将深入探讨如何使用Java客户端API来对Zookeeper的Znode进行增删改查操作,并讨论同步与异步两种方式的使用。 首先,要使用Java客户端API访问Zookeeper,你需要引入相关的依赖。在Maven项目中,可以在pom.xml...
### Zookeeper概述与核心知识点详解 #### 一、Zookeeper简介 **Zookeeper**是一款开源的分布式协调服务框架,主要用于解决分布式系统中的数据一致性问题。它通过提供一系列的基础功能(如配置维护、命名服务、集群...
- 服务调用是否阻塞:默认同步阻塞调用,也支持异步调用。 - 注册中心的选择:推荐使用ZooKeeper。 - 序列化框架:默认使用Hessian,也支持其他如JSON、Java自带序列化等。 - 服务失效踢出原理:通过心跳检测和...
- **Watcher**:Watcher是Zookeeper的一个重要特性,它是一种异步的通知机制,允许客户端对特定节点进行监视,当节点状态发生变化时,Zookeeper会通知已注册的Watcher。 2. **Zookeeper的主要操作** - **读操作**...
什么是ZooKeeper ? ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置...
在Java客户端实现中,通常开发者会通过实现特定的监听器接口(如Watcher接口),并将其实例注册到Zookeeper客户端来响应特定的事件。 目前,常见的监听事件包括: 1. 监听节点数据的变化,使用getpath[watch]方法来...
ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode...
- 服务调用可能是阻塞的,但可以通过异步调用优化。 - ZooKeeper常作为Dubbo的服务注册中心,也可以选择Eureka等其他选项。 - 服务失效踢出基于心跳检测和注册中心监控。 5. **Elasticsearch面试题** - Elastic...
- **触发时机**:当数据发生变化时,ZooKeeper会异步发送Watcher事件到客户端。 #### 7、服务端处理Watcher实现 - **处理流程**:服务端接收到Watcher事件后,将其放入队列中,然后异步处理这些事件。 - **异步...
ZookeeperWatcher机制--数据变更通知** - **Watcher机制:**用于监控节点的变化。 - **事件触发:**当被监视的节点发生变化时,触发Watcher事件。 **6. 客户端注册Watcher实现** - **注册Watcher:**客户端可以...