背景:
10月29号,系统切换的时候,发生了master信息的变更,有个监听master信息的系统未收到zookeeper的通知。
分析:
原代码如下所示, 监听系统中用以下代码来监听path中数据的变更。当数据发生变更的时候,会回调process方法,然后处理相应的业务。(我们使用的是curator的jar包)
client = CuratorFrameworkFactory.newClient("*:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); try { client.getData().usingWatcher(this).inBackground().forPath("/id/master"); } catch (Exception e) { throw new RuntimeException(e); }
问题点:
以上代码再第一次发生变更的时候,可以接收到zookeeper的变更通知,但是在zookeeper server会将watcher删除,在以后的变更中将不会继续通知。
源码分析:
当调用forPath函数时将会调用GetDataBuilderImpl的forPath方法,这个方法中会异步处理操作
@Override public byte[] forPath(String path) throws Exception { path = client.fixForNamespace(path); byte[] responseData = null; if ( backgrounding.inBackground() ) { client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null); } else { responseData = pathInForeground(path); } return responseData; }
异步调用GetDataBuilderImpl的performBackgroundOperation方法。然后会调用appcahe原生zookeeper的jar包。
@Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { if ( watching.isWatched() ) { client.getZooKeeper().getData(operationAndData.getData(), true, callback, backgrounding.getContext()); } else { client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext()); } }
然后会调用zookeeper.java中的getData函数,发送请求到zookeeper server. setWatch为true
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) { final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
zookeeper server接收到客户端的getData请求时,会调用FinalRequestProcessor类中的以下代码处理
case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); Stat stat = new Stat(); byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; }
接下去会调用DataTree中的getData函数,这个函数中会将监听的节点加入到WatcherManager中,这样我们就添加了对某个路径的监听
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); synchronized (n) { n.copyStat(stat); if (watcher != null) { dataWatches.addWatch(path, watcher); } return n.data; } }
当另一个客户端调用setData方法时,Zookeeper客户端将会调用DataTree类中的setData方法。这个方法将会变更数据,然后调用triggerWatch方法,通知变更。
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
然后调用WatcherManager中的triggerWatch方法,方法中将会删除监听。然后调用watcher的process方法
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }
调用NIOServerCnxn类中的process方法,发送通知到客户端。
synchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); sendResponse(h, e, "notification"); }
客户端接收到notification时,会调用ClientCnxn类的下面代码处理。queueEvent方法中会调用watcher.materialize方法确定watcherEvent的处理类
if (replyHdr.getXid() == -1) { WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); WatchedEvent we = new WatchedEvent(event); eventThread.queueEvent( we ); return; } public void queueEvent(WatchedEvent event) { // materialize the watchers based on the event WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); }
调用zookeeper的materialize方法获取watcher对象,并且重watcher列表中删除。
@Override public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { Set<Watcher> result = new HashSet<Watcher>(); switch (type) { case None: result.add(defaultWatcher); boolean clear = ClientCnxn.getDisableAutoResetWatch() && state != Watcher.Event.KeeperState.SyncConnected; synchronized(dataWatches) { for(Set<Watcher> ws: dataWatches.values()) { result.addAll(ws); } if (clear) { dataWatches.clear(); } } synchronized(existWatches) { for(Set<Watcher> ws: existWatches.values()) { result.addAll(ws); } if (clear) { existWatches.clear(); } } synchronized(childWatches) { for(Set<Watcher> ws: childWatches.values()) { result.addAll(ws); } if (clear) { childWatches.clear(); } } return result; }
总结:
从上面源码,我们可以看出,zookeeper的每次只能监听一次变更。
解决:
curator为我们封装了重复监听的类NodeCache.java. 它是怎么实现的呢?
NodeCache nc = new NodeCache(zkclient.getClient(), path); nc.start(true);
start方法会获取初始值,然后调用reset方法
public void start(boolean buildInitial) throws Exception { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); ensurePath.ensure(client.getZookeeperClient()); client.getConnectionStateListenable().addListener(connectionStateListener); if ( buildInitial ) { internalRebuild(); } reset(); }
reset方法会调用zookeeper的forpath方法,并用包装的watcher类
private void reset() throws Exception { if ( (state.get() == State.STARTED) && isConnected.get() ) { client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } }
当配置发生变化时,watcher类中会重新调用reset方法,再次监听
private final CuratorWatcher watcher = new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { reset(); } };
相关推荐
ZooKeeper通过 zab(ZooKeeper Atomic Broadcast)协议保证了强一致性,而Leader选举则是ZooKeeper集群中实现高可用性的关键过程,确保即使有服务器故障,服务仍能继续运行。 总的来说,ZooKeeper是一个强大的工具...
### Zookeeper 使用总结 #### ZOOKEEPER 概述 - **Zookeeper 介绍** Zookeeper 是一个分布式协调服务框架,旨在简化分布式应用程序的开发。它提供了一个高性能的协同工作系统,使得开发者能够专注于应用程序的...
- **集群管理**:Zookeeper 可以监控集群中节点的状态,当节点发生变化时,可以通过事件通知其他节点,实现集群的动态扩展和故障恢复。 ### Zookeeper 集群模式 在实际生产环境中,Zookeeper 通常运行在真正的集群...
总结,ZooKeeper是分布式系统中不可或缺的组件,它的稳定性和高性能使其在大数据、云计算等领域广泛应用。理解并熟练掌握ZooKeeper的原理和使用,对于Linux运维人员来说至关重要,能够帮助构建更加可靠的分布式服务...
总结来说,Zookeeper-3.4.8是一个强大的分布式协调服务,其安装包中的`zookeeper-3.4.8.jar`和`slf4j-api-1.6.1.jar`是运行和管理Zookeeper所必需的。通过理解和掌握Zookeeper的工作原理及其配置,开发者可以有效地...
在使用ZooKeeper时,需要注意一些最佳实践,例如,为了保证高可用性,通常会部署一个由多个节点组成的ZooKeeper集群,每个节点都应配置奇数个,以防止因节点故障导致半数以上节点不可用的情况。此外,合理规划ZNode...
总结,Zookeeper 3.4.6作为一款强大的分布式协调服务,对于搭建Dubbo这样的分布式应用至关重要。了解其安装、配置、集群搭建以及与Dubbo的集成方法,是成功构建和维护分布式系统的关键步骤。通过深入理解Zookeeper的...
总结来说,Zookeeper在Kafka中的作用不可忽视,它是Kafka实现高效、可靠、可扩展的关键因素。通过理解Zookeeper的工作原理,我们可以更好地优化Kafka集群的配置和管理,提高系统的稳定性和性能。在实际操作中,我们...
同时,Zookeeper与Dubbo的结合,使得服务治理变得更加简便,例如服务注册与发现、负载均衡、故障转移等功能,大大提升了分布式服务的可扩展性和可靠性。 在Zookeeper 3.4.6的压缩包`zookeeper-3.4.6.tar`中,包含了...
总结,Zookeeper 3.4.12 是一个强大的分布式协调框架,它为开发者提供了构建高可用分布式系统的基础。了解并熟练掌握其核心概念和用法,对于构建可扩展、容错的分布式应用至关重要。通过深入学习和实践,我们可以更...
例如,在Hadoop中,ZooKeeper用于管理NameNode的元数据,防止单点故障;在Spark中,它负责任务调度和集群管理;在HBase中,它帮助管理RegionServer的状态和分配。 总的来说,从Paxos到ZooKeeper的演进体现了分布式...
总结,Zookeeper客户端图形化界面为Zookeeper的管理和维护提供了便利,通过直观的图形展示和便捷的操作,使得Zookeeper的管理变得更加高效和人性化。无论是开发人员还是运维人员,都能从中受益,提升工作效率。在...
Dubbo的优秀服务治理功能和Zookeeper的协调能力共同确保了系统的高效运行和故障恢复。在使用过程中,根据项目需求和兼容性选择合适的组件版本是关键,同时保持对最新技术趋势的关注也是必要的,以便及时利用新功能和...
这种设计使得Zookeeper具有高可用性,即使部分服务器故障,系统仍能正常运行。 三、Zookeeper数据模型 Zookeeper的数据模型是一个层次化的命名空间,类似于文件系统的目录结构,称为ZNode。每个ZNode都可以存储数据...
总结,Zookeeper 3.4.13是分布式系统中不可或缺的工具,其强大的功能和稳定的性能使得它在云计算、大数据等领域广泛应用。理解和掌握Zookeeper的使用,对于构建和维护大规模分布式系统具有重要意义。
**Zookeeper中文开发指南** ...总结来说,“Zookeeper中文开发指南”是一本全面介绍Zookeeper的参考资料,它将帮助开发者深入理解Zookeeper的原理、使用方法和最佳实践,对于构建和管理分布式系统具有很高的参考价值。
总结,Zookeeper for Windows 3.4.8虽然提供了在Windows环境下的运行可能,但考虑到其性能和稳定性,推荐在生产环境中使用Linux版本。通过理解和掌握Zookeeper的基本概念、安装配置以及使用方法,开发者能够更好地在...