原生的zookeeper API在获得节点数据的时候,可以通过getData方法获取,并且可以传入一个watcher对象,表示对当前节点进行监听,当节点的数据发送了改变,客户端会收到server端的通知事件告知该节点状态发生了改变。下面我们根据getData()方法来看看zk是如何进行事件监听处理的。
1.1 Packet
1.2 SendThead
1.3 EventThread
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { ..................... // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); .................... return response.getData(); }
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; }
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { .............. synchronized (outgoingQueue) { packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
(4) SendThread.readResponse()方法接收服务器端返回
class SendThread extends Thread { ............ void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); .............. try { if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr( KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet ); } ................ } finally { finishPacket(packet); } }
其调用ClientCnxnSocket的sendPacket方法将封装好的packet对象传入, 源码如下:
@Override void sendPacket(Packet p) throws IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } p.createBB(); ByteBuffer pbb = p.bb; sock.write(pbb); } public void createBB() { try { ............ if (requestHeader != null) { requestHeader.serialize(boa, "header"); } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); // append "am-I-allowed-to-be-readonly" flag boa.writeBool(readOnly, "readOnly"); } else if (request != null) { request.serialize(boa, "request"); } ........ } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); } }
源码里的finishPacket()方法会将从刚刚我们封装的packet中取出来,通过调用 p.watchRegistration.register()会将我们暂时保存的Watcher对象
private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } ............ }
class DataWatchRegistration extends WatchRegistration { public DataWatchRegistration(Watcher watcher, String clientPath) { super(watcher, clientPath); } @Override protected Map<String, Set<Watcher>> getWatches(int rc) { return watchManager.dataWatches; } }
private static class ZKWatchManager implements ClientWatchManager { private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); ............. }
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }
@Override synchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); sendResponse(h, e, "notification"); }
class SendThread extends Thread { private long lastPingSentNs; private final ClientCnxnSocket clientCnxnSocket; private Random r = new Random(System.nanoTime()); private boolean isFirstConnect = true; //用来接收服务端的事件 void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; } if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } //反序列化 WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } eventThread.queueEvent( we ); return; } } }
public void queueEvent(WatchedEvent event) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); // materialize the watchers based on the event WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); } }
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { Set<Watcher> result = new HashSet<Watcher>(); switch (type) { case None: //...................... synchronized(dataWatches) { for(Set<Watcher> ws: dataWatches.values()) { result.addAll(ws); } if (clear) { dataWatches.clear(); } } return result; case NodeDataChanged: //.............. case NodeCreated: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } break; } return result; } }
@Override public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down"); }
《Zookeeper 3.4.5:原始解析与源码剖析》 Zookeeper,作为一款分布式协调服务,广泛应用于分布式系统中,为集群提供统一命名服务、配置管理、分布式锁和组服务等核心功能。本解析主要针对Zookeeper 3.4.5版本,...
五、ZooKeeper监控与故障恢复 ZooKeeper提供了监控接口,允许用户监控服务器状态、连接状态等。当节点出现故障时,可以通过监控信息快速定位问题并进行恢复。 六、ZooKeeper的应用场景 1. 分布式锁:利用临时节点...
### Dubbo源码解析2 #### 一、源码阅读路径 在开始深入解析Dubbo源码之前,首先需要明确的是,Dubbo虽然代码量不算庞大,但是它涉及的技术领域非常广泛,对于初学者来说,可能需要具备一定的前置知识才能更好地...
《Dubbo与Zookeeper深度解析:Java源码实践》 在现代分布式系统中,服务治理是不可或缺的一部分。Dubbo和Zookeeper作为两个重要的组件,分别承担了服务治理和注册中心的角色,它们共同构建了高效、稳定的微服务体系...
通过阅读源码,我们可以了解到如何与Zookeeper服务器通信,如何解析和展示数据,以及如何实现图形化操作的逻辑。这对于理解和开发类似的分布式协调服务管理工具具有很大的价值。 总结来说,Zookeeper管理工具是管理...
- **集群管理**:Zookeeper 可以监控集群中节点的状态,当节点发生变化时,可以通过事件通知其他节点,实现集群的动态扩展和故障恢复。 ### Zookeeper 集群模式 在实际生产环境中,Zookeeper 通常运行在真正的集群...
本篇文章将深入探讨如何在Zookeeper 3.4.14版本中添加IP黑白名单功能,并详细解析源码改造过程。 1. **IP限制需求分析** 在默认情况下,Zookeeper允许任何IP地址进行连接。为了提高系统的安全性,我们需要实现对...
《深入解析Zookeeper源码》 Zookeeper是一个分布式协调服务,广泛应用于分布式系统中,如Hadoop、HBase、Kafka等。它提供了一种可靠的方式来管理分布式环境中的配置信息、命名服务、集群状态同步以及分布式锁等功能...
《Zookeeper 3.4.6:分布式协调服务的核心解析》 Zookeeper,作为一个高度可靠的分布式协调服务,是Apache Hadoop项目的重要组成部分。在版本3.4.6中,它继续为分布式应用程序提供了稳定和高效的服务。这个版本的...
### HBase源码解析与开发实战 #### 一、HBase简介 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的 Google 论文 “Bigtable:一个结构化数据的分布式存储系统”。就像 Bigtable 利用了...
《Zookeeper 3.4.9:分布式协调服务的核心解析》 Zookeeper,这个在分布式系统领域中举足轻重的名字,是Apache软件基金会的一个关键项目,它为分布式应用程序提供了高效且可靠的协调服务。Zookeeper 3.4.9版本是其...
描述中提到,"快速查看zookeeper内容" 指的是prettyZoo的主要功能之一,即能迅速地浏览Zookeeper服务器上的节点信息,包括节点的数据、子节点等,这对于监控和调试Zookeeper服务非常有用。"git 地址 ...
《Zookeeper-3.4.7:分布式协调服务的核心解析》 Zookeeper是Apache软件基金会的一个开源项目,作为分布式协调服务,它在分布式系统中扮演着至关重要的角色。Zookeeper-3.4.7是该项目的一个稳定版本,包含了丰富的...
《Zookeeper 3.5.3-beta:分布式协调服务的核心解析》 Zookeeper,作为Apache基金会的一个顶级项目,是一款高效、可靠的分布式协调服务框架。3.5.3-beta版本是Zookeeper的重要迭代,旨在提供更稳定的性能和更多的新...
《Dubbo与Zookeeper在分布式环境中的应用解析》 Dubbo和Zookeeper是两个在分布式系统中广泛应用的技术,它们在构建高效、可扩展的服务架构中起着关键作用。本篇文章将深入探讨这两个技术以及如何结合使用,以实现一...
《Zookeeper 3.4.14:Dubbo 集成与配置解析》 Apache ZooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终将...